Show More
@@ -4,6 +4,8 b' from session import extract_header' | |||||
4 |
|
4 | |||
5 | class DisplayHook(object): |
|
5 | class DisplayHook(object): | |
6 |
|
6 | |||
|
7 | topic=None | |||
|
8 | ||||
7 | def __init__(self, session, pub_socket): |
|
9 | def __init__(self, session, pub_socket): | |
8 | self.session = session |
|
10 | self.session = session | |
9 | self.pub_socket = pub_socket |
|
11 | self.pub_socket = pub_socket | |
@@ -15,7 +17,7 b' class DisplayHook(object):' | |||||
15 |
|
17 | |||
16 | __builtin__._ = obj |
|
18 | __builtin__._ = obj | |
17 | msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)}, |
|
19 | msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)}, | |
18 | parent=self.parent_header) |
|
20 | parent=self.parent_header, ident=self.topic) | |
19 |
|
21 | |||
20 | def set_parent(self, parent): |
|
22 | def set_parent(self, parent): | |
21 | self.parent_header = extract_header(parent) No newline at end of file |
|
23 | self.parent_header = extract_header(parent) |
@@ -23,7 +23,8 b' class OutStream(object):' | |||||
23 |
|
23 | |||
24 | # The time interval between automatic flushes, in seconds. |
|
24 | # The time interval between automatic flushes, in seconds. | |
25 | flush_interval = 0.05 |
|
25 | flush_interval = 0.05 | |
26 |
|
26 | topic=None | ||
|
27 | ||||
27 | def __init__(self, session, pub_socket, name): |
|
28 | def __init__(self, session, pub_socket, name): | |
28 | self.session = session |
|
29 | self.session = session | |
29 | self.pub_socket = pub_socket |
|
30 | self.pub_socket = pub_socket | |
@@ -49,10 +50,10 b' class OutStream(object):' | |||||
49 | enc = sys.stdin.encoding or sys.getdefaultencoding() |
|
50 | enc = sys.stdin.encoding or sys.getdefaultencoding() | |
50 | data = data.decode(enc, 'replace') |
|
51 | data = data.decode(enc, 'replace') | |
51 | content = {u'name':self.name, u'data':data} |
|
52 | content = {u'name':self.name, u'data':data} | |
52 | msg = self.session.send(self.pub_socket, u'stream', |
|
53 | msg = self.session.send(self.pub_socket, u'stream', content=content, | |
53 |
|
|
54 | parent=self.parent_header, ident=self.topic) | |
54 | parent=self.parent_header) |
|
|||
55 | logger.debug(msg) |
|
55 | logger.debug(msg) | |
|
56 | ||||
56 | self._buffer.close() |
|
57 | self._buffer.close() | |
57 | self._new_buffer() |
|
58 | self._new_buffer() | |
58 |
|
59 |
@@ -137,11 +137,13 b' class AsyncResult(object):' | |||||
137 | return self._metadata |
|
137 | return self._metadata | |
138 |
|
138 | |||
139 | @property |
|
139 | @property | |
140 | @check_ready |
|
|||
141 | def result_dict(self): |
|
140 | def result_dict(self): | |
142 | """result property as a dict.""" |
|
141 | """result property as a dict.""" | |
143 | return self.get_dict(0) |
|
142 | return self.get_dict(0) | |
144 |
|
143 | |||
|
144 | def __dict__(self): | |||
|
145 | return self.get_dict(0) | |||
|
146 | ||||
145 | #------------------------------------- |
|
147 | #------------------------------------- | |
146 | # dict-access |
|
148 | # dict-access | |
147 | #------------------------------------- |
|
149 | #------------------------------------- |
@@ -103,6 +103,32 b' class ResultDict(dict):' | |||||
103 | raise res |
|
103 | raise res | |
104 | return res |
|
104 | return res | |
105 |
|
105 | |||
|
106 | class Metadata(dict): | |||
|
107 | """Subclass of dict for initializing metadata values.""" | |||
|
108 | def __init__(self, *args, **kwargs): | |||
|
109 | dict.__init__(self) | |||
|
110 | md = {'msg_id' : None, | |||
|
111 | 'submitted' : None, | |||
|
112 | 'started' : None, | |||
|
113 | 'completed' : None, | |||
|
114 | 'received' : None, | |||
|
115 | 'engine_uuid' : None, | |||
|
116 | 'engine_id' : None, | |||
|
117 | 'follow' : None, | |||
|
118 | 'after' : None, | |||
|
119 | 'status' : None, | |||
|
120 | ||||
|
121 | 'pyin' : None, | |||
|
122 | 'pyout' : None, | |||
|
123 | 'pyerr' : None, | |||
|
124 | 'stdout' : '', | |||
|
125 | 'stderr' : '', | |||
|
126 | } | |||
|
127 | self.update(md) | |||
|
128 | self.update(dict(*args, **kwargs)) | |||
|
129 | ||||
|
130 | ||||
|
131 | ||||
106 | class Client(object): |
|
132 | class Client(object): | |
107 | """A semi-synchronous client to the IPython ZMQ controller |
|
133 | """A semi-synchronous client to the IPython ZMQ controller | |
108 |
|
134 | |||
@@ -196,6 +222,7 b' class Client(object):' | |||||
196 | _registration_socket=None |
|
222 | _registration_socket=None | |
197 | _query_socket=None |
|
223 | _query_socket=None | |
198 | _control_socket=None |
|
224 | _control_socket=None | |
|
225 | _iopub_socket=None | |||
199 | _notification_socket=None |
|
226 | _notification_socket=None | |
200 | _mux_socket=None |
|
227 | _mux_socket=None | |
201 | _task_socket=None |
|
228 | _task_socket=None | |
@@ -325,6 +352,11 b' class Client(object):' | |||||
325 | self._control_socket = self.context.socket(zmq.PAIR) |
|
352 | self._control_socket = self.context.socket(zmq.PAIR) | |
326 | self._control_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
353 | self._control_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
327 | connect_socket(self._control_socket, content.control) |
|
354 | connect_socket(self._control_socket, content.control) | |
|
355 | if content.iopub: | |||
|
356 | self._iopub_socket = self.context.socket(zmq.SUB) | |||
|
357 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '') | |||
|
358 | self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) | |||
|
359 | connect_socket(self._iopub_socket, content.iopub) | |||
328 | self._update_engines(dict(content.engines)) |
|
360 | self._update_engines(dict(content.engines)) | |
329 |
|
361 | |||
330 | else: |
|
362 | else: | |
@@ -350,8 +382,8 b' class Client(object):' | |||||
350 | if eid in self._ids: |
|
382 | if eid in self._ids: | |
351 | self._ids.remove(eid) |
|
383 | self._ids.remove(eid) | |
352 | self._engines.pop(eid) |
|
384 | self._engines.pop(eid) | |
353 |
|
|
385 | ||
354 |
def _ |
|
386 | def _extract_metadata(self, header, parent, content): | |
355 | md = {'msg_id' : parent['msg_id'], |
|
387 | md = {'msg_id' : parent['msg_id'], | |
356 | 'submitted' : datetime.strptime(parent['date'], ss.ISO8601), |
|
388 | 'submitted' : datetime.strptime(parent['date'], ss.ISO8601), | |
357 | 'started' : datetime.strptime(header['started'], ss.ISO8601), |
|
389 | 'started' : datetime.strptime(header['started'], ss.ISO8601), | |
@@ -361,8 +393,8 b' class Client(object):' | |||||
361 | 'engine_id' : self._engines.get(header['engine'], None), |
|
393 | 'engine_id' : self._engines.get(header['engine'], None), | |
362 | 'follow' : parent['follow'], |
|
394 | 'follow' : parent['follow'], | |
363 | 'after' : parent['after'], |
|
395 | 'after' : parent['after'], | |
364 | 'status' : content['status'] |
|
396 | 'status' : content['status'], | |
365 |
|
|
397 | } | |
366 | return md |
|
398 | return md | |
367 |
|
399 | |||
368 | def _handle_execute_reply(self, msg): |
|
400 | def _handle_execute_reply(self, msg): | |
@@ -390,8 +422,12 b' class Client(object):' | |||||
390 | content = msg['content'] |
|
422 | content = msg['content'] | |
391 | header = msg['header'] |
|
423 | header = msg['header'] | |
392 |
|
424 | |||
393 | self.metadata[msg_id] = self._build_metadata(header, parent, content) |
|
425 | # construct metadata: | |
|
426 | md = self.metadata.setdefault(msg_id, Metadata()) | |||
|
427 | md.update(self._extract_metadata(header, parent, content)) | |||
|
428 | self.metadata[msg_id] = md | |||
394 |
|
429 | |||
|
430 | # construct result: | |||
395 | if content['status'] == 'ok': |
|
431 | if content['status'] == 'ok': | |
396 | self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0] |
|
432 | self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0] | |
397 | elif content['status'] == 'aborted': |
|
433 | elif content['status'] == 'aborted': | |
@@ -448,6 +484,37 b' class Client(object):' | |||||
448 | pprint(msg) |
|
484 | pprint(msg) | |
449 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) |
|
485 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
450 |
|
486 | |||
|
487 | def _flush_iopub(self, sock): | |||
|
488 | """Flush replies from the iopub channel waiting | |||
|
489 | in the ZMQ queue. | |||
|
490 | """ | |||
|
491 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |||
|
492 | while msg is not None: | |||
|
493 | if self.debug: | |||
|
494 | pprint(msg) | |||
|
495 | msg = msg[-1] | |||
|
496 | parent = msg['parent_header'] | |||
|
497 | msg_id = parent['msg_id'] | |||
|
498 | content = msg['content'] | |||
|
499 | header = msg['header'] | |||
|
500 | msg_type = msg['msg_type'] | |||
|
501 | ||||
|
502 | # init metadata: | |||
|
503 | md = self.metadata.setdefault(msg_id, Metadata()) | |||
|
504 | ||||
|
505 | if msg_type == 'stream': | |||
|
506 | name = content['name'] | |||
|
507 | s = md[name] or '' | |||
|
508 | md[name] = s + content['data'] | |||
|
509 | elif msg_type == 'pyerr': | |||
|
510 | md.update({'pyerr' : ss.unwrap_exception(content)}) | |||
|
511 | else: | |||
|
512 | md.update({msg_type : content['data']}) | |||
|
513 | ||||
|
514 | self.metadata[msg_id] = md | |||
|
515 | ||||
|
516 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |||
|
517 | ||||
451 | #-------------------------------------------------------------------------- |
|
518 | #-------------------------------------------------------------------------- | |
452 | # getitem |
|
519 | # getitem | |
453 | #-------------------------------------------------------------------------- |
|
520 | #-------------------------------------------------------------------------- | |
@@ -501,6 +568,8 b' class Client(object):' | |||||
501 | self._flush_results(self._task_socket) |
|
568 | self._flush_results(self._task_socket) | |
502 | if self._control_socket: |
|
569 | if self._control_socket: | |
503 | self._flush_control(self._control_socket) |
|
570 | self._flush_control(self._control_socket) | |
|
571 | if self._iopub_socket: | |||
|
572 | self._flush_iopub(self._iopub_socket) | |||
504 |
|
573 | |||
505 | def barrier(self, msg_ids=None, timeout=-1): |
|
574 | def barrier(self, msg_ids=None, timeout=-1): | |
506 | """waits on one or more `msg_ids`, for up to `timeout` seconds. |
|
575 | """waits on one or more `msg_ids`, for up to `timeout` seconds. | |
@@ -966,10 +1035,13 b' class Client(object):' | |||||
966 | parent = rec['header'] |
|
1035 | parent = rec['header'] | |
967 | header = rec['result_header'] |
|
1036 | header = rec['result_header'] | |
968 | rcontent = rec['result_content'] |
|
1037 | rcontent = rec['result_content'] | |
|
1038 | iodict = rec['io'] | |||
969 | if isinstance(rcontent, str): |
|
1039 | if isinstance(rcontent, str): | |
970 | rcontent = self.session.unpack(rcontent) |
|
1040 | rcontent = self.session.unpack(rcontent) | |
971 |
|
1041 | |||
972 | self.metadata[msg_id] = self._build_metadata(header, parent, rcontent) |
|
1042 | md = self.metadata.setdefault(msg_id, Metadata()) | |
|
1043 | md.update(self._extract_metadata(header, parent, rcontent)) | |||
|
1044 | md.update(iodict) | |||
973 |
|
1045 | |||
974 | if rcontent['status'] == 'ok': |
|
1046 | if rcontent['status'] == 'ok': | |
975 | res,buffers = ss.unserialize_object(buffers) |
|
1047 | res,buffers = ss.unserialize_object(buffers) |
@@ -58,8 +58,8 b' def make_argument_parser():' | |||||
58 | help='set the PUB socket for registration notification [default: random]') |
|
58 | help='set the PUB socket for registration notification [default: random]') | |
59 | parser.add_argument('--hb', type=str, metavar='PORTS', |
|
59 | parser.add_argument('--hb', type=str, metavar='PORTS', | |
60 | help='set the 2 ports for heartbeats [default: random]') |
|
60 | help='set the 2 ports for heartbeats [default: random]') | |
61 |
parser.add_argument('--ping', type=int, default= |
|
61 | parser.add_argument('--ping', type=int, default=100, | |
62 |
help='set the heartbeat period in ms [default: |
|
62 | help='set the heartbeat period in ms [default: 100]') | |
63 | parser.add_argument('--monitor', type=int, metavar='PORT', default=0, |
|
63 | parser.add_argument('--monitor', type=int, metavar='PORT', default=0, | |
64 | help='set the SUB port for queue monitoring [default: random]') |
|
64 | help='set the SUB port for queue monitoring [default: random]') | |
65 | parser.add_argument('--mux', type=str, metavar='PORTS', |
|
65 | parser.add_argument('--mux', type=str, metavar='PORTS', | |
@@ -68,11 +68,15 b' def make_argument_parser():' | |||||
68 | help='set the XREP/XREQ ports for the task queue [default: random]') |
|
68 | help='set the XREP/XREQ ports for the task queue [default: random]') | |
69 | parser.add_argument('--control', type=str, metavar='PORTS', |
|
69 | parser.add_argument('--control', type=str, metavar='PORTS', | |
70 | help='set the XREP ports for the control queue [default: random]') |
|
70 | help='set the XREP ports for the control queue [default: random]') | |
71 |
parser.add_argument('-- |
|
71 | parser.add_argument('--iopub', type=str, metavar='PORTS', | |
|
72 | help='set the PUB/SUB ports for the iopub relay [default: random]') | |||
|
73 | parser.add_argument('--scheduler', type=str, default='lru', | |||
72 | choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], |
|
74 | choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], | |
73 |
help='select the task scheduler [default: |
|
75 | help='select the task scheduler [default: Python LRU]') | |
74 | parser.add_argument('--mongodb', action='store_true', |
|
76 | parser.add_argument('--mongodb', action='store_true', | |
75 | help='Use MongoDB task storage [default: in-memory]') |
|
77 | help='Use MongoDB task storage [default: in-memory]') | |
|
78 | parser.add_argument('--session', type=str, default=None, | |||
|
79 | help='Manually specify the session id.') | |||
76 |
|
80 | |||
77 | return parser |
|
81 | return parser | |
78 |
|
82 | |||
@@ -95,6 +99,11 b' def main(argv=None):' | |||||
95 | else: |
|
99 | else: | |
96 | mux = None |
|
100 | mux = None | |
97 | random_ports += 2 |
|
101 | random_ports += 2 | |
|
102 | if args.iopub: | |||
|
103 | iopub = split_ports(args.iopub, 2) | |||
|
104 | else: | |||
|
105 | iopub = None | |||
|
106 | random_ports += 2 | |||
98 | if args.task: |
|
107 | if args.task: | |
99 | task = split_ports(args.task, 2) |
|
108 | task = split_ports(args.task, 2) | |
100 | else: |
|
109 | else: | |
@@ -139,7 +148,8 b' def main(argv=None):' | |||||
139 | if args.execkey and not os.path.isfile(args.execkey): |
|
148 | if args.execkey and not os.path.isfile(args.execkey): | |
140 | generate_exec_key(args.execkey) |
|
149 | generate_exec_key(args.execkey) | |
141 |
|
150 | |||
142 |
thesession = session.StreamSession(username=args.ident or "controller", |
|
151 | thesession = session.StreamSession(username=args.ident or "controller", | |
|
152 | keyfile=args.execkey, session=args.session) | |||
143 |
|
153 | |||
144 | ### build and launch the queues ### |
|
154 | ### build and launch the queues ### | |
145 |
|
155 | |||
@@ -151,6 +161,19 b' def main(argv=None):' | |||||
151 |
|
161 | |||
152 | ports = select_random_ports(random_ports) |
|
162 | ports = select_random_ports(random_ports) | |
153 | children = [] |
|
163 | children = [] | |
|
164 | ||||
|
165 | # IOPub relay (in a Process) | |||
|
166 | if not iopub: | |||
|
167 | iopub = (ports.pop(),ports.pop()) | |||
|
168 | q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A') | |||
|
169 | q.bind_in(iface%iopub[1]) | |||
|
170 | q.bind_out(iface%iopub[0]) | |||
|
171 | q.setsockopt_in(zmq.SUBSCRIBE, '') | |||
|
172 | q.connect_mon(iface%monport) | |||
|
173 | q.daemon=True | |||
|
174 | q.start() | |||
|
175 | children.append(q.launcher) | |||
|
176 | ||||
154 | # Multiplexer Queue (in a Process) |
|
177 | # Multiplexer Queue (in a Process) | |
155 | if not mux: |
|
178 | if not mux: | |
156 | mux = (ports.pop(),ports.pop()) |
|
179 | mux = (ports.pop(),ports.pop()) | |
@@ -204,6 +227,7 b' def main(argv=None):' | |||||
204 | 'queue': iface%mux[1], |
|
227 | 'queue': iface%mux[1], | |
205 | 'heartbeat': (iface%hb[0], iface%hb[1]), |
|
228 | 'heartbeat': (iface%hb[0], iface%hb[1]), | |
206 | 'task' : iface%task[1], |
|
229 | 'task' : iface%task[1], | |
|
230 | 'iopub' : iface%iopub[1], | |||
207 | 'monitor' : iface%monport, |
|
231 | 'monitor' : iface%monport, | |
208 | } |
|
232 | } | |
209 |
|
233 | |||
@@ -212,8 +236,11 b' def main(argv=None):' | |||||
212 | 'query': iface%cport, |
|
236 | 'query': iface%cport, | |
213 | 'queue': iface%mux[0], |
|
237 | 'queue': iface%mux[0], | |
214 | 'task' : iface%task[0], |
|
238 | 'task' : iface%task[0], | |
|
239 | 'iopub' : iface%iopub[0], | |||
215 | 'notification': iface%nport |
|
240 | 'notification': iface%nport | |
216 | } |
|
241 | } | |
|
242 | ||||
|
243 | # register relay of signals to the children | |||
217 | signal_children(children) |
|
244 | signal_children(children) | |
218 | hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs) |
|
245 | hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs) | |
219 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) |
|
246 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) |
@@ -26,7 +26,7 b' from entry_point import make_base_argument_parser, connect_logger, parse_url' | |||||
26 |
|
26 | |||
27 |
|
27 | |||
28 | def printer(*msg): |
|
28 | def printer(*msg): | |
29 | pprint(msg) |
|
29 | pprint(msg, stream=sys.__stdout__) | |
30 |
|
30 | |||
31 | class Engine(object): |
|
31 | class Engine(object): | |
32 | """IPython engine""" |
|
32 | """IPython engine""" | |
@@ -69,19 +69,12 b' class Engine(object):' | |||||
69 | shell_addrs = [str(queue_addr)] |
|
69 | shell_addrs = [str(queue_addr)] | |
70 | control_addr = str(msg.content.control) |
|
70 | control_addr = str(msg.content.control) | |
71 | task_addr = msg.content.task |
|
71 | task_addr = msg.content.task | |
|
72 | iopub_addr = msg.content.iopub | |||
72 | if task_addr: |
|
73 | if task_addr: | |
73 | shell_addrs.append(str(task_addr)) |
|
74 | shell_addrs.append(str(task_addr)) | |
74 |
|
75 | |||
75 | hb_addrs = msg.content.heartbeat |
|
76 | hb_addrs = msg.content.heartbeat | |
76 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() |
|
77 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() | |
77 |
|
||||
78 | # placeholder for no, since pub isn't hooked up: |
|
|||
79 | sub = self.context.socket(zmq.SUB) |
|
|||
80 | sub = zmqstream.ZMQStream(sub, self.loop) |
|
|||
81 | sub.on_recv(lambda *a: None) |
|
|||
82 | port = sub.bind_to_random_port("tcp://%s"%LOCALHOST) |
|
|||
83 | iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345) |
|
|||
84 |
|
||||
85 | k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, |
|
78 | k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, | |
86 | hb_addrs, client_addr=None, loop=self.loop, |
|
79 | hb_addrs, client_addr=None, loop=self.loop, | |
87 | context=self.context, key=self.session.key)[-1] |
|
80 | context=self.context, key=self.session.key)[-1] | |
@@ -96,7 +89,7 b' class Engine(object):' | |||||
96 |
|
89 | |||
97 | # logger.info("engine::completed registration with id %s"%self.session.username) |
|
90 | # logger.info("engine::completed registration with id %s"%self.session.username) | |
98 |
|
91 | |||
99 | print (msg) |
|
92 | print (msg,file=sys.__stdout__) | |
100 |
|
93 | |||
101 | def unregister(self): |
|
94 | def unregister(self): | |
102 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) |
|
95 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) | |
@@ -104,7 +97,7 b' class Engine(object):' | |||||
104 | sys.exit(0) |
|
97 | sys.exit(0) | |
105 |
|
98 | |||
106 | def start(self): |
|
99 | def start(self): | |
107 | print ("registering") |
|
100 | print ("registering",file=sys.__stdout__) | |
108 | self.register() |
|
101 | self.register() | |
109 |
|
102 | |||
110 |
|
103 | |||
@@ -128,7 +121,7 b' def main(argv=None, user_ns=None):' | |||||
128 | connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) |
|
121 | connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) | |
129 |
|
122 | |||
130 | reg_conn = iface % args.regport |
|
123 | reg_conn = iface % args.regport | |
131 | print (reg_conn) |
|
124 | print (reg_conn, file=sys.__stdout__) | |
132 | print ("Starting the engine...", file=sys.__stderr__) |
|
125 | print ("Starting the engine...", file=sys.__stderr__) | |
133 |
|
126 | |||
134 | reg = ctx.socket(zmq.PAIR) |
|
127 | reg = ctx.socket(zmq.PAIR) |
@@ -91,7 +91,7 b' def make_base_argument_parser():' | |||||
91 | help='set the XREP port for registration [default: 10101]') |
|
91 | help='set the XREP port for registration [default: 10101]') | |
92 | parser.add_argument('--logport', type=int, metavar='PORT', default=20202, |
|
92 | parser.add_argument('--logport', type=int, metavar='PORT', default=20202, | |
93 | help='set the PUB port for logging [default: 10201]') |
|
93 | help='set the PUB port for logging [default: 10201]') | |
94 |
parser.add_argument('--loglevel', type= |
|
94 | parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.DEBUG, | |
95 | help='set the log level [default: DEBUG]') |
|
95 | help='set the log level [default: DEBUG]') | |
96 | parser.add_argument('--ident', type=str, |
|
96 | parser.add_argument('--ident', type=str, | |
97 | help='set the ZMQ identity [default: random]') |
|
97 | help='set the ZMQ identity [default: random]') | |
@@ -107,6 +107,11 b' def make_base_argument_parser():' | |||||
107 |
|
107 | |||
108 |
|
108 | |||
109 | def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG): |
|
109 | def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG): | |
|
110 | try: | |||
|
111 | loglevel = int(loglevel) | |||
|
112 | except ValueError: | |||
|
113 | if isinstance(loglevel, str): | |||
|
114 | loglevel = getattr(logging, loglevel) | |||
110 | lsock = context.socket(zmq.PUB) |
|
115 | lsock = context.socket(zmq.PUB) | |
111 | lsock.connect(iface) |
|
116 | lsock.connect(iface) | |
112 | handler = handlers.PUBHandler(lsock) |
|
117 | handler = handlers.PUBHandler(lsock) |
@@ -41,6 +41,10 b' else:' | |||||
41 | def _passer(*args, **kwargs): |
|
41 | def _passer(*args, **kwargs): | |
42 | return |
|
42 | return | |
43 |
|
43 | |||
|
44 | def _printer(*args, **kwargs): | |||
|
45 | print (args) | |||
|
46 | print (kwargs) | |||
|
47 | ||||
44 | def init_record(msg): |
|
48 | def init_record(msg): | |
45 | """return an empty TaskRecord dict, with all keys initialized with None.""" |
|
49 | """return an empty TaskRecord dict, with all keys initialized with None.""" | |
46 | header = msg['header'] |
|
50 | header = msg['header'] | |
@@ -58,7 +62,12 b' def init_record(msg):' | |||||
58 | 'result_header' : None, |
|
62 | 'result_header' : None, | |
59 | 'result_content' : None, |
|
63 | 'result_content' : None, | |
60 | 'result_buffers' : None, |
|
64 | 'result_buffers' : None, | |
61 | 'queue' : None |
|
65 | 'queue' : None, | |
|
66 | 'pyin' : None, | |||
|
67 | 'pyout': None, | |||
|
68 | 'pyerr': None, | |||
|
69 | 'stdout': '', | |||
|
70 | 'stderr': '', | |||
62 | } |
|
71 | } | |
63 |
|
72 | |||
64 |
|
73 | |||
@@ -181,19 +190,20 b' class Hub(object):' | |||||
181 | # register our callbacks |
|
190 | # register our callbacks | |
182 | self.registrar.on_recv(self.dispatch_register_request) |
|
191 | self.registrar.on_recv(self.dispatch_register_request) | |
183 | self.clientele.on_recv(self.dispatch_client_msg) |
|
192 | self.clientele.on_recv(self.dispatch_client_msg) | |
184 |
self.queue.on_recv(self.dispatch_ |
|
193 | self.queue.on_recv(self.dispatch_monitor_traffic) | |
185 |
|
194 | |||
186 | if heartbeat is not None: |
|
195 | if heartbeat is not None: | |
187 | heartbeat.add_heart_failure_handler(self.handle_heart_failure) |
|
196 | heartbeat.add_heart_failure_handler(self.handle_heart_failure) | |
188 | heartbeat.add_new_heart_handler(self.handle_new_heart) |
|
197 | heartbeat.add_new_heart_handler(self.handle_new_heart) | |
189 |
|
198 | |||
190 |
self. |
|
199 | self.monitor_handlers = { 'in' : self.save_queue_request, | |
191 | 'out': self.save_queue_result, |
|
200 | 'out': self.save_queue_result, | |
192 | 'intask': self.save_task_request, |
|
201 | 'intask': self.save_task_request, | |
193 | 'outtask': self.save_task_result, |
|
202 | 'outtask': self.save_task_result, | |
194 | 'tracktask': self.save_task_destination, |
|
203 | 'tracktask': self.save_task_destination, | |
195 | 'incontrol': _passer, |
|
204 | 'incontrol': _passer, | |
196 | 'outcontrol': _passer, |
|
205 | 'outcontrol': _passer, | |
|
206 | 'iopub': self.save_iopub_message, | |||
197 | } |
|
207 | } | |
198 |
|
208 | |||
199 | self.client_handlers = {'queue_request': self.queue_status, |
|
209 | self.client_handlers = {'queue_request': self.queue_status, | |
@@ -262,7 +272,7 b' class Hub(object):' | |||||
262 | try: |
|
272 | try: | |
263 | msg = self.session.unpack_message(msg[1:], content=True) |
|
273 | msg = self.session.unpack_message(msg[1:], content=True) | |
264 | except: |
|
274 | except: | |
265 | logger.error("client::Invalid Message %s"%msg) |
|
275 | logger.error("client::Invalid Message %s"%msg, exc_info=True) | |
266 | return False |
|
276 | return False | |
267 |
|
277 | |||
268 | msg_type = msg.get('msg_type', None) |
|
278 | msg_type = msg.get('msg_type', None) | |
@@ -299,19 +309,20 b' class Hub(object):' | |||||
299 | else: |
|
309 | else: | |
300 | handler(idents, msg) |
|
310 | handler(idents, msg) | |
301 |
|
311 | |||
302 |
def dispatch_ |
|
312 | def dispatch_monitor_traffic(self, msg): | |
303 |
"""all ME and Task queue messages come through here |
|
313 | """all ME and Task queue messages come through here, as well as | |
304 | logger.debug("queue traffic: %s"%msg[:2]) |
|
314 | IOPub traffic.""" | |
|
315 | logger.debug("monitor traffic: %s"%msg[:2]) | |||
305 | switch = msg[0] |
|
316 | switch = msg[0] | |
306 | idents, msg = self.session.feed_identities(msg[1:]) |
|
317 | idents, msg = self.session.feed_identities(msg[1:]) | |
307 | if not idents: |
|
318 | if not idents: | |
308 |
logger.error("Bad |
|
319 | logger.error("Bad Monitor Message: %s"%msg) | |
309 | return |
|
320 | return | |
310 |
handler = self. |
|
321 | handler = self.monitor_handlers.get(switch, None) | |
311 | if handler is not None: |
|
322 | if handler is not None: | |
312 | handler(idents, msg) |
|
323 | handler(idents, msg) | |
313 | else: |
|
324 | else: | |
314 |
logger.error("Invalid m |
|
325 | logger.error("Invalid monitor topic: %s"%switch) | |
315 |
|
326 | |||
316 |
|
327 | |||
317 | def dispatch_client_msg(self, msg): |
|
328 | def dispatch_client_msg(self, msg): | |
@@ -486,7 +497,7 b' class Hub(object):' | |||||
486 | msg = self.session.unpack_message(msg, content=False) |
|
497 | msg = self.session.unpack_message(msg, content=False) | |
487 | except: |
|
498 | except: | |
488 | logger.error("task::invalid task result message send to %r: %s"%( |
|
499 | logger.error("task::invalid task result message send to %r: %s"%( | |
489 | client_id, msg)) |
|
500 | client_id, msg), exc_info=True) | |
490 | raise |
|
501 | raise | |
491 | return |
|
502 | return | |
492 |
|
503 | |||
@@ -532,7 +543,7 b' class Hub(object):' | |||||
532 | try: |
|
543 | try: | |
533 | msg = self.session.unpack_message(msg, content=True) |
|
544 | msg = self.session.unpack_message(msg, content=True) | |
534 | except: |
|
545 | except: | |
535 | logger.error("task::invalid task tracking message") |
|
546 | logger.error("task::invalid task tracking message", exc_info=True) | |
536 | return |
|
547 | return | |
537 | content = msg['content'] |
|
548 | content = msg['content'] | |
538 | print (content) |
|
549 | print (content) | |
@@ -557,6 +568,43 b' class Hub(object):' | |||||
557 | # self.session.send('mia_reply', content=content, idents=client_id) |
|
568 | # self.session.send('mia_reply', content=content, idents=client_id) | |
558 |
|
569 | |||
559 |
|
570 | |||
|
571 | #--------------------- IOPub Traffic ------------------------------ | |||
|
572 | ||||
|
573 | def save_iopub_message(self, topics, msg): | |||
|
574 | """save an iopub message into the db""" | |||
|
575 | print (topics) | |||
|
576 | try: | |||
|
577 | msg = self.session.unpack_message(msg, content=True) | |||
|
578 | except: | |||
|
579 | logger.error("iopub::invalid IOPub message", exc_info=True) | |||
|
580 | return | |||
|
581 | ||||
|
582 | parent = msg['parent_header'] | |||
|
583 | msg_id = parent['msg_id'] | |||
|
584 | msg_type = msg['msg_type'] | |||
|
585 | content = msg['content'] | |||
|
586 | ||||
|
587 | # ensure msg_id is in db | |||
|
588 | try: | |||
|
589 | rec = self.db.get_record(msg_id) | |||
|
590 | except: | |||
|
591 | logger.error("iopub::IOPub message has invalid parent", exc_info=True) | |||
|
592 | return | |||
|
593 | # stream | |||
|
594 | d = {} | |||
|
595 | if msg_type == 'stream': | |||
|
596 | name = content['name'] | |||
|
597 | s = rec[name] or '' | |||
|
598 | d[name] = s + content['data'] | |||
|
599 | ||||
|
600 | elif msg_type == 'pyerr': | |||
|
601 | d['pyerr'] = content | |||
|
602 | else: | |||
|
603 | d[msg_type] = content['data'] | |||
|
604 | ||||
|
605 | self.db.update_record(msg_id, d) | |||
|
606 | ||||
|
607 | ||||
560 |
|
608 | |||
561 | #------------------------------------------------------------------------- |
|
609 | #------------------------------------------------------------------------- | |
562 | # Registration requests |
|
610 | # Registration requests | |
@@ -579,7 +627,7 b' class Hub(object):' | |||||
579 | try: |
|
627 | try: | |
580 | queue = content['queue'] |
|
628 | queue = content['queue'] | |
581 | except KeyError: |
|
629 | except KeyError: | |
582 | logger.error("registration::queue not specified") |
|
630 | logger.error("registration::queue not specified", exc_info=True) | |
583 | return |
|
631 | return | |
584 | heart = content.get('heartbeat', None) |
|
632 | heart = content.get('heartbeat', None) | |
585 | """register a new engine, and create the socket(s) necessary""" |
|
633 | """register a new engine, and create the socket(s) necessary""" | |
@@ -639,7 +687,7 b' class Hub(object):' | |||||
639 | try: |
|
687 | try: | |
640 | eid = msg['content']['id'] |
|
688 | eid = msg['content']['id'] | |
641 | except: |
|
689 | except: | |
642 | logger.error("registration::bad engine id for unregistration: %s"%ident) |
|
690 | logger.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True) | |
643 | return |
|
691 | return | |
644 | logger.info("registration::unregister_engine(%s)"%eid) |
|
692 | logger.info("registration::unregister_engine(%s)"%eid) | |
645 | content=dict(id=eid, queue=self.engines[eid].queue) |
|
693 | content=dict(id=eid, queue=self.engines[eid].queue) | |
@@ -662,7 +710,7 b' class Hub(object):' | |||||
662 | try: |
|
710 | try: | |
663 | (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) |
|
711 | (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) | |
664 | except KeyError: |
|
712 | except KeyError: | |
665 | logger.error("registration::tried to finish nonexistant registration") |
|
713 | logger.error("registration::tried to finish nonexistant registration", exc_info=True) | |
666 | return |
|
714 | return | |
667 | logger.info("registration::finished registering engine %i:%r"%(eid,queue)) |
|
715 | logger.info("registration::finished registering engine %i:%r"%(eid,queue)) | |
668 | if purge is not None: |
|
716 | if purge is not None: | |
@@ -820,9 +868,13 b' class Hub(object):' | |||||
820 | completed.append(msg_id) |
|
868 | completed.append(msg_id) | |
821 | if not statusonly: |
|
869 | if not statusonly: | |
822 | rec = records[msg_id] |
|
870 | rec = records[msg_id] | |
|
871 | io_dict = {} | |||
|
872 | for key in 'pyin pyout pyerr stdout stderr'.split(): | |||
|
873 | io_dict[key] = rec[key] | |||
823 | content[msg_id] = { 'result_content': rec['result_content'], |
|
874 | content[msg_id] = { 'result_content': rec['result_content'], | |
824 | 'header': rec['header'], |
|
875 | 'header': rec['header'], | |
825 | 'result_header' : rec['result_header'], |
|
876 | 'result_header' : rec['result_header'], | |
|
877 | 'io' : io_dict, | |||
826 | } |
|
878 | } | |
827 | buffers.extend(map(str, rec['result_buffers'])) |
|
879 | buffers.extend(map(str, rec['result_buffers'])) | |
828 | else: |
|
880 | else: |
@@ -30,9 +30,9 b' from IPython.external.decorator import decorator' | |||||
30 |
|
30 | |||
31 | @decorator |
|
31 | @decorator | |
32 | def logged(f,self,*args,**kwargs): |
|
32 | def logged(f,self,*args,**kwargs): | |
33 | print ("#--------------------") |
|
33 | # print ("#--------------------") | |
34 | print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) |
|
34 | # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) | |
35 | print ("#--") |
|
35 | # print ("#--") | |
36 | return f(self,*args, **kwargs) |
|
36 | return f(self,*args, **kwargs) | |
37 |
|
37 | |||
38 | #---------------------------------------------------------------------- |
|
38 | #---------------------------------------------------------------------- |
@@ -28,6 +28,9 b' from IPython.core import ultratb' | |||||
28 | from IPython.utils.traitlets import HasTraits, Instance, List |
|
28 | from IPython.utils.traitlets import HasTraits, Instance, List | |
29 | from IPython.zmq.completer import KernelCompleter |
|
29 | from IPython.zmq.completer import KernelCompleter | |
30 | from IPython.zmq.log import logger # a Logger object |
|
30 | from IPython.zmq.log import logger # a Logger object | |
|
31 | from IPython.zmq.iostream import OutStream | |||
|
32 | from IPython.zmq.displayhook import DisplayHook | |||
|
33 | ||||
31 |
|
34 | |||
32 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ |
|
35 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ | |
33 | unpack_apply_message, ISO8601, wrap_exception |
|
36 | unpack_apply_message, ISO8601, wrap_exception | |
@@ -36,7 +39,7 b' import heartmonitor' | |||||
36 | from client import Client |
|
39 | from client import Client | |
37 |
|
40 | |||
38 | def printer(*args): |
|
41 | def printer(*args): | |
39 | pprint(args) |
|
42 | pprint(args, stream=sys.__stdout__) | |
40 |
|
43 | |||
41 | #----------------------------------------------------------------------------- |
|
44 | #----------------------------------------------------------------------------- | |
42 | # Main kernel class |
|
45 | # Main kernel class | |
@@ -59,6 +62,7 b' class Kernel(HasTraits):' | |||||
59 | def __init__(self, **kwargs): |
|
62 | def __init__(self, **kwargs): | |
60 | super(Kernel, self).__init__(**kwargs) |
|
63 | super(Kernel, self).__init__(**kwargs) | |
61 | self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) |
|
64 | self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) | |
|
65 | self.prefix = 'engine.%s'%self.identity | |||
62 | self.user_ns = {} |
|
66 | self.user_ns = {} | |
63 | self.history = [] |
|
67 | self.history = [] | |
64 | self.compiler = CommandCompiler() |
|
68 | self.compiler = CommandCompiler() | |
@@ -212,18 +216,22 b' class Kernel(HasTraits):' | |||||
212 | return |
|
216 | return | |
213 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
217 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
214 | # self.iopub_stream.send(pyin_msg) |
|
218 | # self.iopub_stream.send(pyin_msg) | |
215 |
self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent |
|
219 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, | |
|
220 | ident=self.identity+'.pyin') | |||
216 | started = datetime.now().strftime(ISO8601) |
|
221 | started = datetime.now().strftime(ISO8601) | |
217 | try: |
|
222 | try: | |
218 | comp_code = self.compiler(code, '<zmq-kernel>') |
|
223 | comp_code = self.compiler(code, '<zmq-kernel>') | |
219 | # allow for not overriding displayhook |
|
224 | # allow for not overriding displayhook | |
220 | if hasattr(sys.displayhook, 'set_parent'): |
|
225 | if hasattr(sys.displayhook, 'set_parent'): | |
221 | sys.displayhook.set_parent(parent) |
|
226 | sys.displayhook.set_parent(parent) | |
|
227 | sys.stdout.set_parent(parent) | |||
|
228 | sys.stderr.set_parent(parent) | |||
222 | exec comp_code in self.user_ns, self.user_ns |
|
229 | exec comp_code in self.user_ns, self.user_ns | |
223 | except: |
|
230 | except: | |
224 | exc_content = self._wrap_exception('execute') |
|
231 | exc_content = self._wrap_exception('execute') | |
225 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
232 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
226 |
self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent |
|
233 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, | |
|
234 | ident=self.identity+'.pyerr') | |||
227 | reply_content = exc_content |
|
235 | reply_content = exc_content | |
228 | else: |
|
236 | else: | |
229 | reply_content = {'status' : 'ok'} |
|
237 | reply_content = {'status' : 'ok'} | |
@@ -247,7 +255,7 b' class Kernel(HasTraits):' | |||||
247 | return self.completer.complete(msg.content.line, msg.content.text) |
|
255 | return self.completer.complete(msg.content.line, msg.content.text) | |
248 |
|
256 | |||
249 | def apply_request(self, stream, ident, parent): |
|
257 | def apply_request(self, stream, ident, parent): | |
250 | print (parent) |
|
258 | # print (parent) | |
251 | try: |
|
259 | try: | |
252 | content = parent[u'content'] |
|
260 | content = parent[u'content'] | |
253 | bufs = parent[u'buffers'] |
|
261 | bufs = parent[u'buffers'] | |
@@ -266,6 +274,8 b' class Kernel(HasTraits):' | |||||
266 | # allow for not overriding displayhook |
|
274 | # allow for not overriding displayhook | |
267 | if hasattr(sys.displayhook, 'set_parent'): |
|
275 | if hasattr(sys.displayhook, 'set_parent'): | |
268 | sys.displayhook.set_parent(parent) |
|
276 | sys.displayhook.set_parent(parent) | |
|
277 | sys.stdout.set_parent(parent) | |||
|
278 | sys.stderr.set_parent(parent) | |||
269 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns |
|
279 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns | |
270 | if bound: |
|
280 | if bound: | |
271 | working = self.user_ns |
|
281 | working = self.user_ns | |
@@ -305,7 +315,8 b' class Kernel(HasTraits):' | |||||
305 | except: |
|
315 | except: | |
306 | exc_content = self._wrap_exception('apply') |
|
316 | exc_content = self._wrap_exception('apply') | |
307 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
317 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
308 |
self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent |
|
318 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, | |
|
319 | ident=self.identity+'.pyerr') | |||
309 | reply_content = exc_content |
|
320 | reply_content = exc_content | |
310 | result_buf = [] |
|
321 | result_buf = [] | |
311 |
|
322 | |||
@@ -318,7 +329,7 b' class Kernel(HasTraits):' | |||||
318 | # self.reply_socket.send_json(reply_msg) |
|
329 | # self.reply_socket.send_json(reply_msg) | |
319 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, |
|
330 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | |
320 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) |
|
331 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) | |
321 | print(Message(reply_msg), file=sys.__stdout__) |
|
332 | # print(Message(reply_msg), file=sys.__stdout__) | |
322 | # if reply_msg['content']['status'] == u'error': |
|
333 | # if reply_msg['content']['status'] == u'error': | |
323 | # self.abort_queues() |
|
334 | # self.abort_queues() | |
324 |
|
335 | |||
@@ -364,7 +375,7 b' class Kernel(HasTraits):' | |||||
364 |
|
375 | |||
365 | if self.iopub_stream: |
|
376 | if self.iopub_stream: | |
366 | self.iopub_stream.on_err(printer) |
|
377 | self.iopub_stream.on_err(printer) | |
367 | self.iopub_stream.on_send(printer) |
|
378 | # self.iopub_stream.on_send(printer) | |
368 |
|
379 | |||
369 | #### while True mode: |
|
380 | #### while True mode: | |
370 | # while True: |
|
381 | # while True: | |
@@ -388,7 +399,9 b' class Kernel(HasTraits):' | |||||
388 | # time.sleep(1e-3) |
|
399 | # time.sleep(1e-3) | |
389 |
|
400 | |||
390 | def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, |
|
401 | def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, | |
391 |
client_addr=None, loop=None, context=None, key=None |
|
402 | client_addr=None, loop=None, context=None, key=None, | |
|
403 | out_stream_factory=OutStream, display_hook_factory=DisplayHook): | |||
|
404 | ||||
392 | # create loop, context, and session: |
|
405 | # create loop, context, and session: | |
393 | if loop is None: |
|
406 | if loop is None: | |
394 | loop = ioloop.IOLoop.instance() |
|
407 | loop = ioloop.IOLoop.instance() | |
@@ -417,6 +430,17 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,' | |||||
417 | iopub_stream.setsockopt(zmq.IDENTITY, identity) |
|
430 | iopub_stream.setsockopt(zmq.IDENTITY, identity) | |
418 | iopub_stream.connect(iopub_addr) |
|
431 | iopub_stream.connect(iopub_addr) | |
419 |
|
432 | |||
|
433 | # Redirect input streams and set a display hook. | |||
|
434 | if out_stream_factory: | |||
|
435 | sys.stdout = out_stream_factory(session, iopub_stream, u'stdout') | |||
|
436 | sys.stdout.topic = identity+'.stdout' | |||
|
437 | sys.stderr = out_stream_factory(session, iopub_stream, u'stderr') | |||
|
438 | sys.stderr.topic = identity+'.stderr' | |||
|
439 | if display_hook_factory: | |||
|
440 | sys.displayhook = display_hook_factory(session, iopub_stream) | |||
|
441 | sys.displayhook.topic = identity+'.pyout' | |||
|
442 | ||||
|
443 | ||||
420 | # launch heartbeat |
|
444 | # launch heartbeat | |
421 | heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) |
|
445 | heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) | |
422 | heart.start() |
|
446 | heart.start() |
@@ -336,7 +336,7 b' class StreamSession(object):' | |||||
336 | return header.get('key', None) == self.key |
|
336 | return header.get('key', None) == self.key | |
337 |
|
337 | |||
338 |
|
338 | |||
339 | def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None): |
|
339 | def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None): | |
340 | """Build and send a message via stream or socket. |
|
340 | """Build and send a message via stream or socket. | |
341 |
|
341 | |||
342 | Parameters |
|
342 | Parameters | |
@@ -344,10 +344,9 b' class StreamSession(object):' | |||||
344 |
|
344 | |||
345 | stream : zmq.Socket or ZMQStream |
|
345 | stream : zmq.Socket or ZMQStream | |
346 | the socket-like object used to send the data |
|
346 | the socket-like object used to send the data | |
347 | msg_type : str or Message/dict |
|
347 | msg_or_type : str or Message/dict | |
348 | Normally, msg_type will be |
|
348 | Normally, msg_or_type will be a msg_type unless a message is being sent more | |
349 |
|
349 | than once. | ||
350 |
|
||||
351 |
|
350 | |||
352 | Returns |
|
351 | Returns | |
353 | ------- |
|
352 | ------- | |
@@ -356,13 +355,13 b' class StreamSession(object):' | |||||
356 | the nice wrapped dict-like object containing the headers |
|
355 | the nice wrapped dict-like object containing the headers | |
357 |
|
356 | |||
358 | """ |
|
357 | """ | |
359 | if isinstance(msg_type, (Message, dict)): |
|
358 | if isinstance(msg_or_type, (Message, dict)): | |
360 | # we got a Message, not a msg_type |
|
359 | # we got a Message, not a msg_type | |
361 | # don't build a new Message |
|
360 | # don't build a new Message | |
362 | msg = msg_type |
|
361 | msg = msg_or_type | |
363 | content = msg['content'] |
|
362 | content = msg['content'] | |
364 | else: |
|
363 | else: | |
365 | msg = self.msg(msg_type, content, parent, subheader) |
|
364 | msg = self.msg(msg_or_type, content, parent, subheader) | |
366 | buffers = [] if buffers is None else buffers |
|
365 | buffers = [] if buffers is None else buffers | |
367 | to_send = [] |
|
366 | to_send = [] | |
368 | if isinstance(ident, list): |
|
367 | if isinstance(ident, list): |
General Comments 0
You need to be logged in to leave comments.
Login now