Show More
@@ -42,7 +42,7 b' from IPython.utils import py3compat' | |||||
42 | from IPython.utils.frame import extract_module_locals |
|
42 | from IPython.utils.frame import extract_module_locals | |
43 | from IPython.utils.jsonutil import json_clean |
|
43 | from IPython.utils.jsonutil import json_clean | |
44 | from IPython.utils.traitlets import ( |
|
44 | from IPython.utils.traitlets import ( | |
45 | Any, Instance, Float, Dict, CaselessStrEnum, List |
|
45 | Any, Instance, Float, Dict, CaselessStrEnum, List, Set | |
46 | ) |
|
46 | ) | |
47 |
|
47 | |||
48 | from entry_point import base_launch_kernel |
|
48 | from entry_point import base_launch_kernel | |
@@ -84,7 +84,7 b' class Kernel(Configurable):' | |||||
84 | self.shell.init_user_ns() |
|
84 | self.shell.init_user_ns() | |
85 |
|
85 | |||
86 | # Private interface |
|
86 | # Private interface | |
87 |
|
87 | |||
88 | # Time to sleep after flushing the stdout/err buffers in each execute |
|
88 | # Time to sleep after flushing the stdout/err buffers in each execute | |
89 | # cycle. While this introduces a hard limit on the minimal latency of the |
|
89 | # cycle. While this introduces a hard limit on the minimal latency of the | |
90 | # execute cycle, it helps prevent output synchronization problems for |
|
90 | # execute cycle, it helps prevent output synchronization problems for | |
@@ -109,6 +109,9 b' class Kernel(Configurable):' | |||||
109 | # This is a dict of port number that the kernel is listening on. It is set |
|
109 | # This is a dict of port number that the kernel is listening on. It is set | |
110 | # by record_ports and used by connect_request. |
|
110 | # by record_ports and used by connect_request. | |
111 | _recorded_ports = Dict() |
|
111 | _recorded_ports = Dict() | |
|
112 | ||||
|
113 | # set of aborted msg_ids | |||
|
114 | aborted = Set() | |||
112 |
|
115 | |||
113 |
|
116 | |||
114 |
|
117 | |||
@@ -179,6 +182,7 b' class Kernel(Configurable):' | |||||
179 |
|
182 | |||
180 | def dispatch_message(self, socket, idents, msg, handlers): |
|
183 | def dispatch_message(self, socket, idents, msg, handlers): | |
181 | msg_type = msg['header']['msg_type'] |
|
184 | msg_type = msg['header']['msg_type'] | |
|
185 | msg_id = msg['header']['msg_id'] | |||
182 |
|
186 | |||
183 | # This assert will raise in versions of zeromq 2.0.7 and lesser. |
|
187 | # This assert will raise in versions of zeromq 2.0.7 and lesser. | |
184 | # We now require 2.0.8 or above, so we can uncomment for safety. |
|
188 | # We now require 2.0.8 or above, so we can uncomment for safety. | |
@@ -191,6 +195,17 b' class Kernel(Configurable):' | |||||
191 | self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) |
|
195 | self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) | |
192 | self.log.debug(' Content: %s\n --->\n ', msg['content']) |
|
196 | self.log.debug(' Content: %s\n --->\n ', msg['content']) | |
193 |
|
197 | |||
|
198 | # check if request has been aborted | |||
|
199 | if msg_id in self.aborted: | |||
|
200 | self.aborted.remove(msg_id) | |||
|
201 | # is it safe to assume a msg_id will not be resubmitted? | |||
|
202 | reply_type = msg_type.split('_')[0] + '_reply' | |||
|
203 | status = {'status' : 'aborted'} | |||
|
204 | reply_msg = self.session.send(socket, reply_type, subheader=status, | |||
|
205 | content=status, parent=msg, ident=idents) | |||
|
206 | return | |||
|
207 | ||||
|
208 | ||||
194 | # Find and call actual handler for message |
|
209 | # Find and call actual handler for message | |
195 | handler = handlers.get(msg_type, None) |
|
210 | handler = handlers.get(msg_type, None) | |
196 | if handler is None: |
|
211 | if handler is None: | |
@@ -390,7 +405,7 b' class Kernel(Configurable):' | |||||
390 | self.log.debug("%s", reply_msg) |
|
405 | self.log.debug("%s", reply_msg) | |
391 |
|
406 | |||
392 | if reply_msg['content']['status'] == u'error': |
|
407 | if reply_msg['content']['status'] == u'error': | |
393 | self._abort_queue() |
|
408 | self._abort_queues() | |
394 |
|
409 | |||
395 | self.session.send(self.iopub_socket, |
|
410 | self.session.send(self.iopub_socket, | |
396 | u'status', |
|
411 | u'status', | |
@@ -548,31 +563,6 b' class Kernel(Configurable):' | |||||
548 | # Control messages |
|
563 | # Control messages | |
549 | #--------------------------------------------------------------------------- |
|
564 | #--------------------------------------------------------------------------- | |
550 |
|
565 | |||
551 | def abort_queues(self): |
|
|||
552 | for socket in self.shell_sockets: |
|
|||
553 | if socket: |
|
|||
554 | self.abort_queue(socket) |
|
|||
555 |
|
||||
556 | def abort_queue(self, socket): |
|
|||
557 | while True: |
|
|||
558 | idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True) |
|
|||
559 | if msg is None: |
|
|||
560 | return |
|
|||
561 |
|
||||
562 | self.log.info("Aborting:") |
|
|||
563 | self.log.info("%s", msg) |
|
|||
564 | msg_type = msg['header']['msg_type'] |
|
|||
565 | reply_type = msg_type.split('_')[0] + '_reply' |
|
|||
566 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) |
|
|||
567 | # self.reply_socket.send(ident,zmq.SNDMORE) |
|
|||
568 | # self.reply_socket.send_json(reply_msg) |
|
|||
569 | reply_msg = self.session.send(socket, reply_type, |
|
|||
570 | content={'status' : 'aborted'}, parent=msg, ident=idents) |
|
|||
571 | self.log.debug("%s", reply_msg) |
|
|||
572 | # We need to wait a bit for requests to come in. This can probably |
|
|||
573 | # be set shorter for true asynchronous clients. |
|
|||
574 | time.sleep(0.05) |
|
|||
575 |
|
||||
576 | def abort_request(self, socket, ident, parent): |
|
566 | def abort_request(self, socket, ident, parent): | |
577 | """abort a specifig msg by id""" |
|
567 | """abort a specifig msg by id""" | |
578 | msg_ids = parent['content'].get('msg_ids', None) |
|
568 | msg_ids = parent['content'].get('msg_ids', None) | |
@@ -600,28 +590,31 b' class Kernel(Configurable):' | |||||
600 | # Protected interface |
|
590 | # Protected interface | |
601 | #--------------------------------------------------------------------------- |
|
591 | #--------------------------------------------------------------------------- | |
602 |
|
592 | |||
603 | def _abort_queue(self): |
|
593 | def _abort_queues(self): | |
|
594 | for socket in self.shell_sockets: | |||
|
595 | if socket: | |||
|
596 | self._abort_queue(socket) | |||
|
597 | ||||
|
598 | def _abort_queue(self, socket): | |||
604 | while True: |
|
599 | while True: | |
605 | try: |
|
600 | idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True) | |
606 | ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) |
|
|||
607 | except Exception: |
|
|||
608 | self.log.warn("Invalid Message:", exc_info=True) |
|
|||
609 | continue |
|
|||
610 | if msg is None: |
|
601 | if msg is None: | |
611 |
|
|
602 | return | |
612 | else: |
|
|||
613 | assert ident is not None, \ |
|
|||
614 | "Unexpected missing message part." |
|
|||
615 |
|
603 | |||
616 |
self.log. |
|
604 | self.log.info("Aborting:") | |
|
605 | self.log.info("%s", msg) | |||
617 | msg_type = msg['header']['msg_type'] |
|
606 | msg_type = msg['header']['msg_type'] | |
618 | reply_type = msg_type.split('_')[0] + '_reply' |
|
607 | reply_type = msg_type.split('_')[0] + '_reply' | |
619 |
reply_msg = self.session. |
|
608 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | |
620 | {'status' : 'aborted'}, msg, ident=ident) |
|
609 | # self.reply_socket.send(ident,zmq.SNDMORE) | |
|
610 | # self.reply_socket.send_json(reply_msg) | |||
|
611 | reply_msg = self.session.send(socket, reply_type, | |||
|
612 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |||
621 | self.log.debug("%s", reply_msg) |
|
613 | self.log.debug("%s", reply_msg) | |
622 | # We need to wait a bit for requests to come in. This can probably |
|
614 | # We need to wait a bit for requests to come in. This can probably | |
623 | # be set shorter for true asynchronous clients. |
|
615 | # be set shorter for true asynchronous clients. | |
624 |
time.sleep(0. |
|
616 | time.sleep(0.05) | |
|
617 | ||||
625 |
|
618 | |||
626 | def _no_raw_input(self): |
|
619 | def _no_raw_input(self): | |
627 | """Raise StdinNotImplentedError if active frontend doesn't support |
|
620 | """Raise StdinNotImplentedError if active frontend doesn't support |
General Comments 0
You need to be logged in to leave comments.
Login now