##// END OF EJS Templates
Merge pull request #1374 from minrk/on_err...
Brian E. Granger -
r6079:64c7b29c merge
parent child Browse files
Show More
@@ -1,439 +1,414
1 """
1 """
2 Kernel adapted from kernel.py to use ZMQ Streams
2 Kernel adapted from kernel.py to use ZMQ Streams
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 * Brian Granger
7 * Brian Granger
8 * Fernando Perez
8 * Fernando Perez
9 * Evan Patterson
9 * Evan Patterson
10 """
10 """
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2010-2011 The IPython Development Team
12 # Copyright (C) 2010-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 # Standard library imports.
22 # Standard library imports.
23 from __future__ import print_function
23 from __future__ import print_function
24
24
25 import sys
25 import sys
26 import time
26 import time
27
27
28 from code import CommandCompiler
28 from code import CommandCompiler
29 from datetime import datetime
29 from datetime import datetime
30 from pprint import pprint
30 from pprint import pprint
31
31
32 # System library imports.
32 # System library imports.
33 import zmq
33 import zmq
34 from zmq.eventloop import ioloop, zmqstream
34 from zmq.eventloop import ioloop, zmqstream
35
35
36 # Local imports.
36 # Local imports.
37 from IPython.utils.traitlets import Instance, List, Integer, Dict, Set, Unicode, CBytes
37 from IPython.utils.traitlets import Instance, List, Integer, Dict, Set, Unicode, CBytes
38 from IPython.zmq.completer import KernelCompleter
38 from IPython.zmq.completer import KernelCompleter
39
39
40 from IPython.parallel.error import wrap_exception
40 from IPython.parallel.error import wrap_exception
41 from IPython.parallel.factory import SessionFactory
41 from IPython.parallel.factory import SessionFactory
42 from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
42 from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
43
43
44 def printer(*args):
44 def printer(*args):
45 pprint(args, stream=sys.__stdout__)
45 pprint(args, stream=sys.__stdout__)
46
46
47
47
48 class _Passer(zmqstream.ZMQStream):
48 class _Passer(zmqstream.ZMQStream):
49 """Empty class that implements `send()` that does nothing.
49 """Empty class that implements `send()` that does nothing.
50
50
51 Subclass ZMQStream for Session typechecking
51 Subclass ZMQStream for Session typechecking
52
52
53 """
53 """
54 def __init__(self, *args, **kwargs):
54 def __init__(self, *args, **kwargs):
55 pass
55 pass
56
56
57 def send(self, *args, **kwargs):
57 def send(self, *args, **kwargs):
58 pass
58 pass
59 send_multipart = send
59 send_multipart = send
60
60
61
61
62 #-----------------------------------------------------------------------------
62 #-----------------------------------------------------------------------------
63 # Main kernel class
63 # Main kernel class
64 #-----------------------------------------------------------------------------
64 #-----------------------------------------------------------------------------
65
65
66 class Kernel(SessionFactory):
66 class Kernel(SessionFactory):
67
67
68 #---------------------------------------------------------------------------
68 #---------------------------------------------------------------------------
69 # Kernel interface
69 # Kernel interface
70 #---------------------------------------------------------------------------
70 #---------------------------------------------------------------------------
71
71
72 # kwargs:
72 # kwargs:
73 exec_lines = List(Unicode, config=True,
73 exec_lines = List(Unicode, config=True,
74 help="List of lines to execute")
74 help="List of lines to execute")
75
75
76 # identities:
76 # identities:
77 int_id = Integer(-1)
77 int_id = Integer(-1)
78 bident = CBytes()
78 bident = CBytes()
79 ident = Unicode()
79 ident = Unicode()
80 def _ident_changed(self, name, old, new):
80 def _ident_changed(self, name, old, new):
81 self.bident = asbytes(new)
81 self.bident = asbytes(new)
82
82
83 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
83 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
84
84
85 control_stream = Instance(zmqstream.ZMQStream)
85 control_stream = Instance(zmqstream.ZMQStream)
86 task_stream = Instance(zmqstream.ZMQStream)
86 task_stream = Instance(zmqstream.ZMQStream)
87 iopub_stream = Instance(zmqstream.ZMQStream)
87 iopub_stream = Instance(zmqstream.ZMQStream)
88 client = Instance('IPython.parallel.Client')
88 client = Instance('IPython.parallel.Client')
89
89
90 # internals
90 # internals
91 shell_streams = List()
91 shell_streams = List()
92 compiler = Instance(CommandCompiler, (), {})
92 compiler = Instance(CommandCompiler, (), {})
93 completer = Instance(KernelCompleter)
93 completer = Instance(KernelCompleter)
94
94
95 aborted = Set()
95 aborted = Set()
96 shell_handlers = Dict()
96 shell_handlers = Dict()
97 control_handlers = Dict()
97 control_handlers = Dict()
98
98
99 def _set_prefix(self):
99 def _set_prefix(self):
100 self.prefix = "engine.%s"%self.int_id
100 self.prefix = "engine.%s"%self.int_id
101
101
102 def _connect_completer(self):
102 def _connect_completer(self):
103 self.completer = KernelCompleter(self.user_ns)
103 self.completer = KernelCompleter(self.user_ns)
104
104
105 def __init__(self, **kwargs):
105 def __init__(self, **kwargs):
106 super(Kernel, self).__init__(**kwargs)
106 super(Kernel, self).__init__(**kwargs)
107 self._set_prefix()
107 self._set_prefix()
108 self._connect_completer()
108 self._connect_completer()
109
109
110 self.on_trait_change(self._set_prefix, 'id')
110 self.on_trait_change(self._set_prefix, 'id')
111 self.on_trait_change(self._connect_completer, 'user_ns')
111 self.on_trait_change(self._connect_completer, 'user_ns')
112
112
113 # Build dict of handlers for message types
113 # Build dict of handlers for message types
114 for msg_type in ['execute_request', 'complete_request', 'apply_request',
114 for msg_type in ['execute_request', 'complete_request', 'apply_request',
115 'clear_request']:
115 'clear_request']:
116 self.shell_handlers[msg_type] = getattr(self, msg_type)
116 self.shell_handlers[msg_type] = getattr(self, msg_type)
117
117
118 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
118 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
119 self.control_handlers[msg_type] = getattr(self, msg_type)
119 self.control_handlers[msg_type] = getattr(self, msg_type)
120
120
121 self._initial_exec_lines()
121 self._initial_exec_lines()
122
122
123 def _wrap_exception(self, method=None):
123 def _wrap_exception(self, method=None):
124 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
124 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
125 content=wrap_exception(e_info)
125 content=wrap_exception(e_info)
126 return content
126 return content
127
127
128 def _initial_exec_lines(self):
128 def _initial_exec_lines(self):
129 s = _Passer()
129 s = _Passer()
130 content = dict(silent=True, user_variable=[],user_expressions=[])
130 content = dict(silent=True, user_variable=[],user_expressions=[])
131 for line in self.exec_lines:
131 for line in self.exec_lines:
132 self.log.debug("executing initialization: %s"%line)
132 self.log.debug("executing initialization: %s"%line)
133 content.update({'code':line})
133 content.update({'code':line})
134 msg = self.session.msg('execute_request', content)
134 msg = self.session.msg('execute_request', content)
135 self.execute_request(s, [], msg)
135 self.execute_request(s, [], msg)
136
136
137
137
138 #-------------------- control handlers -----------------------------
138 #-------------------- control handlers -----------------------------
139 def abort_queues(self):
139 def abort_queues(self):
140 for stream in self.shell_streams:
140 for stream in self.shell_streams:
141 if stream:
141 if stream:
142 self.abort_queue(stream)
142 self.abort_queue(stream)
143
143
144 def abort_queue(self, stream):
144 def abort_queue(self, stream):
145 while True:
145 while True:
146 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
146 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
147 if msg is None:
147 if msg is None:
148 return
148 return
149
149
150 self.log.info("Aborting:")
150 self.log.info("Aborting:")
151 self.log.info(str(msg))
151 self.log.info(str(msg))
152 msg_type = msg['header']['msg_type']
152 msg_type = msg['header']['msg_type']
153 reply_type = msg_type.split('_')[0] + '_reply'
153 reply_type = msg_type.split('_')[0] + '_reply'
154 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
154 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
155 # self.reply_socket.send(ident,zmq.SNDMORE)
155 # self.reply_socket.send(ident,zmq.SNDMORE)
156 # self.reply_socket.send_json(reply_msg)
156 # self.reply_socket.send_json(reply_msg)
157 reply_msg = self.session.send(stream, reply_type,
157 reply_msg = self.session.send(stream, reply_type,
158 content={'status' : 'aborted'}, parent=msg, ident=idents)
158 content={'status' : 'aborted'}, parent=msg, ident=idents)
159 self.log.debug(str(reply_msg))
159 self.log.debug(str(reply_msg))
160 # We need to wait a bit for requests to come in. This can probably
160 # We need to wait a bit for requests to come in. This can probably
161 # be set shorter for true asynchronous clients.
161 # be set shorter for true asynchronous clients.
162 time.sleep(0.05)
162 time.sleep(0.05)
163
163
164 def abort_request(self, stream, ident, parent):
164 def abort_request(self, stream, ident, parent):
165 """abort a specifig msg by id"""
165 """abort a specifig msg by id"""
166 msg_ids = parent['content'].get('msg_ids', None)
166 msg_ids = parent['content'].get('msg_ids', None)
167 if isinstance(msg_ids, basestring):
167 if isinstance(msg_ids, basestring):
168 msg_ids = [msg_ids]
168 msg_ids = [msg_ids]
169 if not msg_ids:
169 if not msg_ids:
170 self.abort_queues()
170 self.abort_queues()
171 for mid in msg_ids:
171 for mid in msg_ids:
172 self.aborted.add(str(mid))
172 self.aborted.add(str(mid))
173
173
174 content = dict(status='ok')
174 content = dict(status='ok')
175 reply_msg = self.session.send(stream, 'abort_reply', content=content,
175 reply_msg = self.session.send(stream, 'abort_reply', content=content,
176 parent=parent, ident=ident)
176 parent=parent, ident=ident)
177 self.log.debug(str(reply_msg))
177 self.log.debug(str(reply_msg))
178
178
179 def shutdown_request(self, stream, ident, parent):
179 def shutdown_request(self, stream, ident, parent):
180 """kill ourself. This should really be handled in an external process"""
180 """kill ourself. This should really be handled in an external process"""
181 try:
181 try:
182 self.abort_queues()
182 self.abort_queues()
183 except:
183 except:
184 content = self._wrap_exception('shutdown')
184 content = self._wrap_exception('shutdown')
185 else:
185 else:
186 content = dict(parent['content'])
186 content = dict(parent['content'])
187 content['status'] = 'ok'
187 content['status'] = 'ok'
188 msg = self.session.send(stream, 'shutdown_reply',
188 msg = self.session.send(stream, 'shutdown_reply',
189 content=content, parent=parent, ident=ident)
189 content=content, parent=parent, ident=ident)
190 self.log.debug(str(msg))
190 self.log.debug(str(msg))
191 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
191 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
192 dc.start()
192 dc.start()
193
193
194 def dispatch_control(self, msg):
194 def dispatch_control(self, msg):
195 idents,msg = self.session.feed_identities(msg, copy=False)
195 idents,msg = self.session.feed_identities(msg, copy=False)
196 try:
196 try:
197 msg = self.session.unserialize(msg, content=True, copy=False)
197 msg = self.session.unserialize(msg, content=True, copy=False)
198 except:
198 except:
199 self.log.error("Invalid Message", exc_info=True)
199 self.log.error("Invalid Message", exc_info=True)
200 return
200 return
201 else:
201 else:
202 self.log.debug("Control received, %s", msg)
202 self.log.debug("Control received, %s", msg)
203
203
204 header = msg['header']
204 header = msg['header']
205 msg_id = header['msg_id']
205 msg_id = header['msg_id']
206 msg_type = header['msg_type']
206 msg_type = header['msg_type']
207
207
208 handler = self.control_handlers.get(msg_type, None)
208 handler = self.control_handlers.get(msg_type, None)
209 if handler is None:
209 if handler is None:
210 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg_type)
210 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg_type)
211 else:
211 else:
212 handler(self.control_stream, idents, msg)
212 handler(self.control_stream, idents, msg)
213
213
214
214
215 #-------------------- queue helpers ------------------------------
215 #-------------------- queue helpers ------------------------------
216
216
217 def check_dependencies(self, dependencies):
217 def check_dependencies(self, dependencies):
218 if not dependencies:
218 if not dependencies:
219 return True
219 return True
220 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
220 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
221 anyorall = dependencies[0]
221 anyorall = dependencies[0]
222 dependencies = dependencies[1]
222 dependencies = dependencies[1]
223 else:
223 else:
224 anyorall = 'all'
224 anyorall = 'all'
225 results = self.client.get_results(dependencies,status_only=True)
225 results = self.client.get_results(dependencies,status_only=True)
226 if results['status'] != 'ok':
226 if results['status'] != 'ok':
227 return False
227 return False
228
228
229 if anyorall == 'any':
229 if anyorall == 'any':
230 if not results['completed']:
230 if not results['completed']:
231 return False
231 return False
232 else:
232 else:
233 if results['pending']:
233 if results['pending']:
234 return False
234 return False
235
235
236 return True
236 return True
237
237
238 def check_aborted(self, msg_id):
238 def check_aborted(self, msg_id):
239 return msg_id in self.aborted
239 return msg_id in self.aborted
240
240
241 #-------------------- queue handlers -----------------------------
241 #-------------------- queue handlers -----------------------------
242
242
243 def clear_request(self, stream, idents, parent):
243 def clear_request(self, stream, idents, parent):
244 """Clear our namespace."""
244 """Clear our namespace."""
245 self.user_ns = {}
245 self.user_ns = {}
246 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
246 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
247 content = dict(status='ok'))
247 content = dict(status='ok'))
248 self._initial_exec_lines()
248 self._initial_exec_lines()
249
249
250 def execute_request(self, stream, ident, parent):
250 def execute_request(self, stream, ident, parent):
251 self.log.debug('execute request %s'%parent)
251 self.log.debug('execute request %s'%parent)
252 try:
252 try:
253 code = parent[u'content'][u'code']
253 code = parent[u'content'][u'code']
254 except:
254 except:
255 self.log.error("Got bad msg: %s"%parent, exc_info=True)
255 self.log.error("Got bad msg: %s"%parent, exc_info=True)
256 return
256 return
257 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
257 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
258 ident=asbytes('%s.pyin'%self.prefix))
258 ident=asbytes('%s.pyin'%self.prefix))
259 started = datetime.now()
259 started = datetime.now()
260 try:
260 try:
261 comp_code = self.compiler(code, '<zmq-kernel>')
261 comp_code = self.compiler(code, '<zmq-kernel>')
262 # allow for not overriding displayhook
262 # allow for not overriding displayhook
263 if hasattr(sys.displayhook, 'set_parent'):
263 if hasattr(sys.displayhook, 'set_parent'):
264 sys.displayhook.set_parent(parent)
264 sys.displayhook.set_parent(parent)
265 sys.stdout.set_parent(parent)
265 sys.stdout.set_parent(parent)
266 sys.stderr.set_parent(parent)
266 sys.stderr.set_parent(parent)
267 exec comp_code in self.user_ns, self.user_ns
267 exec comp_code in self.user_ns, self.user_ns
268 except:
268 except:
269 exc_content = self._wrap_exception('execute')
269 exc_content = self._wrap_exception('execute')
270 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
270 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
271 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
271 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
272 ident=asbytes('%s.pyerr'%self.prefix))
272 ident=asbytes('%s.pyerr'%self.prefix))
273 reply_content = exc_content
273 reply_content = exc_content
274 else:
274 else:
275 reply_content = {'status' : 'ok'}
275 reply_content = {'status' : 'ok'}
276
276
277 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
277 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
278 ident=ident, subheader = dict(started=started))
278 ident=ident, subheader = dict(started=started))
279 self.log.debug(str(reply_msg))
279 self.log.debug(str(reply_msg))
280 if reply_msg['content']['status'] == u'error':
280 if reply_msg['content']['status'] == u'error':
281 self.abort_queues()
281 self.abort_queues()
282
282
283 def complete_request(self, stream, ident, parent):
283 def complete_request(self, stream, ident, parent):
284 matches = {'matches' : self.complete(parent),
284 matches = {'matches' : self.complete(parent),
285 'status' : 'ok'}
285 'status' : 'ok'}
286 completion_msg = self.session.send(stream, 'complete_reply',
286 completion_msg = self.session.send(stream, 'complete_reply',
287 matches, parent, ident)
287 matches, parent, ident)
288 # print >> sys.__stdout__, completion_msg
288 # print >> sys.__stdout__, completion_msg
289
289
290 def complete(self, msg):
290 def complete(self, msg):
291 return self.completer.complete(msg.content.line, msg.content.text)
291 return self.completer.complete(msg.content.line, msg.content.text)
292
292
293 def apply_request(self, stream, ident, parent):
293 def apply_request(self, stream, ident, parent):
294 # flush previous reply, so this request won't block it
294 # flush previous reply, so this request won't block it
295 stream.flush(zmq.POLLOUT)
295 stream.flush(zmq.POLLOUT)
296 try:
296 try:
297 content = parent[u'content']
297 content = parent[u'content']
298 bufs = parent[u'buffers']
298 bufs = parent[u'buffers']
299 msg_id = parent['header']['msg_id']
299 msg_id = parent['header']['msg_id']
300 # bound = parent['header'].get('bound', False)
300 # bound = parent['header'].get('bound', False)
301 except:
301 except:
302 self.log.error("Got bad msg: %s"%parent, exc_info=True)
302 self.log.error("Got bad msg: %s"%parent, exc_info=True)
303 return
303 return
304 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
304 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
305 # self.iopub_stream.send(pyin_msg)
305 # self.iopub_stream.send(pyin_msg)
306 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
306 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
307 sub = {'dependencies_met' : True, 'engine' : self.ident,
307 sub = {'dependencies_met' : True, 'engine' : self.ident,
308 'started': datetime.now()}
308 'started': datetime.now()}
309 try:
309 try:
310 # allow for not overriding displayhook
310 # allow for not overriding displayhook
311 if hasattr(sys.displayhook, 'set_parent'):
311 if hasattr(sys.displayhook, 'set_parent'):
312 sys.displayhook.set_parent(parent)
312 sys.displayhook.set_parent(parent)
313 sys.stdout.set_parent(parent)
313 sys.stdout.set_parent(parent)
314 sys.stderr.set_parent(parent)
314 sys.stderr.set_parent(parent)
315 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
315 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
316 working = self.user_ns
316 working = self.user_ns
317 # suffix =
317 # suffix =
318 prefix = "_"+str(msg_id).replace("-","")+"_"
318 prefix = "_"+str(msg_id).replace("-","")+"_"
319
319
320 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
320 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
321 # if bound:
321 # if bound:
322 # bound_ns = Namespace(working)
322 # bound_ns = Namespace(working)
323 # args = [bound_ns]+list(args)
323 # args = [bound_ns]+list(args)
324
324
325 fname = getattr(f, '__name__', 'f')
325 fname = getattr(f, '__name__', 'f')
326
326
327 fname = prefix+"f"
327 fname = prefix+"f"
328 argname = prefix+"args"
328 argname = prefix+"args"
329 kwargname = prefix+"kwargs"
329 kwargname = prefix+"kwargs"
330 resultname = prefix+"result"
330 resultname = prefix+"result"
331
331
332 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
332 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
333 # print ns
333 # print ns
334 working.update(ns)
334 working.update(ns)
335 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
335 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
336 try:
336 try:
337 exec code in working,working
337 exec code in working,working
338 result = working.get(resultname)
338 result = working.get(resultname)
339 finally:
339 finally:
340 for key in ns.iterkeys():
340 for key in ns.iterkeys():
341 working.pop(key)
341 working.pop(key)
342 # if bound:
342 # if bound:
343 # working.update(bound_ns)
343 # working.update(bound_ns)
344
344
345 packed_result,buf = serialize_object(result)
345 packed_result,buf = serialize_object(result)
346 result_buf = [packed_result]+buf
346 result_buf = [packed_result]+buf
347 except:
347 except:
348 exc_content = self._wrap_exception('apply')
348 exc_content = self._wrap_exception('apply')
349 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
349 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
350 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
350 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
351 ident=asbytes('%s.pyerr'%self.prefix))
351 ident=asbytes('%s.pyerr'%self.prefix))
352 reply_content = exc_content
352 reply_content = exc_content
353 result_buf = []
353 result_buf = []
354
354
355 if exc_content['ename'] == 'UnmetDependency':
355 if exc_content['ename'] == 'UnmetDependency':
356 sub['dependencies_met'] = False
356 sub['dependencies_met'] = False
357 else:
357 else:
358 reply_content = {'status' : 'ok'}
358 reply_content = {'status' : 'ok'}
359
359
360 # put 'ok'/'error' status in header, for scheduler introspection:
360 # put 'ok'/'error' status in header, for scheduler introspection:
361 sub['status'] = reply_content['status']
361 sub['status'] = reply_content['status']
362
362
363 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
363 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
364 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
364 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
365
365
366 # flush i/o
366 # flush i/o
367 # should this be before reply_msg is sent, like in the single-kernel code,
367 # should this be before reply_msg is sent, like in the single-kernel code,
368 # or should nothing get in the way of real results?
368 # or should nothing get in the way of real results?
369 sys.stdout.flush()
369 sys.stdout.flush()
370 sys.stderr.flush()
370 sys.stderr.flush()
371
371
372 def dispatch_queue(self, stream, msg):
372 def dispatch_queue(self, stream, msg):
373 self.control_stream.flush()
373 self.control_stream.flush()
374 idents,msg = self.session.feed_identities(msg, copy=False)
374 idents,msg = self.session.feed_identities(msg, copy=False)
375 try:
375 try:
376 msg = self.session.unserialize(msg, content=True, copy=False)
376 msg = self.session.unserialize(msg, content=True, copy=False)
377 except:
377 except:
378 self.log.error("Invalid Message", exc_info=True)
378 self.log.error("Invalid Message", exc_info=True)
379 return
379 return
380 else:
380 else:
381 self.log.debug("Message received, %s", msg)
381 self.log.debug("Message received, %s", msg)
382
382
383
383
384 header = msg['header']
384 header = msg['header']
385 msg_id = header['msg_id']
385 msg_id = header['msg_id']
386 msg_type = msg['header']['msg_type']
386 msg_type = msg['header']['msg_type']
387 if self.check_aborted(msg_id):
387 if self.check_aborted(msg_id):
388 self.aborted.remove(msg_id)
388 self.aborted.remove(msg_id)
389 # is it safe to assume a msg_id will not be resubmitted?
389 # is it safe to assume a msg_id will not be resubmitted?
390 reply_type = msg_type.split('_')[0] + '_reply'
390 reply_type = msg_type.split('_')[0] + '_reply'
391 status = {'status' : 'aborted'}
391 status = {'status' : 'aborted'}
392 reply_msg = self.session.send(stream, reply_type, subheader=status,
392 reply_msg = self.session.send(stream, reply_type, subheader=status,
393 content=status, parent=msg, ident=idents)
393 content=status, parent=msg, ident=idents)
394 return
394 return
395 handler = self.shell_handlers.get(msg_type, None)
395 handler = self.shell_handlers.get(msg_type, None)
396 if handler is None:
396 if handler is None:
397 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg_type)
397 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg_type)
398 else:
398 else:
399 handler(stream, idents, msg)
399 handler(stream, idents, msg)
400
400
401 def start(self):
401 def start(self):
402 #### stream mode:
402 #### stream mode:
403 if self.control_stream:
403 if self.control_stream:
404 self.control_stream.on_recv(self.dispatch_control, copy=False)
404 self.control_stream.on_recv(self.dispatch_control, copy=False)
405 self.control_stream.on_err(printer)
406
405
407 def make_dispatcher(stream):
406 def make_dispatcher(stream):
408 def dispatcher(msg):
407 def dispatcher(msg):
409 return self.dispatch_queue(stream, msg)
408 return self.dispatch_queue(stream, msg)
410 return dispatcher
409 return dispatcher
411
410
412 for s in self.shell_streams:
411 for s in self.shell_streams:
413 s.on_recv(make_dispatcher(s), copy=False)
412 s.on_recv(make_dispatcher(s), copy=False)
414 s.on_err(printer)
413
415
416 if self.iopub_stream:
417 self.iopub_stream.on_err(printer)
418
419 #### while True mode:
420 # while True:
421 # idle = True
422 # try:
423 # msg = self.shell_stream.socket.recv_multipart(
424 # zmq.NOBLOCK, copy=False)
425 # except zmq.ZMQError, e:
426 # if e.errno != zmq.EAGAIN:
427 # raise e
428 # else:
429 # idle=False
430 # self.dispatch_queue(self.shell_stream, msg)
431 #
432 # if not self.task_stream.empty():
433 # idle=False
434 # msg = self.task_stream.recv_multipart()
435 # self.dispatch_queue(self.task_stream, msg)
436 # if idle:
437 # # don't busywait
438 # time.sleep(1e-3)
439
414
General Comments 0
You need to be logged in to leave comments. Login now