##// END OF EJS Templates
Fixing another bug in msg_type refactoring.
Brian E. Granger -
Show More
@@ -1,440 +1,440
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 * Brian Granger
8 * Brian Granger
9 * Fernando Perez
9 * Fernando Perez
10 * Evan Patterson
10 * Evan Patterson
11 """
11 """
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2010-2011 The IPython Development Team
13 # Copyright (C) 2010-2011 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 # Standard library imports.
23 # Standard library imports.
24 from __future__ import print_function
24 from __future__ import print_function
25
25
26 import sys
26 import sys
27 import time
27 import time
28
28
29 from code import CommandCompiler
29 from code import CommandCompiler
30 from datetime import datetime
30 from datetime import datetime
31 from pprint import pprint
31 from pprint import pprint
32
32
33 # System library imports.
33 # System library imports.
34 import zmq
34 import zmq
35 from zmq.eventloop import ioloop, zmqstream
35 from zmq.eventloop import ioloop, zmqstream
36
36
37 # Local imports.
37 # Local imports.
38 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes
38 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes
39 from IPython.zmq.completer import KernelCompleter
39 from IPython.zmq.completer import KernelCompleter
40
40
41 from IPython.parallel.error import wrap_exception
41 from IPython.parallel.error import wrap_exception
42 from IPython.parallel.factory import SessionFactory
42 from IPython.parallel.factory import SessionFactory
43 from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
43 from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
44
44
45 def printer(*args):
45 def printer(*args):
46 pprint(args, stream=sys.__stdout__)
46 pprint(args, stream=sys.__stdout__)
47
47
48
48
49 class _Passer(zmqstream.ZMQStream):
49 class _Passer(zmqstream.ZMQStream):
50 """Empty class that implements `send()` that does nothing.
50 """Empty class that implements `send()` that does nothing.
51
51
52 Subclass ZMQStream for Session typechecking
52 Subclass ZMQStream for Session typechecking
53
53
54 """
54 """
55 def __init__(self, *args, **kwargs):
55 def __init__(self, *args, **kwargs):
56 pass
56 pass
57
57
58 def send(self, *args, **kwargs):
58 def send(self, *args, **kwargs):
59 pass
59 pass
60 send_multipart = send
60 send_multipart = send
61
61
62
62
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64 # Main kernel class
64 # Main kernel class
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66
66
67 class Kernel(SessionFactory):
67 class Kernel(SessionFactory):
68
68
69 #---------------------------------------------------------------------------
69 #---------------------------------------------------------------------------
70 # Kernel interface
70 # Kernel interface
71 #---------------------------------------------------------------------------
71 #---------------------------------------------------------------------------
72
72
73 # kwargs:
73 # kwargs:
74 exec_lines = List(Unicode, config=True,
74 exec_lines = List(Unicode, config=True,
75 help="List of lines to execute")
75 help="List of lines to execute")
76
76
77 # identities:
77 # identities:
78 int_id = Int(-1)
78 int_id = Int(-1)
79 bident = CBytes()
79 bident = CBytes()
80 ident = Unicode()
80 ident = Unicode()
81 def _ident_changed(self, name, old, new):
81 def _ident_changed(self, name, old, new):
82 self.bident = asbytes(new)
82 self.bident = asbytes(new)
83
83
84 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
84 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
85
85
86 control_stream = Instance(zmqstream.ZMQStream)
86 control_stream = Instance(zmqstream.ZMQStream)
87 task_stream = Instance(zmqstream.ZMQStream)
87 task_stream = Instance(zmqstream.ZMQStream)
88 iopub_stream = Instance(zmqstream.ZMQStream)
88 iopub_stream = Instance(zmqstream.ZMQStream)
89 client = Instance('IPython.parallel.Client')
89 client = Instance('IPython.parallel.Client')
90
90
91 # internals
91 # internals
92 shell_streams = List()
92 shell_streams = List()
93 compiler = Instance(CommandCompiler, (), {})
93 compiler = Instance(CommandCompiler, (), {})
94 completer = Instance(KernelCompleter)
94 completer = Instance(KernelCompleter)
95
95
96 aborted = Set()
96 aborted = Set()
97 shell_handlers = Dict()
97 shell_handlers = Dict()
98 control_handlers = Dict()
98 control_handlers = Dict()
99
99
100 def _set_prefix(self):
100 def _set_prefix(self):
101 self.prefix = "engine.%s"%self.int_id
101 self.prefix = "engine.%s"%self.int_id
102
102
103 def _connect_completer(self):
103 def _connect_completer(self):
104 self.completer = KernelCompleter(self.user_ns)
104 self.completer = KernelCompleter(self.user_ns)
105
105
106 def __init__(self, **kwargs):
106 def __init__(self, **kwargs):
107 super(Kernel, self).__init__(**kwargs)
107 super(Kernel, self).__init__(**kwargs)
108 self._set_prefix()
108 self._set_prefix()
109 self._connect_completer()
109 self._connect_completer()
110
110
111 self.on_trait_change(self._set_prefix, 'id')
111 self.on_trait_change(self._set_prefix, 'id')
112 self.on_trait_change(self._connect_completer, 'user_ns')
112 self.on_trait_change(self._connect_completer, 'user_ns')
113
113
114 # Build dict of handlers for message types
114 # Build dict of handlers for message types
115 for msg_type in ['execute_request', 'complete_request', 'apply_request',
115 for msg_type in ['execute_request', 'complete_request', 'apply_request',
116 'clear_request']:
116 'clear_request']:
117 self.shell_handlers[msg_type] = getattr(self, msg_type)
117 self.shell_handlers[msg_type] = getattr(self, msg_type)
118
118
119 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
119 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
120 self.control_handlers[msg_type] = getattr(self, msg_type)
120 self.control_handlers[msg_type] = getattr(self, msg_type)
121
121
122 self._initial_exec_lines()
122 self._initial_exec_lines()
123
123
124 def _wrap_exception(self, method=None):
124 def _wrap_exception(self, method=None):
125 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
125 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
126 content=wrap_exception(e_info)
126 content=wrap_exception(e_info)
127 return content
127 return content
128
128
129 def _initial_exec_lines(self):
129 def _initial_exec_lines(self):
130 s = _Passer()
130 s = _Passer()
131 content = dict(silent=True, user_variable=[],user_expressions=[])
131 content = dict(silent=True, user_variable=[],user_expressions=[])
132 for line in self.exec_lines:
132 for line in self.exec_lines:
133 self.log.debug("executing initialization: %s"%line)
133 self.log.debug("executing initialization: %s"%line)
134 content.update({'code':line})
134 content.update({'code':line})
135 msg = self.session.msg('execute_request', content)
135 msg = self.session.msg('execute_request', content)
136 self.execute_request(s, [], msg)
136 self.execute_request(s, [], msg)
137
137
138
138
139 #-------------------- control handlers -----------------------------
139 #-------------------- control handlers -----------------------------
140 def abort_queues(self):
140 def abort_queues(self):
141 for stream in self.shell_streams:
141 for stream in self.shell_streams:
142 if stream:
142 if stream:
143 self.abort_queue(stream)
143 self.abort_queue(stream)
144
144
145 def abort_queue(self, stream):
145 def abort_queue(self, stream):
146 while True:
146 while True:
147 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
147 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
148 if msg is None:
148 if msg is None:
149 return
149 return
150
150
151 self.log.info("Aborting:")
151 self.log.info("Aborting:")
152 self.log.info(str(msg))
152 self.log.info(str(msg))
153 msg_type = msg['header']['msg_type']
153 msg_type = msg['header']['msg_type']
154 reply_type = msg_type.split('_')[0] + '_reply'
154 reply_type = msg_type.split('_')[0] + '_reply'
155 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
155 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
156 # self.reply_socket.send(ident,zmq.SNDMORE)
156 # self.reply_socket.send(ident,zmq.SNDMORE)
157 # self.reply_socket.send_json(reply_msg)
157 # self.reply_socket.send_json(reply_msg)
158 reply_msg = self.session.send(stream, reply_type,
158 reply_msg = self.session.send(stream, reply_type,
159 content={'status' : 'aborted'}, parent=msg, ident=idents)
159 content={'status' : 'aborted'}, parent=msg, ident=idents)
160 self.log.debug(str(reply_msg))
160 self.log.debug(str(reply_msg))
161 # We need to wait a bit for requests to come in. This can probably
161 # We need to wait a bit for requests to come in. This can probably
162 # be set shorter for true asynchronous clients.
162 # be set shorter for true asynchronous clients.
163 time.sleep(0.05)
163 time.sleep(0.05)
164
164
165 def abort_request(self, stream, ident, parent):
165 def abort_request(self, stream, ident, parent):
166 """abort a specifig msg by id"""
166 """abort a specifig msg by id"""
167 msg_ids = parent['content'].get('msg_ids', None)
167 msg_ids = parent['content'].get('msg_ids', None)
168 if isinstance(msg_ids, basestring):
168 if isinstance(msg_ids, basestring):
169 msg_ids = [msg_ids]
169 msg_ids = [msg_ids]
170 if not msg_ids:
170 if not msg_ids:
171 self.abort_queues()
171 self.abort_queues()
172 for mid in msg_ids:
172 for mid in msg_ids:
173 self.aborted.add(str(mid))
173 self.aborted.add(str(mid))
174
174
175 content = dict(status='ok')
175 content = dict(status='ok')
176 reply_msg = self.session.send(stream, 'abort_reply', content=content,
176 reply_msg = self.session.send(stream, 'abort_reply', content=content,
177 parent=parent, ident=ident)
177 parent=parent, ident=ident)
178 self.log.debug(str(reply_msg))
178 self.log.debug(str(reply_msg))
179
179
180 def shutdown_request(self, stream, ident, parent):
180 def shutdown_request(self, stream, ident, parent):
181 """kill ourself. This should really be handled in an external process"""
181 """kill ourself. This should really be handled in an external process"""
182 try:
182 try:
183 self.abort_queues()
183 self.abort_queues()
184 except:
184 except:
185 content = self._wrap_exception('shutdown')
185 content = self._wrap_exception('shutdown')
186 else:
186 else:
187 content = dict(parent['content'])
187 content = dict(parent['content'])
188 content['status'] = 'ok'
188 content['status'] = 'ok'
189 msg = self.session.send(stream, 'shutdown_reply',
189 msg = self.session.send(stream, 'shutdown_reply',
190 content=content, parent=parent, ident=ident)
190 content=content, parent=parent, ident=ident)
191 self.log.debug(str(msg))
191 self.log.debug(str(msg))
192 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
192 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
193 dc.start()
193 dc.start()
194
194
195 def dispatch_control(self, msg):
195 def dispatch_control(self, msg):
196 idents,msg = self.session.feed_identities(msg, copy=False)
196 idents,msg = self.session.feed_identities(msg, copy=False)
197 try:
197 try:
198 msg = self.session.unserialize(msg, content=True, copy=False)
198 msg = self.session.unserialize(msg, content=True, copy=False)
199 except:
199 except:
200 self.log.error("Invalid Message", exc_info=True)
200 self.log.error("Invalid Message", exc_info=True)
201 return
201 return
202 else:
202 else:
203 self.log.debug("Control received, %s", msg)
203 self.log.debug("Control received, %s", msg)
204
204
205 header = msg['header']
205 header = msg['header']
206 msg_id = header['msg_id']
206 msg_id = header['msg_id']
207 msg_type = header['msg_type']
207 msg_type = header['msg_type']
208
208
209 handler = self.control_handlers.get(msg_type, None)
209 handler = self.control_handlers.get(msg_type, None)
210 if handler is None:
210 if handler is None:
211 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg_type)
211 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg_type)
212 else:
212 else:
213 handler(self.control_stream, idents, msg)
213 handler(self.control_stream, idents, msg)
214
214
215
215
216 #-------------------- queue helpers ------------------------------
216 #-------------------- queue helpers ------------------------------
217
217
218 def check_dependencies(self, dependencies):
218 def check_dependencies(self, dependencies):
219 if not dependencies:
219 if not dependencies:
220 return True
220 return True
221 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
221 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
222 anyorall = dependencies[0]
222 anyorall = dependencies[0]
223 dependencies = dependencies[1]
223 dependencies = dependencies[1]
224 else:
224 else:
225 anyorall = 'all'
225 anyorall = 'all'
226 results = self.client.get_results(dependencies,status_only=True)
226 results = self.client.get_results(dependencies,status_only=True)
227 if results['status'] != 'ok':
227 if results['status'] != 'ok':
228 return False
228 return False
229
229
230 if anyorall == 'any':
230 if anyorall == 'any':
231 if not results['completed']:
231 if not results['completed']:
232 return False
232 return False
233 else:
233 else:
234 if results['pending']:
234 if results['pending']:
235 return False
235 return False
236
236
237 return True
237 return True
238
238
239 def check_aborted(self, msg_id):
239 def check_aborted(self, msg_id):
240 return msg_id in self.aborted
240 return msg_id in self.aborted
241
241
242 #-------------------- queue handlers -----------------------------
242 #-------------------- queue handlers -----------------------------
243
243
244 def clear_request(self, stream, idents, parent):
244 def clear_request(self, stream, idents, parent):
245 """Clear our namespace."""
245 """Clear our namespace."""
246 self.user_ns = {}
246 self.user_ns = {}
247 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
247 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
248 content = dict(status='ok'))
248 content = dict(status='ok'))
249 self._initial_exec_lines()
249 self._initial_exec_lines()
250
250
251 def execute_request(self, stream, ident, parent):
251 def execute_request(self, stream, ident, parent):
252 self.log.debug('execute request %s'%parent)
252 self.log.debug('execute request %s'%parent)
253 try:
253 try:
254 code = parent[u'content'][u'code']
254 code = parent[u'content'][u'code']
255 except:
255 except:
256 self.log.error("Got bad msg: %s"%parent, exc_info=True)
256 self.log.error("Got bad msg: %s"%parent, exc_info=True)
257 return
257 return
258 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
258 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
259 ident=asbytes('%s.pyin'%self.prefix))
259 ident=asbytes('%s.pyin'%self.prefix))
260 started = datetime.now()
260 started = datetime.now()
261 try:
261 try:
262 comp_code = self.compiler(code, '<zmq-kernel>')
262 comp_code = self.compiler(code, '<zmq-kernel>')
263 # allow for not overriding displayhook
263 # allow for not overriding displayhook
264 if hasattr(sys.displayhook, 'set_parent'):
264 if hasattr(sys.displayhook, 'set_parent'):
265 sys.displayhook.set_parent(parent)
265 sys.displayhook.set_parent(parent)
266 sys.stdout.set_parent(parent)
266 sys.stdout.set_parent(parent)
267 sys.stderr.set_parent(parent)
267 sys.stderr.set_parent(parent)
268 exec comp_code in self.user_ns, self.user_ns
268 exec comp_code in self.user_ns, self.user_ns
269 except:
269 except:
270 exc_content = self._wrap_exception('execute')
270 exc_content = self._wrap_exception('execute')
271 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
271 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
272 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
272 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
273 ident=asbytes('%s.pyerr'%self.prefix))
273 ident=asbytes('%s.pyerr'%self.prefix))
274 reply_content = exc_content
274 reply_content = exc_content
275 else:
275 else:
276 reply_content = {'status' : 'ok'}
276 reply_content = {'status' : 'ok'}
277
277
278 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
278 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
279 ident=ident, subheader = dict(started=started))
279 ident=ident, subheader = dict(started=started))
280 self.log.debug(str(reply_msg))
280 self.log.debug(str(reply_msg))
281 if reply_msg['content']['status'] == u'error':
281 if reply_msg['content']['status'] == u'error':
282 self.abort_queues()
282 self.abort_queues()
283
283
284 def complete_request(self, stream, ident, parent):
284 def complete_request(self, stream, ident, parent):
285 matches = {'matches' : self.complete(parent),
285 matches = {'matches' : self.complete(parent),
286 'status' : 'ok'}
286 'status' : 'ok'}
287 completion_msg = self.session.send(stream, 'complete_reply',
287 completion_msg = self.session.send(stream, 'complete_reply',
288 matches, parent, ident)
288 matches, parent, ident)
289 # print >> sys.__stdout__, completion_msg
289 # print >> sys.__stdout__, completion_msg
290
290
291 def complete(self, msg):
291 def complete(self, msg):
292 return self.completer.complete(msg.content.line, msg.content.text)
292 return self.completer.complete(msg.content.line, msg.content.text)
293
293
294 def apply_request(self, stream, ident, parent):
294 def apply_request(self, stream, ident, parent):
295 # flush previous reply, so this request won't block it
295 # flush previous reply, so this request won't block it
296 stream.flush(zmq.POLLOUT)
296 stream.flush(zmq.POLLOUT)
297 try:
297 try:
298 content = parent[u'content']
298 content = parent[u'content']
299 bufs = parent[u'buffers']
299 bufs = parent[u'buffers']
300 msg_id = parent['header']['msg_id']
300 msg_id = parent['header']['msg_id']
301 # bound = parent['header'].get('bound', False)
301 # bound = parent['header'].get('bound', False)
302 except:
302 except:
303 self.log.error("Got bad msg: %s"%parent, exc_info=True)
303 self.log.error("Got bad msg: %s"%parent, exc_info=True)
304 return
304 return
305 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
305 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
306 # self.iopub_stream.send(pyin_msg)
306 # self.iopub_stream.send(pyin_msg)
307 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
307 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
308 sub = {'dependencies_met' : True, 'engine' : self.ident,
308 sub = {'dependencies_met' : True, 'engine' : self.ident,
309 'started': datetime.now()}
309 'started': datetime.now()}
310 try:
310 try:
311 # allow for not overriding displayhook
311 # allow for not overriding displayhook
312 if hasattr(sys.displayhook, 'set_parent'):
312 if hasattr(sys.displayhook, 'set_parent'):
313 sys.displayhook.set_parent(parent)
313 sys.displayhook.set_parent(parent)
314 sys.stdout.set_parent(parent)
314 sys.stdout.set_parent(parent)
315 sys.stderr.set_parent(parent)
315 sys.stderr.set_parent(parent)
316 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
316 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
317 working = self.user_ns
317 working = self.user_ns
318 # suffix =
318 # suffix =
319 prefix = "_"+str(msg_id).replace("-","")+"_"
319 prefix = "_"+str(msg_id).replace("-","")+"_"
320
320
321 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
321 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
322 # if bound:
322 # if bound:
323 # bound_ns = Namespace(working)
323 # bound_ns = Namespace(working)
324 # args = [bound_ns]+list(args)
324 # args = [bound_ns]+list(args)
325
325
326 fname = getattr(f, '__name__', 'f')
326 fname = getattr(f, '__name__', 'f')
327
327
328 fname = prefix+"f"
328 fname = prefix+"f"
329 argname = prefix+"args"
329 argname = prefix+"args"
330 kwargname = prefix+"kwargs"
330 kwargname = prefix+"kwargs"
331 resultname = prefix+"result"
331 resultname = prefix+"result"
332
332
333 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
333 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
334 # print ns
334 # print ns
335 working.update(ns)
335 working.update(ns)
336 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
336 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
337 try:
337 try:
338 exec code in working,working
338 exec code in working,working
339 result = working.get(resultname)
339 result = working.get(resultname)
340 finally:
340 finally:
341 for key in ns.iterkeys():
341 for key in ns.iterkeys():
342 working.pop(key)
342 working.pop(key)
343 # if bound:
343 # if bound:
344 # working.update(bound_ns)
344 # working.update(bound_ns)
345
345
346 packed_result,buf = serialize_object(result)
346 packed_result,buf = serialize_object(result)
347 result_buf = [packed_result]+buf
347 result_buf = [packed_result]+buf
348 except:
348 except:
349 exc_content = self._wrap_exception('apply')
349 exc_content = self._wrap_exception('apply')
350 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
350 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
351 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
351 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
352 ident=asbytes('%s.pyerr'%self.prefix))
352 ident=asbytes('%s.pyerr'%self.prefix))
353 reply_content = exc_content
353 reply_content = exc_content
354 result_buf = []
354 result_buf = []
355
355
356 if exc_content['ename'] == 'UnmetDependency':
356 if exc_content['ename'] == 'UnmetDependency':
357 sub['dependencies_met'] = False
357 sub['dependencies_met'] = False
358 else:
358 else:
359 reply_content = {'status' : 'ok'}
359 reply_content = {'status' : 'ok'}
360
360
361 # put 'ok'/'error' status in header, for scheduler introspection:
361 # put 'ok'/'error' status in header, for scheduler introspection:
362 sub['status'] = reply_content['status']
362 sub['status'] = reply_content['status']
363
363
364 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
364 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
365 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
365 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
366
366
367 # flush i/o
367 # flush i/o
368 # should this be before reply_msg is sent, like in the single-kernel code,
368 # should this be before reply_msg is sent, like in the single-kernel code,
369 # or should nothing get in the way of real results?
369 # or should nothing get in the way of real results?
370 sys.stdout.flush()
370 sys.stdout.flush()
371 sys.stderr.flush()
371 sys.stderr.flush()
372
372
373 def dispatch_queue(self, stream, msg):
373 def dispatch_queue(self, stream, msg):
374 self.control_stream.flush()
374 self.control_stream.flush()
375 idents,msg = self.session.feed_identities(msg, copy=False)
375 idents,msg = self.session.feed_identities(msg, copy=False)
376 try:
376 try:
377 msg = self.session.unserialize(msg, content=True, copy=False)
377 msg = self.session.unserialize(msg, content=True, copy=False)
378 except:
378 except:
379 self.log.error("Invalid Message", exc_info=True)
379 self.log.error("Invalid Message", exc_info=True)
380 return
380 return
381 else:
381 else:
382 self.log.debug("Message received, %s", msg)
382 self.log.debug("Message received, %s", msg)
383
383
384
384
385 header = msg['header']
385 header = msg['header']
386 msg_id = header['msg_id']
386 msg_id = header['msg_id']
387 msg['header']['msg_type']
387 msg_type = msg['header']['msg_type']
388 if self.check_aborted(msg_id):
388 if self.check_aborted(msg_id):
389 self.aborted.remove(msg_id)
389 self.aborted.remove(msg_id)
390 # is it safe to assume a msg_id will not be resubmitted?
390 # is it safe to assume a msg_id will not be resubmitted?
391 reply_type = msg_type.split('_')[0] + '_reply'
391 reply_type = msg_type.split('_')[0] + '_reply'
392 status = {'status' : 'aborted'}
392 status = {'status' : 'aborted'}
393 reply_msg = self.session.send(stream, reply_type, subheader=status,
393 reply_msg = self.session.send(stream, reply_type, subheader=status,
394 content=status, parent=msg, ident=idents)
394 content=status, parent=msg, ident=idents)
395 return
395 return
396 handler = self.shell_handlers.get(msg_type, None)
396 handler = self.shell_handlers.get(msg_type, None)
397 if handler is None:
397 if handler is None:
398 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg_type)
398 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg_type)
399 else:
399 else:
400 handler(stream, idents, msg)
400 handler(stream, idents, msg)
401
401
402 def start(self):
402 def start(self):
403 #### stream mode:
403 #### stream mode:
404 if self.control_stream:
404 if self.control_stream:
405 self.control_stream.on_recv(self.dispatch_control, copy=False)
405 self.control_stream.on_recv(self.dispatch_control, copy=False)
406 self.control_stream.on_err(printer)
406 self.control_stream.on_err(printer)
407
407
408 def make_dispatcher(stream):
408 def make_dispatcher(stream):
409 def dispatcher(msg):
409 def dispatcher(msg):
410 return self.dispatch_queue(stream, msg)
410 return self.dispatch_queue(stream, msg)
411 return dispatcher
411 return dispatcher
412
412
413 for s in self.shell_streams:
413 for s in self.shell_streams:
414 s.on_recv(make_dispatcher(s), copy=False)
414 s.on_recv(make_dispatcher(s), copy=False)
415 s.on_err(printer)
415 s.on_err(printer)
416
416
417 if self.iopub_stream:
417 if self.iopub_stream:
418 self.iopub_stream.on_err(printer)
418 self.iopub_stream.on_err(printer)
419
419
420 #### while True mode:
420 #### while True mode:
421 # while True:
421 # while True:
422 # idle = True
422 # idle = True
423 # try:
423 # try:
424 # msg = self.shell_stream.socket.recv_multipart(
424 # msg = self.shell_stream.socket.recv_multipart(
425 # zmq.NOBLOCK, copy=False)
425 # zmq.NOBLOCK, copy=False)
426 # except zmq.ZMQError, e:
426 # except zmq.ZMQError, e:
427 # if e.errno != zmq.EAGAIN:
427 # if e.errno != zmq.EAGAIN:
428 # raise e
428 # raise e
429 # else:
429 # else:
430 # idle=False
430 # idle=False
431 # self.dispatch_queue(self.shell_stream, msg)
431 # self.dispatch_queue(self.shell_stream, msg)
432 #
432 #
433 # if not self.task_stream.empty():
433 # if not self.task_stream.empty():
434 # idle=False
434 # idle=False
435 # msg = self.task_stream.recv_multipart()
435 # msg = self.task_stream.recv_multipart()
436 # self.dispatch_queue(self.task_stream, msg)
436 # self.dispatch_queue(self.task_stream, msg)
437 # if idle:
437 # if idle:
438 # # don't busywait
438 # # don't busywait
439 # time.sleep(1e-3)
439 # time.sleep(1e-3)
440
440
General Comments 0
You need to be logged in to leave comments. Login now