##// END OF EJS Templates
Merge pull request #1374 from minrk/on_err...
Brian E. Granger -
r6079:64c7b29c merge
parent child Browse files
Show More
@@ -1,439 +1,414
1 1 """
2 2 Kernel adapted from kernel.py to use ZMQ Streams
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 * Brian Granger
8 8 * Fernando Perez
9 9 * Evan Patterson
10 10 """
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2010-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 # Standard library imports.
23 23 from __future__ import print_function
24 24
25 25 import sys
26 26 import time
27 27
28 28 from code import CommandCompiler
29 29 from datetime import datetime
30 30 from pprint import pprint
31 31
32 32 # System library imports.
33 33 import zmq
34 34 from zmq.eventloop import ioloop, zmqstream
35 35
36 36 # Local imports.
37 37 from IPython.utils.traitlets import Instance, List, Integer, Dict, Set, Unicode, CBytes
38 38 from IPython.zmq.completer import KernelCompleter
39 39
40 40 from IPython.parallel.error import wrap_exception
41 41 from IPython.parallel.factory import SessionFactory
42 42 from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
43 43
44 44 def printer(*args):
45 45 pprint(args, stream=sys.__stdout__)
46 46
47 47
48 48 class _Passer(zmqstream.ZMQStream):
49 49 """Empty class that implements `send()` that does nothing.
50 50
51 51 Subclass ZMQStream for Session typechecking
52 52
53 53 """
54 54 def __init__(self, *args, **kwargs):
55 55 pass
56 56
57 57 def send(self, *args, **kwargs):
58 58 pass
59 59 send_multipart = send
60 60
61 61
62 62 #-----------------------------------------------------------------------------
63 63 # Main kernel class
64 64 #-----------------------------------------------------------------------------
65 65
66 66 class Kernel(SessionFactory):
67 67
68 68 #---------------------------------------------------------------------------
69 69 # Kernel interface
70 70 #---------------------------------------------------------------------------
71 71
72 72 # kwargs:
73 73 exec_lines = List(Unicode, config=True,
74 74 help="List of lines to execute")
75 75
76 76 # identities:
77 77 int_id = Integer(-1)
78 78 bident = CBytes()
79 79 ident = Unicode()
80 80 def _ident_changed(self, name, old, new):
81 81 self.bident = asbytes(new)
82 82
83 83 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
84 84
85 85 control_stream = Instance(zmqstream.ZMQStream)
86 86 task_stream = Instance(zmqstream.ZMQStream)
87 87 iopub_stream = Instance(zmqstream.ZMQStream)
88 88 client = Instance('IPython.parallel.Client')
89 89
90 90 # internals
91 91 shell_streams = List()
92 92 compiler = Instance(CommandCompiler, (), {})
93 93 completer = Instance(KernelCompleter)
94 94
95 95 aborted = Set()
96 96 shell_handlers = Dict()
97 97 control_handlers = Dict()
98 98
99 99 def _set_prefix(self):
100 100 self.prefix = "engine.%s"%self.int_id
101 101
102 102 def _connect_completer(self):
103 103 self.completer = KernelCompleter(self.user_ns)
104 104
105 105 def __init__(self, **kwargs):
106 106 super(Kernel, self).__init__(**kwargs)
107 107 self._set_prefix()
108 108 self._connect_completer()
109 109
110 110 self.on_trait_change(self._set_prefix, 'id')
111 111 self.on_trait_change(self._connect_completer, 'user_ns')
112 112
113 113 # Build dict of handlers for message types
114 114 for msg_type in ['execute_request', 'complete_request', 'apply_request',
115 115 'clear_request']:
116 116 self.shell_handlers[msg_type] = getattr(self, msg_type)
117 117
118 118 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
119 119 self.control_handlers[msg_type] = getattr(self, msg_type)
120 120
121 121 self._initial_exec_lines()
122 122
123 123 def _wrap_exception(self, method=None):
124 124 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
125 125 content=wrap_exception(e_info)
126 126 return content
127 127
128 128 def _initial_exec_lines(self):
129 129 s = _Passer()
130 130 content = dict(silent=True, user_variable=[],user_expressions=[])
131 131 for line in self.exec_lines:
132 132 self.log.debug("executing initialization: %s"%line)
133 133 content.update({'code':line})
134 134 msg = self.session.msg('execute_request', content)
135 135 self.execute_request(s, [], msg)
136 136
137 137
138 138 #-------------------- control handlers -----------------------------
139 139 def abort_queues(self):
140 140 for stream in self.shell_streams:
141 141 if stream:
142 142 self.abort_queue(stream)
143 143
144 144 def abort_queue(self, stream):
145 145 while True:
146 146 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
147 147 if msg is None:
148 148 return
149 149
150 150 self.log.info("Aborting:")
151 151 self.log.info(str(msg))
152 152 msg_type = msg['header']['msg_type']
153 153 reply_type = msg_type.split('_')[0] + '_reply'
154 154 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
155 155 # self.reply_socket.send(ident,zmq.SNDMORE)
156 156 # self.reply_socket.send_json(reply_msg)
157 157 reply_msg = self.session.send(stream, reply_type,
158 158 content={'status' : 'aborted'}, parent=msg, ident=idents)
159 159 self.log.debug(str(reply_msg))
160 160 # We need to wait a bit for requests to come in. This can probably
161 161 # be set shorter for true asynchronous clients.
162 162 time.sleep(0.05)
163 163
164 164 def abort_request(self, stream, ident, parent):
165 165 """abort a specifig msg by id"""
166 166 msg_ids = parent['content'].get('msg_ids', None)
167 167 if isinstance(msg_ids, basestring):
168 168 msg_ids = [msg_ids]
169 169 if not msg_ids:
170 170 self.abort_queues()
171 171 for mid in msg_ids:
172 172 self.aborted.add(str(mid))
173 173
174 174 content = dict(status='ok')
175 175 reply_msg = self.session.send(stream, 'abort_reply', content=content,
176 176 parent=parent, ident=ident)
177 177 self.log.debug(str(reply_msg))
178 178
179 179 def shutdown_request(self, stream, ident, parent):
180 180 """kill ourself. This should really be handled in an external process"""
181 181 try:
182 182 self.abort_queues()
183 183 except:
184 184 content = self._wrap_exception('shutdown')
185 185 else:
186 186 content = dict(parent['content'])
187 187 content['status'] = 'ok'
188 188 msg = self.session.send(stream, 'shutdown_reply',
189 189 content=content, parent=parent, ident=ident)
190 190 self.log.debug(str(msg))
191 191 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
192 192 dc.start()
193 193
194 194 def dispatch_control(self, msg):
195 195 idents,msg = self.session.feed_identities(msg, copy=False)
196 196 try:
197 197 msg = self.session.unserialize(msg, content=True, copy=False)
198 198 except:
199 199 self.log.error("Invalid Message", exc_info=True)
200 200 return
201 201 else:
202 202 self.log.debug("Control received, %s", msg)
203 203
204 204 header = msg['header']
205 205 msg_id = header['msg_id']
206 206 msg_type = header['msg_type']
207 207
208 208 handler = self.control_handlers.get(msg_type, None)
209 209 if handler is None:
210 210 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg_type)
211 211 else:
212 212 handler(self.control_stream, idents, msg)
213 213
214 214
215 215 #-------------------- queue helpers ------------------------------
216 216
217 217 def check_dependencies(self, dependencies):
218 218 if not dependencies:
219 219 return True
220 220 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
221 221 anyorall = dependencies[0]
222 222 dependencies = dependencies[1]
223 223 else:
224 224 anyorall = 'all'
225 225 results = self.client.get_results(dependencies,status_only=True)
226 226 if results['status'] != 'ok':
227 227 return False
228 228
229 229 if anyorall == 'any':
230 230 if not results['completed']:
231 231 return False
232 232 else:
233 233 if results['pending']:
234 234 return False
235 235
236 236 return True
237 237
238 238 def check_aborted(self, msg_id):
239 239 return msg_id in self.aborted
240 240
241 241 #-------------------- queue handlers -----------------------------
242 242
243 243 def clear_request(self, stream, idents, parent):
244 244 """Clear our namespace."""
245 245 self.user_ns = {}
246 246 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
247 247 content = dict(status='ok'))
248 248 self._initial_exec_lines()
249 249
250 250 def execute_request(self, stream, ident, parent):
251 251 self.log.debug('execute request %s'%parent)
252 252 try:
253 253 code = parent[u'content'][u'code']
254 254 except:
255 255 self.log.error("Got bad msg: %s"%parent, exc_info=True)
256 256 return
257 257 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
258 258 ident=asbytes('%s.pyin'%self.prefix))
259 259 started = datetime.now()
260 260 try:
261 261 comp_code = self.compiler(code, '<zmq-kernel>')
262 262 # allow for not overriding displayhook
263 263 if hasattr(sys.displayhook, 'set_parent'):
264 264 sys.displayhook.set_parent(parent)
265 265 sys.stdout.set_parent(parent)
266 266 sys.stderr.set_parent(parent)
267 267 exec comp_code in self.user_ns, self.user_ns
268 268 except:
269 269 exc_content = self._wrap_exception('execute')
270 270 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
271 271 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
272 272 ident=asbytes('%s.pyerr'%self.prefix))
273 273 reply_content = exc_content
274 274 else:
275 275 reply_content = {'status' : 'ok'}
276 276
277 277 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
278 278 ident=ident, subheader = dict(started=started))
279 279 self.log.debug(str(reply_msg))
280 280 if reply_msg['content']['status'] == u'error':
281 281 self.abort_queues()
282 282
283 283 def complete_request(self, stream, ident, parent):
284 284 matches = {'matches' : self.complete(parent),
285 285 'status' : 'ok'}
286 286 completion_msg = self.session.send(stream, 'complete_reply',
287 287 matches, parent, ident)
288 288 # print >> sys.__stdout__, completion_msg
289 289
290 290 def complete(self, msg):
291 291 return self.completer.complete(msg.content.line, msg.content.text)
292 292
293 293 def apply_request(self, stream, ident, parent):
294 294 # flush previous reply, so this request won't block it
295 295 stream.flush(zmq.POLLOUT)
296 296 try:
297 297 content = parent[u'content']
298 298 bufs = parent[u'buffers']
299 299 msg_id = parent['header']['msg_id']
300 300 # bound = parent['header'].get('bound', False)
301 301 except:
302 302 self.log.error("Got bad msg: %s"%parent, exc_info=True)
303 303 return
304 304 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
305 305 # self.iopub_stream.send(pyin_msg)
306 306 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
307 307 sub = {'dependencies_met' : True, 'engine' : self.ident,
308 308 'started': datetime.now()}
309 309 try:
310 310 # allow for not overriding displayhook
311 311 if hasattr(sys.displayhook, 'set_parent'):
312 312 sys.displayhook.set_parent(parent)
313 313 sys.stdout.set_parent(parent)
314 314 sys.stderr.set_parent(parent)
315 315 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
316 316 working = self.user_ns
317 317 # suffix =
318 318 prefix = "_"+str(msg_id).replace("-","")+"_"
319 319
320 320 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
321 321 # if bound:
322 322 # bound_ns = Namespace(working)
323 323 # args = [bound_ns]+list(args)
324 324
325 325 fname = getattr(f, '__name__', 'f')
326 326
327 327 fname = prefix+"f"
328 328 argname = prefix+"args"
329 329 kwargname = prefix+"kwargs"
330 330 resultname = prefix+"result"
331 331
332 332 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
333 333 # print ns
334 334 working.update(ns)
335 335 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
336 336 try:
337 337 exec code in working,working
338 338 result = working.get(resultname)
339 339 finally:
340 340 for key in ns.iterkeys():
341 341 working.pop(key)
342 342 # if bound:
343 343 # working.update(bound_ns)
344 344
345 345 packed_result,buf = serialize_object(result)
346 346 result_buf = [packed_result]+buf
347 347 except:
348 348 exc_content = self._wrap_exception('apply')
349 349 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
350 350 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
351 351 ident=asbytes('%s.pyerr'%self.prefix))
352 352 reply_content = exc_content
353 353 result_buf = []
354 354
355 355 if exc_content['ename'] == 'UnmetDependency':
356 356 sub['dependencies_met'] = False
357 357 else:
358 358 reply_content = {'status' : 'ok'}
359 359
360 360 # put 'ok'/'error' status in header, for scheduler introspection:
361 361 sub['status'] = reply_content['status']
362 362
363 363 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
364 364 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
365 365
366 366 # flush i/o
367 367 # should this be before reply_msg is sent, like in the single-kernel code,
368 368 # or should nothing get in the way of real results?
369 369 sys.stdout.flush()
370 370 sys.stderr.flush()
371 371
372 372 def dispatch_queue(self, stream, msg):
373 373 self.control_stream.flush()
374 374 idents,msg = self.session.feed_identities(msg, copy=False)
375 375 try:
376 376 msg = self.session.unserialize(msg, content=True, copy=False)
377 377 except:
378 378 self.log.error("Invalid Message", exc_info=True)
379 379 return
380 380 else:
381 381 self.log.debug("Message received, %s", msg)
382 382
383 383
384 384 header = msg['header']
385 385 msg_id = header['msg_id']
386 386 msg_type = msg['header']['msg_type']
387 387 if self.check_aborted(msg_id):
388 388 self.aborted.remove(msg_id)
389 389 # is it safe to assume a msg_id will not be resubmitted?
390 390 reply_type = msg_type.split('_')[0] + '_reply'
391 391 status = {'status' : 'aborted'}
392 392 reply_msg = self.session.send(stream, reply_type, subheader=status,
393 393 content=status, parent=msg, ident=idents)
394 394 return
395 395 handler = self.shell_handlers.get(msg_type, None)
396 396 if handler is None:
397 397 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg_type)
398 398 else:
399 399 handler(stream, idents, msg)
400 400
401 401 def start(self):
402 402 #### stream mode:
403 403 if self.control_stream:
404 404 self.control_stream.on_recv(self.dispatch_control, copy=False)
405 self.control_stream.on_err(printer)
406 405
407 406 def make_dispatcher(stream):
408 407 def dispatcher(msg):
409 408 return self.dispatch_queue(stream, msg)
410 409 return dispatcher
411 410
412 411 for s in self.shell_streams:
413 412 s.on_recv(make_dispatcher(s), copy=False)
414 s.on_err(printer)
415
416 if self.iopub_stream:
417 self.iopub_stream.on_err(printer)
418
419 #### while True mode:
420 # while True:
421 # idle = True
422 # try:
423 # msg = self.shell_stream.socket.recv_multipart(
424 # zmq.NOBLOCK, copy=False)
425 # except zmq.ZMQError, e:
426 # if e.errno != zmq.EAGAIN:
427 # raise e
428 # else:
429 # idle=False
430 # self.dispatch_queue(self.shell_stream, msg)
431 #
432 # if not self.task_stream.empty():
433 # idle=False
434 # msg = self.task_stream.recv_multipart()
435 # self.dispatch_queue(self.task_stream, msg)
436 # if idle:
437 # # don't busywait
438 # time.sleep(1e-3)
413
439 414
General Comments 0
You need to be logged in to leave comments. Login now