Show More
@@ -136,14 +136,22 b' class Kernel(Configurable):' | |||||
136 | # Build dict of handlers for message types |
|
136 | # Build dict of handlers for message types | |
137 | msg_types = [ 'execute_request', 'complete_request', |
|
137 | msg_types = [ 'execute_request', 'complete_request', | |
138 | 'object_info_request', 'history_request', |
|
138 | 'object_info_request', 'history_request', | |
139 |
'connect_request', 'shutdown_request' |
|
139 | 'connect_request', 'shutdown_request', | |
|
140 | 'apply_request', | |||
|
141 | ] | |||
140 | self.handlers = {} |
|
142 | self.handlers = {} | |
141 | for msg_type in msg_types: |
|
143 | for msg_type in msg_types: | |
142 | self.handlers[msg_type] = getattr(self, msg_type) |
|
144 | self.handlers[msg_type] = getattr(self, msg_type) | |
143 |
|
145 | |||
|
146 | control_msg_types = [ 'clear_request', 'abort_request' ] | |||
|
147 | self.control_handlers = {} | |||
|
148 | for msg_type in control_msg_types: | |||
|
149 | self.control_handlers[msg_type] = getattr(self, msg_type) | |||
|
150 | ||||
144 | def do_one_iteration(self): |
|
151 | def do_one_iteration(self): | |
145 | """Do one iteration of the kernel's evaluation loop. |
|
152 | """Do one iteration of the kernel's evaluation loop. | |
146 | """ |
|
153 | """ | |
|
154 | ||||
147 | try: |
|
155 | try: | |
148 | ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) |
|
156 | ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) | |
149 | except Exception: |
|
157 | except Exception: | |
@@ -436,6 +444,139 b' class Kernel(Configurable):' | |||||
436 | sys.exit(0) |
|
444 | sys.exit(0) | |
437 |
|
445 | |||
438 | #--------------------------------------------------------------------------- |
|
446 | #--------------------------------------------------------------------------- | |
|
447 | # Engine methods | |||
|
448 | #--------------------------------------------------------------------------- | |||
|
449 | ||||
|
450 | def apply_request(self, stream, ident, parent): | |||
|
451 | # flush previous reply, so this request won't block it | |||
|
452 | stream.flush(zmq.POLLOUT) | |||
|
453 | try: | |||
|
454 | content = parent[u'content'] | |||
|
455 | bufs = parent[u'buffers'] | |||
|
456 | msg_id = parent['header']['msg_id'] | |||
|
457 | # bound = parent['header'].get('bound', False) | |||
|
458 | except: | |||
|
459 | self.log.error("Got bad msg: %s"%parent, exc_info=True) | |||
|
460 | return | |||
|
461 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |||
|
462 | # self.iopub_stream.send(pyin_msg) | |||
|
463 | # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) | |||
|
464 | sub = {'dependencies_met' : True, 'engine' : self.ident, | |||
|
465 | 'started': datetime.now()} | |||
|
466 | try: | |||
|
467 | # allow for not overriding displayhook | |||
|
468 | if hasattr(sys.displayhook, 'set_parent'): | |||
|
469 | sys.displayhook.set_parent(parent) | |||
|
470 | sys.stdout.set_parent(parent) | |||
|
471 | sys.stderr.set_parent(parent) | |||
|
472 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns | |||
|
473 | working = self.user_ns | |||
|
474 | # suffix = | |||
|
475 | prefix = "_"+str(msg_id).replace("-","")+"_" | |||
|
476 | ||||
|
477 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | |||
|
478 | # if bound: | |||
|
479 | # bound_ns = Namespace(working) | |||
|
480 | # args = [bound_ns]+list(args) | |||
|
481 | ||||
|
482 | fname = getattr(f, '__name__', 'f') | |||
|
483 | ||||
|
484 | fname = prefix+"f" | |||
|
485 | argname = prefix+"args" | |||
|
486 | kwargname = prefix+"kwargs" | |||
|
487 | resultname = prefix+"result" | |||
|
488 | ||||
|
489 | ns = { fname : f, argname : args, kwargname : kwargs , resultname : None } | |||
|
490 | # print ns | |||
|
491 | working.update(ns) | |||
|
492 | code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) | |||
|
493 | try: | |||
|
494 | exec code in working,working | |||
|
495 | result = working.get(resultname) | |||
|
496 | finally: | |||
|
497 | for key in ns.iterkeys(): | |||
|
498 | working.pop(key) | |||
|
499 | # if bound: | |||
|
500 | # working.update(bound_ns) | |||
|
501 | ||||
|
502 | packed_result,buf = serialize_object(result) | |||
|
503 | result_buf = [packed_result]+buf | |||
|
504 | except: | |||
|
505 | exc_content = self._wrap_exception('apply') | |||
|
506 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |||
|
507 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, | |||
|
508 | ident=asbytes('%s.pyerr'%self.prefix)) | |||
|
509 | reply_content = exc_content | |||
|
510 | result_buf = [] | |||
|
511 | ||||
|
512 | if exc_content['ename'] == 'UnmetDependency': | |||
|
513 | sub['dependencies_met'] = False | |||
|
514 | else: | |||
|
515 | reply_content = {'status' : 'ok'} | |||
|
516 | ||||
|
517 | # put 'ok'/'error' status in header, for scheduler introspection: | |||
|
518 | sub['status'] = reply_content['status'] | |||
|
519 | ||||
|
520 | # flush i/o | |||
|
521 | sys.stdout.flush() | |||
|
522 | sys.stderr.flush() | |||
|
523 | ||||
|
524 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | |||
|
525 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) | |||
|
526 | ||||
|
527 | #--------------------------------------------------------------------------- | |||
|
528 | # Control messages | |||
|
529 | #--------------------------------------------------------------------------- | |||
|
530 | ||||
|
531 | def abort_queues(self): | |||
|
532 | for stream in self.shell_streams: | |||
|
533 | if stream: | |||
|
534 | self.abort_queue(stream) | |||
|
535 | ||||
|
536 | def abort_queue(self, stream): | |||
|
537 | while True: | |||
|
538 | idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True) | |||
|
539 | if msg is None: | |||
|
540 | return | |||
|
541 | ||||
|
542 | self.log.info("Aborting:") | |||
|
543 | self.log.info(str(msg)) | |||
|
544 | msg_type = msg['header']['msg_type'] | |||
|
545 | reply_type = msg_type.split('_')[0] + '_reply' | |||
|
546 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | |||
|
547 | # self.reply_socket.send(ident,zmq.SNDMORE) | |||
|
548 | # self.reply_socket.send_json(reply_msg) | |||
|
549 | reply_msg = self.session.send(stream, reply_type, | |||
|
550 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |||
|
551 | self.log.debug(str(reply_msg)) | |||
|
552 | # We need to wait a bit for requests to come in. This can probably | |||
|
553 | # be set shorter for true asynchronous clients. | |||
|
554 | time.sleep(0.05) | |||
|
555 | ||||
|
556 | def abort_request(self, stream, ident, parent): | |||
|
557 | """abort a specifig msg by id""" | |||
|
558 | msg_ids = parent['content'].get('msg_ids', None) | |||
|
559 | if isinstance(msg_ids, basestring): | |||
|
560 | msg_ids = [msg_ids] | |||
|
561 | if not msg_ids: | |||
|
562 | self.abort_queues() | |||
|
563 | for mid in msg_ids: | |||
|
564 | self.aborted.add(str(mid)) | |||
|
565 | ||||
|
566 | content = dict(status='ok') | |||
|
567 | reply_msg = self.session.send(stream, 'abort_reply', content=content, | |||
|
568 | parent=parent, ident=ident) | |||
|
569 | self.log.debug(str(reply_msg)) | |||
|
570 | ||||
|
571 | def clear_request(self, stream, idents, parent): | |||
|
572 | """Clear our namespace.""" | |||
|
573 | self.user_ns = {} | |||
|
574 | msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | |||
|
575 | content = dict(status='ok')) | |||
|
576 | self._initial_exec_lines() | |||
|
577 | ||||
|
578 | ||||
|
579 | #--------------------------------------------------------------------------- | |||
439 | # Protected interface |
|
580 | # Protected interface | |
440 | #--------------------------------------------------------------------------- |
|
581 | #--------------------------------------------------------------------------- | |
441 |
|
582 |
General Comments 0
You need to be logged in to leave comments.
Login now