##// END OF EJS Templates
drop in apply_request handler in zmq.ipkernel...
MinRK -
Show More
@@ -136,14 +136,22 b' class Kernel(Configurable):'
136 # Build dict of handlers for message types
136 # Build dict of handlers for message types
137 msg_types = [ 'execute_request', 'complete_request',
137 msg_types = [ 'execute_request', 'complete_request',
138 'object_info_request', 'history_request',
138 'object_info_request', 'history_request',
139 'connect_request', 'shutdown_request']
139 'connect_request', 'shutdown_request',
140 'apply_request',
141 ]
140 self.handlers = {}
142 self.handlers = {}
141 for msg_type in msg_types:
143 for msg_type in msg_types:
142 self.handlers[msg_type] = getattr(self, msg_type)
144 self.handlers[msg_type] = getattr(self, msg_type)
143
145
146 control_msg_types = [ 'clear_request', 'abort_request' ]
147 self.control_handlers = {}
148 for msg_type in control_msg_types:
149 self.control_handlers[msg_type] = getattr(self, msg_type)
150
144 def do_one_iteration(self):
151 def do_one_iteration(self):
145 """Do one iteration of the kernel's evaluation loop.
152 """Do one iteration of the kernel's evaluation loop.
146 """
153 """
154
147 try:
155 try:
148 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
156 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
149 except Exception:
157 except Exception:
@@ -436,6 +444,139 b' class Kernel(Configurable):'
436 sys.exit(0)
444 sys.exit(0)
437
445
438 #---------------------------------------------------------------------------
446 #---------------------------------------------------------------------------
447 # Engine methods
448 #---------------------------------------------------------------------------
449
450 def apply_request(self, stream, ident, parent):
451 # flush previous reply, so this request won't block it
452 stream.flush(zmq.POLLOUT)
453 try:
454 content = parent[u'content']
455 bufs = parent[u'buffers']
456 msg_id = parent['header']['msg_id']
457 # bound = parent['header'].get('bound', False)
458 except:
459 self.log.error("Got bad msg: %s"%parent, exc_info=True)
460 return
461 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
462 # self.iopub_stream.send(pyin_msg)
463 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
464 sub = {'dependencies_met' : True, 'engine' : self.ident,
465 'started': datetime.now()}
466 try:
467 # allow for not overriding displayhook
468 if hasattr(sys.displayhook, 'set_parent'):
469 sys.displayhook.set_parent(parent)
470 sys.stdout.set_parent(parent)
471 sys.stderr.set_parent(parent)
472 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
473 working = self.user_ns
474 # suffix =
475 prefix = "_"+str(msg_id).replace("-","")+"_"
476
477 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
478 # if bound:
479 # bound_ns = Namespace(working)
480 # args = [bound_ns]+list(args)
481
482 fname = getattr(f, '__name__', 'f')
483
484 fname = prefix+"f"
485 argname = prefix+"args"
486 kwargname = prefix+"kwargs"
487 resultname = prefix+"result"
488
489 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
490 # print ns
491 working.update(ns)
492 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
493 try:
494 exec code in working,working
495 result = working.get(resultname)
496 finally:
497 for key in ns.iterkeys():
498 working.pop(key)
499 # if bound:
500 # working.update(bound_ns)
501
502 packed_result,buf = serialize_object(result)
503 result_buf = [packed_result]+buf
504 except:
505 exc_content = self._wrap_exception('apply')
506 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
507 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
508 ident=asbytes('%s.pyerr'%self.prefix))
509 reply_content = exc_content
510 result_buf = []
511
512 if exc_content['ename'] == 'UnmetDependency':
513 sub['dependencies_met'] = False
514 else:
515 reply_content = {'status' : 'ok'}
516
517 # put 'ok'/'error' status in header, for scheduler introspection:
518 sub['status'] = reply_content['status']
519
520 # flush i/o
521 sys.stdout.flush()
522 sys.stderr.flush()
523
524 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
525 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
526
527 #---------------------------------------------------------------------------
528 # Control messages
529 #---------------------------------------------------------------------------
530
531 def abort_queues(self):
532 for stream in self.shell_streams:
533 if stream:
534 self.abort_queue(stream)
535
536 def abort_queue(self, stream):
537 while True:
538 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
539 if msg is None:
540 return
541
542 self.log.info("Aborting:")
543 self.log.info(str(msg))
544 msg_type = msg['header']['msg_type']
545 reply_type = msg_type.split('_')[0] + '_reply'
546 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
547 # self.reply_socket.send(ident,zmq.SNDMORE)
548 # self.reply_socket.send_json(reply_msg)
549 reply_msg = self.session.send(stream, reply_type,
550 content={'status' : 'aborted'}, parent=msg, ident=idents)
551 self.log.debug(str(reply_msg))
552 # We need to wait a bit for requests to come in. This can probably
553 # be set shorter for true asynchronous clients.
554 time.sleep(0.05)
555
556 def abort_request(self, stream, ident, parent):
557 """abort a specifig msg by id"""
558 msg_ids = parent['content'].get('msg_ids', None)
559 if isinstance(msg_ids, basestring):
560 msg_ids = [msg_ids]
561 if not msg_ids:
562 self.abort_queues()
563 for mid in msg_ids:
564 self.aborted.add(str(mid))
565
566 content = dict(status='ok')
567 reply_msg = self.session.send(stream, 'abort_reply', content=content,
568 parent=parent, ident=ident)
569 self.log.debug(str(reply_msg))
570
571 def clear_request(self, stream, idents, parent):
572 """Clear our namespace."""
573 self.user_ns = {}
574 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
575 content = dict(status='ok'))
576 self._initial_exec_lines()
577
578
579 #---------------------------------------------------------------------------
439 # Protected interface
580 # Protected interface
440 #---------------------------------------------------------------------------
581 #---------------------------------------------------------------------------
441
582
General Comments 0
You need to be logged in to leave comments. Login now