##// END OF EJS Templates
properly handle nothing to recv in StreamKernel.abort
MinRK -
Show More
@@ -1,433 +1,422 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010-2011 The IPython Development Team
6 # Copyright (C) 2010-2011 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
9 # the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 # Standard library imports.
16 # Standard library imports.
17 from __future__ import print_function
17 from __future__ import print_function
18
18
19 import sys
19 import sys
20 import time
20 import time
21
21
22 from code import CommandCompiler
22 from code import CommandCompiler
23 from datetime import datetime
23 from datetime import datetime
24 from pprint import pprint
24 from pprint import pprint
25
25
26 # System library imports.
26 # System library imports.
27 import zmq
27 import zmq
28 from zmq.eventloop import ioloop, zmqstream
28 from zmq.eventloop import ioloop, zmqstream
29
29
30 # Local imports.
30 # Local imports.
31 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode
31 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode
32 from IPython.zmq.completer import KernelCompleter
32 from IPython.zmq.completer import KernelCompleter
33
33
34 from IPython.parallel.error import wrap_exception
34 from IPython.parallel.error import wrap_exception
35 from IPython.parallel.factory import SessionFactory
35 from IPython.parallel.factory import SessionFactory
36 from IPython.parallel.util import serialize_object, unpack_apply_message
36 from IPython.parallel.util import serialize_object, unpack_apply_message
37
37
38 def printer(*args):
38 def printer(*args):
39 pprint(args, stream=sys.__stdout__)
39 pprint(args, stream=sys.__stdout__)
40
40
41
41
42 class _Passer(zmqstream.ZMQStream):
42 class _Passer(zmqstream.ZMQStream):
43 """Empty class that implements `send()` that does nothing.
43 """Empty class that implements `send()` that does nothing.
44
44
45 Subclass ZMQStream for Session typechecking
45 Subclass ZMQStream for Session typechecking
46
46
47 """
47 """
48 def __init__(self, *args, **kwargs):
48 def __init__(self, *args, **kwargs):
49 pass
49 pass
50
50
51 def send(self, *args, **kwargs):
51 def send(self, *args, **kwargs):
52 pass
52 pass
53 send_multipart = send
53 send_multipart = send
54
54
55
55
56 #-----------------------------------------------------------------------------
56 #-----------------------------------------------------------------------------
57 # Main kernel class
57 # Main kernel class
58 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
59
59
60 class Kernel(SessionFactory):
60 class Kernel(SessionFactory):
61
61
62 #---------------------------------------------------------------------------
62 #---------------------------------------------------------------------------
63 # Kernel interface
63 # Kernel interface
64 #---------------------------------------------------------------------------
64 #---------------------------------------------------------------------------
65
65
66 # kwargs:
66 # kwargs:
67 exec_lines = List(Unicode, config=True,
67 exec_lines = List(Unicode, config=True,
68 help="List of lines to execute")
68 help="List of lines to execute")
69
69
70 int_id = Int(-1)
70 int_id = Int(-1)
71 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
71 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
72
72
73 control_stream = Instance(zmqstream.ZMQStream)
73 control_stream = Instance(zmqstream.ZMQStream)
74 task_stream = Instance(zmqstream.ZMQStream)
74 task_stream = Instance(zmqstream.ZMQStream)
75 iopub_stream = Instance(zmqstream.ZMQStream)
75 iopub_stream = Instance(zmqstream.ZMQStream)
76 client = Instance('IPython.parallel.Client')
76 client = Instance('IPython.parallel.Client')
77
77
78 # internals
78 # internals
79 shell_streams = List()
79 shell_streams = List()
80 compiler = Instance(CommandCompiler, (), {})
80 compiler = Instance(CommandCompiler, (), {})
81 completer = Instance(KernelCompleter)
81 completer = Instance(KernelCompleter)
82
82
83 aborted = Set()
83 aborted = Set()
84 shell_handlers = Dict()
84 shell_handlers = Dict()
85 control_handlers = Dict()
85 control_handlers = Dict()
86
86
87 def _set_prefix(self):
87 def _set_prefix(self):
88 self.prefix = "engine.%s"%self.int_id
88 self.prefix = "engine.%s"%self.int_id
89
89
90 def _connect_completer(self):
90 def _connect_completer(self):
91 self.completer = KernelCompleter(self.user_ns)
91 self.completer = KernelCompleter(self.user_ns)
92
92
93 def __init__(self, **kwargs):
93 def __init__(self, **kwargs):
94 super(Kernel, self).__init__(**kwargs)
94 super(Kernel, self).__init__(**kwargs)
95 self._set_prefix()
95 self._set_prefix()
96 self._connect_completer()
96 self._connect_completer()
97
97
98 self.on_trait_change(self._set_prefix, 'id')
98 self.on_trait_change(self._set_prefix, 'id')
99 self.on_trait_change(self._connect_completer, 'user_ns')
99 self.on_trait_change(self._connect_completer, 'user_ns')
100
100
101 # Build dict of handlers for message types
101 # Build dict of handlers for message types
102 for msg_type in ['execute_request', 'complete_request', 'apply_request',
102 for msg_type in ['execute_request', 'complete_request', 'apply_request',
103 'clear_request']:
103 'clear_request']:
104 self.shell_handlers[msg_type] = getattr(self, msg_type)
104 self.shell_handlers[msg_type] = getattr(self, msg_type)
105
105
106 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
106 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
107 self.control_handlers[msg_type] = getattr(self, msg_type)
107 self.control_handlers[msg_type] = getattr(self, msg_type)
108
108
109 self._initial_exec_lines()
109 self._initial_exec_lines()
110
110
111 def _wrap_exception(self, method=None):
111 def _wrap_exception(self, method=None):
112 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
112 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
113 content=wrap_exception(e_info)
113 content=wrap_exception(e_info)
114 return content
114 return content
115
115
116 def _initial_exec_lines(self):
116 def _initial_exec_lines(self):
117 s = _Passer()
117 s = _Passer()
118 content = dict(silent=True, user_variable=[],user_expressions=[])
118 content = dict(silent=True, user_variable=[],user_expressions=[])
119 for line in self.exec_lines:
119 for line in self.exec_lines:
120 self.log.debug("executing initialization: %s"%line)
120 self.log.debug("executing initialization: %s"%line)
121 content.update({'code':line})
121 content.update({'code':line})
122 msg = self.session.msg('execute_request', content)
122 msg = self.session.msg('execute_request', content)
123 self.execute_request(s, [], msg)
123 self.execute_request(s, [], msg)
124
124
125
125
126 #-------------------- control handlers -----------------------------
126 #-------------------- control handlers -----------------------------
127 def abort_queues(self):
127 def abort_queues(self):
128 for stream in self.shell_streams:
128 for stream in self.shell_streams:
129 if stream:
129 if stream:
130 self.abort_queue(stream)
130 self.abort_queue(stream)
131
131
132 def abort_queue(self, stream):
132 def abort_queue(self, stream):
133 while True:
133 while True:
134 try:
134 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
135 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
135 if msg is None:
136 except zmq.ZMQError as e:
136 return
137 if e.errno == zmq.EAGAIN:
138 break
139 else:
140 return
141 else:
142 if msg is None:
143 return
144 else:
145 idents,msg = msg
146
137
147 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
148 # msg = self.reply_socket.recv_json()
149 self.log.info("Aborting:")
138 self.log.info("Aborting:")
150 self.log.info(str(msg))
139 self.log.info(str(msg))
151 msg_type = msg['msg_type']
140 msg_type = msg['msg_type']
152 reply_type = msg_type.split('_')[0] + '_reply'
141 reply_type = msg_type.split('_')[0] + '_reply'
153 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
142 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
154 # self.reply_socket.send(ident,zmq.SNDMORE)
143 # self.reply_socket.send(ident,zmq.SNDMORE)
155 # self.reply_socket.send_json(reply_msg)
144 # self.reply_socket.send_json(reply_msg)
156 reply_msg = self.session.send(stream, reply_type,
145 reply_msg = self.session.send(stream, reply_type,
157 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
146 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
158 self.log.debug(str(reply_msg))
147 self.log.debug(str(reply_msg))
159 # We need to wait a bit for requests to come in. This can probably
148 # We need to wait a bit for requests to come in. This can probably
160 # be set shorter for true asynchronous clients.
149 # be set shorter for true asynchronous clients.
161 time.sleep(0.05)
150 time.sleep(0.05)
162
151
163 def abort_request(self, stream, ident, parent):
152 def abort_request(self, stream, ident, parent):
164 """abort a specifig msg by id"""
153 """abort a specifig msg by id"""
165 msg_ids = parent['content'].get('msg_ids', None)
154 msg_ids = parent['content'].get('msg_ids', None)
166 if isinstance(msg_ids, basestring):
155 if isinstance(msg_ids, basestring):
167 msg_ids = [msg_ids]
156 msg_ids = [msg_ids]
168 if not msg_ids:
157 if not msg_ids:
169 self.abort_queues()
158 self.abort_queues()
170 for mid in msg_ids:
159 for mid in msg_ids:
171 self.aborted.add(str(mid))
160 self.aborted.add(str(mid))
172
161
173 content = dict(status='ok')
162 content = dict(status='ok')
174 reply_msg = self.session.send(stream, 'abort_reply', content=content,
163 reply_msg = self.session.send(stream, 'abort_reply', content=content,
175 parent=parent, ident=ident)
164 parent=parent, ident=ident)
176 self.log.debug(str(reply_msg))
165 self.log.debug(str(reply_msg))
177
166
178 def shutdown_request(self, stream, ident, parent):
167 def shutdown_request(self, stream, ident, parent):
179 """kill ourself. This should really be handled in an external process"""
168 """kill ourself. This should really be handled in an external process"""
180 try:
169 try:
181 self.abort_queues()
170 self.abort_queues()
182 except:
171 except:
183 content = self._wrap_exception('shutdown')
172 content = self._wrap_exception('shutdown')
184 else:
173 else:
185 content = dict(parent['content'])
174 content = dict(parent['content'])
186 content['status'] = 'ok'
175 content['status'] = 'ok'
187 msg = self.session.send(stream, 'shutdown_reply',
176 msg = self.session.send(stream, 'shutdown_reply',
188 content=content, parent=parent, ident=ident)
177 content=content, parent=parent, ident=ident)
189 self.log.debug(str(msg))
178 self.log.debug(str(msg))
190 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
179 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
191 dc.start()
180 dc.start()
192
181
193 def dispatch_control(self, msg):
182 def dispatch_control(self, msg):
194 idents,msg = self.session.feed_identities(msg, copy=False)
183 idents,msg = self.session.feed_identities(msg, copy=False)
195 try:
184 try:
196 msg = self.session.unpack_message(msg, content=True, copy=False)
185 msg = self.session.unpack_message(msg, content=True, copy=False)
197 except:
186 except:
198 self.log.error("Invalid Message", exc_info=True)
187 self.log.error("Invalid Message", exc_info=True)
199 return
188 return
200
189
201 header = msg['header']
190 header = msg['header']
202 msg_id = header['msg_id']
191 msg_id = header['msg_id']
203
192
204 handler = self.control_handlers.get(msg['msg_type'], None)
193 handler = self.control_handlers.get(msg['msg_type'], None)
205 if handler is None:
194 if handler is None:
206 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
195 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
207 else:
196 else:
208 handler(self.control_stream, idents, msg)
197 handler(self.control_stream, idents, msg)
209
198
210
199
211 #-------------------- queue helpers ------------------------------
200 #-------------------- queue helpers ------------------------------
212
201
213 def check_dependencies(self, dependencies):
202 def check_dependencies(self, dependencies):
214 if not dependencies:
203 if not dependencies:
215 return True
204 return True
216 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
205 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
217 anyorall = dependencies[0]
206 anyorall = dependencies[0]
218 dependencies = dependencies[1]
207 dependencies = dependencies[1]
219 else:
208 else:
220 anyorall = 'all'
209 anyorall = 'all'
221 results = self.client.get_results(dependencies,status_only=True)
210 results = self.client.get_results(dependencies,status_only=True)
222 if results['status'] != 'ok':
211 if results['status'] != 'ok':
223 return False
212 return False
224
213
225 if anyorall == 'any':
214 if anyorall == 'any':
226 if not results['completed']:
215 if not results['completed']:
227 return False
216 return False
228 else:
217 else:
229 if results['pending']:
218 if results['pending']:
230 return False
219 return False
231
220
232 return True
221 return True
233
222
234 def check_aborted(self, msg_id):
223 def check_aborted(self, msg_id):
235 return msg_id in self.aborted
224 return msg_id in self.aborted
236
225
237 #-------------------- queue handlers -----------------------------
226 #-------------------- queue handlers -----------------------------
238
227
239 def clear_request(self, stream, idents, parent):
228 def clear_request(self, stream, idents, parent):
240 """Clear our namespace."""
229 """Clear our namespace."""
241 self.user_ns = {}
230 self.user_ns = {}
242 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
231 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
243 content = dict(status='ok'))
232 content = dict(status='ok'))
244 self._initial_exec_lines()
233 self._initial_exec_lines()
245
234
246 def execute_request(self, stream, ident, parent):
235 def execute_request(self, stream, ident, parent):
247 self.log.debug('execute request %s'%parent)
236 self.log.debug('execute request %s'%parent)
248 try:
237 try:
249 code = parent[u'content'][u'code']
238 code = parent[u'content'][u'code']
250 except:
239 except:
251 self.log.error("Got bad msg: %s"%parent, exc_info=True)
240 self.log.error("Got bad msg: %s"%parent, exc_info=True)
252 return
241 return
253 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
242 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
254 ident='%s.pyin'%self.prefix)
243 ident='%s.pyin'%self.prefix)
255 started = datetime.now()
244 started = datetime.now()
256 try:
245 try:
257 comp_code = self.compiler(code, '<zmq-kernel>')
246 comp_code = self.compiler(code, '<zmq-kernel>')
258 # allow for not overriding displayhook
247 # allow for not overriding displayhook
259 if hasattr(sys.displayhook, 'set_parent'):
248 if hasattr(sys.displayhook, 'set_parent'):
260 sys.displayhook.set_parent(parent)
249 sys.displayhook.set_parent(parent)
261 sys.stdout.set_parent(parent)
250 sys.stdout.set_parent(parent)
262 sys.stderr.set_parent(parent)
251 sys.stderr.set_parent(parent)
263 exec comp_code in self.user_ns, self.user_ns
252 exec comp_code in self.user_ns, self.user_ns
264 except:
253 except:
265 exc_content = self._wrap_exception('execute')
254 exc_content = self._wrap_exception('execute')
266 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
255 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
267 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
256 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
268 ident='%s.pyerr'%self.prefix)
257 ident='%s.pyerr'%self.prefix)
269 reply_content = exc_content
258 reply_content = exc_content
270 else:
259 else:
271 reply_content = {'status' : 'ok'}
260 reply_content = {'status' : 'ok'}
272
261
273 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
262 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
274 ident=ident, subheader = dict(started=started))
263 ident=ident, subheader = dict(started=started))
275 self.log.debug(str(reply_msg))
264 self.log.debug(str(reply_msg))
276 if reply_msg['content']['status'] == u'error':
265 if reply_msg['content']['status'] == u'error':
277 self.abort_queues()
266 self.abort_queues()
278
267
279 def complete_request(self, stream, ident, parent):
268 def complete_request(self, stream, ident, parent):
280 matches = {'matches' : self.complete(parent),
269 matches = {'matches' : self.complete(parent),
281 'status' : 'ok'}
270 'status' : 'ok'}
282 completion_msg = self.session.send(stream, 'complete_reply',
271 completion_msg = self.session.send(stream, 'complete_reply',
283 matches, parent, ident)
272 matches, parent, ident)
284 # print >> sys.__stdout__, completion_msg
273 # print >> sys.__stdout__, completion_msg
285
274
286 def complete(self, msg):
275 def complete(self, msg):
287 return self.completer.complete(msg.content.line, msg.content.text)
276 return self.completer.complete(msg.content.line, msg.content.text)
288
277
289 def apply_request(self, stream, ident, parent):
278 def apply_request(self, stream, ident, parent):
290 # flush previous reply, so this request won't block it
279 # flush previous reply, so this request won't block it
291 stream.flush(zmq.POLLOUT)
280 stream.flush(zmq.POLLOUT)
292
281
293 try:
282 try:
294 content = parent[u'content']
283 content = parent[u'content']
295 bufs = parent[u'buffers']
284 bufs = parent[u'buffers']
296 msg_id = parent['header']['msg_id']
285 msg_id = parent['header']['msg_id']
297 # bound = parent['header'].get('bound', False)
286 # bound = parent['header'].get('bound', False)
298 except:
287 except:
299 self.log.error("Got bad msg: %s"%parent, exc_info=True)
288 self.log.error("Got bad msg: %s"%parent, exc_info=True)
300 return
289 return
301 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
290 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
302 # self.iopub_stream.send(pyin_msg)
291 # self.iopub_stream.send(pyin_msg)
303 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
292 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
304 sub = {'dependencies_met' : True, 'engine' : self.ident,
293 sub = {'dependencies_met' : True, 'engine' : self.ident,
305 'started': datetime.now()}
294 'started': datetime.now()}
306 try:
295 try:
307 # allow for not overriding displayhook
296 # allow for not overriding displayhook
308 if hasattr(sys.displayhook, 'set_parent'):
297 if hasattr(sys.displayhook, 'set_parent'):
309 sys.displayhook.set_parent(parent)
298 sys.displayhook.set_parent(parent)
310 sys.stdout.set_parent(parent)
299 sys.stdout.set_parent(parent)
311 sys.stderr.set_parent(parent)
300 sys.stderr.set_parent(parent)
312 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
301 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
313 working = self.user_ns
302 working = self.user_ns
314 # suffix =
303 # suffix =
315 prefix = "_"+str(msg_id).replace("-","")+"_"
304 prefix = "_"+str(msg_id).replace("-","")+"_"
316
305
317 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
306 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
318 # if bound:
307 # if bound:
319 # bound_ns = Namespace(working)
308 # bound_ns = Namespace(working)
320 # args = [bound_ns]+list(args)
309 # args = [bound_ns]+list(args)
321
310
322 fname = getattr(f, '__name__', 'f')
311 fname = getattr(f, '__name__', 'f')
323
312
324 fname = prefix+"f"
313 fname = prefix+"f"
325 argname = prefix+"args"
314 argname = prefix+"args"
326 kwargname = prefix+"kwargs"
315 kwargname = prefix+"kwargs"
327 resultname = prefix+"result"
316 resultname = prefix+"result"
328
317
329 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
318 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
330 # print ns
319 # print ns
331 working.update(ns)
320 working.update(ns)
332 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
321 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
333 try:
322 try:
334 exec code in working,working
323 exec code in working,working
335 result = working.get(resultname)
324 result = working.get(resultname)
336 finally:
325 finally:
337 for key in ns.iterkeys():
326 for key in ns.iterkeys():
338 working.pop(key)
327 working.pop(key)
339 # if bound:
328 # if bound:
340 # working.update(bound_ns)
329 # working.update(bound_ns)
341
330
342 packed_result,buf = serialize_object(result)
331 packed_result,buf = serialize_object(result)
343 result_buf = [packed_result]+buf
332 result_buf = [packed_result]+buf
344 except:
333 except:
345 exc_content = self._wrap_exception('apply')
334 exc_content = self._wrap_exception('apply')
346 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
335 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
347 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
336 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
348 ident='%s.pyerr'%self.prefix)
337 ident='%s.pyerr'%self.prefix)
349 reply_content = exc_content
338 reply_content = exc_content
350 result_buf = []
339 result_buf = []
351
340
352 if exc_content['ename'] == 'UnmetDependency':
341 if exc_content['ename'] == 'UnmetDependency':
353 sub['dependencies_met'] = False
342 sub['dependencies_met'] = False
354 else:
343 else:
355 reply_content = {'status' : 'ok'}
344 reply_content = {'status' : 'ok'}
356
345
357 # put 'ok'/'error' status in header, for scheduler introspection:
346 # put 'ok'/'error' status in header, for scheduler introspection:
358 sub['status'] = reply_content['status']
347 sub['status'] = reply_content['status']
359
348
360 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
349 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
361 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
350 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
362
351
363 # flush i/o
352 # flush i/o
364 # should this be before reply_msg is sent, like in the single-kernel code,
353 # should this be before reply_msg is sent, like in the single-kernel code,
365 # or should nothing get in the way of real results?
354 # or should nothing get in the way of real results?
366 sys.stdout.flush()
355 sys.stdout.flush()
367 sys.stderr.flush()
356 sys.stderr.flush()
368
357
369 def dispatch_queue(self, stream, msg):
358 def dispatch_queue(self, stream, msg):
370 self.control_stream.flush()
359 self.control_stream.flush()
371 idents,msg = self.session.feed_identities(msg, copy=False)
360 idents,msg = self.session.feed_identities(msg, copy=False)
372 try:
361 try:
373 msg = self.session.unpack_message(msg, content=True, copy=False)
362 msg = self.session.unpack_message(msg, content=True, copy=False)
374 except:
363 except:
375 self.log.error("Invalid Message", exc_info=True)
364 self.log.error("Invalid Message", exc_info=True)
376 return
365 return
377
366
378
367
379 header = msg['header']
368 header = msg['header']
380 msg_id = header['msg_id']
369 msg_id = header['msg_id']
381 if self.check_aborted(msg_id):
370 if self.check_aborted(msg_id):
382 self.aborted.remove(msg_id)
371 self.aborted.remove(msg_id)
383 # is it safe to assume a msg_id will not be resubmitted?
372 # is it safe to assume a msg_id will not be resubmitted?
384 reply_type = msg['msg_type'].split('_')[0] + '_reply'
373 reply_type = msg['msg_type'].split('_')[0] + '_reply'
385 status = {'status' : 'aborted'}
374 status = {'status' : 'aborted'}
386 reply_msg = self.session.send(stream, reply_type, subheader=status,
375 reply_msg = self.session.send(stream, reply_type, subheader=status,
387 content=status, parent=msg, ident=idents)
376 content=status, parent=msg, ident=idents)
388 return
377 return
389 handler = self.shell_handlers.get(msg['msg_type'], None)
378 handler = self.shell_handlers.get(msg['msg_type'], None)
390 if handler is None:
379 if handler is None:
391 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
380 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
392 else:
381 else:
393 handler(stream, idents, msg)
382 handler(stream, idents, msg)
394
383
395 def start(self):
384 def start(self):
396 #### stream mode:
385 #### stream mode:
397 if self.control_stream:
386 if self.control_stream:
398 self.control_stream.on_recv(self.dispatch_control, copy=False)
387 self.control_stream.on_recv(self.dispatch_control, copy=False)
399 self.control_stream.on_err(printer)
388 self.control_stream.on_err(printer)
400
389
401 def make_dispatcher(stream):
390 def make_dispatcher(stream):
402 def dispatcher(msg):
391 def dispatcher(msg):
403 return self.dispatch_queue(stream, msg)
392 return self.dispatch_queue(stream, msg)
404 return dispatcher
393 return dispatcher
405
394
406 for s in self.shell_streams:
395 for s in self.shell_streams:
407 s.on_recv(make_dispatcher(s), copy=False)
396 s.on_recv(make_dispatcher(s), copy=False)
408 s.on_err(printer)
397 s.on_err(printer)
409
398
410 if self.iopub_stream:
399 if self.iopub_stream:
411 self.iopub_stream.on_err(printer)
400 self.iopub_stream.on_err(printer)
412
401
413 #### while True mode:
402 #### while True mode:
414 # while True:
403 # while True:
415 # idle = True
404 # idle = True
416 # try:
405 # try:
417 # msg = self.shell_stream.socket.recv_multipart(
406 # msg = self.shell_stream.socket.recv_multipart(
418 # zmq.NOBLOCK, copy=False)
407 # zmq.NOBLOCK, copy=False)
419 # except zmq.ZMQError, e:
408 # except zmq.ZMQError, e:
420 # if e.errno != zmq.EAGAIN:
409 # if e.errno != zmq.EAGAIN:
421 # raise e
410 # raise e
422 # else:
411 # else:
423 # idle=False
412 # idle=False
424 # self.dispatch_queue(self.shell_stream, msg)
413 # self.dispatch_queue(self.shell_stream, msg)
425 #
414 #
426 # if not self.task_stream.empty():
415 # if not self.task_stream.empty():
427 # idle=False
416 # idle=False
428 # msg = self.task_stream.recv_multipart()
417 # msg = self.task_stream.recv_multipart()
429 # self.dispatch_queue(self.task_stream, msg)
418 # self.dispatch_queue(self.task_stream, msg)
430 # if idle:
419 # if idle:
431 # # don't busywait
420 # # don't busywait
432 # time.sleep(1e-3)
421 # time.sleep(1e-3)
433
422
General Comments 0
You need to be logged in to leave comments. Login now