Show More
@@ -21,7 +21,3 b' class EnginePUBHandler(PUBHandler):' | |||||
21 | else: |
|
21 | else: | |
22 | return "engine" |
|
22 | return "engine" | |
23 |
|
23 | |||
24 |
|
||||
25 | logger = logging.getLogger('ipzmq') |
|
|||
26 | logger.setLevel(logging.DEBUG) |
|
|||
27 |
|
@@ -90,11 +90,6 b' def defaultblock(f, self, *args, **kwargs):' | |||||
90 | # Classes |
|
90 | # Classes | |
91 | #-------------------------------------------------------------------------- |
|
91 | #-------------------------------------------------------------------------- | |
92 |
|
92 | |||
93 | class AbortedTask(object): |
|
|||
94 | """A basic wrapper object describing an aborted task.""" |
|
|||
95 | def __init__(self, msg_id): |
|
|||
96 | self.msg_id = msg_id |
|
|||
97 |
|
||||
98 | class ResultDict(dict): |
|
93 | class ResultDict(dict): | |
99 | """A subclass of dict that raises errors if it has them.""" |
|
94 | """A subclass of dict that raises errors if it has them.""" | |
100 | def __getitem__(self, key): |
|
95 | def __getitem__(self, key): | |
@@ -332,10 +327,10 b' class Client(object):' | |||||
332 | msg = ss.Message(msg) |
|
327 | msg = ss.Message(msg) | |
333 | content = msg.content |
|
328 | content = msg.content | |
334 | if content.status == 'ok': |
|
329 | if content.status == 'ok': | |
335 |
if content. |
|
330 | if content.mux: | |
336 | self._mux_socket = self.context.socket(zmq.PAIR) |
|
331 | self._mux_socket = self.context.socket(zmq.PAIR) | |
337 | self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
332 | self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
338 |
connect_socket(self._mux_socket, content. |
|
333 | connect_socket(self._mux_socket, content.mux) | |
339 | if content.task: |
|
334 | if content.task: | |
340 | self._task_socket = self.context.socket(zmq.PAIR) |
|
335 | self._task_socket = self.context.socket(zmq.PAIR) | |
341 | self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
336 | self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) |
@@ -17,6 +17,7 b' from __future__ import print_function' | |||||
17 |
|
17 | |||
18 | import os |
|
18 | import os | |
19 | import time |
|
19 | import time | |
|
20 | import logging | |||
20 | from multiprocessing import Process |
|
21 | from multiprocessing import Process | |
21 |
|
22 | |||
22 | import zmq |
|
23 | import zmq | |
@@ -29,7 +30,8 b' from IPython.zmq.entry_point import bind_port' | |||||
29 |
|
30 | |||
30 | from hub import Hub |
|
31 | from hub import Hub | |
31 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, |
|
32 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, | |
32 |
connect_logger, parse_url, signal_children, generate_exec_key |
|
33 | connect_logger, parse_url, signal_children, generate_exec_key, | |
|
34 | local_logger) | |||
33 |
|
35 | |||
34 |
|
36 | |||
35 | import streamsession as session |
|
37 | import streamsession as session | |
@@ -118,8 +120,6 b' def main(argv=None):' | |||||
118 | ctx = zmq.Context() |
|
120 | ctx = zmq.Context() | |
119 | loop = ioloop.IOLoop.instance() |
|
121 | loop = ioloop.IOLoop.instance() | |
120 |
|
122 | |||
121 | # setup logging |
|
|||
122 | connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel) |
|
|||
123 |
|
123 | |||
124 | # Registrar socket |
|
124 | # Registrar socket | |
125 | reg = ZMQStream(ctx.socket(zmq.XREP), loop) |
|
125 | reg = ZMQStream(ctx.socket(zmq.XREP), loop) | |
@@ -207,7 +207,9 b' def main(argv=None):' | |||||
207 | q.start() |
|
207 | q.start() | |
208 | children.append(q.launcher) |
|
208 | children.append(q.launcher) | |
209 | else: |
|
209 | else: | |
210 | sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler) |
|
210 | log_addr = iface%args.logport if args.logport else None | |
|
211 | sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport, | |||
|
212 | log_addr, args.loglevel, args.scheduler) | |||
211 | print (sargs) |
|
213 | print (sargs) | |
212 | q = Process(target=launch_scheduler, args=sargs) |
|
214 | q = Process(target=launch_scheduler, args=sargs) | |
213 | q.daemon=True |
|
215 | q.daemon=True | |
@@ -224,7 +226,7 b' def main(argv=None):' | |||||
224 | # build connection dicts |
|
226 | # build connection dicts | |
225 | engine_addrs = { |
|
227 | engine_addrs = { | |
226 | 'control' : iface%control[1], |
|
228 | 'control' : iface%control[1], | |
227 |
' |
|
229 | 'mux': iface%mux[1], | |
228 | 'heartbeat': (iface%hb[0], iface%hb[1]), |
|
230 | 'heartbeat': (iface%hb[0], iface%hb[1]), | |
229 | 'task' : iface%task[1], |
|
231 | 'task' : iface%task[1], | |
230 | 'iopub' : iface%iopub[1], |
|
232 | 'iopub' : iface%iopub[1], | |
@@ -234,15 +236,24 b' def main(argv=None):' | |||||
234 | client_addrs = { |
|
236 | client_addrs = { | |
235 | 'control' : iface%control[0], |
|
237 | 'control' : iface%control[0], | |
236 | 'query': iface%cport, |
|
238 | 'query': iface%cport, | |
237 |
' |
|
239 | 'mux': iface%mux[0], | |
238 | 'task' : iface%task[0], |
|
240 | 'task' : iface%task[0], | |
239 | 'iopub' : iface%iopub[0], |
|
241 | 'iopub' : iface%iopub[0], | |
240 | 'notification': iface%nport |
|
242 | 'notification': iface%nport | |
241 | } |
|
243 | } | |
242 |
|
244 | |||
|
245 | # setup logging | |||
|
246 | if args.logport: | |||
|
247 | connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel) | |||
|
248 | else: | |||
|
249 | local_logger(args.loglevel) | |||
|
250 | ||||
243 | # register relay of signals to the children |
|
251 | # register relay of signals to the children | |
244 | signal_children(children) |
|
252 | signal_children(children) | |
245 | hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs) |
|
253 | hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon, | |
|
254 | registrar=reg, clientele=c, notifier=n, db=db, | |||
|
255 | engine_addrs=engine_addrs, client_addrs=client_addrs) | |||
|
256 | ||||
246 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) |
|
257 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) | |
247 | dc.start() |
|
258 | dc.start() | |
248 | loop.start() |
|
259 | loop.start() |
@@ -8,49 +8,48 b' import sys' | |||||
8 | import time |
|
8 | import time | |
9 | import traceback |
|
9 | import traceback | |
10 | import uuid |
|
10 | import uuid | |
|
11 | import logging | |||
11 | from pprint import pprint |
|
12 | from pprint import pprint | |
12 |
|
13 | |||
13 | import zmq |
|
14 | import zmq | |
14 | from zmq.eventloop import ioloop, zmqstream |
|
15 | from zmq.eventloop import ioloop, zmqstream | |
15 |
|
16 | |||
16 | from IPython.utils.traitlets import HasTraits |
|
17 | # internal | |
17 | from IPython.utils.localinterfaces import LOCALHOST |
|
18 | from IPython.config.configurable import Configurable | |
|
19 | from IPython.utils.traitlets import Instance, Str, Dict | |||
|
20 | # from IPython.utils.localinterfaces import LOCALHOST | |||
18 |
|
21 | |||
19 | from streamsession import Message, StreamSession |
|
22 | from streamsession import Message, StreamSession | |
20 | from client import Client |
|
|||
21 | from streamkernel import Kernel, make_kernel |
|
23 | from streamkernel import Kernel, make_kernel | |
22 | import heartmonitor |
|
24 | import heartmonitor | |
23 | from entry_point import make_base_argument_parser, connect_logger, parse_url |
|
25 | from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url, | |
|
26 | local_logger) | |||
24 | # import taskthread |
|
27 | # import taskthread | |
25 | # from log import logger |
|
28 | logger = logging.getLogger() | |
26 |
|
||||
27 |
|
29 | |||
28 | def printer(*msg): |
|
30 | def printer(*msg): | |
29 |
|
|
31 | # print (logger.handlers, file=sys.__stdout__) | |
|
32 | logger.info(str(msg)) | |||
30 |
|
33 | |||
31 |
class Engine( |
|
34 | class Engine(Configurable): | |
32 | """IPython engine""" |
|
35 | """IPython engine""" | |
33 |
|
36 | |||
34 | id=None |
|
|||
35 | context=None |
|
|||
36 | loop=None |
|
|||
37 | session=None |
|
|||
38 | ident=None |
|
|||
39 | registrar=None |
|
|||
40 | heart=None |
|
|||
41 | kernel=None |
|
37 | kernel=None | |
42 |
|
|
38 | id=None | |
43 |
|
39 | |||
44 | def __init__(self, context, loop, session, registrar, client=None, ident=None, |
|
40 | # configurables: | |
45 | heart_id=None, user_ns=None): |
|
41 | context=Instance(zmq.Context) | |
46 | self.context = context |
|
42 | loop=Instance(ioloop.IOLoop) | |
47 | self.loop = loop |
|
43 | session=Instance(StreamSession) | |
48 | self.session = session |
|
44 | ident=Str() | |
49 | self.registrar = registrar |
|
45 | registrar=Instance(zmqstream.ZMQStream) | |
50 | self.client = client |
|
46 | user_ns=Dict() | |
51 | self.ident = ident if ident else str(uuid.uuid4()) |
|
47 | ||
|
48 | def __init__(self, **kwargs): | |||
|
49 | super(Engine, self).__init__(**kwargs) | |||
|
50 | if not self.ident: | |||
|
51 | self.ident = str(uuid.uuid4()) | |||
52 | self.registrar.on_send(printer) |
|
52 | self.registrar.on_send(printer) | |
53 | self.user_ns = user_ns |
|
|||
54 |
|
53 | |||
55 | def register(self): |
|
54 | def register(self): | |
56 |
|
55 | |||
@@ -64,9 +63,10 b' class Engine(object):' | |||||
64 | idents,msg = self.session.feed_identities(msg) |
|
63 | idents,msg = self.session.feed_identities(msg) | |
65 | msg = Message(self.session.unpack_message(msg)) |
|
64 | msg = Message(self.session.unpack_message(msg)) | |
66 | if msg.content.status == 'ok': |
|
65 | if msg.content.status == 'ok': | |
67 |
self. |
|
66 | self.id = int(msg.content.id) | |
68 | queue_addr = msg.content.queue |
|
67 | self.session.username = 'engine-%i'%self.id | |
69 | shell_addrs = [str(queue_addr)] |
|
68 | queue_addr = msg.content.mux | |
|
69 | shell_addrs = [ str(queue_addr) ] | |||
70 | control_addr = str(msg.content.control) |
|
70 | control_addr = str(msg.content.control) | |
71 | task_addr = msg.content.task |
|
71 | task_addr = msg.content.task | |
72 | iopub_addr = msg.content.iopub |
|
72 | iopub_addr = msg.content.iopub | |
@@ -75,7 +75,7 b' class Engine(object):' | |||||
75 |
|
75 | |||
76 | hb_addrs = msg.content.heartbeat |
|
76 | hb_addrs = msg.content.heartbeat | |
77 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() |
|
77 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() | |
78 | k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, |
|
78 | k = make_kernel(self.id, self.ident, control_addr, shell_addrs, iopub_addr, | |
79 | hb_addrs, client_addr=None, loop=self.loop, |
|
79 | hb_addrs, client_addr=None, loop=self.loop, | |
80 | context=self.context, key=self.session.key)[-1] |
|
80 | context=self.context, key=self.session.key)[-1] | |
81 | self.kernel = k |
|
81 | self.kernel = k | |
@@ -84,12 +84,12 b' class Engine(object):' | |||||
84 | self.kernel.user_ns = self.user_ns |
|
84 | self.kernel.user_ns = self.user_ns | |
85 |
|
85 | |||
86 | else: |
|
86 | else: | |
87 |
|
|
87 | logger.error("Registration Failed: %s"%msg) | |
88 | raise Exception("Registration Failed: %s"%msg) |
|
88 | raise Exception("Registration Failed: %s"%msg) | |
89 |
|
89 | |||
90 |
|
|
90 | logger.info("completed registration with id %i"%self.id) | |
91 |
|
91 | |||
92 | print (msg,file=sys.__stdout__) |
|
92 | # logger.info(str(msg)) | |
93 |
|
93 | |||
94 | def unregister(self): |
|
94 | def unregister(self): | |
95 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) |
|
95 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) | |
@@ -97,7 +97,7 b' class Engine(object):' | |||||
97 | sys.exit(0) |
|
97 | sys.exit(0) | |
98 |
|
98 | |||
99 | def start(self): |
|
99 | def start(self): | |
100 |
|
|
100 | logger.info("registering") | |
101 | self.register() |
|
101 | self.register() | |
102 |
|
102 | |||
103 |
|
103 | |||
@@ -118,7 +118,6 b' def main(argv=None, user_ns=None):' | |||||
118 | ctx = zmq.Context() |
|
118 | ctx = zmq.Context() | |
119 |
|
119 | |||
120 | # setup logging |
|
120 | # setup logging | |
121 | connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) |
|
|||
122 |
|
121 | |||
123 | reg_conn = iface % args.regport |
|
122 | reg_conn = iface % args.regport | |
124 | print (reg_conn, file=sys.__stdout__) |
|
123 | print (reg_conn, file=sys.__stdout__) | |
@@ -127,10 +126,16 b' def main(argv=None, user_ns=None):' | |||||
127 | reg = ctx.socket(zmq.PAIR) |
|
126 | reg = ctx.socket(zmq.PAIR) | |
128 | reg.connect(reg_conn) |
|
127 | reg.connect(reg_conn) | |
129 | reg = zmqstream.ZMQStream(reg, loop) |
|
128 | reg = zmqstream.ZMQStream(reg, loop) | |
130 | client = None |
|
|||
131 |
|
129 | |||
132 | e = Engine(ctx, loop, session, reg, client, args.ident, user_ns=user_ns) |
|
130 | e = Engine(context=ctx, loop=loop, session=session, registrar=reg, | |
133 | dc = ioloop.DelayedCallback(e.start, 100, loop) |
|
131 | ident=args.ident or '', user_ns=user_ns) | |
|
132 | if args.logport: | |||
|
133 | print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__) | |||
|
134 | connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel) | |||
|
135 | else: | |||
|
136 | local_logger(args.loglevel) | |||
|
137 | ||||
|
138 | dc = ioloop.DelayedCallback(e.start, 0, loop) | |||
134 | dc.start() |
|
139 | dc.start() | |
135 | loop.start() |
|
140 | loop.start() | |
136 |
|
141 |
@@ -22,7 +22,7 b' from zmq.log import handlers' | |||||
22 | # Local imports. |
|
22 | # Local imports. | |
23 | from IPython.core.ultratb import FormattedTB |
|
23 | from IPython.core.ultratb import FormattedTB | |
24 | from IPython.external.argparse import ArgumentParser |
|
24 | from IPython.external.argparse import ArgumentParser | |
25 |
from IPython.zmq.log import |
|
25 | from IPython.zmq.log import EnginePUBHandler | |
26 |
|
26 | |||
27 | def split_ports(s, n): |
|
27 | def split_ports(s, n): | |
28 | """Parser helper for multiport strings""" |
|
28 | """Parser helper for multiport strings""" | |
@@ -82,6 +82,7 b' def make_base_argument_parser():' | |||||
82 | """ Creates an ArgumentParser for the generic arguments supported by all |
|
82 | """ Creates an ArgumentParser for the generic arguments supported by all | |
83 | ipcluster entry points. |
|
83 | ipcluster entry points. | |
84 | """ |
|
84 | """ | |
|
85 | ||||
85 | parser = ArgumentParser() |
|
86 | parser = ArgumentParser() | |
86 | parser.add_argument('--ip', type=str, default='127.0.0.1', |
|
87 | parser.add_argument('--ip', type=str, default='127.0.0.1', | |
87 | help='set the controller\'s IP address [default: local]') |
|
88 | help='set the controller\'s IP address [default: local]') | |
@@ -89,10 +90,10 b' def make_base_argument_parser():' | |||||
89 | help='set the transport to use [default: tcp]') |
|
90 | help='set the transport to use [default: tcp]') | |
90 | parser.add_argument('--regport', type=int, metavar='PORT', default=10101, |
|
91 | parser.add_argument('--regport', type=int, metavar='PORT', default=10101, | |
91 | help='set the XREP port for registration [default: 10101]') |
|
92 | help='set the XREP port for registration [default: 10101]') | |
92 |
parser.add_argument('--logport', type=int, metavar='PORT', default= |
|
93 | parser.add_argument('--logport', type=int, metavar='PORT', default=0, | |
93 |
help='set the PUB port for logging [default: |
|
94 | help='set the PUB port for remote logging [default: log to stdout]') | |
94 |
parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging. |
|
95 | parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO, | |
95 |
help='set the log level [default: |
|
96 | help='set the log level [default: INFO]') | |
96 | parser.add_argument('--ident', type=str, |
|
97 | parser.add_argument('--ident', type=str, | |
97 | help='set the ZMQ identity [default: random]') |
|
98 | help='set the ZMQ identity [default: random]') | |
98 | parser.add_argument('--packer', type=str, default='json', |
|
99 | parser.add_argument('--packer', type=str, default='json', | |
@@ -105,17 +106,42 b' def make_base_argument_parser():' | |||||
105 |
|
106 | |||
106 | return parser |
|
107 | return parser | |
107 |
|
108 | |||
108 |
|
109 | def integer_loglevel(loglevel): | ||
109 | def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG): |
|
|||
110 | try: |
|
110 | try: | |
111 | loglevel = int(loglevel) |
|
111 | loglevel = int(loglevel) | |
112 | except ValueError: |
|
112 | except ValueError: | |
113 | if isinstance(loglevel, str): |
|
113 | if isinstance(loglevel, str): | |
114 | loglevel = getattr(logging, loglevel) |
|
114 | loglevel = getattr(logging, loglevel) | |
|
115 | return loglevel | |||
|
116 | ||||
|
117 | def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG): | |||
|
118 | loglevel = integer_loglevel(loglevel) | |||
115 | lsock = context.socket(zmq.PUB) |
|
119 | lsock = context.socket(zmq.PUB) | |
116 | lsock.connect(iface) |
|
120 | lsock.connect(iface) | |
117 | handler = handlers.PUBHandler(lsock) |
|
121 | handler = handlers.PUBHandler(lsock) | |
118 | handler.setLevel(loglevel) |
|
122 | handler.setLevel(loglevel) | |
119 | handler.root_topic = root |
|
123 | handler.root_topic = root | |
|
124 | logger = logging.getLogger() | |||
|
125 | logger.addHandler(handler) | |||
|
126 | logger.setLevel(loglevel) | |||
|
127 | ||||
|
128 | def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG): | |||
|
129 | logger = logging.getLogger() | |||
|
130 | loglevel = integer_loglevel(loglevel) | |||
|
131 | lsock = context.socket(zmq.PUB) | |||
|
132 | lsock.connect(iface) | |||
|
133 | handler = EnginePUBHandler(engine, lsock) | |||
|
134 | handler.setLevel(loglevel) | |||
|
135 | logger.addHandler(handler) | |||
|
136 | logger.setLevel(loglevel) | |||
|
137 | ||||
|
138 | def local_logger(loglevel=logging.DEBUG): | |||
|
139 | loglevel = integer_loglevel(loglevel) | |||
|
140 | logger = logging.getLogger() | |||
|
141 | if logger.handlers: | |||
|
142 | # if there are any handlers, skip the hookup | |||
|
143 | return | |||
|
144 | handler = logging.StreamHandler() | |||
|
145 | handler.setLevel(loglevel) | |||
120 | logger.addHandler(handler) |
|
146 | logger.addHandler(handler) | |
121 | No newline at end of file |
|
147 | logger.setLevel(loglevel) |
@@ -7,13 +7,13 b' and hearts are tracked based on their XREQ identities.' | |||||
7 | from __future__ import print_function |
|
7 | from __future__ import print_function | |
8 | import time |
|
8 | import time | |
9 | import uuid |
|
9 | import uuid | |
|
10 | import logging | |||
10 |
|
11 | |||
11 | import zmq |
|
12 | import zmq | |
12 | from zmq.devices import ProcessDevice,ThreadDevice |
|
13 | from zmq.devices import ProcessDevice,ThreadDevice | |
13 | from zmq.eventloop import ioloop, zmqstream |
|
14 | from zmq.eventloop import ioloop, zmqstream | |
14 |
|
15 | |||
15 | #internal |
|
16 | logger = logging.getLogger() | |
16 | from IPython.zmq.log import logger |
|
|||
17 |
|
17 | |||
18 | class Heart(object): |
|
18 | class Heart(object): | |
19 | """A basic heart object for responding to a HeartMonitor. |
|
19 | """A basic heart object for responding to a HeartMonitor. | |
@@ -53,6 +53,7 b' class HeartMonitor(object):' | |||||
53 | hearts=None |
|
53 | hearts=None | |
54 | on_probation=None |
|
54 | on_probation=None | |
55 | last_ping=None |
|
55 | last_ping=None | |
|
56 | # debug=False | |||
56 |
|
57 | |||
57 | def __init__(self, loop, pingstream, pongstream, period=1000): |
|
58 | def __init__(self, loop, pingstream, pongstream, period=1000): | |
58 | self.loop = loop |
|
59 | self.loop = loop | |
@@ -84,19 +85,6 b' class HeartMonitor(object):' | |||||
84 | """add a new handler for heart failure""" |
|
85 | """add a new handler for heart failure""" | |
85 | logger.debug("heartbeat::new heart failure handler: %s"%handler) |
|
86 | logger.debug("heartbeat::new heart failure handler: %s"%handler) | |
86 | self._failure_handlers.add(handler) |
|
87 | self._failure_handlers.add(handler) | |
87 |
|
||||
88 | # def _flush(self): |
|
|||
89 | # """override IOLoop triggers""" |
|
|||
90 | # while True: |
|
|||
91 | # try: |
|
|||
92 | # msg = self.pongstream.socket.recv_multipart(zmq.NOBLOCK) |
|
|||
93 | # logger.warn("IOLoop triggered beat with incoming heartbeat waiting to be handled") |
|
|||
94 | # except zmq.ZMQError: |
|
|||
95 | # return |
|
|||
96 | # else: |
|
|||
97 | # self.handle_pong(msg) |
|
|||
98 | # # print '.' |
|
|||
99 | # |
|
|||
100 |
|
88 | |||
101 | def beat(self): |
|
89 | def beat(self): | |
102 | self.pongstream.flush() |
|
90 | self.pongstream.flush() | |
@@ -105,7 +93,7 b' class HeartMonitor(object):' | |||||
105 | toc = time.time() |
|
93 | toc = time.time() | |
106 | self.lifetime += toc-self.tic |
|
94 | self.lifetime += toc-self.tic | |
107 | self.tic = toc |
|
95 | self.tic = toc | |
108 | logger.debug("heartbeat::%s"%self.lifetime) |
|
96 | # logger.debug("heartbeat::%s"%self.lifetime) | |
109 | goodhearts = self.hearts.intersection(self.responses) |
|
97 | goodhearts = self.hearts.intersection(self.responses) | |
110 | missed_beats = self.hearts.difference(goodhearts) |
|
98 | missed_beats = self.hearts.difference(goodhearts) | |
111 | heartfailures = self.on_probation.intersection(missed_beats) |
|
99 | heartfailures = self.on_probation.intersection(missed_beats) | |
@@ -144,7 +132,7 b' class HeartMonitor(object):' | |||||
144 | "a heart just beat" |
|
132 | "a heart just beat" | |
145 | if msg[1] == str(self.lifetime): |
|
133 | if msg[1] == str(self.lifetime): | |
146 | delta = time.time()-self.tic |
|
134 | delta = time.time()-self.tic | |
147 | logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) |
|
135 | # logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) | |
148 | self.responses.add(msg[0]) |
|
136 | self.responses.add(msg[0]) | |
149 | elif msg[1] == str(self.last_ping): |
|
137 | elif msg[1] == str(self.last_ping): | |
150 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) |
|
138 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) |
@@ -18,14 +18,19 b' from __future__ import print_function' | |||||
18 | import sys |
|
18 | import sys | |
19 | from datetime import datetime |
|
19 | from datetime import datetime | |
20 | import time |
|
20 | import time | |
|
21 | import logging | |||
21 |
|
22 | |||
22 | import zmq |
|
23 | import zmq | |
23 | from zmq.eventloop import ioloop |
|
24 | from zmq.eventloop import ioloop, zmqstream | |
24 |
|
25 | |||
25 | # internal: |
|
26 | # internal: | |
26 | from IPython.zmq.log import logger # a Logger object |
|
27 | from IPython.config.configurable import Configurable | |
|
28 | from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict | |||
|
29 | # from IPython.zmq.log import logger # a Logger object | |||
27 |
|
30 | |||
28 | from streamsession import Message, wrap_exception, ISO8601 |
|
31 | from streamsession import Message, wrap_exception, ISO8601 | |
|
32 | from heartmonitor import HeartMonitor | |||
|
33 | from util import validate_url_container | |||
29 |
|
34 | |||
30 | try: |
|
35 | try: | |
31 | from pymongo.binary import Binary |
|
36 | from pymongo.binary import Binary | |
@@ -38,6 +43,8 b' else:' | |||||
38 | # Code |
|
43 | # Code | |
39 | #----------------------------------------------------------------------------- |
|
44 | #----------------------------------------------------------------------------- | |
40 |
|
45 | |||
|
46 | logger = logging.getLogger() | |||
|
47 | ||||
41 | def _passer(*args, **kwargs): |
|
48 | def _passer(*args, **kwargs): | |
42 | return |
|
49 | return | |
43 |
|
50 | |||
@@ -46,7 +53,7 b' def _printer(*args, **kwargs):' | |||||
46 | print (kwargs) |
|
53 | print (kwargs) | |
47 |
|
54 | |||
48 | def init_record(msg): |
|
55 | def init_record(msg): | |
49 | """return an empty TaskRecord dict, with all keys initialized with None.""" |
|
56 | """Initialize a TaskRecord based on a request.""" | |
50 | header = msg['header'] |
|
57 | header = msg['header'] | |
51 | return { |
|
58 | return { | |
52 | 'msg_id' : header['msg_id'], |
|
59 | 'msg_id' : header['msg_id'], | |
@@ -71,7 +78,7 b' def init_record(msg):' | |||||
71 | } |
|
78 | } | |
72 |
|
79 | |||
73 |
|
80 | |||
74 |
class EngineConnector( |
|
81 | class EngineConnector(HasTraits): | |
75 | """A simple object for accessing the various zmq connections of an object. |
|
82 | """A simple object for accessing the various zmq connections of an object. | |
76 | Attributes are: |
|
83 | Attributes are: | |
77 | id (int): engine ID |
|
84 | id (int): engine ID | |
@@ -80,22 +87,18 b' class EngineConnector(object):' | |||||
80 | registration (str): identity of registration XREQ socket |
|
87 | registration (str): identity of registration XREQ socket | |
81 | heartbeat (str): identity of heartbeat XREQ socket |
|
88 | heartbeat (str): identity of heartbeat XREQ socket | |
82 | """ |
|
89 | """ | |
83 | id=0 |
|
90 | id=Int(0) | |
84 |
queue= |
|
91 | queue=Str() | |
85 |
control= |
|
92 | control=Str() | |
86 |
registration= |
|
93 | registration=Str() | |
87 |
heartbeat= |
|
94 | heartbeat=Str() | |
88 |
pending= |
|
95 | pending=Instance(set) | |
89 |
|
96 | |||
90 | def __init__(self, id, queue, registration, control, heartbeat=None): |
|
97 | def __init__(self, **kwargs): | |
91 | logger.info("engine::Engine Connected: %i"%id) |
|
98 | super(EngineConnector, self).__init__(**kwargs) | |
92 | self.id = id |
|
99 | logger.info("engine::Engine Connected: %i"%self.id) | |
93 | self.queue = queue |
|
100 | ||
94 | self.registration = registration |
|
101 | class Hub(Configurable): | |
95 | self.control = control |
|
|||
96 | self.heartbeat = heartbeat |
|
|||
97 |
|
||||
98 | class Hub(object): |
|
|||
99 | """The IPython Controller Hub with 0MQ connections |
|
102 | """The IPython Controller Hub with 0MQ connections | |
100 |
|
103 | |||
101 | Parameters |
|
104 | Parameters | |
@@ -123,26 +126,25 b' class Hub(object):' | |||||
123 | clients=None |
|
126 | clients=None | |
124 | hearts=None |
|
127 | hearts=None | |
125 | pending=None |
|
128 | pending=None | |
126 | results=None |
|
|||
127 | tasks=None |
|
129 | tasks=None | |
128 | completed=None |
|
130 | completed=None | |
129 | mia=None |
|
131 | # mia=None | |
130 | incoming_registrations=None |
|
132 | incoming_registrations=None | |
131 | registration_timeout=None |
|
133 | registration_timeout=None | |
132 |
|
134 | |||
133 | #objects from constructor: |
|
135 | # objects from constructor: | |
134 | loop=None |
|
136 | loop=Instance(ioloop.IOLoop) | |
135 | registrar=None |
|
137 | registrar=Instance(zmqstream.ZMQStream) | |
136 | clientelle=None |
|
138 | clientele=Instance(zmqstream.ZMQStream) | |
137 | queue=None |
|
139 | monitor=Instance(zmqstream.ZMQStream) | |
138 | heartbeat=None |
|
140 | heartmonitor=Instance(HeartMonitor) | |
139 | notifier=None |
|
141 | notifier=Instance(zmqstream.ZMQStream) | |
140 | db=None |
|
142 | db=Instance(object) | |
141 |
client_addr= |
|
143 | client_addrs=Dict() | |
142 |
engine_addrs= |
|
144 | engine_addrs=Dict() | |
143 |
|
145 | |||
144 |
|
146 | |||
145 | def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs): |
|
147 | def __init__(self, **kwargs): | |
146 | """ |
|
148 | """ | |
147 | # universal: |
|
149 | # universal: | |
148 | loop: IOLoop for creating future connections |
|
150 | loop: IOLoop for creating future connections | |
@@ -158,6 +160,8 b' class Hub(object):' | |||||
158 | engine_addrs: zmq address/protocol dict for engine connections |
|
160 | engine_addrs: zmq address/protocol dict for engine connections | |
159 | client_addrs: zmq address/protocol dict for client connections |
|
161 | client_addrs: zmq address/protocol dict for client connections | |
160 | """ |
|
162 | """ | |
|
163 | ||||
|
164 | super(Hub, self).__init__(**kwargs) | |||
161 | self.ids = set() |
|
165 | self.ids = set() | |
162 | self.keytable={} |
|
166 | self.keytable={} | |
163 | self.incoming_registrations={} |
|
167 | self.incoming_registrations={} | |
@@ -166,35 +170,44 b' class Hub(object):' | |||||
166 | self.clients = {} |
|
170 | self.clients = {} | |
167 | self.hearts = {} |
|
171 | self.hearts = {} | |
168 | # self.mia = set() |
|
172 | # self.mia = set() | |
169 |
|
173 | self.registration_timeout = max(5000, 2*self.heartmonitor.period) | ||
|
174 | # this is the stuff that will move to DB: | |||
|
175 | self.pending = set() # pending messages, keyed by msg_id | |||
|
176 | self.queues = {} # pending msg_ids keyed by engine_id | |||
|
177 | self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id | |||
|
178 | self.completed = {} # completed msg_ids keyed by engine_id | |||
|
179 | self.all_completed = set() | |||
|
180 | self._idcounter = 0 | |||
170 | # self.sockets = {} |
|
181 | # self.sockets = {} | |
171 | self.loop = loop |
|
182 | # self.loop = loop | |
172 | self.session = session |
|
183 | # self.session = session | |
173 | self.registrar = registrar |
|
184 | # self.registrar = registrar | |
174 | self.clientele = clientele |
|
185 | # self.clientele = clientele | |
175 | self.queue = queue |
|
186 | # self.queue = queue | |
176 |
self.heart |
|
187 | # self.heartmonitor = heartbeat | |
177 | self.notifier = notifier |
|
188 | # self.notifier = notifier | |
178 | self.db = db |
|
189 | # self.db = db | |
179 |
|
190 | |||
180 | # validate connection dicts: |
|
191 | # validate connection dicts: | |
181 | self.client_addrs = client_addrs |
|
192 | # self.client_addrs = client_addrs | |
182 | assert isinstance(client_addrs['queue'], str) |
|
193 | validate_url_container(self.client_addrs) | |
183 | assert isinstance(client_addrs['control'], str) |
|
194 | ||
|
195 | # assert isinstance(self.client_addrs['queue'], str) | |||
|
196 | # assert isinstance(self.client_addrs['control'], str) | |||
184 | # self.hb_addrs = hb_addrs |
|
197 | # self.hb_addrs = hb_addrs | |
185 | self.engine_addrs = engine_addrs |
|
198 | validate_url_container(self.engine_addrs) | |
186 | assert isinstance(engine_addrs['queue'], str) |
|
199 | # self.engine_addrs = engine_addrs | |
187 |
assert isinstance( |
|
200 | # assert isinstance(self.engine_addrs['queue'], str) | |
188 |
assert |
|
201 | # assert isinstance(self.engine_addrs['control'], str) | |
|
202 | # assert len(engine_addrs['heartbeat']) == 2 | |||
189 |
|
203 | |||
190 | # register our callbacks |
|
204 | # register our callbacks | |
191 | self.registrar.on_recv(self.dispatch_register_request) |
|
205 | self.registrar.on_recv(self.dispatch_register_request) | |
192 | self.clientele.on_recv(self.dispatch_client_msg) |
|
206 | self.clientele.on_recv(self.dispatch_client_msg) | |
193 |
self. |
|
207 | self.monitor.on_recv(self.dispatch_monitor_traffic) | |
194 |
|
208 | |||
195 | if heartbeat is not None: |
|
209 | self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure) | |
196 |
|
|
210 | self.heartmonitor.add_new_heart_handler(self.handle_new_heart) | |
197 | heartbeat.add_new_heart_handler(self.handle_new_heart) |
|
|||
198 |
|
211 | |||
199 | self.monitor_handlers = { 'in' : self.save_queue_request, |
|
212 | self.monitor_handlers = { 'in' : self.save_queue_request, | |
200 | 'out': self.save_queue_result, |
|
213 | 'out': self.save_queue_result, | |
@@ -218,25 +231,21 b' class Hub(object):' | |||||
218 | 'unregistration_request' : self.unregister_engine, |
|
231 | 'unregistration_request' : self.unregister_engine, | |
219 | 'connection_request': self.connection_request, |
|
232 | 'connection_request': self.connection_request, | |
220 | } |
|
233 | } | |
221 | self.registration_timeout = max(5000, 2*self.heartbeat.period) |
|
|||
222 | # this is the stuff that will move to DB: |
|
|||
223 | # self.results = {} # completed results |
|
|||
224 | self.pending = set() # pending messages, keyed by msg_id |
|
|||
225 | self.queues = {} # pending msg_ids keyed by engine_id |
|
|||
226 | self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id |
|
|||
227 | self.completed = {} # completed msg_ids keyed by engine_id |
|
|||
228 | self.all_completed = set() |
|
|||
229 |
|
234 | |||
230 | logger.info("controller::created controller") |
|
235 | logger.info("controller::created controller") | |
231 |
|
236 | |||
232 | def _new_id(self): |
|
237 | @property | |
|
238 | def _next_id(self): | |||
233 | """gemerate a new ID""" |
|
239 | """gemerate a new ID""" | |
234 |
newid = |
|
240 | newid = self._idcounter | |
235 | incoming = [id[0] for id in self.incoming_registrations.itervalues()] |
|
241 | self._idcounter += 1 | |
236 | # print newid, self.ids, self.incoming_registrations |
|
|||
237 | while newid in self.ids or newid in incoming: |
|
|||
238 | newid += 1 |
|
|||
239 | return newid |
|
242 | return newid | |
|
243 | # newid = 0 | |||
|
244 | # incoming = [id[0] for id in self.incoming_registrations.itervalues()] | |||
|
245 | # # print newid, self.ids, self.incoming_registrations | |||
|
246 | # while newid in self.ids or newid in incoming: | |||
|
247 | # newid += 1 | |||
|
248 | # return newid | |||
240 |
|
249 | |||
241 | #----------------------------------------------------------------------------- |
|
250 | #----------------------------------------------------------------------------- | |
242 | # message validation |
|
251 | # message validation | |
@@ -580,6 +589,9 b' class Hub(object):' | |||||
580 | return |
|
589 | return | |
581 |
|
590 | |||
582 | parent = msg['parent_header'] |
|
591 | parent = msg['parent_header'] | |
|
592 | if not parent: | |||
|
593 | logger.error("iopub::invalid IOPub message: %s"%msg) | |||
|
594 | return | |||
583 | msg_id = parent['msg_id'] |
|
595 | msg_id = parent['msg_id'] | |
584 | msg_type = msg['msg_type'] |
|
596 | msg_type = msg['msg_type'] | |
585 | content = msg['content'] |
|
597 | content = msg['content'] | |
@@ -631,7 +643,7 b' class Hub(object):' | |||||
631 | return |
|
643 | return | |
632 | heart = content.get('heartbeat', None) |
|
644 | heart = content.get('heartbeat', None) | |
633 | """register a new engine, and create the socket(s) necessary""" |
|
645 | """register a new engine, and create the socket(s) necessary""" | |
634 |
eid = self._ne |
|
646 | eid = self._next_id | |
635 | # print (eid, queue, reg, heart) |
|
647 | # print (eid, queue, reg, heart) | |
636 |
|
648 | |||
637 | logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) |
|
649 | logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) | |
@@ -644,10 +656,12 b' class Hub(object):' | |||||
644 | raise KeyError("queue_id %r in use"%queue) |
|
656 | raise KeyError("queue_id %r in use"%queue) | |
645 | except: |
|
657 | except: | |
646 | content = wrap_exception() |
|
658 | content = wrap_exception() | |
|
659 | logger.error("queue_id %r in use"%queue, exc_info=True) | |||
647 | elif heart in self.hearts: # need to check unique hearts? |
|
660 | elif heart in self.hearts: # need to check unique hearts? | |
648 | try: |
|
661 | try: | |
649 | raise KeyError("heart_id %r in use"%heart) |
|
662 | raise KeyError("heart_id %r in use"%heart) | |
650 | except: |
|
663 | except: | |
|
664 | logger.error("heart_id %r in use"%heart, exc_info=True) | |||
651 | content = wrap_exception() |
|
665 | content = wrap_exception() | |
652 | else: |
|
666 | else: | |
653 | for h, pack in self.incoming_registrations.iteritems(): |
|
667 | for h, pack in self.incoming_registrations.iteritems(): | |
@@ -655,12 +669,14 b' class Hub(object):' | |||||
655 | try: |
|
669 | try: | |
656 | raise KeyError("heart_id %r in use"%heart) |
|
670 | raise KeyError("heart_id %r in use"%heart) | |
657 | except: |
|
671 | except: | |
|
672 | logger.error("heart_id %r in use"%heart, exc_info=True) | |||
658 | content = wrap_exception() |
|
673 | content = wrap_exception() | |
659 | break |
|
674 | break | |
660 | elif queue == pack[1]: |
|
675 | elif queue == pack[1]: | |
661 | try: |
|
676 | try: | |
662 | raise KeyError("queue_id %r in use"%queue) |
|
677 | raise KeyError("queue_id %r in use"%queue) | |
663 | except: |
|
678 | except: | |
|
679 | logger.error("queue_id %r in use"%queue, exc_info=True) | |||
664 | content = wrap_exception() |
|
680 | content = wrap_exception() | |
665 | break |
|
681 | break | |
666 |
|
682 | |||
@@ -669,15 +685,15 b' class Hub(object):' | |||||
669 | ident=reg) |
|
685 | ident=reg) | |
670 |
|
686 | |||
671 | if content['status'] == 'ok': |
|
687 | if content['status'] == 'ok': | |
672 |
if heart in self.heart |
|
688 | if heart in self.heartmonitor.hearts: | |
673 | # already beating |
|
689 | # already beating | |
674 | self.incoming_registrations[heart] = (eid,queue,reg,None) |
|
690 | self.incoming_registrations[heart] = (eid,queue,reg[0],None) | |
675 | self.finish_registration(heart) |
|
691 | self.finish_registration(heart) | |
676 | else: |
|
692 | else: | |
677 | purge = lambda : self._purge_stalled_registration(heart) |
|
693 | purge = lambda : self._purge_stalled_registration(heart) | |
678 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) |
|
694 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) | |
679 | dc.start() |
|
695 | dc.start() | |
680 | self.incoming_registrations[heart] = (eid,queue,reg,dc) |
|
696 | self.incoming_registrations[heart] = (eid,queue,reg[0],dc) | |
681 | else: |
|
697 | else: | |
682 | logger.error("registration::registration %i failed: %s"%(eid, content['evalue'])) |
|
698 | logger.error("registration::registration %i failed: %s"%(eid, content['evalue'])) | |
683 | return eid |
|
699 | return eid | |
@@ -718,7 +734,8 b' class Hub(object):' | |||||
718 | control = queue |
|
734 | control = queue | |
719 | self.ids.add(eid) |
|
735 | self.ids.add(eid) | |
720 | self.keytable[eid] = queue |
|
736 | self.keytable[eid] = queue | |
721 |
self.engines[eid] = EngineConnector(eid, queue, reg, |
|
737 | self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg, | |
|
738 | control=control, heartbeat=heart) | |||
722 | self.by_ident[queue] = eid |
|
739 | self.by_ident[queue] = eid | |
723 | self.queues[eid] = list() |
|
740 | self.queues[eid] = list() | |
724 | self.tasks[eid] = list() |
|
741 | self.tasks[eid] = list() |
@@ -11,6 +11,8 b' Python Scheduler exists.' | |||||
11 |
|
11 | |||
12 | from __future__ import print_function |
|
12 | from __future__ import print_function | |
13 | from random import randint,random |
|
13 | from random import randint,random | |
|
14 | import logging | |||
|
15 | from types import FunctionType | |||
14 |
|
16 | |||
15 | try: |
|
17 | try: | |
16 | import numpy |
|
18 | import numpy | |
@@ -21,17 +23,22 b' import zmq' | |||||
21 | from zmq.eventloop import ioloop, zmqstream |
|
23 | from zmq.eventloop import ioloop, zmqstream | |
22 |
|
24 | |||
23 | # local imports |
|
25 | # local imports | |
24 | from IPython.zmq.log import logger # a Logger object |
|
26 | from IPython.external.decorator import decorator | |
|
27 | from IPython.config.configurable import Configurable | |||
|
28 | from IPython.utils.traitlets import Instance | |||
|
29 | ||||
25 | from client import Client |
|
30 | from client import Client | |
26 | from dependency import Dependency |
|
31 | from dependency import Dependency | |
27 | import streamsession as ss |
|
32 | import streamsession as ss | |
|
33 | from entry_point import connect_logger, local_logger | |||
28 |
|
34 | |||
29 | from IPython.external.decorator import decorator |
|
35 | ||
|
36 | logger = logging.getLogger() | |||
30 |
|
37 | |||
31 | @decorator |
|
38 | @decorator | |
32 | def logged(f,self,*args,**kwargs): |
|
39 | def logged(f,self,*args,**kwargs): | |
33 | # print ("#--------------------") |
|
40 | # print ("#--------------------") | |
34 |
|
|
41 | logger.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs)) | |
35 | # print ("#--") |
|
42 | # print ("#--") | |
36 | return f(self,*args, **kwargs) |
|
43 | return f(self,*args, **kwargs) | |
37 |
|
44 | |||
@@ -99,7 +106,7 b' def leastload(loads):' | |||||
99 | #--------------------------------------------------------------------- |
|
106 | #--------------------------------------------------------------------- | |
100 | # Classes |
|
107 | # Classes | |
101 | #--------------------------------------------------------------------- |
|
108 | #--------------------------------------------------------------------- | |
102 |
class TaskScheduler( |
|
109 | class TaskScheduler(Configurable): | |
103 | """Python TaskScheduler object. |
|
110 | """Python TaskScheduler object. | |
104 |
|
111 | |||
105 | This is the simplest object that supports msg_id based |
|
112 | This is the simplest object that supports msg_id based | |
@@ -108,10 +115,15 b' class TaskScheduler(object):' | |||||
108 |
|
115 | |||
109 | """ |
|
116 | """ | |
110 |
|
117 | |||
111 | scheme = leastload # function for determining the destination |
|
118 | # configurables: | |
112 | client_stream = None # client-facing stream |
|
119 | scheme = Instance(FunctionType, default=leastload) # function for determining the destination | |
113 |
|
|
120 | client_stream = Instance(zmqstream.ZMQStream) # client-facing stream | |
114 |
|
|
121 | engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream | |
|
122 | notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream | |||
|
123 | mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream | |||
|
124 | io_loop = Instance(ioloop.IOLoop) | |||
|
125 | ||||
|
126 | # internals: | |||
115 | dependencies = None # dict by msg_id of [ msg_ids that depend on key ] |
|
127 | dependencies = None # dict by msg_id of [ msg_ids that depend on key ] | |
116 | depending = None # dict by msg_id of (msg_id, raw_msg, after, follow) |
|
128 | depending = None # dict by msg_id of (msg_id, raw_msg, after, follow) | |
117 | pending = None # dict by engine_uuid of submitted tasks |
|
129 | pending = None # dict by engine_uuid of submitted tasks | |
@@ -123,23 +135,10 b' class TaskScheduler(object):' | |||||
123 | blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency |
|
135 | blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency | |
124 |
|
136 | |||
125 |
|
137 | |||
126 | def __init__(self, client_stream, engine_stream, mon_stream, |
|
138 | def __init__(self, **kwargs): | |
127 | notifier_stream, scheme=None, io_loop=None): |
|
139 | super(TaskScheduler, self).__init__(**kwargs) | |
128 | if io_loop is None: |
|
|||
129 | io_loop = ioloop.IOLoop.instance() |
|
|||
130 | self.io_loop = io_loop |
|
|||
131 | self.client_stream = client_stream |
|
|||
132 | self.engine_stream = engine_stream |
|
|||
133 | self.mon_stream = mon_stream |
|
|||
134 | self.notifier_stream = notifier_stream |
|
|||
135 |
|
||||
136 | if scheme is not None: |
|
|||
137 | self.scheme = scheme |
|
|||
138 | else: |
|
|||
139 | self.scheme = TaskScheduler.scheme |
|
|||
140 |
|
140 | |||
141 | self.session = ss.StreamSession(username="TaskScheduler") |
|
141 | self.session = ss.StreamSession(username="TaskScheduler") | |
142 |
|
||||
143 | self.dependencies = {} |
|
142 | self.dependencies = {} | |
144 | self.depending = {} |
|
143 | self.depending = {} | |
145 | self.completed = {} |
|
144 | self.completed = {} | |
@@ -150,12 +149,13 b' class TaskScheduler(object):' | |||||
150 | self.targets = [] |
|
149 | self.targets = [] | |
151 | self.loads = [] |
|
150 | self.loads = [] | |
152 |
|
151 | |||
153 | engine_stream.on_recv(self.dispatch_result, copy=False) |
|
152 | self.engine_stream.on_recv(self.dispatch_result, copy=False) | |
154 | self._notification_handlers = dict( |
|
153 | self._notification_handlers = dict( | |
155 | registration_notification = self._register_engine, |
|
154 | registration_notification = self._register_engine, | |
156 | unregistration_notification = self._unregister_engine |
|
155 | unregistration_notification = self._unregister_engine | |
157 | ) |
|
156 | ) | |
158 | self.notifier_stream.on_recv(self.dispatch_notification) |
|
157 | self.notifier_stream.on_recv(self.dispatch_notification) | |
|
158 | logger.info("Scheduler started...%r"%self) | |||
159 |
|
159 | |||
160 | def resume_receiving(self): |
|
160 | def resume_receiving(self): | |
161 | """Resume accepting jobs.""" |
|
161 | """Resume accepting jobs.""" | |
@@ -183,6 +183,7 b' class TaskScheduler(object):' | |||||
183 | handler(str(msg['content']['queue'])) |
|
183 | handler(str(msg['content']['queue'])) | |
184 | except KeyError: |
|
184 | except KeyError: | |
185 | logger.error("task::Invalid notification msg: %s"%msg) |
|
185 | logger.error("task::Invalid notification msg: %s"%msg) | |
|
186 | ||||
186 | @logged |
|
187 | @logged | |
187 | def _register_engine(self, uid): |
|
188 | def _register_engine(self, uid): | |
188 | """New engine with ident `uid` became available.""" |
|
189 | """New engine with ident `uid` became available.""" | |
@@ -306,7 +307,8 b' class TaskScheduler(object):' | |||||
306 | self.add_job(idx) |
|
307 | self.add_job(idx) | |
307 | self.pending[target][msg_id] = (msg, follow) |
|
308 | self.pending[target][msg_id] = (msg, follow) | |
308 | content = dict(msg_id=msg_id, engine_id=target) |
|
309 | content = dict(msg_id=msg_id, engine_id=target) | |
309 |
self.session.send(self.mon_stream, 'task_destination', content=content, |
|
310 | self.session.send(self.mon_stream, 'task_destination', content=content, | |
|
311 | ident=['tracktask',self.session.session]) | |||
310 |
|
312 | |||
311 | #----------------------------------------------------------------------- |
|
313 | #----------------------------------------------------------------------- | |
312 | # Result Handling |
|
314 | # Result Handling | |
@@ -395,7 +397,7 b' class TaskScheduler(object):' | |||||
395 |
|
397 | |||
396 |
|
398 | |||
397 |
|
399 | |||
398 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'): |
|
400 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, loglevel=logging.DEBUG, scheme='weighted'): | |
399 | from zmq.eventloop import ioloop |
|
401 | from zmq.eventloop import ioloop | |
400 | from zmq.eventloop.zmqstream import ZMQStream |
|
402 | from zmq.eventloop.zmqstream import ZMQStream | |
401 |
|
403 | |||
@@ -414,7 +416,15 b" def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):" | |||||
414 | nots.setsockopt(zmq.SUBSCRIBE, '') |
|
416 | nots.setsockopt(zmq.SUBSCRIBE, '') | |
415 | nots.connect(not_addr) |
|
417 | nots.connect(not_addr) | |
416 |
|
418 | |||
417 | scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop) |
|
419 | # setup logging | |
|
420 | if log_addr: | |||
|
421 | connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel) | |||
|
422 | else: | |||
|
423 | local_logger(loglevel) | |||
|
424 | ||||
|
425 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, | |||
|
426 | mon_stream=mons,notifier_stream=nots, | |||
|
427 | scheme=scheme,io_loop=loop) | |||
418 |
|
428 | |||
419 | loop.start() |
|
429 | loop.start() | |
420 |
|
430 |
@@ -15,6 +15,7 b' import os' | |||||
15 | import sys |
|
15 | import sys | |
16 | import time |
|
16 | import time | |
17 | import traceback |
|
17 | import traceback | |
|
18 | import logging | |||
18 | from datetime import datetime |
|
19 | from datetime import datetime | |
19 | from signal import SIGTERM, SIGKILL |
|
20 | from signal import SIGTERM, SIGKILL | |
20 | from pprint import pprint |
|
21 | from pprint import pprint | |
@@ -25,9 +26,8 b' from zmq.eventloop import ioloop, zmqstream' | |||||
25 |
|
26 | |||
26 | # Local imports. |
|
27 | # Local imports. | |
27 | from IPython.core import ultratb |
|
28 | from IPython.core import ultratb | |
28 | from IPython.utils.traitlets import HasTraits, Instance, List |
|
29 | from IPython.utils.traitlets import HasTraits, Instance, List, Int | |
29 | from IPython.zmq.completer import KernelCompleter |
|
30 | from IPython.zmq.completer import KernelCompleter | |
30 | from IPython.zmq.log import logger # a Logger object |
|
|||
31 | from IPython.zmq.iostream import OutStream |
|
31 | from IPython.zmq.iostream import OutStream | |
32 | from IPython.zmq.displayhook import DisplayHook |
|
32 | from IPython.zmq.displayhook import DisplayHook | |
33 |
|
33 | |||
@@ -38,6 +38,8 b' from dependency import UnmetDependency' | |||||
38 | import heartmonitor |
|
38 | import heartmonitor | |
39 | from client import Client |
|
39 | from client import Client | |
40 |
|
40 | |||
|
41 | logger = logging.getLogger() | |||
|
42 | ||||
41 | def printer(*args): |
|
43 | def printer(*args): | |
42 | pprint(args, stream=sys.__stdout__) |
|
44 | pprint(args, stream=sys.__stdout__) | |
43 |
|
45 | |||
@@ -51,8 +53,9 b' class Kernel(HasTraits):' | |||||
51 | # Kernel interface |
|
53 | # Kernel interface | |
52 | #--------------------------------------------------------------------------- |
|
54 | #--------------------------------------------------------------------------- | |
53 |
|
55 | |||
|
56 | id = Int(-1) | |||
54 | session = Instance(StreamSession) |
|
57 | session = Instance(StreamSession) | |
55 |
shell_streams = |
|
58 | shell_streams = List() | |
56 | control_stream = Instance(zmqstream.ZMQStream) |
|
59 | control_stream = Instance(zmqstream.ZMQStream) | |
57 | task_stream = Instance(zmqstream.ZMQStream) |
|
60 | task_stream = Instance(zmqstream.ZMQStream) | |
58 | iopub_stream = Instance(zmqstream.ZMQStream) |
|
61 | iopub_stream = Instance(zmqstream.ZMQStream) | |
@@ -62,7 +65,8 b' class Kernel(HasTraits):' | |||||
62 | def __init__(self, **kwargs): |
|
65 | def __init__(self, **kwargs): | |
63 | super(Kernel, self).__init__(**kwargs) |
|
66 | super(Kernel, self).__init__(**kwargs) | |
64 | self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) |
|
67 | self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) | |
65 |
self.prefix = 'engine.%s'%self.id |
|
68 | self.prefix = 'engine.%s'%self.id | |
|
69 | logger.root_topic = self.prefix | |||
66 | self.user_ns = {} |
|
70 | self.user_ns = {} | |
67 | self.history = [] |
|
71 | self.history = [] | |
68 | self.compiler = CommandCompiler() |
|
72 | self.compiler = CommandCompiler() | |
@@ -108,8 +112,8 b' class Kernel(HasTraits):' | |||||
108 |
|
112 | |||
109 | # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." |
|
113 | # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." | |
110 | # msg = self.reply_socket.recv_json() |
|
114 | # msg = self.reply_socket.recv_json() | |
111 |
|
|
115 | logger.info("Aborting:") | |
112 | print (Message(msg), file=sys.__stdout__) |
|
116 | logger.info(str(msg)) | |
113 | msg_type = msg['msg_type'] |
|
117 | msg_type = msg['msg_type'] | |
114 | reply_type = msg_type.split('_')[0] + '_reply' |
|
118 | reply_type = msg_type.split('_')[0] + '_reply' | |
115 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) |
|
119 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | |
@@ -117,7 +121,7 b' class Kernel(HasTraits):' | |||||
117 | # self.reply_socket.send_json(reply_msg) |
|
121 | # self.reply_socket.send_json(reply_msg) | |
118 | reply_msg = self.session.send(stream, reply_type, |
|
122 | reply_msg = self.session.send(stream, reply_type, | |
119 | content={'status' : 'aborted'}, parent=msg, ident=idents)[0] |
|
123 | content={'status' : 'aborted'}, parent=msg, ident=idents)[0] | |
120 | print(Message(reply_msg), file=sys.__stdout__) |
|
124 | logger.debug(str(reply_msg)) | |
121 | # We need to wait a bit for requests to come in. This can probably |
|
125 | # We need to wait a bit for requests to come in. This can probably | |
122 | # be set shorter for true asynchronous clients. |
|
126 | # be set shorter for true asynchronous clients. | |
123 | time.sleep(0.05) |
|
127 | time.sleep(0.05) | |
@@ -135,7 +139,7 b' class Kernel(HasTraits):' | |||||
135 | content = dict(status='ok') |
|
139 | content = dict(status='ok') | |
136 | reply_msg = self.session.send(stream, 'abort_reply', content=content, |
|
140 | reply_msg = self.session.send(stream, 'abort_reply', content=content, | |
137 | parent=parent, ident=ident)[0] |
|
141 | parent=parent, ident=ident)[0] | |
138 |
|
|
142 | logger(Message(reply_msg), file=sys.__stdout__) | |
139 |
|
143 | |||
140 | def shutdown_request(self, stream, ident, parent): |
|
144 | def shutdown_request(self, stream, ident, parent): | |
141 | """kill ourself. This should really be handled in an external process""" |
|
145 | """kill ourself. This should really be handled in an external process""" | |
@@ -168,7 +172,7 b' class Kernel(HasTraits):' | |||||
168 |
|
172 | |||
169 | handler = self.control_handlers.get(msg['msg_type'], None) |
|
173 | handler = self.control_handlers.get(msg['msg_type'], None) | |
170 | if handler is None: |
|
174 | if handler is None: | |
171 |
|
|
175 | logger.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type']) | |
172 | else: |
|
176 | else: | |
173 | handler(self.control_stream, idents, msg) |
|
177 | handler(self.control_stream, idents, msg) | |
174 |
|
178 | |||
@@ -211,13 +215,12 b' class Kernel(HasTraits):' | |||||
211 | try: |
|
215 | try: | |
212 | code = parent[u'content'][u'code'] |
|
216 | code = parent[u'content'][u'code'] | |
213 | except: |
|
217 | except: | |
214 | print("Got bad msg: ", file=sys.__stderr__) |
|
218 | logger.error("Got bad msg: %s"%parent, exc_info=True) | |
215 | print(Message(parent), file=sys.__stderr__) |
|
|||
216 | return |
|
219 | return | |
217 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
220 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
218 | # self.iopub_stream.send(pyin_msg) |
|
221 | # self.iopub_stream.send(pyin_msg) | |
219 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, |
|
222 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, | |
220 |
ident=self. |
|
223 | ident='%s.pyin'%self.prefix) | |
221 | started = datetime.now().strftime(ISO8601) |
|
224 | started = datetime.now().strftime(ISO8601) | |
222 | try: |
|
225 | try: | |
223 | comp_code = self.compiler(code, '<zmq-kernel>') |
|
226 | comp_code = self.compiler(code, '<zmq-kernel>') | |
@@ -231,7 +234,7 b' class Kernel(HasTraits):' | |||||
231 | exc_content = self._wrap_exception('execute') |
|
234 | exc_content = self._wrap_exception('execute') | |
232 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
235 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
233 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, |
|
236 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, | |
234 |
ident=self. |
|
237 | ident='%s.pyerr'%self.prefix) | |
235 | reply_content = exc_content |
|
238 | reply_content = exc_content | |
236 | else: |
|
239 | else: | |
237 | reply_content = {'status' : 'ok'} |
|
240 | reply_content = {'status' : 'ok'} | |
@@ -240,7 +243,7 b' class Kernel(HasTraits):' | |||||
240 | # self.reply_socket.send_json(reply_msg) |
|
243 | # self.reply_socket.send_json(reply_msg) | |
241 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, |
|
244 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, | |
242 | ident=ident, subheader = dict(started=started)) |
|
245 | ident=ident, subheader = dict(started=started)) | |
243 | print(Message(reply_msg), file=sys.__stdout__) |
|
246 | logger.debug(str(reply_msg)) | |
244 | if reply_msg['content']['status'] == u'error': |
|
247 | if reply_msg['content']['status'] == u'error': | |
245 | self.abort_queues() |
|
248 | self.abort_queues() | |
246 |
|
249 | |||
@@ -262,8 +265,7 b' class Kernel(HasTraits):' | |||||
262 | msg_id = parent['header']['msg_id'] |
|
265 | msg_id = parent['header']['msg_id'] | |
263 | bound = content.get('bound', False) |
|
266 | bound = content.get('bound', False) | |
264 | except: |
|
267 | except: | |
265 | print("Got bad msg: ", file=sys.__stderr__) |
|
268 | logger.error("Got bad msg: %s"%parent, exc_info=True) | |
266 | print(Message(parent), file=sys.__stderr__) |
|
|||
267 | return |
|
269 | return | |
268 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
270 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
269 | # self.iopub_stream.send(pyin_msg) |
|
271 | # self.iopub_stream.send(pyin_msg) | |
@@ -316,7 +318,7 b' class Kernel(HasTraits):' | |||||
316 | exc_content = self._wrap_exception('apply') |
|
318 | exc_content = self._wrap_exception('apply') | |
317 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
319 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
318 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, |
|
320 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, | |
319 |
ident=self. |
|
321 | ident='%s.pyerr'%self.prefix) | |
320 | reply_content = exc_content |
|
322 | reply_content = exc_content | |
321 | result_buf = [] |
|
323 | result_buf = [] | |
322 |
|
324 | |||
@@ -354,7 +356,7 b' class Kernel(HasTraits):' | |||||
354 | return |
|
356 | return | |
355 | handler = self.shell_handlers.get(msg['msg_type'], None) |
|
357 | handler = self.shell_handlers.get(msg['msg_type'], None) | |
356 | if handler is None: |
|
358 | if handler is None: | |
357 |
|
|
359 | logger.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type']) | |
358 | else: |
|
360 | else: | |
359 | handler(stream, idents, msg) |
|
361 | handler(stream, idents, msg) | |
360 |
|
362 | |||
@@ -398,7 +400,7 b' class Kernel(HasTraits):' | |||||
398 | # # don't busywait |
|
400 | # # don't busywait | |
399 | # time.sleep(1e-3) |
|
401 | # time.sleep(1e-3) | |
400 |
|
402 | |||
401 | def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, |
|
403 | def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs, | |
402 | client_addr=None, loop=None, context=None, key=None, |
|
404 | client_addr=None, loop=None, context=None, key=None, | |
403 | out_stream_factory=OutStream, display_hook_factory=DisplayHook): |
|
405 | out_stream_factory=OutStream, display_hook_factory=DisplayHook): | |
404 |
|
406 | |||
@@ -410,7 +412,7 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,' | |||||
410 | c = context |
|
412 | c = context | |
411 | session = StreamSession(key=key) |
|
413 | session = StreamSession(key=key) | |
412 | # print (session.key) |
|
414 | # print (session.key) | |
413 | print (control_addr, shell_addrs, iopub_addr, hb_addrs) |
|
415 | # print (control_addr, shell_addrs, iopub_addr, hb_addrs) | |
414 |
|
416 | |||
415 | # create Control Stream |
|
417 | # create Control Stream | |
416 | control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) |
|
418 | control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) | |
@@ -433,12 +435,12 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,' | |||||
433 | # Redirect input streams and set a display hook. |
|
435 | # Redirect input streams and set a display hook. | |
434 | if out_stream_factory: |
|
436 | if out_stream_factory: | |
435 | sys.stdout = out_stream_factory(session, iopub_stream, u'stdout') |
|
437 | sys.stdout = out_stream_factory(session, iopub_stream, u'stdout') | |
436 |
sys.stdout.topic = |
|
438 | sys.stdout.topic = 'engine.%i.stdout'%int_id | |
437 | sys.stderr = out_stream_factory(session, iopub_stream, u'stderr') |
|
439 | sys.stderr = out_stream_factory(session, iopub_stream, u'stderr') | |
438 |
sys.stderr.topic = |
|
440 | sys.stderr.topic = 'engine.%i.stderr'%int_id | |
439 | if display_hook_factory: |
|
441 | if display_hook_factory: | |
440 | sys.displayhook = display_hook_factory(session, iopub_stream) |
|
442 | sys.displayhook = display_hook_factory(session, iopub_stream) | |
441 |
sys.displayhook.topic = |
|
443 | sys.displayhook.topic = 'engine.%i.pyout'%int_id | |
442 |
|
444 | |||
443 |
|
445 | |||
444 | # launch heartbeat |
|
446 | # launch heartbeat | |
@@ -451,7 +453,7 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,' | |||||
451 | else: |
|
453 | else: | |
452 | client = None |
|
454 | client = None | |
453 |
|
455 | |||
454 | kernel = Kernel(session=session, control_stream=control_stream, |
|
456 | kernel = Kernel(id=int_id, session=session, control_stream=control_stream, | |
455 | shell_streams=shell_streams, iopub_stream=iopub_stream, |
|
457 | shell_streams=shell_streams, iopub_stream=iopub_stream, | |
456 | client=client, loop=loop) |
|
458 | client=client, loop=loop) | |
457 | kernel.start() |
|
459 | kernel.start() |
@@ -1,4 +1,5 b'' | |||||
1 | """some generic utilities""" |
|
1 | """some generic utilities""" | |
|
2 | import re | |||
2 |
|
3 | |||
3 | class ReverseDict(dict): |
|
4 | class ReverseDict(dict): | |
4 | """simple double-keyed subset of dict methods.""" |
|
5 | """simple double-keyed subset of dict methods.""" | |
@@ -33,3 +34,46 b' class ReverseDict(dict):' | |||||
33 | return default |
|
34 | return default | |
34 |
|
35 | |||
35 |
|
36 | |||
|
37 | def validate_url(url): | |||
|
38 | """validate a url for zeromq""" | |||
|
39 | if not isinstance(url, basestring): | |||
|
40 | raise TypeError("url must be a string, not %r"%type(url)) | |||
|
41 | url = url.lower() | |||
|
42 | ||||
|
43 | proto_addr = url.split('://') | |||
|
44 | assert len(proto_addr) == 2, 'Invalid url: %r'%url | |||
|
45 | proto, addr = proto_addr | |||
|
46 | assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto | |||
|
47 | ||||
|
48 | # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391 | |||
|
49 | # author: Remi Sabourin | |||
|
50 | pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$') | |||
|
51 | ||||
|
52 | if proto == 'tcp': | |||
|
53 | lis = addr.split(':') | |||
|
54 | assert len(lis) == 2, 'Invalid url: %r'%url | |||
|
55 | addr,s_port = lis | |||
|
56 | try: | |||
|
57 | port = int(s_port) | |||
|
58 | except ValueError: | |||
|
59 | raise AssertionError("Invalid port %r in url: %r"%(port, url)) | |||
|
60 | ||||
|
61 | assert pat.match(addr) is not None, 'Invalid url: %r'%url | |||
|
62 | ||||
|
63 | else: | |||
|
64 | # only validate tcp urls currently | |||
|
65 | pass | |||
|
66 | ||||
|
67 | return True | |||
|
68 | ||||
|
69 | ||||
|
70 | def validate_url_container(container): | |||
|
71 | """validate a potentially nested collection of urls.""" | |||
|
72 | if isinstance(container, basestring): | |||
|
73 | url = container | |||
|
74 | return validate_url(url) | |||
|
75 | elif isinstance(container, dict): | |||
|
76 | container = container.itervalues() | |||
|
77 | ||||
|
78 | for element in container: | |||
|
79 | validate_url_container(element) No newline at end of file |
@@ -19,6 +19,7 b'' | |||||
19 | # You should have received a copy of the Lesser GNU General Public License |
|
19 | # You should have received a copy of the Lesser GNU General Public License | |
20 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
20 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
21 |
|
21 | |||
|
22 | import sys | |||
22 | import zmq |
|
23 | import zmq | |
23 | logport = 20202 |
|
24 | logport = 20202 | |
24 | def main(topics, addrs): |
|
25 | def main(topics, addrs): | |
@@ -26,20 +27,27 b' def main(topics, addrs):' | |||||
26 | context = zmq.Context() |
|
27 | context = zmq.Context() | |
27 | socket = context.socket(zmq.SUB) |
|
28 | socket = context.socket(zmq.SUB) | |
28 | for topic in topics: |
|
29 | for topic in topics: | |
|
30 | print "Subscribing to: %r"%topic | |||
29 | socket.setsockopt(zmq.SUBSCRIBE, topic) |
|
31 | socket.setsockopt(zmq.SUBSCRIBE, topic) | |
30 | if addrs: |
|
32 | if addrs: | |
31 | for addr in addrs: |
|
33 | for addr in addrs: | |
32 | print "Connecting to: ", addr |
|
34 | print "Connecting to: ", addr | |
33 | socket.connect(addr) |
|
35 | socket.connect(addr) | |
34 | else: |
|
36 | else: | |
35 |
socket.bind('tcp:// |
|
37 | socket.bind('tcp://*:%i'%logport) | |
36 |
|
38 | |||
37 | while True: |
|
39 | while True: | |
38 | # topic = socket.recv() |
|
40 | # topic = socket.recv() | |
39 | # print topic |
|
41 | # print topic | |
40 | topic, msg = socket.recv_multipart() |
|
42 | # print 'tic' | |
41 |
|
|
43 | raw = socket.recv_multipart() | |
42 | print "%s | %s " % (topic, msg), |
|
44 | if len(raw) != 2: | |
|
45 | print "!!! invalid log message: %s"%raw | |||
|
46 | else: | |||
|
47 | topic, msg = raw | |||
|
48 | # don't newline, since log messages always newline: | |||
|
49 | print "%s | %s" % (topic, msg), | |||
|
50 | sys.stdout.flush() | |||
43 |
|
51 | |||
44 | if __name__ == '__main__': |
|
52 | if __name__ == '__main__': | |
45 | import sys |
|
53 | import sys |
General Comments 0
You need to be logged in to leave comments.
Login now