##// END OF EJS Templates
Clients can now shutdown the controller.
MinRK -
Show More
@@ -554,20 +554,32 b' class Client(object):'
554
554
555 @spinfirst
555 @spinfirst
556 @defaultblock
556 @defaultblock
557 def shutdown(self, targets=None, restart=False, block=None):
557 def shutdown(self, targets=None, restart=False, controller=False, block=None):
558 """Terminates one or more engine processes."""
558 """Terminates one or more engine processes, optionally including the controller."""
559 if controller:
560 targets = 'all'
559 targets = self._build_targets(targets)[0]
561 targets = self._build_targets(targets)[0]
560 for t in targets:
562 for t in targets:
561 self.session.send(self._control_socket, 'shutdown_request',
563 self.session.send(self._control_socket, 'shutdown_request',
562 content={'restart':restart},ident=t)
564 content={'restart':restart},ident=t)
563 error = False
565 error = False
564 if self.block:
566 if block or controller:
565 for i in range(len(targets)):
567 for i in range(len(targets)):
566 idents,msg = self.session.recv(self._control_socket,0)
568 idents,msg = self.session.recv(self._control_socket,0)
567 if self.debug:
569 if self.debug:
568 pprint(msg)
570 pprint(msg)
569 if msg['content']['status'] != 'ok':
571 if msg['content']['status'] != 'ok':
570 error = ss.unwrap_exception(msg['content'])
572 error = ss.unwrap_exception(msg['content'])
573
574 if controller:
575 time.sleep(0.25)
576 self.session.send(self._query_socket, 'shutdown_request')
577 idents,msg = self.session.recv(self._query_socket, 0)
578 if self.debug:
579 pprint(msg)
580 if msg['content']['status'] != 'ok':
581 error = ss.unwrap_exception(msg['content'])
582
571 if error:
583 if error:
572 return error
584 return error
573
585
@@ -15,13 +15,15 b' and monitors traffic through the various queues.'
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import os
19 import os
19 from datetime import datetime
20 from datetime import datetime
20 import logging
21 import logging
22 import time
23 import uuid
21
24
22 import zmq
25 import zmq
23 from zmq.eventloop import zmqstream, ioloop
26 from zmq.eventloop import zmqstream, ioloop
24 import uuid
25
27
26 # internal:
28 # internal:
27 from IPython.zmq.log import logger # a Logger object
29 from IPython.zmq.log import logger # a Logger object
@@ -232,6 +234,7 b' class Controller(object):'
232 'purge_request': self.purge_results,
234 'purge_request': self.purge_results,
233 'load_request': self.check_load,
235 'load_request': self.check_load,
234 'resubmit_request': self.resubmit_task,
236 'resubmit_request': self.resubmit_task,
237 'shutdown_request': self.shutdown_request,
235 }
238 }
236
239
237 self.registrar_handlers = {'registration_request' : self.register_engine,
240 self.registrar_handlers = {'registration_request' : self.register_engine,
@@ -716,6 +719,24 b' class Controller(object):'
716 # Client Requests
719 # Client Requests
717 #-------------------------------------------------------------------------
720 #-------------------------------------------------------------------------
718
721
722 def shutdown_request(self, client_id, msg):
723 """handle shutdown request."""
724 # s = self.context.socket(zmq.XREQ)
725 # s.connect(self.client_connections['mux'])
726 # time.sleep(0.1)
727 # for eid,ec in self.engines.iteritems():
728 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
729 # time.sleep(1)
730 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
731 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
732 dc.start()
733
734 def _shutdown(self):
735 logger.info("controller::controller shutting down.")
736 time.sleep(0.1)
737 sys.exit(0)
738
739
719 def check_load(self, client_id, msg):
740 def check_load(self, client_id, msg):
720 content = msg['content']
741 content = msg['content']
721 try:
742 try:
@@ -1018,6 +1039,7 b' def main(argv=None):'
1018 signal_children(children)
1039 signal_children(children)
1019 con = Controller(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
1040 con = Controller(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
1020 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
1041 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
1042 dc.start()
1021 loop.start()
1043 loop.start()
1022
1044
1023
1045
@@ -29,7 +29,7 b' from IPython.zmq.completer import KernelCompleter'
29 from IPython.zmq.log import logger # a Logger object
29 from IPython.zmq.log import logger # a Logger object
30
30
31 from streamsession import StreamSession, Message, extract_header, serialize_object,\
31 from streamsession import StreamSession, Message, extract_header, serialize_object,\
32 unpack_apply_message, ISO8601
32 unpack_apply_message, ISO8601, wrap_exception
33 from dependency import UnmetDependency
33 from dependency import UnmetDependency
34 import heartmonitor
34 import heartmonitor
35 from client import Client
35 from client import Client
@@ -53,6 +53,7 b' class Kernel(HasTraits):'
53 task_stream = Instance(zmqstream.ZMQStream)
53 task_stream = Instance(zmqstream.ZMQStream)
54 iopub_stream = Instance(zmqstream.ZMQStream)
54 iopub_stream = Instance(zmqstream.ZMQStream)
55 client = Instance(Client)
55 client = Instance(Client)
56 loop = Instance(ioloop.IOLoop)
56
57
57 def __init__(self, **kwargs):
58 def __init__(self, **kwargs):
58 super(Kernel, self).__init__(**kwargs)
59 super(Kernel, self).__init__(**kwargs)
@@ -127,15 +128,21 b' class Kernel(HasTraits):'
127
128
128 def shutdown_request(self, stream, ident, parent):
129 def shutdown_request(self, stream, ident, parent):
129 """kill ourself. This should really be handled in an external process"""
130 """kill ourself. This should really be handled in an external process"""
130 self.abort_queues()
131 try:
131 content = dict(parent['content'])
132 self.abort_queues()
133 except:
134 content = wrap_exception()
135 else:
136 content = dict(parent['content'])
137 content['status'] = 'ok'
132 msg = self.session.send(stream, 'shutdown_reply',
138 msg = self.session.send(stream, 'shutdown_reply',
133 content=content, parent=parent, ident=ident)
139 content=content, parent=parent, ident=ident)
134 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
140 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
135 # content, parent, ident)
141 # content, parent, ident)
136 # print >> sys.__stdout__, msg
142 # print >> sys.__stdout__, msg
137 time.sleep(0.1)
143 # time.sleep(0.2)
138 sys.exit(0)
144 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
145 dc.start()
139
146
140 def dispatch_control(self, msg):
147 def dispatch_control(self, msg):
141 idents,msg = self.session.feed_identities(msg, copy=False)
148 idents,msg = self.session.feed_identities(msg, copy=False)
@@ -207,15 +214,7 b' class Kernel(HasTraits):'
207 sys.displayhook.set_parent(parent)
214 sys.displayhook.set_parent(parent)
208 exec comp_code in self.user_ns, self.user_ns
215 exec comp_code in self.user_ns, self.user_ns
209 except:
216 except:
210 # result = u'error'
217 exc_content = wrap_exception()
211 etype, evalue, tb = sys.exc_info()
212 tb = traceback.format_exception(etype, evalue, tb)
213 exc_content = {
214 u'status' : u'error',
215 u'traceback' : tb,
216 u'etype' : unicode(etype),
217 u'evalue' : unicode(evalue)
218 }
219 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
218 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
220 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
219 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
221 reply_content = exc_content
220 reply_content = exc_content
@@ -292,15 +291,7 b' class Kernel(HasTraits):'
292 packed_result,buf = serialize_object(result)
291 packed_result,buf = serialize_object(result)
293 result_buf = [packed_result]+buf
292 result_buf = [packed_result]+buf
294 except:
293 except:
295 result = u'error'
294 exc_content = wrap_exception()
296 etype, evalue, tb = sys.exc_info()
297 tb = traceback.format_exception(etype, evalue, tb)
298 exc_content = {
299 u'status' : u'error',
300 u'traceback' : tb,
301 u'etype' : unicode(etype),
302 u'evalue' : unicode(evalue)
303 }
304 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
295 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
305 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
296 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
306 reply_content = exc_content
297 reply_content = exc_content
@@ -426,7 +417,7 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,'
426
417
427 kernel = Kernel(session=session, control_stream=control_stream,
418 kernel = Kernel(session=session, control_stream=control_stream,
428 shell_streams=shell_streams, iopub_stream=iopub_stream,
419 shell_streams=shell_streams, iopub_stream=iopub_stream,
429 client=client)
420 client=client, loop=loop)
430 kernel.start()
421 kernel.start()
431 return loop, c, kernel
422 return loop, c, kernel
432
423
@@ -65,9 +65,9 b' def wrap_exception():'
65 tb = traceback.format_exception(etype, evalue, tb)
65 tb = traceback.format_exception(etype, evalue, tb)
66 exc_content = {
66 exc_content = {
67 'status' : 'error',
67 'status' : 'error',
68 'traceback' : str(tb),
68 'traceback' : tb.encode('utf8'),
69 'etype' : str(etype),
69 'etype' : etype.encode('utf8'),
70 'evalue' : str(evalue)
70 'evalue' : evalue.encode('utf8')
71 }
71 }
72 return exc_content
72 return exc_content
73
73
General Comments 0
You need to be logged in to leave comments. Login now