##// END OF EJS Templates
fix typo in StreamKernel.abort_queue...
MinRK -
Show More
@@ -1,429 +1,429 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 * Brian Granger
8 * Brian Granger
9 * Fernando Perez
9 * Fernando Perez
10 * Evan Patterson
10 * Evan Patterson
11 """
11 """
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2010-2011 The IPython Development Team
13 # Copyright (C) 2010-2011 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 # Standard library imports.
23 # Standard library imports.
24 from __future__ import print_function
24 from __future__ import print_function
25
25
26 import sys
26 import sys
27 import time
27 import time
28
28
29 from code import CommandCompiler
29 from code import CommandCompiler
30 from datetime import datetime
30 from datetime import datetime
31 from pprint import pprint
31 from pprint import pprint
32
32
33 # System library imports.
33 # System library imports.
34 import zmq
34 import zmq
35 from zmq.eventloop import ioloop, zmqstream
35 from zmq.eventloop import ioloop, zmqstream
36
36
37 # Local imports.
37 # Local imports.
38 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode
38 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode
39 from IPython.zmq.completer import KernelCompleter
39 from IPython.zmq.completer import KernelCompleter
40
40
41 from IPython.parallel.error import wrap_exception
41 from IPython.parallel.error import wrap_exception
42 from IPython.parallel.factory import SessionFactory
42 from IPython.parallel.factory import SessionFactory
43 from IPython.parallel.util import serialize_object, unpack_apply_message
43 from IPython.parallel.util import serialize_object, unpack_apply_message
44
44
45 def printer(*args):
45 def printer(*args):
46 pprint(args, stream=sys.__stdout__)
46 pprint(args, stream=sys.__stdout__)
47
47
48
48
49 class _Passer(zmqstream.ZMQStream):
49 class _Passer(zmqstream.ZMQStream):
50 """Empty class that implements `send()` that does nothing.
50 """Empty class that implements `send()` that does nothing.
51
51
52 Subclass ZMQStream for Session typechecking
52 Subclass ZMQStream for Session typechecking
53
53
54 """
54 """
55 def __init__(self, *args, **kwargs):
55 def __init__(self, *args, **kwargs):
56 pass
56 pass
57
57
58 def send(self, *args, **kwargs):
58 def send(self, *args, **kwargs):
59 pass
59 pass
60 send_multipart = send
60 send_multipart = send
61
61
62
62
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64 # Main kernel class
64 # Main kernel class
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66
66
67 class Kernel(SessionFactory):
67 class Kernel(SessionFactory):
68
68
69 #---------------------------------------------------------------------------
69 #---------------------------------------------------------------------------
70 # Kernel interface
70 # Kernel interface
71 #---------------------------------------------------------------------------
71 #---------------------------------------------------------------------------
72
72
73 # kwargs:
73 # kwargs:
74 exec_lines = List(Unicode, config=True,
74 exec_lines = List(Unicode, config=True,
75 help="List of lines to execute")
75 help="List of lines to execute")
76
76
77 int_id = Int(-1)
77 int_id = Int(-1)
78 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
78 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
79
79
80 control_stream = Instance(zmqstream.ZMQStream)
80 control_stream = Instance(zmqstream.ZMQStream)
81 task_stream = Instance(zmqstream.ZMQStream)
81 task_stream = Instance(zmqstream.ZMQStream)
82 iopub_stream = Instance(zmqstream.ZMQStream)
82 iopub_stream = Instance(zmqstream.ZMQStream)
83 client = Instance('IPython.parallel.Client')
83 client = Instance('IPython.parallel.Client')
84
84
85 # internals
85 # internals
86 shell_streams = List()
86 shell_streams = List()
87 compiler = Instance(CommandCompiler, (), {})
87 compiler = Instance(CommandCompiler, (), {})
88 completer = Instance(KernelCompleter)
88 completer = Instance(KernelCompleter)
89
89
90 aborted = Set()
90 aborted = Set()
91 shell_handlers = Dict()
91 shell_handlers = Dict()
92 control_handlers = Dict()
92 control_handlers = Dict()
93
93
94 def _set_prefix(self):
94 def _set_prefix(self):
95 self.prefix = "engine.%s"%self.int_id
95 self.prefix = "engine.%s"%self.int_id
96
96
97 def _connect_completer(self):
97 def _connect_completer(self):
98 self.completer = KernelCompleter(self.user_ns)
98 self.completer = KernelCompleter(self.user_ns)
99
99
100 def __init__(self, **kwargs):
100 def __init__(self, **kwargs):
101 super(Kernel, self).__init__(**kwargs)
101 super(Kernel, self).__init__(**kwargs)
102 self._set_prefix()
102 self._set_prefix()
103 self._connect_completer()
103 self._connect_completer()
104
104
105 self.on_trait_change(self._set_prefix, 'id')
105 self.on_trait_change(self._set_prefix, 'id')
106 self.on_trait_change(self._connect_completer, 'user_ns')
106 self.on_trait_change(self._connect_completer, 'user_ns')
107
107
108 # Build dict of handlers for message types
108 # Build dict of handlers for message types
109 for msg_type in ['execute_request', 'complete_request', 'apply_request',
109 for msg_type in ['execute_request', 'complete_request', 'apply_request',
110 'clear_request']:
110 'clear_request']:
111 self.shell_handlers[msg_type] = getattr(self, msg_type)
111 self.shell_handlers[msg_type] = getattr(self, msg_type)
112
112
113 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
113 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
114 self.control_handlers[msg_type] = getattr(self, msg_type)
114 self.control_handlers[msg_type] = getattr(self, msg_type)
115
115
116 self._initial_exec_lines()
116 self._initial_exec_lines()
117
117
118 def _wrap_exception(self, method=None):
118 def _wrap_exception(self, method=None):
119 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
119 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
120 content=wrap_exception(e_info)
120 content=wrap_exception(e_info)
121 return content
121 return content
122
122
123 def _initial_exec_lines(self):
123 def _initial_exec_lines(self):
124 s = _Passer()
124 s = _Passer()
125 content = dict(silent=True, user_variable=[],user_expressions=[])
125 content = dict(silent=True, user_variable=[],user_expressions=[])
126 for line in self.exec_lines:
126 for line in self.exec_lines:
127 self.log.debug("executing initialization: %s"%line)
127 self.log.debug("executing initialization: %s"%line)
128 content.update({'code':line})
128 content.update({'code':line})
129 msg = self.session.msg('execute_request', content)
129 msg = self.session.msg('execute_request', content)
130 self.execute_request(s, [], msg)
130 self.execute_request(s, [], msg)
131
131
132
132
133 #-------------------- control handlers -----------------------------
133 #-------------------- control handlers -----------------------------
134 def abort_queues(self):
134 def abort_queues(self):
135 for stream in self.shell_streams:
135 for stream in self.shell_streams:
136 if stream:
136 if stream:
137 self.abort_queue(stream)
137 self.abort_queue(stream)
138
138
139 def abort_queue(self, stream):
139 def abort_queue(self, stream):
140 while True:
140 while True:
141 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
141 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
142 if msg is None:
142 if msg is None:
143 return
143 return
144
144
145 self.log.info("Aborting:")
145 self.log.info("Aborting:")
146 self.log.info(str(msg))
146 self.log.info(str(msg))
147 msg_type = msg['msg_type']
147 msg_type = msg['msg_type']
148 reply_type = msg_type.split('_')[0] + '_reply'
148 reply_type = msg_type.split('_')[0] + '_reply'
149 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
149 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
150 # self.reply_socket.send(ident,zmq.SNDMORE)
150 # self.reply_socket.send(ident,zmq.SNDMORE)
151 # self.reply_socket.send_json(reply_msg)
151 # self.reply_socket.send_json(reply_msg)
152 reply_msg = self.session.send(stream, reply_type,
152 reply_msg = self.session.send(stream, reply_type,
153 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
153 content={'status' : 'aborted'}, parent=msg, ident=idents)
154 self.log.debug(str(reply_msg))
154 self.log.debug(str(reply_msg))
155 # We need to wait a bit for requests to come in. This can probably
155 # We need to wait a bit for requests to come in. This can probably
156 # be set shorter for true asynchronous clients.
156 # be set shorter for true asynchronous clients.
157 time.sleep(0.05)
157 time.sleep(0.05)
158
158
159 def abort_request(self, stream, ident, parent):
159 def abort_request(self, stream, ident, parent):
160 """abort a specifig msg by id"""
160 """abort a specifig msg by id"""
161 msg_ids = parent['content'].get('msg_ids', None)
161 msg_ids = parent['content'].get('msg_ids', None)
162 if isinstance(msg_ids, basestring):
162 if isinstance(msg_ids, basestring):
163 msg_ids = [msg_ids]
163 msg_ids = [msg_ids]
164 if not msg_ids:
164 if not msg_ids:
165 self.abort_queues()
165 self.abort_queues()
166 for mid in msg_ids:
166 for mid in msg_ids:
167 self.aborted.add(str(mid))
167 self.aborted.add(str(mid))
168
168
169 content = dict(status='ok')
169 content = dict(status='ok')
170 reply_msg = self.session.send(stream, 'abort_reply', content=content,
170 reply_msg = self.session.send(stream, 'abort_reply', content=content,
171 parent=parent, ident=ident)
171 parent=parent, ident=ident)
172 self.log.debug(str(reply_msg))
172 self.log.debug(str(reply_msg))
173
173
174 def shutdown_request(self, stream, ident, parent):
174 def shutdown_request(self, stream, ident, parent):
175 """kill ourself. This should really be handled in an external process"""
175 """kill ourself. This should really be handled in an external process"""
176 try:
176 try:
177 self.abort_queues()
177 self.abort_queues()
178 except:
178 except:
179 content = self._wrap_exception('shutdown')
179 content = self._wrap_exception('shutdown')
180 else:
180 else:
181 content = dict(parent['content'])
181 content = dict(parent['content'])
182 content['status'] = 'ok'
182 content['status'] = 'ok'
183 msg = self.session.send(stream, 'shutdown_reply',
183 msg = self.session.send(stream, 'shutdown_reply',
184 content=content, parent=parent, ident=ident)
184 content=content, parent=parent, ident=ident)
185 self.log.debug(str(msg))
185 self.log.debug(str(msg))
186 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
186 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
187 dc.start()
187 dc.start()
188
188
189 def dispatch_control(self, msg):
189 def dispatch_control(self, msg):
190 idents,msg = self.session.feed_identities(msg, copy=False)
190 idents,msg = self.session.feed_identities(msg, copy=False)
191 try:
191 try:
192 msg = self.session.unpack_message(msg, content=True, copy=False)
192 msg = self.session.unpack_message(msg, content=True, copy=False)
193 except:
193 except:
194 self.log.error("Invalid Message", exc_info=True)
194 self.log.error("Invalid Message", exc_info=True)
195 return
195 return
196
196
197 header = msg['header']
197 header = msg['header']
198 msg_id = header['msg_id']
198 msg_id = header['msg_id']
199
199
200 handler = self.control_handlers.get(msg['msg_type'], None)
200 handler = self.control_handlers.get(msg['msg_type'], None)
201 if handler is None:
201 if handler is None:
202 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
202 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
203 else:
203 else:
204 handler(self.control_stream, idents, msg)
204 handler(self.control_stream, idents, msg)
205
205
206
206
207 #-------------------- queue helpers ------------------------------
207 #-------------------- queue helpers ------------------------------
208
208
209 def check_dependencies(self, dependencies):
209 def check_dependencies(self, dependencies):
210 if not dependencies:
210 if not dependencies:
211 return True
211 return True
212 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
212 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
213 anyorall = dependencies[0]
213 anyorall = dependencies[0]
214 dependencies = dependencies[1]
214 dependencies = dependencies[1]
215 else:
215 else:
216 anyorall = 'all'
216 anyorall = 'all'
217 results = self.client.get_results(dependencies,status_only=True)
217 results = self.client.get_results(dependencies,status_only=True)
218 if results['status'] != 'ok':
218 if results['status'] != 'ok':
219 return False
219 return False
220
220
221 if anyorall == 'any':
221 if anyorall == 'any':
222 if not results['completed']:
222 if not results['completed']:
223 return False
223 return False
224 else:
224 else:
225 if results['pending']:
225 if results['pending']:
226 return False
226 return False
227
227
228 return True
228 return True
229
229
230 def check_aborted(self, msg_id):
230 def check_aborted(self, msg_id):
231 return msg_id in self.aborted
231 return msg_id in self.aborted
232
232
233 #-------------------- queue handlers -----------------------------
233 #-------------------- queue handlers -----------------------------
234
234
235 def clear_request(self, stream, idents, parent):
235 def clear_request(self, stream, idents, parent):
236 """Clear our namespace."""
236 """Clear our namespace."""
237 self.user_ns = {}
237 self.user_ns = {}
238 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
238 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
239 content = dict(status='ok'))
239 content = dict(status='ok'))
240 self._initial_exec_lines()
240 self._initial_exec_lines()
241
241
242 def execute_request(self, stream, ident, parent):
242 def execute_request(self, stream, ident, parent):
243 self.log.debug('execute request %s'%parent)
243 self.log.debug('execute request %s'%parent)
244 try:
244 try:
245 code = parent[u'content'][u'code']
245 code = parent[u'content'][u'code']
246 except:
246 except:
247 self.log.error("Got bad msg: %s"%parent, exc_info=True)
247 self.log.error("Got bad msg: %s"%parent, exc_info=True)
248 return
248 return
249 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
249 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
250 ident='%s.pyin'%self.prefix)
250 ident='%s.pyin'%self.prefix)
251 started = datetime.now()
251 started = datetime.now()
252 try:
252 try:
253 comp_code = self.compiler(code, '<zmq-kernel>')
253 comp_code = self.compiler(code, '<zmq-kernel>')
254 # allow for not overriding displayhook
254 # allow for not overriding displayhook
255 if hasattr(sys.displayhook, 'set_parent'):
255 if hasattr(sys.displayhook, 'set_parent'):
256 sys.displayhook.set_parent(parent)
256 sys.displayhook.set_parent(parent)
257 sys.stdout.set_parent(parent)
257 sys.stdout.set_parent(parent)
258 sys.stderr.set_parent(parent)
258 sys.stderr.set_parent(parent)
259 exec comp_code in self.user_ns, self.user_ns
259 exec comp_code in self.user_ns, self.user_ns
260 except:
260 except:
261 exc_content = self._wrap_exception('execute')
261 exc_content = self._wrap_exception('execute')
262 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
262 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
263 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
263 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
264 ident='%s.pyerr'%self.prefix)
264 ident='%s.pyerr'%self.prefix)
265 reply_content = exc_content
265 reply_content = exc_content
266 else:
266 else:
267 reply_content = {'status' : 'ok'}
267 reply_content = {'status' : 'ok'}
268
268
269 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
269 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
270 ident=ident, subheader = dict(started=started))
270 ident=ident, subheader = dict(started=started))
271 self.log.debug(str(reply_msg))
271 self.log.debug(str(reply_msg))
272 if reply_msg['content']['status'] == u'error':
272 if reply_msg['content']['status'] == u'error':
273 self.abort_queues()
273 self.abort_queues()
274
274
275 def complete_request(self, stream, ident, parent):
275 def complete_request(self, stream, ident, parent):
276 matches = {'matches' : self.complete(parent),
276 matches = {'matches' : self.complete(parent),
277 'status' : 'ok'}
277 'status' : 'ok'}
278 completion_msg = self.session.send(stream, 'complete_reply',
278 completion_msg = self.session.send(stream, 'complete_reply',
279 matches, parent, ident)
279 matches, parent, ident)
280 # print >> sys.__stdout__, completion_msg
280 # print >> sys.__stdout__, completion_msg
281
281
282 def complete(self, msg):
282 def complete(self, msg):
283 return self.completer.complete(msg.content.line, msg.content.text)
283 return self.completer.complete(msg.content.line, msg.content.text)
284
284
285 def apply_request(self, stream, ident, parent):
285 def apply_request(self, stream, ident, parent):
286 # flush previous reply, so this request won't block it
286 # flush previous reply, so this request won't block it
287 stream.flush(zmq.POLLOUT)
287 stream.flush(zmq.POLLOUT)
288
288
289 try:
289 try:
290 content = parent[u'content']
290 content = parent[u'content']
291 bufs = parent[u'buffers']
291 bufs = parent[u'buffers']
292 msg_id = parent['header']['msg_id']
292 msg_id = parent['header']['msg_id']
293 # bound = parent['header'].get('bound', False)
293 # bound = parent['header'].get('bound', False)
294 except:
294 except:
295 self.log.error("Got bad msg: %s"%parent, exc_info=True)
295 self.log.error("Got bad msg: %s"%parent, exc_info=True)
296 return
296 return
297 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
297 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
298 # self.iopub_stream.send(pyin_msg)
298 # self.iopub_stream.send(pyin_msg)
299 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
299 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
300 sub = {'dependencies_met' : True, 'engine' : self.ident,
300 sub = {'dependencies_met' : True, 'engine' : self.ident,
301 'started': datetime.now()}
301 'started': datetime.now()}
302 try:
302 try:
303 # allow for not overriding displayhook
303 # allow for not overriding displayhook
304 if hasattr(sys.displayhook, 'set_parent'):
304 if hasattr(sys.displayhook, 'set_parent'):
305 sys.displayhook.set_parent(parent)
305 sys.displayhook.set_parent(parent)
306 sys.stdout.set_parent(parent)
306 sys.stdout.set_parent(parent)
307 sys.stderr.set_parent(parent)
307 sys.stderr.set_parent(parent)
308 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
308 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
309 working = self.user_ns
309 working = self.user_ns
310 # suffix =
310 # suffix =
311 prefix = "_"+str(msg_id).replace("-","")+"_"
311 prefix = "_"+str(msg_id).replace("-","")+"_"
312
312
313 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
313 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
314 # if bound:
314 # if bound:
315 # bound_ns = Namespace(working)
315 # bound_ns = Namespace(working)
316 # args = [bound_ns]+list(args)
316 # args = [bound_ns]+list(args)
317
317
318 fname = getattr(f, '__name__', 'f')
318 fname = getattr(f, '__name__', 'f')
319
319
320 fname = prefix+"f"
320 fname = prefix+"f"
321 argname = prefix+"args"
321 argname = prefix+"args"
322 kwargname = prefix+"kwargs"
322 kwargname = prefix+"kwargs"
323 resultname = prefix+"result"
323 resultname = prefix+"result"
324
324
325 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
325 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
326 # print ns
326 # print ns
327 working.update(ns)
327 working.update(ns)
328 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
328 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
329 try:
329 try:
330 exec code in working,working
330 exec code in working,working
331 result = working.get(resultname)
331 result = working.get(resultname)
332 finally:
332 finally:
333 for key in ns.iterkeys():
333 for key in ns.iterkeys():
334 working.pop(key)
334 working.pop(key)
335 # if bound:
335 # if bound:
336 # working.update(bound_ns)
336 # working.update(bound_ns)
337
337
338 packed_result,buf = serialize_object(result)
338 packed_result,buf = serialize_object(result)
339 result_buf = [packed_result]+buf
339 result_buf = [packed_result]+buf
340 except:
340 except:
341 exc_content = self._wrap_exception('apply')
341 exc_content = self._wrap_exception('apply')
342 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
342 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
343 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
343 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
344 ident='%s.pyerr'%self.prefix)
344 ident='%s.pyerr'%self.prefix)
345 reply_content = exc_content
345 reply_content = exc_content
346 result_buf = []
346 result_buf = []
347
347
348 if exc_content['ename'] == 'UnmetDependency':
348 if exc_content['ename'] == 'UnmetDependency':
349 sub['dependencies_met'] = False
349 sub['dependencies_met'] = False
350 else:
350 else:
351 reply_content = {'status' : 'ok'}
351 reply_content = {'status' : 'ok'}
352
352
353 # put 'ok'/'error' status in header, for scheduler introspection:
353 # put 'ok'/'error' status in header, for scheduler introspection:
354 sub['status'] = reply_content['status']
354 sub['status'] = reply_content['status']
355
355
356 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
356 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
357 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
357 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
358
358
359 # flush i/o
359 # flush i/o
360 # should this be before reply_msg is sent, like in the single-kernel code,
360 # should this be before reply_msg is sent, like in the single-kernel code,
361 # or should nothing get in the way of real results?
361 # or should nothing get in the way of real results?
362 sys.stdout.flush()
362 sys.stdout.flush()
363 sys.stderr.flush()
363 sys.stderr.flush()
364
364
365 def dispatch_queue(self, stream, msg):
365 def dispatch_queue(self, stream, msg):
366 self.control_stream.flush()
366 self.control_stream.flush()
367 idents,msg = self.session.feed_identities(msg, copy=False)
367 idents,msg = self.session.feed_identities(msg, copy=False)
368 try:
368 try:
369 msg = self.session.unpack_message(msg, content=True, copy=False)
369 msg = self.session.unpack_message(msg, content=True, copy=False)
370 except:
370 except:
371 self.log.error("Invalid Message", exc_info=True)
371 self.log.error("Invalid Message", exc_info=True)
372 return
372 return
373
373
374
374
375 header = msg['header']
375 header = msg['header']
376 msg_id = header['msg_id']
376 msg_id = header['msg_id']
377 if self.check_aborted(msg_id):
377 if self.check_aborted(msg_id):
378 self.aborted.remove(msg_id)
378 self.aborted.remove(msg_id)
379 # is it safe to assume a msg_id will not be resubmitted?
379 # is it safe to assume a msg_id will not be resubmitted?
380 reply_type = msg['msg_type'].split('_')[0] + '_reply'
380 reply_type = msg['msg_type'].split('_')[0] + '_reply'
381 status = {'status' : 'aborted'}
381 status = {'status' : 'aborted'}
382 reply_msg = self.session.send(stream, reply_type, subheader=status,
382 reply_msg = self.session.send(stream, reply_type, subheader=status,
383 content=status, parent=msg, ident=idents)
383 content=status, parent=msg, ident=idents)
384 return
384 return
385 handler = self.shell_handlers.get(msg['msg_type'], None)
385 handler = self.shell_handlers.get(msg['msg_type'], None)
386 if handler is None:
386 if handler is None:
387 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
387 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
388 else:
388 else:
389 handler(stream, idents, msg)
389 handler(stream, idents, msg)
390
390
391 def start(self):
391 def start(self):
392 #### stream mode:
392 #### stream mode:
393 if self.control_stream:
393 if self.control_stream:
394 self.control_stream.on_recv(self.dispatch_control, copy=False)
394 self.control_stream.on_recv(self.dispatch_control, copy=False)
395 self.control_stream.on_err(printer)
395 self.control_stream.on_err(printer)
396
396
397 def make_dispatcher(stream):
397 def make_dispatcher(stream):
398 def dispatcher(msg):
398 def dispatcher(msg):
399 return self.dispatch_queue(stream, msg)
399 return self.dispatch_queue(stream, msg)
400 return dispatcher
400 return dispatcher
401
401
402 for s in self.shell_streams:
402 for s in self.shell_streams:
403 s.on_recv(make_dispatcher(s), copy=False)
403 s.on_recv(make_dispatcher(s), copy=False)
404 s.on_err(printer)
404 s.on_err(printer)
405
405
406 if self.iopub_stream:
406 if self.iopub_stream:
407 self.iopub_stream.on_err(printer)
407 self.iopub_stream.on_err(printer)
408
408
409 #### while True mode:
409 #### while True mode:
410 # while True:
410 # while True:
411 # idle = True
411 # idle = True
412 # try:
412 # try:
413 # msg = self.shell_stream.socket.recv_multipart(
413 # msg = self.shell_stream.socket.recv_multipart(
414 # zmq.NOBLOCK, copy=False)
414 # zmq.NOBLOCK, copy=False)
415 # except zmq.ZMQError, e:
415 # except zmq.ZMQError, e:
416 # if e.errno != zmq.EAGAIN:
416 # if e.errno != zmq.EAGAIN:
417 # raise e
417 # raise e
418 # else:
418 # else:
419 # idle=False
419 # idle=False
420 # self.dispatch_queue(self.shell_stream, msg)
420 # self.dispatch_queue(self.shell_stream, msg)
421 #
421 #
422 # if not self.task_stream.empty():
422 # if not self.task_stream.empty():
423 # idle=False
423 # idle=False
424 # msg = self.task_stream.recv_multipart()
424 # msg = self.task_stream.recv_multipart()
425 # self.dispatch_queue(self.task_stream, msg)
425 # self.dispatch_queue(self.task_stream, msg)
426 # if idle:
426 # if idle:
427 # # don't busywait
427 # # don't busywait
428 # time.sleep(1e-3)
428 # time.sleep(1e-3)
429
429
General Comments 0
You need to be logged in to leave comments. Login now