Show More
@@ -554,20 +554,32 b' class Client(object):' | |||||
554 |
|
554 | |||
555 | @spinfirst |
|
555 | @spinfirst | |
556 | @defaultblock |
|
556 | @defaultblock | |
557 | def shutdown(self, targets=None, restart=False, block=None): |
|
557 | def shutdown(self, targets=None, restart=False, controller=False, block=None): | |
558 | """Terminates one or more engine processes.""" |
|
558 | """Terminates one or more engine processes, optionally including the controller.""" | |
|
559 | if controller: | |||
|
560 | targets = 'all' | |||
559 | targets = self._build_targets(targets)[0] |
|
561 | targets = self._build_targets(targets)[0] | |
560 | for t in targets: |
|
562 | for t in targets: | |
561 | self.session.send(self._control_socket, 'shutdown_request', |
|
563 | self.session.send(self._control_socket, 'shutdown_request', | |
562 | content={'restart':restart},ident=t) |
|
564 | content={'restart':restart},ident=t) | |
563 | error = False |
|
565 | error = False | |
564 |
if |
|
566 | if block or controller: | |
565 | for i in range(len(targets)): |
|
567 | for i in range(len(targets)): | |
566 | idents,msg = self.session.recv(self._control_socket,0) |
|
568 | idents,msg = self.session.recv(self._control_socket,0) | |
567 | if self.debug: |
|
569 | if self.debug: | |
568 | pprint(msg) |
|
570 | pprint(msg) | |
569 | if msg['content']['status'] != 'ok': |
|
571 | if msg['content']['status'] != 'ok': | |
570 | error = ss.unwrap_exception(msg['content']) |
|
572 | error = ss.unwrap_exception(msg['content']) | |
|
573 | ||||
|
574 | if controller: | |||
|
575 | time.sleep(0.25) | |||
|
576 | self.session.send(self._query_socket, 'shutdown_request') | |||
|
577 | idents,msg = self.session.recv(self._query_socket, 0) | |||
|
578 | if self.debug: | |||
|
579 | pprint(msg) | |||
|
580 | if msg['content']['status'] != 'ok': | |||
|
581 | error = ss.unwrap_exception(msg['content']) | |||
|
582 | ||||
571 | if error: |
|
583 | if error: | |
572 | return error |
|
584 | return error | |
573 |
|
585 |
@@ -15,13 +15,15 b' and monitors traffic through the various queues.' | |||||
15 | #----------------------------------------------------------------------------- |
|
15 | #----------------------------------------------------------------------------- | |
16 | from __future__ import print_function |
|
16 | from __future__ import print_function | |
17 |
|
17 | |||
|
18 | import sys | |||
18 | import os |
|
19 | import os | |
19 | from datetime import datetime |
|
20 | from datetime import datetime | |
20 | import logging |
|
21 | import logging | |
|
22 | import time | |||
|
23 | import uuid | |||
21 |
|
24 | |||
22 | import zmq |
|
25 | import zmq | |
23 | from zmq.eventloop import zmqstream, ioloop |
|
26 | from zmq.eventloop import zmqstream, ioloop | |
24 | import uuid |
|
|||
25 |
|
27 | |||
26 | # internal: |
|
28 | # internal: | |
27 | from IPython.zmq.log import logger # a Logger object |
|
29 | from IPython.zmq.log import logger # a Logger object | |
@@ -232,6 +234,7 b' class Controller(object):' | |||||
232 | 'purge_request': self.purge_results, |
|
234 | 'purge_request': self.purge_results, | |
233 | 'load_request': self.check_load, |
|
235 | 'load_request': self.check_load, | |
234 | 'resubmit_request': self.resubmit_task, |
|
236 | 'resubmit_request': self.resubmit_task, | |
|
237 | 'shutdown_request': self.shutdown_request, | |||
235 | } |
|
238 | } | |
236 |
|
239 | |||
237 | self.registrar_handlers = {'registration_request' : self.register_engine, |
|
240 | self.registrar_handlers = {'registration_request' : self.register_engine, | |
@@ -716,6 +719,24 b' class Controller(object):' | |||||
716 | # Client Requests |
|
719 | # Client Requests | |
717 | #------------------------------------------------------------------------- |
|
720 | #------------------------------------------------------------------------- | |
718 |
|
721 | |||
|
722 | def shutdown_request(self, client_id, msg): | |||
|
723 | """handle shutdown request.""" | |||
|
724 | # s = self.context.socket(zmq.XREQ) | |||
|
725 | # s.connect(self.client_connections['mux']) | |||
|
726 | # time.sleep(0.1) | |||
|
727 | # for eid,ec in self.engines.iteritems(): | |||
|
728 | # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue) | |||
|
729 | # time.sleep(1) | |||
|
730 | self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id) | |||
|
731 | dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop) | |||
|
732 | dc.start() | |||
|
733 | ||||
|
734 | def _shutdown(self): | |||
|
735 | logger.info("controller::controller shutting down.") | |||
|
736 | time.sleep(0.1) | |||
|
737 | sys.exit(0) | |||
|
738 | ||||
|
739 | ||||
719 | def check_load(self, client_id, msg): |
|
740 | def check_load(self, client_id, msg): | |
720 | content = msg['content'] |
|
741 | content = msg['content'] | |
721 | try: |
|
742 | try: | |
@@ -1018,6 +1039,7 b' def main(argv=None):' | |||||
1018 | signal_children(children) |
|
1039 | signal_children(children) | |
1019 | con = Controller(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs) |
|
1040 | con = Controller(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs) | |
1020 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) |
|
1041 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) | |
|
1042 | dc.start() | |||
1021 | loop.start() |
|
1043 | loop.start() | |
1022 |
|
1044 | |||
1023 |
|
1045 |
@@ -29,7 +29,7 b' from IPython.zmq.completer import KernelCompleter' | |||||
29 | from IPython.zmq.log import logger # a Logger object |
|
29 | from IPython.zmq.log import logger # a Logger object | |
30 |
|
30 | |||
31 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ |
|
31 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ | |
32 | unpack_apply_message, ISO8601 |
|
32 | unpack_apply_message, ISO8601, wrap_exception | |
33 | from dependency import UnmetDependency |
|
33 | from dependency import UnmetDependency | |
34 | import heartmonitor |
|
34 | import heartmonitor | |
35 | from client import Client |
|
35 | from client import Client | |
@@ -53,6 +53,7 b' class Kernel(HasTraits):' | |||||
53 | task_stream = Instance(zmqstream.ZMQStream) |
|
53 | task_stream = Instance(zmqstream.ZMQStream) | |
54 | iopub_stream = Instance(zmqstream.ZMQStream) |
|
54 | iopub_stream = Instance(zmqstream.ZMQStream) | |
55 | client = Instance(Client) |
|
55 | client = Instance(Client) | |
|
56 | loop = Instance(ioloop.IOLoop) | |||
56 |
|
57 | |||
57 | def __init__(self, **kwargs): |
|
58 | def __init__(self, **kwargs): | |
58 | super(Kernel, self).__init__(**kwargs) |
|
59 | super(Kernel, self).__init__(**kwargs) | |
@@ -127,15 +128,21 b' class Kernel(HasTraits):' | |||||
127 |
|
128 | |||
128 | def shutdown_request(self, stream, ident, parent): |
|
129 | def shutdown_request(self, stream, ident, parent): | |
129 | """kill ourself. This should really be handled in an external process""" |
|
130 | """kill ourself. This should really be handled in an external process""" | |
130 | self.abort_queues() |
|
131 | try: | |
131 | content = dict(parent['content']) |
|
132 | self.abort_queues() | |
|
133 | except: | |||
|
134 | content = wrap_exception() | |||
|
135 | else: | |||
|
136 | content = dict(parent['content']) | |||
|
137 | content['status'] = 'ok' | |||
132 | msg = self.session.send(stream, 'shutdown_reply', |
|
138 | msg = self.session.send(stream, 'shutdown_reply', | |
133 | content=content, parent=parent, ident=ident) |
|
139 | content=content, parent=parent, ident=ident) | |
134 | # msg = self.session.send(self.pub_socket, 'shutdown_reply', |
|
140 | # msg = self.session.send(self.pub_socket, 'shutdown_reply', | |
135 | # content, parent, ident) |
|
141 | # content, parent, ident) | |
136 | # print >> sys.__stdout__, msg |
|
142 | # print >> sys.__stdout__, msg | |
137 |
time.sleep(0. |
|
143 | # time.sleep(0.2) | |
138 | sys.exit(0) |
|
144 | dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop) | |
|
145 | dc.start() | |||
139 |
|
146 | |||
140 | def dispatch_control(self, msg): |
|
147 | def dispatch_control(self, msg): | |
141 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
148 | idents,msg = self.session.feed_identities(msg, copy=False) | |
@@ -207,15 +214,7 b' class Kernel(HasTraits):' | |||||
207 | sys.displayhook.set_parent(parent) |
|
214 | sys.displayhook.set_parent(parent) | |
208 | exec comp_code in self.user_ns, self.user_ns |
|
215 | exec comp_code in self.user_ns, self.user_ns | |
209 | except: |
|
216 | except: | |
210 | # result = u'error' |
|
217 | exc_content = wrap_exception() | |
211 | etype, evalue, tb = sys.exc_info() |
|
|||
212 | tb = traceback.format_exception(etype, evalue, tb) |
|
|||
213 | exc_content = { |
|
|||
214 | u'status' : u'error', |
|
|||
215 | u'traceback' : tb, |
|
|||
216 | u'etype' : unicode(etype), |
|
|||
217 | u'evalue' : unicode(evalue) |
|
|||
218 | } |
|
|||
219 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
218 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
220 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) |
|
219 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) | |
221 | reply_content = exc_content |
|
220 | reply_content = exc_content | |
@@ -292,15 +291,7 b' class Kernel(HasTraits):' | |||||
292 | packed_result,buf = serialize_object(result) |
|
291 | packed_result,buf = serialize_object(result) | |
293 | result_buf = [packed_result]+buf |
|
292 | result_buf = [packed_result]+buf | |
294 | except: |
|
293 | except: | |
295 | result = u'error' |
|
294 | exc_content = wrap_exception() | |
296 | etype, evalue, tb = sys.exc_info() |
|
|||
297 | tb = traceback.format_exception(etype, evalue, tb) |
|
|||
298 | exc_content = { |
|
|||
299 | u'status' : u'error', |
|
|||
300 | u'traceback' : tb, |
|
|||
301 | u'etype' : unicode(etype), |
|
|||
302 | u'evalue' : unicode(evalue) |
|
|||
303 | } |
|
|||
304 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
295 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
305 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) |
|
296 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) | |
306 | reply_content = exc_content |
|
297 | reply_content = exc_content | |
@@ -426,7 +417,7 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,' | |||||
426 |
|
417 | |||
427 | kernel = Kernel(session=session, control_stream=control_stream, |
|
418 | kernel = Kernel(session=session, control_stream=control_stream, | |
428 | shell_streams=shell_streams, iopub_stream=iopub_stream, |
|
419 | shell_streams=shell_streams, iopub_stream=iopub_stream, | |
429 | client=client) |
|
420 | client=client, loop=loop) | |
430 | kernel.start() |
|
421 | kernel.start() | |
431 | return loop, c, kernel |
|
422 | return loop, c, kernel | |
432 |
|
423 |
@@ -65,9 +65,9 b' def wrap_exception():' | |||||
65 | tb = traceback.format_exception(etype, evalue, tb) |
|
65 | tb = traceback.format_exception(etype, evalue, tb) | |
66 | exc_content = { |
|
66 | exc_content = { | |
67 | 'status' : 'error', |
|
67 | 'status' : 'error', | |
68 |
'traceback' : |
|
68 | 'traceback' : tb.encode('utf8'), | |
69 |
'etype' : |
|
69 | 'etype' : etype.encode('utf8'), | |
70 |
'evalue' : |
|
70 | 'evalue' : evalue.encode('utf8') | |
71 | } |
|
71 | } | |
72 | return exc_content |
|
72 | return exc_content | |
73 |
|
73 |
General Comments 0
You need to be logged in to leave comments.
Login now