##// END OF EJS Templates
allow aborting tasks
MinRK -
Show More
@@ -42,7 +42,7 b' from IPython.utils import py3compat'
42 from IPython.utils.frame import extract_module_locals
42 from IPython.utils.frame import extract_module_locals
43 from IPython.utils.jsonutil import json_clean
43 from IPython.utils.jsonutil import json_clean
44 from IPython.utils.traitlets import (
44 from IPython.utils.traitlets import (
45 Any, Instance, Float, Dict, CaselessStrEnum, List
45 Any, Instance, Float, Dict, CaselessStrEnum, List, Set
46 )
46 )
47
47
48 from entry_point import base_launch_kernel
48 from entry_point import base_launch_kernel
@@ -84,7 +84,7 b' class Kernel(Configurable):'
84 self.shell.init_user_ns()
84 self.shell.init_user_ns()
85
85
86 # Private interface
86 # Private interface
87
87
88 # Time to sleep after flushing the stdout/err buffers in each execute
88 # Time to sleep after flushing the stdout/err buffers in each execute
89 # cycle. While this introduces a hard limit on the minimal latency of the
89 # cycle. While this introduces a hard limit on the minimal latency of the
90 # execute cycle, it helps prevent output synchronization problems for
90 # execute cycle, it helps prevent output synchronization problems for
@@ -109,6 +109,9 b' class Kernel(Configurable):'
109 # This is a dict of port number that the kernel is listening on. It is set
109 # This is a dict of port number that the kernel is listening on. It is set
110 # by record_ports and used by connect_request.
110 # by record_ports and used by connect_request.
111 _recorded_ports = Dict()
111 _recorded_ports = Dict()
112
113 # set of aborted msg_ids
114 aborted = Set()
112
115
113
116
114
117
@@ -179,6 +182,7 b' class Kernel(Configurable):'
179
182
180 def dispatch_message(self, socket, idents, msg, handlers):
183 def dispatch_message(self, socket, idents, msg, handlers):
181 msg_type = msg['header']['msg_type']
184 msg_type = msg['header']['msg_type']
185 msg_id = msg['header']['msg_id']
182
186
183 # This assert will raise in versions of zeromq 2.0.7 and lesser.
187 # This assert will raise in versions of zeromq 2.0.7 and lesser.
184 # We now require 2.0.8 or above, so we can uncomment for safety.
188 # We now require 2.0.8 or above, so we can uncomment for safety.
@@ -191,6 +195,17 b' class Kernel(Configurable):'
191 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
195 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
192 self.log.debug(' Content: %s\n --->\n ', msg['content'])
196 self.log.debug(' Content: %s\n --->\n ', msg['content'])
193
197
198 # check if request has been aborted
199 if msg_id in self.aborted:
200 self.aborted.remove(msg_id)
201 # is it safe to assume a msg_id will not be resubmitted?
202 reply_type = msg_type.split('_')[0] + '_reply'
203 status = {'status' : 'aborted'}
204 reply_msg = self.session.send(socket, reply_type, subheader=status,
205 content=status, parent=msg, ident=idents)
206 return
207
208
194 # Find and call actual handler for message
209 # Find and call actual handler for message
195 handler = handlers.get(msg_type, None)
210 handler = handlers.get(msg_type, None)
196 if handler is None:
211 if handler is None:
@@ -390,7 +405,7 b' class Kernel(Configurable):'
390 self.log.debug("%s", reply_msg)
405 self.log.debug("%s", reply_msg)
391
406
392 if reply_msg['content']['status'] == u'error':
407 if reply_msg['content']['status'] == u'error':
393 self._abort_queue()
408 self._abort_queues()
394
409
395 self.session.send(self.iopub_socket,
410 self.session.send(self.iopub_socket,
396 u'status',
411 u'status',
@@ -548,31 +563,6 b' class Kernel(Configurable):'
548 # Control messages
563 # Control messages
549 #---------------------------------------------------------------------------
564 #---------------------------------------------------------------------------
550
565
551 def abort_queues(self):
552 for socket in self.shell_sockets:
553 if socket:
554 self.abort_queue(socket)
555
556 def abort_queue(self, socket):
557 while True:
558 idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True)
559 if msg is None:
560 return
561
562 self.log.info("Aborting:")
563 self.log.info("%s", msg)
564 msg_type = msg['header']['msg_type']
565 reply_type = msg_type.split('_')[0] + '_reply'
566 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
567 # self.reply_socket.send(ident,zmq.SNDMORE)
568 # self.reply_socket.send_json(reply_msg)
569 reply_msg = self.session.send(socket, reply_type,
570 content={'status' : 'aborted'}, parent=msg, ident=idents)
571 self.log.debug("%s", reply_msg)
572 # We need to wait a bit for requests to come in. This can probably
573 # be set shorter for true asynchronous clients.
574 time.sleep(0.05)
575
576 def abort_request(self, socket, ident, parent):
566 def abort_request(self, socket, ident, parent):
577 """abort a specifig msg by id"""
567 """abort a specifig msg by id"""
578 msg_ids = parent['content'].get('msg_ids', None)
568 msg_ids = parent['content'].get('msg_ids', None)
@@ -600,28 +590,31 b' class Kernel(Configurable):'
600 # Protected interface
590 # Protected interface
601 #---------------------------------------------------------------------------
591 #---------------------------------------------------------------------------
602
592
603 def _abort_queue(self):
593 def _abort_queues(self):
594 for socket in self.shell_sockets:
595 if socket:
596 self._abort_queue(socket)
597
598 def _abort_queue(self, socket):
604 while True:
599 while True:
605 try:
600 idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True)
606 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
607 except Exception:
608 self.log.warn("Invalid Message:", exc_info=True)
609 continue
610 if msg is None:
601 if msg is None:
611 break
602 return
612 else:
613 assert ident is not None, \
614 "Unexpected missing message part."
615
603
616 self.log.debug("Aborting:\n%s", msg)
604 self.log.info("Aborting:")
605 self.log.info("%s", msg)
617 msg_type = msg['header']['msg_type']
606 msg_type = msg['header']['msg_type']
618 reply_type = msg_type.split('_')[0] + '_reply'
607 reply_type = msg_type.split('_')[0] + '_reply'
619 reply_msg = self.session.send(self.shell_socket, reply_type,
608 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
620 {'status' : 'aborted'}, msg, ident=ident)
609 # self.reply_socket.send(ident,zmq.SNDMORE)
610 # self.reply_socket.send_json(reply_msg)
611 reply_msg = self.session.send(socket, reply_type,
612 content={'status' : 'aborted'}, parent=msg, ident=idents)
621 self.log.debug("%s", reply_msg)
613 self.log.debug("%s", reply_msg)
622 # We need to wait a bit for requests to come in. This can probably
614 # We need to wait a bit for requests to come in. This can probably
623 # be set shorter for true asynchronous clients.
615 # be set shorter for true asynchronous clients.
624 time.sleep(0.1)
616 time.sleep(0.05)
617
625
618
626 def _no_raw_input(self):
619 def _no_raw_input(self):
627 """Raise StdinNotImplentedError if active frontend doesn't support
620 """Raise StdinNotImplentedError if active frontend doesn't support
General Comments 0
You need to be logged in to leave comments. Login now