Show More
@@ -42,7 +42,7 b' from IPython.utils import py3compat' | |||
|
42 | 42 | from IPython.utils.frame import extract_module_locals |
|
43 | 43 | from IPython.utils.jsonutil import json_clean |
|
44 | 44 | from IPython.utils.traitlets import ( |
|
45 | Any, Instance, Float, Dict, CaselessStrEnum, List | |
|
45 | Any, Instance, Float, Dict, CaselessStrEnum, List, Set | |
|
46 | 46 | ) |
|
47 | 47 | |
|
48 | 48 | from entry_point import base_launch_kernel |
@@ -110,6 +110,9 b' class Kernel(Configurable):' | |||
|
110 | 110 | # by record_ports and used by connect_request. |
|
111 | 111 | _recorded_ports = Dict() |
|
112 | 112 | |
|
113 | # set of aborted msg_ids | |
|
114 | aborted = Set() | |
|
115 | ||
|
113 | 116 | |
|
114 | 117 | |
|
115 | 118 | def __init__(self, **kwargs): |
@@ -179,6 +182,7 b' class Kernel(Configurable):' | |||
|
179 | 182 | |
|
180 | 183 | def dispatch_message(self, socket, idents, msg, handlers): |
|
181 | 184 | msg_type = msg['header']['msg_type'] |
|
185 | msg_id = msg['header']['msg_id'] | |
|
182 | 186 | |
|
183 | 187 | # This assert will raise in versions of zeromq 2.0.7 and lesser. |
|
184 | 188 | # We now require 2.0.8 or above, so we can uncomment for safety. |
@@ -191,6 +195,17 b' class Kernel(Configurable):' | |||
|
191 | 195 | self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) |
|
192 | 196 | self.log.debug(' Content: %s\n --->\n ', msg['content']) |
|
193 | 197 | |
|
198 | # check if request has been aborted | |
|
199 | if msg_id in self.aborted: | |
|
200 | self.aborted.remove(msg_id) | |
|
201 | # is it safe to assume a msg_id will not be resubmitted? | |
|
202 | reply_type = msg_type.split('_')[0] + '_reply' | |
|
203 | status = {'status' : 'aborted'} | |
|
204 | reply_msg = self.session.send(socket, reply_type, subheader=status, | |
|
205 | content=status, parent=msg, ident=idents) | |
|
206 | return | |
|
207 | ||
|
208 | ||
|
194 | 209 | # Find and call actual handler for message |
|
195 | 210 | handler = handlers.get(msg_type, None) |
|
196 | 211 | if handler is None: |
@@ -390,7 +405,7 b' class Kernel(Configurable):' | |||
|
390 | 405 | self.log.debug("%s", reply_msg) |
|
391 | 406 | |
|
392 | 407 | if reply_msg['content']['status'] == u'error': |
|
393 | self._abort_queue() | |
|
408 | self._abort_queues() | |
|
394 | 409 | |
|
395 | 410 | self.session.send(self.iopub_socket, |
|
396 | 411 | u'status', |
@@ -548,31 +563,6 b' class Kernel(Configurable):' | |||
|
548 | 563 | # Control messages |
|
549 | 564 | #--------------------------------------------------------------------------- |
|
550 | 565 | |
|
551 | def abort_queues(self): | |
|
552 | for socket in self.shell_sockets: | |
|
553 | if socket: | |
|
554 | self.abort_queue(socket) | |
|
555 | ||
|
556 | def abort_queue(self, socket): | |
|
557 | while True: | |
|
558 | idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True) | |
|
559 | if msg is None: | |
|
560 | return | |
|
561 | ||
|
562 | self.log.info("Aborting:") | |
|
563 | self.log.info("%s", msg) | |
|
564 | msg_type = msg['header']['msg_type'] | |
|
565 | reply_type = msg_type.split('_')[0] + '_reply' | |
|
566 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | |
|
567 | # self.reply_socket.send(ident,zmq.SNDMORE) | |
|
568 | # self.reply_socket.send_json(reply_msg) | |
|
569 | reply_msg = self.session.send(socket, reply_type, | |
|
570 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |
|
571 | self.log.debug("%s", reply_msg) | |
|
572 | # We need to wait a bit for requests to come in. This can probably | |
|
573 | # be set shorter for true asynchronous clients. | |
|
574 | time.sleep(0.05) | |
|
575 | ||
|
576 | 566 | def abort_request(self, socket, ident, parent): |
|
577 | 567 | """abort a specifig msg by id""" |
|
578 | 568 | msg_ids = parent['content'].get('msg_ids', None) |
@@ -600,28 +590,31 b' class Kernel(Configurable):' | |||
|
600 | 590 | # Protected interface |
|
601 | 591 | #--------------------------------------------------------------------------- |
|
602 | 592 | |
|
603 | def _abort_queue(self): | |
|
593 | def _abort_queues(self): | |
|
594 | for socket in self.shell_sockets: | |
|
595 | if socket: | |
|
596 | self._abort_queue(socket) | |
|
597 | ||
|
598 | def _abort_queue(self, socket): | |
|
604 | 599 | while True: |
|
605 | try: | |
|
606 | ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) | |
|
607 | except Exception: | |
|
608 | self.log.warn("Invalid Message:", exc_info=True) | |
|
609 | continue | |
|
600 | idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True) | |
|
610 | 601 | if msg is None: |
|
611 |
|
|
|
612 | else: | |
|
613 | assert ident is not None, \ | |
|
614 | "Unexpected missing message part." | |
|
602 | return | |
|
615 | 603 | |
|
616 |
self.log. |
|
|
604 | self.log.info("Aborting:") | |
|
605 | self.log.info("%s", msg) | |
|
617 | 606 | msg_type = msg['header']['msg_type'] |
|
618 | 607 | reply_type = msg_type.split('_')[0] + '_reply' |
|
619 |
reply_msg = self.session. |
|
|
620 | {'status' : 'aborted'}, msg, ident=ident) | |
|
608 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | |
|
609 | # self.reply_socket.send(ident,zmq.SNDMORE) | |
|
610 | # self.reply_socket.send_json(reply_msg) | |
|
611 | reply_msg = self.session.send(socket, reply_type, | |
|
612 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |
|
621 | 613 | self.log.debug("%s", reply_msg) |
|
622 | 614 | # We need to wait a bit for requests to come in. This can probably |
|
623 | 615 | # be set shorter for true asynchronous clients. |
|
624 |
time.sleep(0. |
|
|
616 | time.sleep(0.05) | |
|
617 | ||
|
625 | 618 | |
|
626 | 619 | def _no_raw_input(self): |
|
627 | 620 | """Raise StdinNotImplentedError if active frontend doesn't support |
General Comments 0
You need to be logged in to leave comments.
Login now