##// END OF EJS Templates
allow aborting tasks
MinRK -
Show More
@@ -42,7 +42,7 b' from IPython.utils import py3compat'
42 42 from IPython.utils.frame import extract_module_locals
43 43 from IPython.utils.jsonutil import json_clean
44 44 from IPython.utils.traitlets import (
45 Any, Instance, Float, Dict, CaselessStrEnum, List
45 Any, Instance, Float, Dict, CaselessStrEnum, List, Set
46 46 )
47 47
48 48 from entry_point import base_launch_kernel
@@ -110,6 +110,9 b' class Kernel(Configurable):'
110 110 # by record_ports and used by connect_request.
111 111 _recorded_ports = Dict()
112 112
113 # set of aborted msg_ids
114 aborted = Set()
115
113 116
114 117
115 118 def __init__(self, **kwargs):
@@ -179,6 +182,7 b' class Kernel(Configurable):'
179 182
180 183 def dispatch_message(self, socket, idents, msg, handlers):
181 184 msg_type = msg['header']['msg_type']
185 msg_id = msg['header']['msg_id']
182 186
183 187 # This assert will raise in versions of zeromq 2.0.7 and lesser.
184 188 # We now require 2.0.8 or above, so we can uncomment for safety.
@@ -191,6 +195,17 b' class Kernel(Configurable):'
191 195 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
192 196 self.log.debug(' Content: %s\n --->\n ', msg['content'])
193 197
198 # check if request has been aborted
199 if msg_id in self.aborted:
200 self.aborted.remove(msg_id)
201 # is it safe to assume a msg_id will not be resubmitted?
202 reply_type = msg_type.split('_')[0] + '_reply'
203 status = {'status' : 'aborted'}
204 reply_msg = self.session.send(socket, reply_type, subheader=status,
205 content=status, parent=msg, ident=idents)
206 return
207
208
194 209 # Find and call actual handler for message
195 210 handler = handlers.get(msg_type, None)
196 211 if handler is None:
@@ -390,7 +405,7 b' class Kernel(Configurable):'
390 405 self.log.debug("%s", reply_msg)
391 406
392 407 if reply_msg['content']['status'] == u'error':
393 self._abort_queue()
408 self._abort_queues()
394 409
395 410 self.session.send(self.iopub_socket,
396 411 u'status',
@@ -548,31 +563,6 b' class Kernel(Configurable):'
548 563 # Control messages
549 564 #---------------------------------------------------------------------------
550 565
551 def abort_queues(self):
552 for socket in self.shell_sockets:
553 if socket:
554 self.abort_queue(socket)
555
556 def abort_queue(self, socket):
557 while True:
558 idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True)
559 if msg is None:
560 return
561
562 self.log.info("Aborting:")
563 self.log.info("%s", msg)
564 msg_type = msg['header']['msg_type']
565 reply_type = msg_type.split('_')[0] + '_reply'
566 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
567 # self.reply_socket.send(ident,zmq.SNDMORE)
568 # self.reply_socket.send_json(reply_msg)
569 reply_msg = self.session.send(socket, reply_type,
570 content={'status' : 'aborted'}, parent=msg, ident=idents)
571 self.log.debug("%s", reply_msg)
572 # We need to wait a bit for requests to come in. This can probably
573 # be set shorter for true asynchronous clients.
574 time.sleep(0.05)
575
576 566 def abort_request(self, socket, ident, parent):
577 567 """abort a specifig msg by id"""
578 568 msg_ids = parent['content'].get('msg_ids', None)
@@ -600,28 +590,31 b' class Kernel(Configurable):'
600 590 # Protected interface
601 591 #---------------------------------------------------------------------------
602 592
603 def _abort_queue(self):
593 def _abort_queues(self):
594 for socket in self.shell_sockets:
595 if socket:
596 self._abort_queue(socket)
597
598 def _abort_queue(self, socket):
604 599 while True:
605 try:
606 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
607 except Exception:
608 self.log.warn("Invalid Message:", exc_info=True)
609 continue
600 idents,msg = self.session.recv(socket, zmq.NOBLOCK, content=True)
610 601 if msg is None:
611 break
612 else:
613 assert ident is not None, \
614 "Unexpected missing message part."
602 return
615 603
616 self.log.debug("Aborting:\n%s", msg)
604 self.log.info("Aborting:")
605 self.log.info("%s", msg)
617 606 msg_type = msg['header']['msg_type']
618 607 reply_type = msg_type.split('_')[0] + '_reply'
619 reply_msg = self.session.send(self.shell_socket, reply_type,
620 {'status' : 'aborted'}, msg, ident=ident)
608 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
609 # self.reply_socket.send(ident,zmq.SNDMORE)
610 # self.reply_socket.send_json(reply_msg)
611 reply_msg = self.session.send(socket, reply_type,
612 content={'status' : 'aborted'}, parent=msg, ident=idents)
621 613 self.log.debug("%s", reply_msg)
622 614 # We need to wait a bit for requests to come in. This can probably
623 615 # be set shorter for true asynchronous clients.
624 time.sleep(0.1)
616 time.sleep(0.05)
617
625 618
626 619 def _no_raw_input(self):
627 620 """Raise StdinNotImplentedError if active frontend doesn't support
General Comments 0
You need to be logged in to leave comments. Login now