Show More
@@ -21,7 +21,3 b' class EnginePUBHandler(PUBHandler):' | |||
|
21 | 21 | else: |
|
22 | 22 | return "engine" |
|
23 | 23 | |
|
24 | ||
|
25 | logger = logging.getLogger('ipzmq') | |
|
26 | logger.setLevel(logging.DEBUG) | |
|
27 |
@@ -90,11 +90,6 b' def defaultblock(f, self, *args, **kwargs):' | |||
|
90 | 90 | # Classes |
|
91 | 91 | #-------------------------------------------------------------------------- |
|
92 | 92 | |
|
93 | class AbortedTask(object): | |
|
94 | """A basic wrapper object describing an aborted task.""" | |
|
95 | def __init__(self, msg_id): | |
|
96 | self.msg_id = msg_id | |
|
97 | ||
|
98 | 93 | class ResultDict(dict): |
|
99 | 94 | """A subclass of dict that raises errors if it has them.""" |
|
100 | 95 | def __getitem__(self, key): |
@@ -332,10 +327,10 b' class Client(object):' | |||
|
332 | 327 | msg = ss.Message(msg) |
|
333 | 328 | content = msg.content |
|
334 | 329 | if content.status == 'ok': |
|
335 |
if content. |
|
|
330 | if content.mux: | |
|
336 | 331 | self._mux_socket = self.context.socket(zmq.PAIR) |
|
337 | 332 | self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
338 |
connect_socket(self._mux_socket, content. |
|
|
333 | connect_socket(self._mux_socket, content.mux) | |
|
339 | 334 | if content.task: |
|
340 | 335 | self._task_socket = self.context.socket(zmq.PAIR) |
|
341 | 336 | self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) |
@@ -17,6 +17,7 b' from __future__ import print_function' | |||
|
17 | 17 | |
|
18 | 18 | import os |
|
19 | 19 | import time |
|
20 | import logging | |
|
20 | 21 | from multiprocessing import Process |
|
21 | 22 | |
|
22 | 23 | import zmq |
@@ -29,7 +30,8 b' from IPython.zmq.entry_point import bind_port' | |||
|
29 | 30 | |
|
30 | 31 | from hub import Hub |
|
31 | 32 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, |
|
32 |
connect_logger, parse_url, signal_children, generate_exec_key |
|
|
33 | connect_logger, parse_url, signal_children, generate_exec_key, | |
|
34 | local_logger) | |
|
33 | 35 | |
|
34 | 36 | |
|
35 | 37 | import streamsession as session |
@@ -118,8 +120,6 b' def main(argv=None):' | |||
|
118 | 120 | ctx = zmq.Context() |
|
119 | 121 | loop = ioloop.IOLoop.instance() |
|
120 | 122 | |
|
121 | # setup logging | |
|
122 | connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel) | |
|
123 | 123 | |
|
124 | 124 | # Registrar socket |
|
125 | 125 | reg = ZMQStream(ctx.socket(zmq.XREP), loop) |
@@ -207,7 +207,9 b' def main(argv=None):' | |||
|
207 | 207 | q.start() |
|
208 | 208 | children.append(q.launcher) |
|
209 | 209 | else: |
|
210 | sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler) | |
|
210 | log_addr = iface%args.logport if args.logport else None | |
|
211 | sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport, | |
|
212 | log_addr, args.loglevel, args.scheduler) | |
|
211 | 213 | print (sargs) |
|
212 | 214 | q = Process(target=launch_scheduler, args=sargs) |
|
213 | 215 | q.daemon=True |
@@ -224,7 +226,7 b' def main(argv=None):' | |||
|
224 | 226 | # build connection dicts |
|
225 | 227 | engine_addrs = { |
|
226 | 228 | 'control' : iface%control[1], |
|
227 |
' |
|
|
229 | 'mux': iface%mux[1], | |
|
228 | 230 | 'heartbeat': (iface%hb[0], iface%hb[1]), |
|
229 | 231 | 'task' : iface%task[1], |
|
230 | 232 | 'iopub' : iface%iopub[1], |
@@ -234,15 +236,24 b' def main(argv=None):' | |||
|
234 | 236 | client_addrs = { |
|
235 | 237 | 'control' : iface%control[0], |
|
236 | 238 | 'query': iface%cport, |
|
237 |
' |
|
|
239 | 'mux': iface%mux[0], | |
|
238 | 240 | 'task' : iface%task[0], |
|
239 | 241 | 'iopub' : iface%iopub[0], |
|
240 | 242 | 'notification': iface%nport |
|
241 | 243 | } |
|
242 | 244 | |
|
245 | # setup logging | |
|
246 | if args.logport: | |
|
247 | connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel) | |
|
248 | else: | |
|
249 | local_logger(args.loglevel) | |
|
250 | ||
|
243 | 251 | # register relay of signals to the children |
|
244 | 252 | signal_children(children) |
|
245 | hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs) | |
|
253 | hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon, | |
|
254 | registrar=reg, clientele=c, notifier=n, db=db, | |
|
255 | engine_addrs=engine_addrs, client_addrs=client_addrs) | |
|
256 | ||
|
246 | 257 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) |
|
247 | 258 | dc.start() |
|
248 | 259 | loop.start() |
@@ -8,49 +8,48 b' import sys' | |||
|
8 | 8 | import time |
|
9 | 9 | import traceback |
|
10 | 10 | import uuid |
|
11 | import logging | |
|
11 | 12 | from pprint import pprint |
|
12 | 13 | |
|
13 | 14 | import zmq |
|
14 | 15 | from zmq.eventloop import ioloop, zmqstream |
|
15 | 16 | |
|
16 | from IPython.utils.traitlets import HasTraits | |
|
17 | from IPython.utils.localinterfaces import LOCALHOST | |
|
17 | # internal | |
|
18 | from IPython.config.configurable import Configurable | |
|
19 | from IPython.utils.traitlets import Instance, Str, Dict | |
|
20 | # from IPython.utils.localinterfaces import LOCALHOST | |
|
18 | 21 | |
|
19 | 22 | from streamsession import Message, StreamSession |
|
20 | from client import Client | |
|
21 | 23 | from streamkernel import Kernel, make_kernel |
|
22 | 24 | import heartmonitor |
|
23 | from entry_point import make_base_argument_parser, connect_logger, parse_url | |
|
25 | from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url, | |
|
26 | local_logger) | |
|
24 | 27 | # import taskthread |
|
25 | # from log import logger | |
|
26 | ||
|
28 | logger = logging.getLogger() | |
|
27 | 29 | |
|
28 | 30 | def printer(*msg): |
|
29 |
|
|
|
31 | # print (logger.handlers, file=sys.__stdout__) | |
|
32 | logger.info(str(msg)) | |
|
30 | 33 | |
|
31 |
class Engine( |
|
|
34 | class Engine(Configurable): | |
|
32 | 35 | """IPython engine""" |
|
33 | 36 | |
|
34 | id=None | |
|
35 | context=None | |
|
36 | loop=None | |
|
37 | session=None | |
|
38 | ident=None | |
|
39 | registrar=None | |
|
40 | heart=None | |
|
41 | 37 | kernel=None |
|
42 |
|
|
|
43 | ||
|
44 | def __init__(self, context, loop, session, registrar, client=None, ident=None, | |
|
45 | heart_id=None, user_ns=None): | |
|
46 | self.context = context | |
|
47 | self.loop = loop | |
|
48 | self.session = session | |
|
49 | self.registrar = registrar | |
|
50 | self.client = client | |
|
51 | self.ident = ident if ident else str(uuid.uuid4()) | |
|
38 | id=None | |
|
39 | ||
|
40 | # configurables: | |
|
41 | context=Instance(zmq.Context) | |
|
42 | loop=Instance(ioloop.IOLoop) | |
|
43 | session=Instance(StreamSession) | |
|
44 | ident=Str() | |
|
45 | registrar=Instance(zmqstream.ZMQStream) | |
|
46 | user_ns=Dict() | |
|
47 | ||
|
48 | def __init__(self, **kwargs): | |
|
49 | super(Engine, self).__init__(**kwargs) | |
|
50 | if not self.ident: | |
|
51 | self.ident = str(uuid.uuid4()) | |
|
52 | 52 | self.registrar.on_send(printer) |
|
53 | self.user_ns = user_ns | |
|
54 | 53 | |
|
55 | 54 | def register(self): |
|
56 | 55 | |
@@ -64,8 +63,9 b' class Engine(object):' | |||
|
64 | 63 | idents,msg = self.session.feed_identities(msg) |
|
65 | 64 | msg = Message(self.session.unpack_message(msg)) |
|
66 | 65 | if msg.content.status == 'ok': |
|
67 |
self. |
|
|
68 | queue_addr = msg.content.queue | |
|
66 | self.id = int(msg.content.id) | |
|
67 | self.session.username = 'engine-%i'%self.id | |
|
68 | queue_addr = msg.content.mux | |
|
69 | 69 | shell_addrs = [str(queue_addr)] |
|
70 | 70 | control_addr = str(msg.content.control) |
|
71 | 71 | task_addr = msg.content.task |
@@ -75,7 +75,7 b' class Engine(object):' | |||
|
75 | 75 | |
|
76 | 76 | hb_addrs = msg.content.heartbeat |
|
77 | 77 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() |
|
78 | k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, | |
|
78 | k = make_kernel(self.id, self.ident, control_addr, shell_addrs, iopub_addr, | |
|
79 | 79 | hb_addrs, client_addr=None, loop=self.loop, |
|
80 | 80 | context=self.context, key=self.session.key)[-1] |
|
81 | 81 | self.kernel = k |
@@ -84,12 +84,12 b' class Engine(object):' | |||
|
84 | 84 | self.kernel.user_ns = self.user_ns |
|
85 | 85 | |
|
86 | 86 | else: |
|
87 |
|
|
|
87 | logger.error("Registration Failed: %s"%msg) | |
|
88 | 88 | raise Exception("Registration Failed: %s"%msg) |
|
89 | 89 | |
|
90 |
|
|
|
90 | logger.info("completed registration with id %i"%self.id) | |
|
91 | 91 | |
|
92 | print (msg,file=sys.__stdout__) | |
|
92 | # logger.info(str(msg)) | |
|
93 | 93 | |
|
94 | 94 | def unregister(self): |
|
95 | 95 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) |
@@ -97,7 +97,7 b' class Engine(object):' | |||
|
97 | 97 | sys.exit(0) |
|
98 | 98 | |
|
99 | 99 | def start(self): |
|
100 |
|
|
|
100 | logger.info("registering") | |
|
101 | 101 | self.register() |
|
102 | 102 | |
|
103 | 103 | |
@@ -118,7 +118,6 b' def main(argv=None, user_ns=None):' | |||
|
118 | 118 | ctx = zmq.Context() |
|
119 | 119 | |
|
120 | 120 | # setup logging |
|
121 | connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) | |
|
122 | 121 | |
|
123 | 122 | reg_conn = iface % args.regport |
|
124 | 123 | print (reg_conn, file=sys.__stdout__) |
@@ -127,10 +126,16 b' def main(argv=None, user_ns=None):' | |||
|
127 | 126 | reg = ctx.socket(zmq.PAIR) |
|
128 | 127 | reg.connect(reg_conn) |
|
129 | 128 | reg = zmqstream.ZMQStream(reg, loop) |
|
130 | client = None | |
|
131 | 129 | |
|
132 | e = Engine(ctx, loop, session, reg, client, args.ident, user_ns=user_ns) | |
|
133 | dc = ioloop.DelayedCallback(e.start, 100, loop) | |
|
130 | e = Engine(context=ctx, loop=loop, session=session, registrar=reg, | |
|
131 | ident=args.ident or '', user_ns=user_ns) | |
|
132 | if args.logport: | |
|
133 | print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__) | |
|
134 | connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel) | |
|
135 | else: | |
|
136 | local_logger(args.loglevel) | |
|
137 | ||
|
138 | dc = ioloop.DelayedCallback(e.start, 0, loop) | |
|
134 | 139 | dc.start() |
|
135 | 140 | loop.start() |
|
136 | 141 |
@@ -22,7 +22,7 b' from zmq.log import handlers' | |||
|
22 | 22 | # Local imports. |
|
23 | 23 | from IPython.core.ultratb import FormattedTB |
|
24 | 24 | from IPython.external.argparse import ArgumentParser |
|
25 |
from IPython.zmq.log import |
|
|
25 | from IPython.zmq.log import EnginePUBHandler | |
|
26 | 26 | |
|
27 | 27 | def split_ports(s, n): |
|
28 | 28 | """Parser helper for multiport strings""" |
@@ -82,6 +82,7 b' def make_base_argument_parser():' | |||
|
82 | 82 | """ Creates an ArgumentParser for the generic arguments supported by all |
|
83 | 83 | ipcluster entry points. |
|
84 | 84 | """ |
|
85 | ||
|
85 | 86 | parser = ArgumentParser() |
|
86 | 87 | parser.add_argument('--ip', type=str, default='127.0.0.1', |
|
87 | 88 | help='set the controller\'s IP address [default: local]') |
@@ -89,10 +90,10 b' def make_base_argument_parser():' | |||
|
89 | 90 | help='set the transport to use [default: tcp]') |
|
90 | 91 | parser.add_argument('--regport', type=int, metavar='PORT', default=10101, |
|
91 | 92 | help='set the XREP port for registration [default: 10101]') |
|
92 |
parser.add_argument('--logport', type=int, metavar='PORT', default= |
|
|
93 |
help='set the PUB port for logging [default: |
|
|
94 |
parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging. |
|
|
95 |
help='set the log level [default: |
|
|
93 | parser.add_argument('--logport', type=int, metavar='PORT', default=0, | |
|
94 | help='set the PUB port for remote logging [default: log to stdout]') | |
|
95 | parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO, | |
|
96 | help='set the log level [default: INFO]') | |
|
96 | 97 | parser.add_argument('--ident', type=str, |
|
97 | 98 | help='set the ZMQ identity [default: random]') |
|
98 | 99 | parser.add_argument('--packer', type=str, default='json', |
@@ -105,17 +106,42 b' def make_base_argument_parser():' | |||
|
105 | 106 | |
|
106 | 107 | return parser |
|
107 | 108 | |
|
108 | ||
|
109 | def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG): | |
|
109 | def integer_loglevel(loglevel): | |
|
110 | 110 | try: |
|
111 | 111 | loglevel = int(loglevel) |
|
112 | 112 | except ValueError: |
|
113 | 113 | if isinstance(loglevel, str): |
|
114 | 114 | loglevel = getattr(logging, loglevel) |
|
115 | return loglevel | |
|
116 | ||
|
117 | def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG): | |
|
118 | loglevel = integer_loglevel(loglevel) | |
|
115 | 119 | lsock = context.socket(zmq.PUB) |
|
116 | 120 | lsock.connect(iface) |
|
117 | 121 | handler = handlers.PUBHandler(lsock) |
|
118 | 122 | handler.setLevel(loglevel) |
|
119 | 123 | handler.root_topic = root |
|
124 | logger = logging.getLogger() | |
|
125 | logger.addHandler(handler) | |
|
126 | logger.setLevel(loglevel) | |
|
127 | ||
|
128 | def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG): | |
|
129 | logger = logging.getLogger() | |
|
130 | loglevel = integer_loglevel(loglevel) | |
|
131 | lsock = context.socket(zmq.PUB) | |
|
132 | lsock.connect(iface) | |
|
133 | handler = EnginePUBHandler(engine, lsock) | |
|
134 | handler.setLevel(loglevel) | |
|
120 | 135 | logger.addHandler(handler) |
|
136 | logger.setLevel(loglevel) | |
|
121 | 137 | |
|
138 | def local_logger(loglevel=logging.DEBUG): | |
|
139 | loglevel = integer_loglevel(loglevel) | |
|
140 | logger = logging.getLogger() | |
|
141 | if logger.handlers: | |
|
142 | # if there are any handlers, skip the hookup | |
|
143 | return | |
|
144 | handler = logging.StreamHandler() | |
|
145 | handler.setLevel(loglevel) | |
|
146 | logger.addHandler(handler) | |
|
147 | logger.setLevel(loglevel) |
@@ -7,13 +7,13 b' and hearts are tracked based on their XREQ identities.' | |||
|
7 | 7 | from __future__ import print_function |
|
8 | 8 | import time |
|
9 | 9 | import uuid |
|
10 | import logging | |
|
10 | 11 | |
|
11 | 12 | import zmq |
|
12 | 13 | from zmq.devices import ProcessDevice,ThreadDevice |
|
13 | 14 | from zmq.eventloop import ioloop, zmqstream |
|
14 | 15 | |
|
15 | #internal | |
|
16 | from IPython.zmq.log import logger | |
|
16 | logger = logging.getLogger() | |
|
17 | 17 | |
|
18 | 18 | class Heart(object): |
|
19 | 19 | """A basic heart object for responding to a HeartMonitor. |
@@ -53,6 +53,7 b' class HeartMonitor(object):' | |||
|
53 | 53 | hearts=None |
|
54 | 54 | on_probation=None |
|
55 | 55 | last_ping=None |
|
56 | # debug=False | |
|
56 | 57 | |
|
57 | 58 | def __init__(self, loop, pingstream, pongstream, period=1000): |
|
58 | 59 | self.loop = loop |
@@ -85,19 +86,6 b' class HeartMonitor(object):' | |||
|
85 | 86 | logger.debug("heartbeat::new heart failure handler: %s"%handler) |
|
86 | 87 | self._failure_handlers.add(handler) |
|
87 | 88 | |
|
88 | # def _flush(self): | |
|
89 | # """override IOLoop triggers""" | |
|
90 | # while True: | |
|
91 | # try: | |
|
92 | # msg = self.pongstream.socket.recv_multipart(zmq.NOBLOCK) | |
|
93 | # logger.warn("IOLoop triggered beat with incoming heartbeat waiting to be handled") | |
|
94 | # except zmq.ZMQError: | |
|
95 | # return | |
|
96 | # else: | |
|
97 | # self.handle_pong(msg) | |
|
98 | # # print '.' | |
|
99 | # | |
|
100 | ||
|
101 | 89 | def beat(self): |
|
102 | 90 | self.pongstream.flush() |
|
103 | 91 | self.last_ping = self.lifetime |
@@ -105,7 +93,7 b' class HeartMonitor(object):' | |||
|
105 | 93 | toc = time.time() |
|
106 | 94 | self.lifetime += toc-self.tic |
|
107 | 95 | self.tic = toc |
|
108 | logger.debug("heartbeat::%s"%self.lifetime) | |
|
96 | # logger.debug("heartbeat::%s"%self.lifetime) | |
|
109 | 97 | goodhearts = self.hearts.intersection(self.responses) |
|
110 | 98 | missed_beats = self.hearts.difference(goodhearts) |
|
111 | 99 | heartfailures = self.on_probation.intersection(missed_beats) |
@@ -144,7 +132,7 b' class HeartMonitor(object):' | |||
|
144 | 132 | "a heart just beat" |
|
145 | 133 | if msg[1] == str(self.lifetime): |
|
146 | 134 | delta = time.time()-self.tic |
|
147 | logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) | |
|
135 | # logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) | |
|
148 | 136 | self.responses.add(msg[0]) |
|
149 | 137 | elif msg[1] == str(self.last_ping): |
|
150 | 138 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) |
@@ -18,14 +18,19 b' from __future__ import print_function' | |||
|
18 | 18 | import sys |
|
19 | 19 | from datetime import datetime |
|
20 | 20 | import time |
|
21 | import logging | |
|
21 | 22 | |
|
22 | 23 | import zmq |
|
23 | from zmq.eventloop import ioloop | |
|
24 | from zmq.eventloop import ioloop, zmqstream | |
|
24 | 25 | |
|
25 | 26 | # internal: |
|
26 | from IPython.zmq.log import logger # a Logger object | |
|
27 | from IPython.config.configurable import Configurable | |
|
28 | from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict | |
|
29 | # from IPython.zmq.log import logger # a Logger object | |
|
27 | 30 | |
|
28 | 31 | from streamsession import Message, wrap_exception, ISO8601 |
|
32 | from heartmonitor import HeartMonitor | |
|
33 | from util import validate_url_container | |
|
29 | 34 | |
|
30 | 35 | try: |
|
31 | 36 | from pymongo.binary import Binary |
@@ -38,6 +43,8 b' else:' | |||
|
38 | 43 | # Code |
|
39 | 44 | #----------------------------------------------------------------------------- |
|
40 | 45 | |
|
46 | logger = logging.getLogger() | |
|
47 | ||
|
41 | 48 | def _passer(*args, **kwargs): |
|
42 | 49 | return |
|
43 | 50 | |
@@ -46,7 +53,7 b' def _printer(*args, **kwargs):' | |||
|
46 | 53 | print (kwargs) |
|
47 | 54 | |
|
48 | 55 | def init_record(msg): |
|
49 | """return an empty TaskRecord dict, with all keys initialized with None.""" | |
|
56 | """Initialize a TaskRecord based on a request.""" | |
|
50 | 57 | header = msg['header'] |
|
51 | 58 | return { |
|
52 | 59 | 'msg_id' : header['msg_id'], |
@@ -71,7 +78,7 b' def init_record(msg):' | |||
|
71 | 78 | } |
|
72 | 79 | |
|
73 | 80 | |
|
74 |
class EngineConnector( |
|
|
81 | class EngineConnector(HasTraits): | |
|
75 | 82 | """A simple object for accessing the various zmq connections of an object. |
|
76 | 83 | Attributes are: |
|
77 | 84 | id (int): engine ID |
@@ -80,22 +87,18 b' class EngineConnector(object):' | |||
|
80 | 87 | registration (str): identity of registration XREQ socket |
|
81 | 88 | heartbeat (str): identity of heartbeat XREQ socket |
|
82 | 89 | """ |
|
83 | id=0 | |
|
84 |
queue= |
|
|
85 |
control= |
|
|
86 |
registration= |
|
|
87 |
heartbeat= |
|
|
88 |
pending= |
|
|
89 | ||
|
90 | def __init__(self, id, queue, registration, control, heartbeat=None): | |
|
91 | logger.info("engine::Engine Connected: %i"%id) | |
|
92 | self.id = id | |
|
93 | self.queue = queue | |
|
94 | self.registration = registration | |
|
95 | self.control = control | |
|
96 | self.heartbeat = heartbeat | |
|
97 | ||
|
98 | class Hub(object): | |
|
90 | id=Int(0) | |
|
91 | queue=Str() | |
|
92 | control=Str() | |
|
93 | registration=Str() | |
|
94 | heartbeat=Str() | |
|
95 | pending=Instance(set) | |
|
96 | ||
|
97 | def __init__(self, **kwargs): | |
|
98 | super(EngineConnector, self).__init__(**kwargs) | |
|
99 | logger.info("engine::Engine Connected: %i"%self.id) | |
|
100 | ||
|
101 | class Hub(Configurable): | |
|
99 | 102 | """The IPython Controller Hub with 0MQ connections |
|
100 | 103 | |
|
101 | 104 | Parameters |
@@ -123,26 +126,25 b' class Hub(object):' | |||
|
123 | 126 | clients=None |
|
124 | 127 | hearts=None |
|
125 | 128 | pending=None |
|
126 | results=None | |
|
127 | 129 | tasks=None |
|
128 | 130 | completed=None |
|
129 | mia=None | |
|
131 | # mia=None | |
|
130 | 132 | incoming_registrations=None |
|
131 | 133 | registration_timeout=None |
|
132 | 134 | |
|
133 | 135 | #objects from constructor: |
|
134 | loop=None | |
|
135 | registrar=None | |
|
136 | clientelle=None | |
|
137 | queue=None | |
|
138 | heartbeat=None | |
|
139 | notifier=None | |
|
140 | db=None | |
|
141 |
client_addr= |
|
|
142 |
engine_addrs= |
|
|
143 | ||
|
144 | ||
|
145 | def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs): | |
|
136 | loop=Instance(ioloop.IOLoop) | |
|
137 | registrar=Instance(zmqstream.ZMQStream) | |
|
138 | clientele=Instance(zmqstream.ZMQStream) | |
|
139 | monitor=Instance(zmqstream.ZMQStream) | |
|
140 | heartmonitor=Instance(HeartMonitor) | |
|
141 | notifier=Instance(zmqstream.ZMQStream) | |
|
142 | db=Instance(object) | |
|
143 | client_addrs=Dict() | |
|
144 | engine_addrs=Dict() | |
|
145 | ||
|
146 | ||
|
147 | def __init__(self, **kwargs): | |
|
146 | 148 | """ |
|
147 | 149 | # universal: |
|
148 | 150 | loop: IOLoop for creating future connections |
@@ -158,6 +160,8 b' class Hub(object):' | |||
|
158 | 160 | engine_addrs: zmq address/protocol dict for engine connections |
|
159 | 161 | client_addrs: zmq address/protocol dict for client connections |
|
160 | 162 | """ |
|
163 | ||
|
164 | super(Hub, self).__init__(**kwargs) | |
|
161 | 165 | self.ids = set() |
|
162 | 166 | self.keytable={} |
|
163 | 167 | self.incoming_registrations={} |
@@ -166,35 +170,44 b' class Hub(object):' | |||
|
166 | 170 | self.clients = {} |
|
167 | 171 | self.hearts = {} |
|
168 | 172 | # self.mia = set() |
|
169 | ||
|
173 | self.registration_timeout = max(5000, 2*self.heartmonitor.period) | |
|
174 | # this is the stuff that will move to DB: | |
|
175 | self.pending = set() # pending messages, keyed by msg_id | |
|
176 | self.queues = {} # pending msg_ids keyed by engine_id | |
|
177 | self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id | |
|
178 | self.completed = {} # completed msg_ids keyed by engine_id | |
|
179 | self.all_completed = set() | |
|
180 | self._idcounter = 0 | |
|
170 | 181 | # self.sockets = {} |
|
171 | self.loop = loop | |
|
172 | self.session = session | |
|
173 | self.registrar = registrar | |
|
174 | self.clientele = clientele | |
|
175 | self.queue = queue | |
|
176 |
self.heart |
|
|
177 | self.notifier = notifier | |
|
178 | self.db = db | |
|
182 | # self.loop = loop | |
|
183 | # self.session = session | |
|
184 | # self.registrar = registrar | |
|
185 | # self.clientele = clientele | |
|
186 | # self.queue = queue | |
|
187 | # self.heartmonitor = heartbeat | |
|
188 | # self.notifier = notifier | |
|
189 | # self.db = db | |
|
179 | 190 | |
|
180 | 191 | # validate connection dicts: |
|
181 | self.client_addrs = client_addrs | |
|
182 | assert isinstance(client_addrs['queue'], str) | |
|
183 | assert isinstance(client_addrs['control'], str) | |
|
192 | # self.client_addrs = client_addrs | |
|
193 | validate_url_container(self.client_addrs) | |
|
194 | ||
|
195 | # assert isinstance(self.client_addrs['queue'], str) | |
|
196 | # assert isinstance(self.client_addrs['control'], str) | |
|
184 | 197 | # self.hb_addrs = hb_addrs |
|
185 | self.engine_addrs = engine_addrs | |
|
186 | assert isinstance(engine_addrs['queue'], str) | |
|
187 |
assert isinstance( |
|
|
188 |
assert |
|
|
198 | validate_url_container(self.engine_addrs) | |
|
199 | # self.engine_addrs = engine_addrs | |
|
200 | # assert isinstance(self.engine_addrs['queue'], str) | |
|
201 | # assert isinstance(self.engine_addrs['control'], str) | |
|
202 | # assert len(engine_addrs['heartbeat']) == 2 | |
|
189 | 203 | |
|
190 | 204 | # register our callbacks |
|
191 | 205 | self.registrar.on_recv(self.dispatch_register_request) |
|
192 | 206 | self.clientele.on_recv(self.dispatch_client_msg) |
|
193 |
self. |
|
|
207 | self.monitor.on_recv(self.dispatch_monitor_traffic) | |
|
194 | 208 | |
|
195 | if heartbeat is not None: | |
|
196 |
|
|
|
197 | heartbeat.add_new_heart_handler(self.handle_new_heart) | |
|
209 | self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure) | |
|
210 | self.heartmonitor.add_new_heart_handler(self.handle_new_heart) | |
|
198 | 211 | |
|
199 | 212 | self.monitor_handlers = { 'in' : self.save_queue_request, |
|
200 | 213 | 'out': self.save_queue_result, |
@@ -218,25 +231,21 b' class Hub(object):' | |||
|
218 | 231 | 'unregistration_request' : self.unregister_engine, |
|
219 | 232 | 'connection_request': self.connection_request, |
|
220 | 233 | } |
|
221 | self.registration_timeout = max(5000, 2*self.heartbeat.period) | |
|
222 | # this is the stuff that will move to DB: | |
|
223 | # self.results = {} # completed results | |
|
224 | self.pending = set() # pending messages, keyed by msg_id | |
|
225 | self.queues = {} # pending msg_ids keyed by engine_id | |
|
226 | self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id | |
|
227 | self.completed = {} # completed msg_ids keyed by engine_id | |
|
228 | self.all_completed = set() | |
|
229 | 234 | |
|
230 | 235 | logger.info("controller::created controller") |
|
231 | 236 | |
|
232 | def _new_id(self): | |
|
237 | @property | |
|
238 | def _next_id(self): | |
|
233 | 239 | """gemerate a new ID""" |
|
234 |
newid = |
|
|
235 | incoming = [id[0] for id in self.incoming_registrations.itervalues()] | |
|
236 | # print newid, self.ids, self.incoming_registrations | |
|
237 | while newid in self.ids or newid in incoming: | |
|
238 | newid += 1 | |
|
240 | newid = self._idcounter | |
|
241 | self._idcounter += 1 | |
|
239 | 242 | return newid |
|
243 | # newid = 0 | |
|
244 | # incoming = [id[0] for id in self.incoming_registrations.itervalues()] | |
|
245 | # # print newid, self.ids, self.incoming_registrations | |
|
246 | # while newid in self.ids or newid in incoming: | |
|
247 | # newid += 1 | |
|
248 | # return newid | |
|
240 | 249 | |
|
241 | 250 | #----------------------------------------------------------------------------- |
|
242 | 251 | # message validation |
@@ -580,6 +589,9 b' class Hub(object):' | |||
|
580 | 589 | return |
|
581 | 590 | |
|
582 | 591 | parent = msg['parent_header'] |
|
592 | if not parent: | |
|
593 | logger.error("iopub::invalid IOPub message: %s"%msg) | |
|
594 | return | |
|
583 | 595 | msg_id = parent['msg_id'] |
|
584 | 596 | msg_type = msg['msg_type'] |
|
585 | 597 | content = msg['content'] |
@@ -631,7 +643,7 b' class Hub(object):' | |||
|
631 | 643 | return |
|
632 | 644 | heart = content.get('heartbeat', None) |
|
633 | 645 | """register a new engine, and create the socket(s) necessary""" |
|
634 |
eid = self._ne |
|
|
646 | eid = self._next_id | |
|
635 | 647 | # print (eid, queue, reg, heart) |
|
636 | 648 | |
|
637 | 649 | logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) |
@@ -644,10 +656,12 b' class Hub(object):' | |||
|
644 | 656 | raise KeyError("queue_id %r in use"%queue) |
|
645 | 657 | except: |
|
646 | 658 | content = wrap_exception() |
|
659 | logger.error("queue_id %r in use"%queue, exc_info=True) | |
|
647 | 660 | elif heart in self.hearts: # need to check unique hearts? |
|
648 | 661 | try: |
|
649 | 662 | raise KeyError("heart_id %r in use"%heart) |
|
650 | 663 | except: |
|
664 | logger.error("heart_id %r in use"%heart, exc_info=True) | |
|
651 | 665 | content = wrap_exception() |
|
652 | 666 | else: |
|
653 | 667 | for h, pack in self.incoming_registrations.iteritems(): |
@@ -655,12 +669,14 b' class Hub(object):' | |||
|
655 | 669 | try: |
|
656 | 670 | raise KeyError("heart_id %r in use"%heart) |
|
657 | 671 | except: |
|
672 | logger.error("heart_id %r in use"%heart, exc_info=True) | |
|
658 | 673 | content = wrap_exception() |
|
659 | 674 | break |
|
660 | 675 | elif queue == pack[1]: |
|
661 | 676 | try: |
|
662 | 677 | raise KeyError("queue_id %r in use"%queue) |
|
663 | 678 | except: |
|
679 | logger.error("queue_id %r in use"%queue, exc_info=True) | |
|
664 | 680 | content = wrap_exception() |
|
665 | 681 | break |
|
666 | 682 | |
@@ -669,15 +685,15 b' class Hub(object):' | |||
|
669 | 685 | ident=reg) |
|
670 | 686 | |
|
671 | 687 | if content['status'] == 'ok': |
|
672 |
if heart in self.heart |
|
|
688 | if heart in self.heartmonitor.hearts: | |
|
673 | 689 | # already beating |
|
674 | self.incoming_registrations[heart] = (eid,queue,reg,None) | |
|
690 | self.incoming_registrations[heart] = (eid,queue,reg[0],None) | |
|
675 | 691 | self.finish_registration(heart) |
|
676 | 692 | else: |
|
677 | 693 | purge = lambda : self._purge_stalled_registration(heart) |
|
678 | 694 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) |
|
679 | 695 | dc.start() |
|
680 | self.incoming_registrations[heart] = (eid,queue,reg,dc) | |
|
696 | self.incoming_registrations[heart] = (eid,queue,reg[0],dc) | |
|
681 | 697 | else: |
|
682 | 698 | logger.error("registration::registration %i failed: %s"%(eid, content['evalue'])) |
|
683 | 699 | return eid |
@@ -718,7 +734,8 b' class Hub(object):' | |||
|
718 | 734 | control = queue |
|
719 | 735 | self.ids.add(eid) |
|
720 | 736 | self.keytable[eid] = queue |
|
721 |
self.engines[eid] = EngineConnector(eid, queue, reg, |
|
|
737 | self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg, | |
|
738 | control=control, heartbeat=heart) | |
|
722 | 739 | self.by_ident[queue] = eid |
|
723 | 740 | self.queues[eid] = list() |
|
724 | 741 | self.tasks[eid] = list() |
@@ -11,6 +11,8 b' Python Scheduler exists.' | |||
|
11 | 11 | |
|
12 | 12 | from __future__ import print_function |
|
13 | 13 | from random import randint,random |
|
14 | import logging | |
|
15 | from types import FunctionType | |
|
14 | 16 | |
|
15 | 17 | try: |
|
16 | 18 | import numpy |
@@ -21,17 +23,22 b' import zmq' | |||
|
21 | 23 | from zmq.eventloop import ioloop, zmqstream |
|
22 | 24 | |
|
23 | 25 | # local imports |
|
24 | from IPython.zmq.log import logger # a Logger object | |
|
26 | from IPython.external.decorator import decorator | |
|
27 | from IPython.config.configurable import Configurable | |
|
28 | from IPython.utils.traitlets import Instance | |
|
29 | ||
|
25 | 30 | from client import Client |
|
26 | 31 | from dependency import Dependency |
|
27 | 32 | import streamsession as ss |
|
33 | from entry_point import connect_logger, local_logger | |
|
28 | 34 | |
|
29 | from IPython.external.decorator import decorator | |
|
35 | ||
|
36 | logger = logging.getLogger() | |
|
30 | 37 | |
|
31 | 38 | @decorator |
|
32 | 39 | def logged(f,self,*args,**kwargs): |
|
33 | 40 | # print ("#--------------------") |
|
34 |
|
|
|
41 | logger.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs)) | |
|
35 | 42 | # print ("#--") |
|
36 | 43 | return f(self,*args, **kwargs) |
|
37 | 44 | |
@@ -99,7 +106,7 b' def leastload(loads):' | |||
|
99 | 106 | #--------------------------------------------------------------------- |
|
100 | 107 | # Classes |
|
101 | 108 | #--------------------------------------------------------------------- |
|
102 |
class TaskScheduler( |
|
|
109 | class TaskScheduler(Configurable): | |
|
103 | 110 | """Python TaskScheduler object. |
|
104 | 111 | |
|
105 | 112 | This is the simplest object that supports msg_id based |
@@ -108,10 +115,15 b' class TaskScheduler(object):' | |||
|
108 | 115 | |
|
109 | 116 | """ |
|
110 | 117 | |
|
111 | scheme = leastload # function for determining the destination | |
|
112 | client_stream = None # client-facing stream | |
|
113 |
|
|
|
114 |
|
|
|
118 | # configurables: | |
|
119 | scheme = Instance(FunctionType, default=leastload) # function for determining the destination | |
|
120 | client_stream = Instance(zmqstream.ZMQStream) # client-facing stream | |
|
121 | engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream | |
|
122 | notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream | |
|
123 | mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream | |
|
124 | io_loop = Instance(ioloop.IOLoop) | |
|
125 | ||
|
126 | # internals: | |
|
115 | 127 | dependencies = None # dict by msg_id of [ msg_ids that depend on key ] |
|
116 | 128 | depending = None # dict by msg_id of (msg_id, raw_msg, after, follow) |
|
117 | 129 | pending = None # dict by engine_uuid of submitted tasks |
@@ -123,23 +135,10 b' class TaskScheduler(object):' | |||
|
123 | 135 | blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency |
|
124 | 136 | |
|
125 | 137 | |
|
126 | def __init__(self, client_stream, engine_stream, mon_stream, | |
|
127 | notifier_stream, scheme=None, io_loop=None): | |
|
128 | if io_loop is None: | |
|
129 | io_loop = ioloop.IOLoop.instance() | |
|
130 | self.io_loop = io_loop | |
|
131 | self.client_stream = client_stream | |
|
132 | self.engine_stream = engine_stream | |
|
133 | self.mon_stream = mon_stream | |
|
134 | self.notifier_stream = notifier_stream | |
|
135 | ||
|
136 | if scheme is not None: | |
|
137 | self.scheme = scheme | |
|
138 | else: | |
|
139 | self.scheme = TaskScheduler.scheme | |
|
138 | def __init__(self, **kwargs): | |
|
139 | super(TaskScheduler, self).__init__(**kwargs) | |
|
140 | 140 | |
|
141 | 141 | self.session = ss.StreamSession(username="TaskScheduler") |
|
142 | ||
|
143 | 142 | self.dependencies = {} |
|
144 | 143 | self.depending = {} |
|
145 | 144 | self.completed = {} |
@@ -150,12 +149,13 b' class TaskScheduler(object):' | |||
|
150 | 149 | self.targets = [] |
|
151 | 150 | self.loads = [] |
|
152 | 151 | |
|
153 | engine_stream.on_recv(self.dispatch_result, copy=False) | |
|
152 | self.engine_stream.on_recv(self.dispatch_result, copy=False) | |
|
154 | 153 | self._notification_handlers = dict( |
|
155 | 154 | registration_notification = self._register_engine, |
|
156 | 155 | unregistration_notification = self._unregister_engine |
|
157 | 156 | ) |
|
158 | 157 | self.notifier_stream.on_recv(self.dispatch_notification) |
|
158 | logger.info("Scheduler started...%r"%self) | |
|
159 | 159 | |
|
160 | 160 | def resume_receiving(self): |
|
161 | 161 | """Resume accepting jobs.""" |
@@ -183,6 +183,7 b' class TaskScheduler(object):' | |||
|
183 | 183 | handler(str(msg['content']['queue'])) |
|
184 | 184 | except KeyError: |
|
185 | 185 | logger.error("task::Invalid notification msg: %s"%msg) |
|
186 | ||
|
186 | 187 | @logged |
|
187 | 188 | def _register_engine(self, uid): |
|
188 | 189 | """New engine with ident `uid` became available.""" |
@@ -306,7 +307,8 b' class TaskScheduler(object):' | |||
|
306 | 307 | self.add_job(idx) |
|
307 | 308 | self.pending[target][msg_id] = (msg, follow) |
|
308 | 309 | content = dict(msg_id=msg_id, engine_id=target) |
|
309 |
self.session.send(self.mon_stream, 'task_destination', content=content, |
|
|
310 | self.session.send(self.mon_stream, 'task_destination', content=content, | |
|
311 | ident=['tracktask',self.session.session]) | |
|
310 | 312 | |
|
311 | 313 | #----------------------------------------------------------------------- |
|
312 | 314 | # Result Handling |
@@ -395,7 +397,7 b' class TaskScheduler(object):' | |||
|
395 | 397 | |
|
396 | 398 | |
|
397 | 399 | |
|
398 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'): | |
|
400 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, loglevel=logging.DEBUG, scheme='weighted'): | |
|
399 | 401 | from zmq.eventloop import ioloop |
|
400 | 402 | from zmq.eventloop.zmqstream import ZMQStream |
|
401 | 403 | |
@@ -414,7 +416,15 b" def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):" | |||
|
414 | 416 | nots.setsockopt(zmq.SUBSCRIBE, '') |
|
415 | 417 | nots.connect(not_addr) |
|
416 | 418 | |
|
417 | scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop) | |
|
419 | # setup logging | |
|
420 | if log_addr: | |
|
421 | connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel) | |
|
422 | else: | |
|
423 | local_logger(loglevel) | |
|
424 | ||
|
425 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, | |
|
426 | mon_stream=mons,notifier_stream=nots, | |
|
427 | scheme=scheme,io_loop=loop) | |
|
418 | 428 | |
|
419 | 429 | loop.start() |
|
420 | 430 |
@@ -15,6 +15,7 b' import os' | |||
|
15 | 15 | import sys |
|
16 | 16 | import time |
|
17 | 17 | import traceback |
|
18 | import logging | |
|
18 | 19 | from datetime import datetime |
|
19 | 20 | from signal import SIGTERM, SIGKILL |
|
20 | 21 | from pprint import pprint |
@@ -25,9 +26,8 b' from zmq.eventloop import ioloop, zmqstream' | |||
|
25 | 26 | |
|
26 | 27 | # Local imports. |
|
27 | 28 | from IPython.core import ultratb |
|
28 | from IPython.utils.traitlets import HasTraits, Instance, List | |
|
29 | from IPython.utils.traitlets import HasTraits, Instance, List, Int | |
|
29 | 30 | from IPython.zmq.completer import KernelCompleter |
|
30 | from IPython.zmq.log import logger # a Logger object | |
|
31 | 31 | from IPython.zmq.iostream import OutStream |
|
32 | 32 | from IPython.zmq.displayhook import DisplayHook |
|
33 | 33 | |
@@ -38,6 +38,8 b' from dependency import UnmetDependency' | |||
|
38 | 38 | import heartmonitor |
|
39 | 39 | from client import Client |
|
40 | 40 | |
|
41 | logger = logging.getLogger() | |
|
42 | ||
|
41 | 43 | def printer(*args): |
|
42 | 44 | pprint(args, stream=sys.__stdout__) |
|
43 | 45 | |
@@ -51,8 +53,9 b' class Kernel(HasTraits):' | |||
|
51 | 53 | # Kernel interface |
|
52 | 54 | #--------------------------------------------------------------------------- |
|
53 | 55 | |
|
56 | id = Int(-1) | |
|
54 | 57 | session = Instance(StreamSession) |
|
55 |
shell_streams = |
|
|
58 | shell_streams = List() | |
|
56 | 59 | control_stream = Instance(zmqstream.ZMQStream) |
|
57 | 60 | task_stream = Instance(zmqstream.ZMQStream) |
|
58 | 61 | iopub_stream = Instance(zmqstream.ZMQStream) |
@@ -62,7 +65,8 b' class Kernel(HasTraits):' | |||
|
62 | 65 | def __init__(self, **kwargs): |
|
63 | 66 | super(Kernel, self).__init__(**kwargs) |
|
64 | 67 | self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) |
|
65 |
self.prefix = 'engine.%s'%self.id |
|
|
68 | self.prefix = 'engine.%s'%self.id | |
|
69 | logger.root_topic = self.prefix | |
|
66 | 70 | self.user_ns = {} |
|
67 | 71 | self.history = [] |
|
68 | 72 | self.compiler = CommandCompiler() |
@@ -108,8 +112,8 b' class Kernel(HasTraits):' | |||
|
108 | 112 | |
|
109 | 113 | # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." |
|
110 | 114 | # msg = self.reply_socket.recv_json() |
|
111 |
|
|
|
112 | print (Message(msg), file=sys.__stdout__) | |
|
115 | logger.info("Aborting:") | |
|
116 | logger.info(str(msg)) | |
|
113 | 117 | msg_type = msg['msg_type'] |
|
114 | 118 | reply_type = msg_type.split('_')[0] + '_reply' |
|
115 | 119 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) |
@@ -117,7 +121,7 b' class Kernel(HasTraits):' | |||
|
117 | 121 | # self.reply_socket.send_json(reply_msg) |
|
118 | 122 | reply_msg = self.session.send(stream, reply_type, |
|
119 | 123 | content={'status' : 'aborted'}, parent=msg, ident=idents)[0] |
|
120 | print(Message(reply_msg), file=sys.__stdout__) | |
|
124 | logger.debug(str(reply_msg)) | |
|
121 | 125 | # We need to wait a bit for requests to come in. This can probably |
|
122 | 126 | # be set shorter for true asynchronous clients. |
|
123 | 127 | time.sleep(0.05) |
@@ -135,7 +139,7 b' class Kernel(HasTraits):' | |||
|
135 | 139 | content = dict(status='ok') |
|
136 | 140 | reply_msg = self.session.send(stream, 'abort_reply', content=content, |
|
137 | 141 | parent=parent, ident=ident)[0] |
|
138 |
|
|
|
142 | logger(Message(reply_msg), file=sys.__stdout__) | |
|
139 | 143 | |
|
140 | 144 | def shutdown_request(self, stream, ident, parent): |
|
141 | 145 | """kill ourself. This should really be handled in an external process""" |
@@ -168,7 +172,7 b' class Kernel(HasTraits):' | |||
|
168 | 172 | |
|
169 | 173 | handler = self.control_handlers.get(msg['msg_type'], None) |
|
170 | 174 | if handler is None: |
|
171 |
|
|
|
175 | logger.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type']) | |
|
172 | 176 | else: |
|
173 | 177 | handler(self.control_stream, idents, msg) |
|
174 | 178 | |
@@ -211,13 +215,12 b' class Kernel(HasTraits):' | |||
|
211 | 215 | try: |
|
212 | 216 | code = parent[u'content'][u'code'] |
|
213 | 217 | except: |
|
214 | print("Got bad msg: ", file=sys.__stderr__) | |
|
215 | print(Message(parent), file=sys.__stderr__) | |
|
218 | logger.error("Got bad msg: %s"%parent, exc_info=True) | |
|
216 | 219 | return |
|
217 | 220 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
218 | 221 | # self.iopub_stream.send(pyin_msg) |
|
219 | 222 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, |
|
220 |
ident=self. |
|
|
223 | ident='%s.pyin'%self.prefix) | |
|
221 | 224 | started = datetime.now().strftime(ISO8601) |
|
222 | 225 | try: |
|
223 | 226 | comp_code = self.compiler(code, '<zmq-kernel>') |
@@ -231,7 +234,7 b' class Kernel(HasTraits):' | |||
|
231 | 234 | exc_content = self._wrap_exception('execute') |
|
232 | 235 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
233 | 236 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, |
|
234 |
ident=self. |
|
|
237 | ident='%s.pyerr'%self.prefix) | |
|
235 | 238 | reply_content = exc_content |
|
236 | 239 | else: |
|
237 | 240 | reply_content = {'status' : 'ok'} |
@@ -240,7 +243,7 b' class Kernel(HasTraits):' | |||
|
240 | 243 | # self.reply_socket.send_json(reply_msg) |
|
241 | 244 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, |
|
242 | 245 | ident=ident, subheader = dict(started=started)) |
|
243 | print(Message(reply_msg), file=sys.__stdout__) | |
|
246 | logger.debug(str(reply_msg)) | |
|
244 | 247 | if reply_msg['content']['status'] == u'error': |
|
245 | 248 | self.abort_queues() |
|
246 | 249 | |
@@ -262,8 +265,7 b' class Kernel(HasTraits):' | |||
|
262 | 265 | msg_id = parent['header']['msg_id'] |
|
263 | 266 | bound = content.get('bound', False) |
|
264 | 267 | except: |
|
265 | print("Got bad msg: ", file=sys.__stderr__) | |
|
266 | print(Message(parent), file=sys.__stderr__) | |
|
268 | logger.error("Got bad msg: %s"%parent, exc_info=True) | |
|
267 | 269 | return |
|
268 | 270 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
269 | 271 | # self.iopub_stream.send(pyin_msg) |
@@ -316,7 +318,7 b' class Kernel(HasTraits):' | |||
|
316 | 318 | exc_content = self._wrap_exception('apply') |
|
317 | 319 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
318 | 320 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, |
|
319 |
ident=self. |
|
|
321 | ident='%s.pyerr'%self.prefix) | |
|
320 | 322 | reply_content = exc_content |
|
321 | 323 | result_buf = [] |
|
322 | 324 | |
@@ -354,7 +356,7 b' class Kernel(HasTraits):' | |||
|
354 | 356 | return |
|
355 | 357 | handler = self.shell_handlers.get(msg['msg_type'], None) |
|
356 | 358 | if handler is None: |
|
357 |
|
|
|
359 | logger.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type']) | |
|
358 | 360 | else: |
|
359 | 361 | handler(stream, idents, msg) |
|
360 | 362 | |
@@ -398,7 +400,7 b' class Kernel(HasTraits):' | |||
|
398 | 400 | # # don't busywait |
|
399 | 401 | # time.sleep(1e-3) |
|
400 | 402 | |
|
401 | def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, | |
|
403 | def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs, | |
|
402 | 404 | client_addr=None, loop=None, context=None, key=None, |
|
403 | 405 | out_stream_factory=OutStream, display_hook_factory=DisplayHook): |
|
404 | 406 | |
@@ -410,7 +412,7 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,' | |||
|
410 | 412 | c = context |
|
411 | 413 | session = StreamSession(key=key) |
|
412 | 414 | # print (session.key) |
|
413 | print (control_addr, shell_addrs, iopub_addr, hb_addrs) | |
|
415 | # print (control_addr, shell_addrs, iopub_addr, hb_addrs) | |
|
414 | 416 | |
|
415 | 417 | # create Control Stream |
|
416 | 418 | control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) |
@@ -433,12 +435,12 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,' | |||
|
433 | 435 | # Redirect input streams and set a display hook. |
|
434 | 436 | if out_stream_factory: |
|
435 | 437 | sys.stdout = out_stream_factory(session, iopub_stream, u'stdout') |
|
436 |
sys.stdout.topic = |
|
|
438 | sys.stdout.topic = 'engine.%i.stdout'%int_id | |
|
437 | 439 | sys.stderr = out_stream_factory(session, iopub_stream, u'stderr') |
|
438 |
sys.stderr.topic = |
|
|
440 | sys.stderr.topic = 'engine.%i.stderr'%int_id | |
|
439 | 441 | if display_hook_factory: |
|
440 | 442 | sys.displayhook = display_hook_factory(session, iopub_stream) |
|
441 |
sys.displayhook.topic = |
|
|
443 | sys.displayhook.topic = 'engine.%i.pyout'%int_id | |
|
442 | 444 | |
|
443 | 445 | |
|
444 | 446 | # launch heartbeat |
@@ -451,7 +453,7 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,' | |||
|
451 | 453 | else: |
|
452 | 454 | client = None |
|
453 | 455 | |
|
454 | kernel = Kernel(session=session, control_stream=control_stream, | |
|
456 | kernel = Kernel(id=int_id, session=session, control_stream=control_stream, | |
|
455 | 457 | shell_streams=shell_streams, iopub_stream=iopub_stream, |
|
456 | 458 | client=client, loop=loop) |
|
457 | 459 | kernel.start() |
@@ -1,4 +1,5 b'' | |||
|
1 | 1 | """some generic utilities""" |
|
2 | import re | |
|
2 | 3 | |
|
3 | 4 | class ReverseDict(dict): |
|
4 | 5 | """simple double-keyed subset of dict methods.""" |
@@ -33,3 +34,46 b' class ReverseDict(dict):' | |||
|
33 | 34 | return default |
|
34 | 35 | |
|
35 | 36 | |
|
37 | def validate_url(url): | |
|
38 | """validate a url for zeromq""" | |
|
39 | if not isinstance(url, basestring): | |
|
40 | raise TypeError("url must be a string, not %r"%type(url)) | |
|
41 | url = url.lower() | |
|
42 | ||
|
43 | proto_addr = url.split('://') | |
|
44 | assert len(proto_addr) == 2, 'Invalid url: %r'%url | |
|
45 | proto, addr = proto_addr | |
|
46 | assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto | |
|
47 | ||
|
48 | # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391 | |
|
49 | # author: Remi Sabourin | |
|
50 | pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$') | |
|
51 | ||
|
52 | if proto == 'tcp': | |
|
53 | lis = addr.split(':') | |
|
54 | assert len(lis) == 2, 'Invalid url: %r'%url | |
|
55 | addr,s_port = lis | |
|
56 | try: | |
|
57 | port = int(s_port) | |
|
58 | except ValueError: | |
|
59 | raise AssertionError("Invalid port %r in url: %r"%(port, url)) | |
|
60 | ||
|
61 | assert pat.match(addr) is not None, 'Invalid url: %r'%url | |
|
62 | ||
|
63 | else: | |
|
64 | # only validate tcp urls currently | |
|
65 | pass | |
|
66 | ||
|
67 | return True | |
|
68 | ||
|
69 | ||
|
70 | def validate_url_container(container): | |
|
71 | """validate a potentially nested collection of urls.""" | |
|
72 | if isinstance(container, basestring): | |
|
73 | url = container | |
|
74 | return validate_url(url) | |
|
75 | elif isinstance(container, dict): | |
|
76 | container = container.itervalues() | |
|
77 | ||
|
78 | for element in container: | |
|
79 | validate_url_container(element) No newline at end of file |
@@ -19,6 +19,7 b'' | |||
|
19 | 19 | # You should have received a copy of the Lesser GNU General Public License |
|
20 | 20 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
21 | 21 | |
|
22 | import sys | |
|
22 | 23 | import zmq |
|
23 | 24 | logport = 20202 |
|
24 | 25 | def main(topics, addrs): |
@@ -26,20 +27,27 b' def main(topics, addrs):' | |||
|
26 | 27 | context = zmq.Context() |
|
27 | 28 | socket = context.socket(zmq.SUB) |
|
28 | 29 | for topic in topics: |
|
30 | print "Subscribing to: %r"%topic | |
|
29 | 31 | socket.setsockopt(zmq.SUBSCRIBE, topic) |
|
30 | 32 | if addrs: |
|
31 | 33 | for addr in addrs: |
|
32 | 34 | print "Connecting to: ", addr |
|
33 | 35 | socket.connect(addr) |
|
34 | 36 | else: |
|
35 |
socket.bind('tcp:// |
|
|
37 | socket.bind('tcp://*:%i'%logport) | |
|
36 | 38 | |
|
37 | 39 | while True: |
|
38 | 40 | # topic = socket.recv() |
|
39 | 41 | # print topic |
|
40 | topic, msg = socket.recv_multipart() | |
|
41 |
|
|
|
42 | # print 'tic' | |
|
43 | raw = socket.recv_multipart() | |
|
44 | if len(raw) != 2: | |
|
45 | print "!!! invalid log message: %s"%raw | |
|
46 | else: | |
|
47 | topic, msg = raw | |
|
48 | # don't newline, since log messages always newline: | |
|
42 | 49 |
print "%s | %s |
|
50 | sys.stdout.flush() | |
|
43 | 51 | |
|
44 | 52 | if __name__ == '__main__': |
|
45 | 53 | import sys |
General Comments 0
You need to be logged in to leave comments.
Login now