##// END OF EJS Templates
Fixing code to assume msg_type and msg_id are top-level....
Brian E. Granger -
Show More
@@ -96,7 +96,7 b' class BaseFrontendMixin(object):'
96 """ Calls the frontend handler associated with the message type of the
96 """ Calls the frontend handler associated with the message type of the
97 given message.
97 given message.
98 """
98 """
99 msg_type = msg['msg_type']
99 msg_type = msg['header']['msg_type']
100 handler = getattr(self, '_handle_' + msg_type, None)
100 handler = getattr(self, '_handle_' + msg_type, None)
101 if handler:
101 if handler:
102 handler(msg)
102 handler(msg)
@@ -66,7 +66,7 b' class QtShellSocketChannel(SocketChannelQObject, ShellSocketChannel):'
66 self.message_received.emit(msg)
66 self.message_received.emit(msg)
67
67
68 # Emit signals for specialized message types.
68 # Emit signals for specialized message types.
69 msg_type = msg['msg_type']
69 msg_type = msg['header']['msg_type']
70 signal = getattr(self, msg_type, None)
70 signal = getattr(self, msg_type, None)
71 if signal:
71 if signal:
72 signal.emit(msg)
72 signal.emit(msg)
@@ -122,7 +122,7 b' class QtSubSocketChannel(SocketChannelQObject, SubSocketChannel):'
122 # Emit the generic signal.
122 # Emit the generic signal.
123 self.message_received.emit(msg)
123 self.message_received.emit(msg)
124 # Emit signals for specialized message types.
124 # Emit signals for specialized message types.
125 msg_type = msg['msg_type']
125 msg_type = msg['header']['msg_type']
126 signal = getattr(self, msg_type + '_received', None)
126 signal = getattr(self, msg_type + '_received', None)
127 if signal:
127 if signal:
128 signal.emit(msg)
128 signal.emit(msg)
@@ -155,7 +155,7 b' class QtStdInSocketChannel(SocketChannelQObject, StdInSocketChannel):'
155 self.message_received.emit(msg)
155 self.message_received.emit(msg)
156
156
157 # Emit signals for specialized message types.
157 # Emit signals for specialized message types.
158 msg_type = msg['msg_type']
158 msg_type = msg['header']['msg_type']
159 if msg_type == 'input_request':
159 if msg_type == 'input_request':
160 self.input_requested.emit(msg)
160 self.input_requested.emit(msg)
161
161
@@ -670,7 +670,7 b' class Client(HasTraits):'
670 while msg is not None:
670 while msg is not None:
671 if self.debug:
671 if self.debug:
672 pprint(msg)
672 pprint(msg)
673 msg_type = msg['msg_type']
673 msg_type = msg['header']['msg_type']
674 handler = self._notification_handlers.get(msg_type, None)
674 handler = self._notification_handlers.get(msg_type, None)
675 if handler is None:
675 if handler is None:
676 raise Exception("Unhandled message type: %s"%msg.msg_type)
676 raise Exception("Unhandled message type: %s"%msg.msg_type)
@@ -684,7 +684,7 b' class Client(HasTraits):'
684 while msg is not None:
684 while msg is not None:
685 if self.debug:
685 if self.debug:
686 pprint(msg)
686 pprint(msg)
687 msg_type = msg['msg_type']
687 msg_type = msg['header']['msg_type']
688 handler = self._queue_handlers.get(msg_type, None)
688 handler = self._queue_handlers.get(msg_type, None)
689 if handler is None:
689 if handler is None:
690 raise Exception("Unhandled message type: %s"%msg.msg_type)
690 raise Exception("Unhandled message type: %s"%msg.msg_type)
@@ -729,7 +729,7 b' class Client(HasTraits):'
729 msg_id = parent['msg_id']
729 msg_id = parent['msg_id']
730 content = msg['content']
730 content = msg['content']
731 header = msg['header']
731 header = msg['header']
732 msg_type = msg['msg_type']
732 msg_type = msg['header']['msg_type']
733
733
734 # init metadata:
734 # init metadata:
735 md = self.metadata[msg_id]
735 md = self.metadata[msg_id]
@@ -994,7 +994,7 b' class Client(HasTraits):'
994 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
994 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
995 subheader=subheader, track=track)
995 subheader=subheader, track=track)
996
996
997 msg_id = msg['msg_id']
997 msg_id = msg['header']['msg_id']
998 self.outstanding.add(msg_id)
998 self.outstanding.add(msg_id)
999 if ident:
999 if ident:
1000 # possibly routed to a specific engine
1000 # possibly routed to a specific engine
@@ -523,7 +523,7 b' class DirectView(View):'
523 ident=ident)
523 ident=ident)
524 if track:
524 if track:
525 trackers.append(msg['tracker'])
525 trackers.append(msg['tracker'])
526 msg_ids.append(msg['msg_id'])
526 msg_ids.append(msg['header']['msg_id'])
527 tracker = None if track is False else zmq.MessageTracker(*trackers)
527 tracker = None if track is False else zmq.MessageTracker(*trackers)
528 ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
528 ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
529 if block:
529 if block:
@@ -980,7 +980,7 b' class LoadBalancedView(View):'
980 subheader=subheader)
980 subheader=subheader)
981 tracker = None if track is False else msg['tracker']
981 tracker = None if track is False else msg['tracker']
982
982
983 ar = AsyncResult(self.client, msg['msg_id'], fname=f.__name__, targets=None, tracker=tracker)
983 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=f.__name__, targets=None, tracker=tracker)
984
984
985 if block:
985 if block:
986 try:
986 try:
@@ -494,7 +494,7 b' class Hub(SessionFactory):'
494 return
494 return
495 # print client_id, header, parent, content
495 # print client_id, header, parent, content
496 #switch on message type:
496 #switch on message type:
497 msg_type = msg['msg_type']
497 msg_type = msg['header']['msg_type']
498 self.log.info("client::client %r requested %r"%(client_id, msg_type))
498 self.log.info("client::client %r requested %r"%(client_id, msg_type))
499 handler = self.query_handlers.get(msg_type, None)
499 handler = self.query_handlers.get(msg_type, None)
500 try:
500 try:
@@ -791,7 +791,7 b' class Hub(SessionFactory):'
791 self.log.error("iopub::invalid IOPub message: %r"%msg)
791 self.log.error("iopub::invalid IOPub message: %r"%msg)
792 return
792 return
793 msg_id = parent['msg_id']
793 msg_id = parent['msg_id']
794 msg_type = msg['msg_type']
794 msg_type = msg['header']['msg_type']
795 content = msg['content']
795 content = msg['content']
796
796
797 # ensure msg_id is in db
797 # ensure msg_id is in db
@@ -216,7 +216,7 b' class TaskScheduler(SessionFactory):'
216 self.log.warn("task::Unauthorized message from: %r"%idents)
216 self.log.warn("task::Unauthorized message from: %r"%idents)
217 return
217 return
218
218
219 msg_type = msg['msg_type']
219 msg_type = msg['header']['msg_type']
220
220
221 handler = self._notification_handlers.get(msg_type, None)
221 handler = self._notification_handlers.get(msg_type, None)
222 if handler is None:
222 if handler is None:
@@ -44,7 +44,7 b' class KernelStarter(object):'
44 except:
44 except:
45 print ("bad msg: %s"%msg)
45 print ("bad msg: %s"%msg)
46
46
47 msgtype = msg['msg_type']
47 msgtype = msg['header']['msg_type']
48 handler = self.handlers.get(msgtype, None)
48 handler = self.handlers.get(msgtype, None)
49 if handler is None:
49 if handler is None:
50 self.downstream.send_multipart(raw_msg, copy=False)
50 self.downstream.send_multipart(raw_msg, copy=False)
@@ -58,7 +58,7 b' class KernelStarter(object):'
58 except:
58 except:
59 print ("bad msg: %s"%msg)
59 print ("bad msg: %s"%msg)
60
60
61 msgtype = msg['msg_type']
61 msgtype = msg['header']['msg_type']
62 handler = self.handlers.get(msgtype, None)
62 handler = self.handlers.get(msgtype, None)
63 if handler is None:
63 if handler is None:
64 self.upstream.send_multipart(raw_msg, copy=False)
64 self.upstream.send_multipart(raw_msg, copy=False)
@@ -227,4 +227,4 b' def make_starter(up_addr, down_addr, *args, **kwargs):'
227 starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
227 starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
228 starter.start()
228 starter.start()
229 loop.start()
229 loop.start()
230 No newline at end of file
230
@@ -150,7 +150,7 b' class Kernel(SessionFactory):'
150
150
151 self.log.info("Aborting:")
151 self.log.info("Aborting:")
152 self.log.info(str(msg))
152 self.log.info(str(msg))
153 msg_type = msg['msg_type']
153 msg_type = msg['header']['msg_type']
154 reply_type = msg_type.split('_')[0] + '_reply'
154 reply_type = msg_type.split('_')[0] + '_reply'
155 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
155 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
156 # self.reply_socket.send(ident,zmq.SNDMORE)
156 # self.reply_socket.send(ident,zmq.SNDMORE)
@@ -205,9 +205,9 b' class Kernel(SessionFactory):'
205 header = msg['header']
205 header = msg['header']
206 msg_id = header['msg_id']
206 msg_id = header['msg_id']
207
207
208 handler = self.control_handlers.get(msg['msg_type'], None)
208 handler = self.control_handlers.get(msg['header']['msg_type'], None)
209 if handler is None:
209 if handler is None:
210 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
210 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['header']['msg_type'])
211 else:
211 else:
212 handler(self.control_stream, idents, msg)
212 handler(self.control_stream, idents, msg)
213
213
@@ -386,14 +386,14 b' class Kernel(SessionFactory):'
386 if self.check_aborted(msg_id):
386 if self.check_aborted(msg_id):
387 self.aborted.remove(msg_id)
387 self.aborted.remove(msg_id)
388 # is it safe to assume a msg_id will not be resubmitted?
388 # is it safe to assume a msg_id will not be resubmitted?
389 reply_type = msg['msg_type'].split('_')[0] + '_reply'
389 reply_type = msg['header']['msg_type'].split('_')[0] + '_reply'
390 status = {'status' : 'aborted'}
390 status = {'status' : 'aborted'}
391 reply_msg = self.session.send(stream, reply_type, subheader=status,
391 reply_msg = self.session.send(stream, reply_type, subheader=status,
392 content=status, parent=msg, ident=idents)
392 content=status, parent=msg, ident=idents)
393 return
393 return
394 handler = self.shell_handlers.get(msg['msg_type'], None)
394 handler = self.shell_handlers.get(msg['header']['msg_type'], None)
395 if handler is None:
395 if handler is None:
396 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
396 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['header']['msg_type'])
397 else:
397 else:
398 handler(stream, idents, msg)
398 handler(stream, idents, msg)
399
399
@@ -56,8 +56,8 b' class TestDictBackend(TestCase):'
56 msg = self.session.msg('apply_request', content=dict(a=5))
56 msg = self.session.msg('apply_request', content=dict(a=5))
57 msg['buffers'] = []
57 msg['buffers'] = []
58 rec = init_record(msg)
58 rec = init_record(msg)
59 msg_ids.append(msg['msg_id'])
59 msg_ids.append(msg['header']['msg_id'])
60 self.db.add_record(msg['msg_id'], rec)
60 self.db.add_record(msg['header']['msg_id'], rec)
61 return msg_ids
61 return msg_ids
62
62
63 def test_add_record(self):
63 def test_add_record(self):
@@ -133,11 +133,11 b' class Kernel(Configurable):'
133 # Print some info about this message and leave a '--->' marker, so it's
133 # Print some info about this message and leave a '--->' marker, so it's
134 # easier to trace visually the message chain when debugging. Each
134 # easier to trace visually the message chain when debugging. Each
135 # handler prints its message at the end.
135 # handler prints its message at the end.
136 self.log.debug('\n*** MESSAGE TYPE:'+str(msg['msg_type'])+'***')
136 self.log.debug('\n*** MESSAGE TYPE:'+str(msg['header']['msg_type'])+'***')
137 self.log.debug(' Content: '+str(msg['content'])+'\n --->\n ')
137 self.log.debug(' Content: '+str(msg['content'])+'\n --->\n ')
138
138
139 # Find and call actual handler for message
139 # Find and call actual handler for message
140 handler = self.handlers.get(msg['msg_type'], None)
140 handler = self.handlers.get(msg['header']['msg_type'], None)
141 if handler is None:
141 if handler is None:
142 self.log.error("UNKNOWN MESSAGE TYPE:" +str(msg))
142 self.log.error("UNKNOWN MESSAGE TYPE:" +str(msg))
143 else:
143 else:
@@ -375,7 +375,7 b' class Kernel(Configurable):'
375 "Unexpected missing message part."
375 "Unexpected missing message part."
376
376
377 self.log.debug("Aborting:\n"+str(Message(msg)))
377 self.log.debug("Aborting:\n"+str(Message(msg)))
378 msg_type = msg['msg_type']
378 msg_type = msg['header']['msg_type']
379 reply_type = msg_type.split('_')[0] + '_reply'
379 reply_type = msg_type.split('_')[0] + '_reply'
380 reply_msg = self.session.send(self.shell_socket, reply_type,
380 reply_msg = self.session.send(self.shell_socket, reply_type,
381 {'status' : 'aborted'}, msg, ident=ident)
381 {'status' : 'aborted'}, msg, ident=ident)
@@ -190,7 +190,7 b' class Kernel(HasTraits):'
190 else:
190 else:
191 assert ident is not None, "Missing message part."
191 assert ident is not None, "Missing message part."
192 self.log.debug("Aborting: %s"%Message(msg))
192 self.log.debug("Aborting: %s"%Message(msg))
193 msg_type = msg['msg_type']
193 msg_type = msg['header']['msg_type']
194 reply_type = msg_type.split('_')[0] + '_reply'
194 reply_type = msg_type.split('_')[0] + '_reply'
195 reply_msg = self.session.send(self.shell_socket, reply_type, {'status':'aborted'}, msg, ident=ident)
195 reply_msg = self.session.send(self.shell_socket, reply_type, {'status':'aborted'}, msg, ident=ident)
196 self.log.debug(Message(reply_msg))
196 self.log.debug(Message(reply_msg))
@@ -359,9 +359,7 b' class Session(Configurable):'
359 """
359 """
360 msg = {}
360 msg = {}
361 msg['header'] = self.msg_header(msg_type)
361 msg['header'] = self.msg_header(msg_type)
362 msg['msg_id'] = msg['header']['msg_id']
363 msg['parent_header'] = {} if parent is None else extract_header(parent)
362 msg['parent_header'] = {} if parent is None else extract_header(parent)
364 msg['msg_type'] = msg_type
365 msg['content'] = {} if content is None else content
363 msg['content'] = {} if content is None else content
366 sub = {} if subheader is None else subheader
364 sub = {} if subheader is None else subheader
367 msg['header'].update(sub)
365 msg['header'].update(sub)
@@ -651,7 +649,6 b' class Session(Configurable):'
651 if not len(msg_list) >= minlen:
649 if not len(msg_list) >= minlen:
652 raise TypeError("malformed message, must have at least %i elements"%minlen)
650 raise TypeError("malformed message, must have at least %i elements"%minlen)
653 message['header'] = self.unpack(msg_list[1])
651 message['header'] = self.unpack(msg_list[1])
654 message['msg_type'] = message['header']['msg_type']
655 message['parent_header'] = self.unpack(msg_list[2])
652 message['parent_header'] = self.unpack(msg_list[2])
656 if content:
653 if content:
657 message['content'] = self.unpack(msg_list[3])
654 message['content'] = self.unpack(msg_list[3])
@@ -37,10 +37,8 b' class TestSession(SessionTestCase):'
37 self.assertTrue(isinstance(msg['content'],dict))
37 self.assertTrue(isinstance(msg['content'],dict))
38 self.assertTrue(isinstance(msg['header'],dict))
38 self.assertTrue(isinstance(msg['header'],dict))
39 self.assertTrue(isinstance(msg['parent_header'],dict))
39 self.assertTrue(isinstance(msg['parent_header'],dict))
40 self.assertEquals(msg['msg_type'], 'execute')
40 self.assertEquals(msg['header']['msg_type'], 'execute')
41
41
42
43
44 def test_args(self):
42 def test_args(self):
45 """initialization arguments for Session"""
43 """initialization arguments for Session"""
46 s = self.session
44 s = self.session
General Comments 0
You need to be logged in to leave comments. Login now