##// END OF EJS Templates
Fixed high CPU usage of XREQ channel....
Brian Granger -
Show More
@@ -1,144 +1,147
1 """ Defines a KernelManager that provides signals and slots.
1 """ Defines a KernelManager that provides signals and slots.
2 """
2 """
3
3
4 # System library imports.
4 # System library imports.
5 from PyQt4 import QtCore
5 from PyQt4 import QtCore
6 import zmq
6
7
7 # IPython imports.
8 # IPython imports.
8 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
9 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
9 XReqSocketChannel, RepSocketChannel
10 XReqSocketChannel, RepSocketChannel
10 from util import MetaQObjectHasTraits
11 from util import MetaQObjectHasTraits
11
12
12
13
14
13 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
15 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
14
16
15 # Emitted when any message is received.
17 # Emitted when any message is received.
16 message_received = QtCore.pyqtSignal(object)
18 message_received = QtCore.pyqtSignal(object)
17
19
18 # Emitted when a message of type 'pyout' or 'stdout' is received.
20 # Emitted when a message of type 'pyout' or 'stdout' is received.
19 output_received = QtCore.pyqtSignal(object)
21 output_received = QtCore.pyqtSignal(object)
20
22
21 # Emitted when a message of type 'pyerr' or 'stderr' is received.
23 # Emitted when a message of type 'pyerr' or 'stderr' is received.
22 error_received = QtCore.pyqtSignal(object)
24 error_received = QtCore.pyqtSignal(object)
23
25
24 #---------------------------------------------------------------------------
26 #---------------------------------------------------------------------------
25 # 'object' interface
27 # 'object' interface
26 #---------------------------------------------------------------------------
28 #---------------------------------------------------------------------------
27
29
28 def __init__(self, *args, **kw):
30 def __init__(self, *args, **kw):
29 """ Reimplemented to ensure that QtCore.QObject is initialized first.
31 """ Reimplemented to ensure that QtCore.QObject is initialized first.
30 """
32 """
31 QtCore.QObject.__init__(self)
33 QtCore.QObject.__init__(self)
32 SubSocketChannel.__init__(self, *args, **kw)
34 SubSocketChannel.__init__(self, *args, **kw)
33
35
34 #---------------------------------------------------------------------------
36 #---------------------------------------------------------------------------
35 # 'SubSocketChannel' interface
37 # 'SubSocketChannel' interface
36 #---------------------------------------------------------------------------
38 #---------------------------------------------------------------------------
37
39
38 def call_handlers(self, msg):
40 def call_handlers(self, msg):
39 """ Reimplemented to emit signals instead of making callbacks.
41 """ Reimplemented to emit signals instead of making callbacks.
40 """
42 """
41 # Emit the generic signal.
43 # Emit the generic signal.
42 self.message_received.emit(msg)
44 self.message_received.emit(msg)
43
45
44 # Emit signals for specialized message types.
46 # Emit signals for specialized message types.
45 msg_type = msg['msg_type']
47 msg_type = msg['msg_type']
46 if msg_type in ('pyout', 'stdout'):
48 if msg_type in ('pyout', 'stdout'):
47 self.output_received.emit(msg)
49 self.output_received.emit(msg)
48 elif msg_type in ('pyerr', 'stderr'):
50 elif msg_type in ('pyerr', 'stderr'):
49 self.error_received.emit(msg)
51 self.error_received.emit(msg)
50
52
51 def flush(self):
53 def flush(self):
52 """ Reimplemented to ensure that signals are dispatched immediately.
54 """ Reimplemented to ensure that signals are dispatched immediately.
53 """
55 """
54 super(QtSubSocketChannel, self).flush()
56 super(QtSubSocketChannel, self).flush()
55 QtCore.QCoreApplication.instance().processEvents()
57 QtCore.QCoreApplication.instance().processEvents()
56
58
57
59
58 class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):
60 class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):
59
61
60 # Emitted when any message is received.
62 # Emitted when any message is received.
61 message_received = QtCore.pyqtSignal(object)
63 message_received = QtCore.pyqtSignal(object)
62
64
63 # Emitted when a reply has been received for the corresponding request type.
65 # Emitted when a reply has been received for the corresponding request type.
64 execute_reply = QtCore.pyqtSignal(object)
66 execute_reply = QtCore.pyqtSignal(object)
65 complete_reply = QtCore.pyqtSignal(object)
67 complete_reply = QtCore.pyqtSignal(object)
66 object_info_reply = QtCore.pyqtSignal(object)
68 object_info_reply = QtCore.pyqtSignal(object)
67
69
68 #---------------------------------------------------------------------------
70 #---------------------------------------------------------------------------
69 # 'object' interface
71 # 'object' interface
70 #---------------------------------------------------------------------------
72 #---------------------------------------------------------------------------
71
73
72 def __init__(self, *args, **kw):
74 def __init__(self, *args, **kw):
73 """ Reimplemented to ensure that QtCore.QObject is initialized first.
75 """ Reimplemented to ensure that QtCore.QObject is initialized first.
74 """
76 """
75 QtCore.QObject.__init__(self)
77 QtCore.QObject.__init__(self)
76 XReqSocketChannel.__init__(self, *args, **kw)
78 XReqSocketChannel.__init__(self, *args, **kw)
77
79
78 #---------------------------------------------------------------------------
80 #---------------------------------------------------------------------------
79 # 'XReqSocketChannel' interface
81 # 'XReqSocketChannel' interface
80 #---------------------------------------------------------------------------
82 #---------------------------------------------------------------------------
81
83
82 def call_handlers(self, msg):
84 def call_handlers(self, msg):
83 """ Reimplemented to emit signals instead of making callbacks.
85 """ Reimplemented to emit signals instead of making callbacks.
84 """
86 """
85 # Emit the generic signal.
87 # Emit the generic signal.
86 self.message_received.emit(msg)
88 self.message_received.emit(msg)
87
89
88 # Emit signals for specialized message types.
90 # Emit signals for specialized message types.
89 msg_type = msg['msg_type']
91 msg_type = msg['msg_type']
90 signal = getattr(self, msg_type, None)
92 signal = getattr(self, msg_type, None)
91 if signal:
93 if signal:
92 signal.emit(msg)
94 signal.emit(msg)
93
95
94 def _queue_request(self, msg, callback):
96 def _queue_request(self, msg, callback):
95 """ Reimplemented to skip callback handling.
97 """ Reimplemented to skip callback handling.
96 """
98 """
97 self.command_queue.put(msg)
99 self.command_queue.put(msg)
100 self.add_io_state(zmq.POLLOUT)
98
101
99
102
100 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
103 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
101
104
102 #---------------------------------------------------------------------------
105 #---------------------------------------------------------------------------
103 # 'object' interface
106 # 'object' interface
104 #---------------------------------------------------------------------------
107 #---------------------------------------------------------------------------
105
108
106 def __init__(self, *args, **kw):
109 def __init__(self, *args, **kw):
107 """ Reimplemented to ensure that QtCore.QObject is initialized first.
110 """ Reimplemented to ensure that QtCore.QObject is initialized first.
108 """
111 """
109 QtCore.QObject.__init__(self)
112 QtCore.QObject.__init__(self)
110 RepSocketChannel.__init__(self, *args, **kw)
113 RepSocketChannel.__init__(self, *args, **kw)
111
114
112
115
113 class QtKernelManager(KernelManager, QtCore.QObject):
116 class QtKernelManager(KernelManager, QtCore.QObject):
114 """ A KernelManager that provides signals and slots.
117 """ A KernelManager that provides signals and slots.
115 """
118 """
116
119
117 __metaclass__ = MetaQObjectHasTraits
120 __metaclass__ = MetaQObjectHasTraits
118
121
119 # Emitted when the kernel manager has started listening.
122 # Emitted when the kernel manager has started listening.
120 started_listening = QtCore.pyqtSignal()
123 started_listening = QtCore.pyqtSignal()
121
124
122 # Emitted when the kernel manager has stopped listening.
125 # Emitted when the kernel manager has stopped listening.
123 stopped_listening = QtCore.pyqtSignal()
126 stopped_listening = QtCore.pyqtSignal()
124
127
125 # Use Qt-specific channel classes that emit signals.
128 # Use Qt-specific channel classes that emit signals.
126 sub_channel_class = QtSubSocketChannel
129 sub_channel_class = QtSubSocketChannel
127 xreq_channel_class = QtXReqSocketChannel
130 xreq_channel_class = QtXReqSocketChannel
128 rep_channel_class = QtRepSocketChannel
131 rep_channel_class = QtRepSocketChannel
129
132
130 #---------------------------------------------------------------------------
133 #---------------------------------------------------------------------------
131 # 'KernelManager' interface
134 # 'KernelManager' interface
132 #---------------------------------------------------------------------------
135 #---------------------------------------------------------------------------
133
136
134 def start_listening(self):
137 def start_listening(self):
135 """ Reimplemented to emit signal.
138 """ Reimplemented to emit signal.
136 """
139 """
137 super(QtKernelManager, self).start_listening()
140 super(QtKernelManager, self).start_listening()
138 self.started_listening.emit()
141 self.started_listening.emit()
139
142
140 def stop_listening(self):
143 def stop_listening(self):
141 """ Reimplemented to emit signal.
144 """ Reimplemented to emit signal.
142 """
145 """
143 super(QtKernelManager, self).stop_listening()
146 super(QtKernelManager, self).stop_listening()
144 self.stopped_listening.emit()
147 self.stopped_listening.emit()
@@ -1,466 +1,497
1 """Kernel frontend classes.
1 """Kernel frontend classes.
2
2
3 TODO: Create logger to handle debugging and console messages.
3 TODO: Create logger to handle debugging and console messages.
4
4
5 """
5 """
6
6
7 # Standard library imports.
7 # Standard library imports.
8 from Queue import Queue, Empty
8 from Queue import Queue, Empty
9 from subprocess import Popen
9 from subprocess import Popen
10 from threading import Thread
10 from threading import Thread
11 import time
11 import time
12 import traceback
12 import traceback
13
13
14 # System library imports.
14 # System library imports.
15 import zmq
15 import zmq
16 from zmq import POLLIN, POLLOUT, POLLERR
16 from zmq import POLLIN, POLLOUT, POLLERR
17 from zmq.eventloop import ioloop
17 from zmq.eventloop import ioloop
18
18
19 # Local imports.
19 # Local imports.
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
21 Type
21 Type
22 from kernel import launch_kernel
22 from kernel import launch_kernel
23 from session import Session
23 from session import Session
24
24
25 # Constants.
25 # Constants.
26 LOCALHOST = '127.0.0.1'
26 LOCALHOST = '127.0.0.1'
27
27
28
28
29 class MissingHandlerError(Exception):
29 class MissingHandlerError(Exception):
30 pass
30 pass
31
31
32
32
33 class ZmqSocketChannel(Thread):
33 class ZmqSocketChannel(Thread):
34 """ The base class for the channels that use ZMQ sockets.
34 """ The base class for the channels that use ZMQ sockets.
35 """
35 """
36
36
37 context = None
38 session = None
39 socket = None
40 ioloop = None
41 iostate = None
42
37 def __init__(self, context, session, address=None):
43 def __init__(self, context, session, address=None):
38 super(ZmqSocketChannel, self).__init__()
44 super(ZmqSocketChannel, self).__init__()
39 self.daemon = True
45 self.daemon = True
40
46
41 self.context = context
47 self.context = context
42 self.session = session
48 self.session = session
43 self.address = address
49 self.address = address
44 self.socket = None
45
50
46 def stop(self):
51 def stop(self):
47 """Stop the thread's activity. Returns when the thread terminates.
52 """Stop the thread's activity. Returns when the thread terminates.
48
53
49 The thread will raise :class:`RuntimeError` if :method:`self.start`
54 The thread will raise :class:`RuntimeError` if :method:`self.start`
50 is called again.
55 is called again.
51 """
56 """
52 self.join()
57 self.join()
53
58
54
59
55 def get_address(self):
60 def get_address(self):
56 """ Get the channel's address. By the default, a channel is on
61 """ Get the channel's address. By the default, a channel is on
57 localhost with no port specified (a negative port number).
62 localhost with no port specified (a negative port number).
58 """
63 """
59 return self._address
64 return self._address
60
65
61 def set_adresss(self, address):
66 def set_adresss(self, address):
62 """ Set the channel's address. Should be a tuple of form:
67 """ Set the channel's address. Should be a tuple of form:
63 (ip address [str], port [int]).
68 (ip address [str], port [int]).
64 or None, in which case the address is reset to its default value.
69 or None, in which case the address is reset to its default value.
65 """
70 """
66 # FIXME: Validate address.
71 # FIXME: Validate address.
67 if self.is_alive(): # This is Thread.is_alive
72 if self.is_alive(): # This is Thread.is_alive
68 raise RuntimeError("Cannot set address on a running channel!")
73 raise RuntimeError("Cannot set address on a running channel!")
69 else:
74 else:
70 if address is None:
75 if address is None:
71 address = (LOCALHOST, 0)
76 address = (LOCALHOST, 0)
72 self._address = address
77 self._address = address
73
78
74 address = property(get_address, set_adresss)
79 address = property(get_address, set_adresss)
75
80
81 def add_io_state(self, state):
82 """Add IO state to the eventloop.
83
84 This is thread safe as it uses the thread safe IOLoop.add_callback.
85 """
86 def add_io_state_callback():
87 if not self.iostate & state:
88 self.iostate = self.iostate | state
89 self.ioloop.update_handler(self.socket, self.iostate)
90 self.ioloop.add_callback(add_io_state_callback)
91
92 def drop_io_state(self, state):
93 """Drop IO state from the eventloop.
94
95 This is thread safe as it uses the thread safe IOLoop.add_callback.
96 """
97 def drop_io_state_callback():
98 if self.iostate & state:
99 self.iostate = self.iostate & (~state)
100 self.ioloop.update_handler(self.socket, self.iostate)
101 self.ioloop.add_callback(drop_io_state_callback)
102
76
103
77 class SubSocketChannel(ZmqSocketChannel):
104 class SubSocketChannel(ZmqSocketChannel):
78
105
79 def __init__(self, context, session, address=None):
106 def __init__(self, context, session, address=None):
80 super(SubSocketChannel, self).__init__(context, session, address)
107 super(SubSocketChannel, self).__init__(context, session, address)
81
108
82 def run(self):
109 def run(self):
83 self.socket = self.context.socket(zmq.SUB)
110 self.socket = self.context.socket(zmq.SUB)
84 self.socket.setsockopt(zmq.SUBSCRIBE,'')
111 self.socket.setsockopt(zmq.SUBSCRIBE,'')
85 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
112 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
86 self.socket.connect('tcp://%s:%i' % self.address)
113 self.socket.connect('tcp://%s:%i' % self.address)
87 self.ioloop = ioloop.IOLoop()
114 self.ioloop = ioloop.IOLoop()
115 self.iostate = POLLIN|POLLERR
88 self.ioloop.add_handler(self.socket, self._handle_events,
116 self.ioloop.add_handler(self.socket, self._handle_events,
89 POLLIN|POLLERR)
117 self.iostate)
90 self.ioloop.start()
118 self.ioloop.start()
91
119
92 def stop(self):
120 def stop(self):
93 self.ioloop.stop()
121 self.ioloop.stop()
94 super(SubSocketChannel, self).stop()
122 super(SubSocketChannel, self).stop()
95
123
96 def _handle_events(self, socket, events):
124 def _handle_events(self, socket, events):
97 # Turn on and off POLLOUT depending on if we have made a request
125 # Turn on and off POLLOUT depending on if we have made a request
98 if events & POLLERR:
126 if events & POLLERR:
99 self._handle_err()
127 self._handle_err()
100 if events & POLLIN:
128 if events & POLLIN:
101 self._handle_recv()
129 self._handle_recv()
102
130
103 def _handle_err(self):
131 def _handle_err(self):
104 # We don't want to let this go silently, so eventually we should log.
132 # We don't want to let this go silently, so eventually we should log.
105 raise zmq.ZMQError()
133 raise zmq.ZMQError()
106
134
107 def _handle_recv(self):
135 def _handle_recv(self):
108 # Get all of the messages we can
136 # Get all of the messages we can
109 while True:
137 while True:
110 try:
138 try:
111 msg = self.socket.recv_json(zmq.NOBLOCK)
139 msg = self.socket.recv_json(zmq.NOBLOCK)
112 except zmq.ZMQError:
140 except zmq.ZMQError:
113 # Check the errno?
141 # Check the errno?
114 # Will this tigger POLLERR?
142 # Will this tigger POLLERR?
115 break
143 break
116 else:
144 else:
117 self.call_handlers(msg)
145 self.call_handlers(msg)
118
146
119 def call_handlers(self, msg):
147 def call_handlers(self, msg):
120 """This method is called in the ioloop thread when a message arrives.
148 """This method is called in the ioloop thread when a message arrives.
121
149
122 Subclasses should override this method to handle incoming messages.
150 Subclasses should override this method to handle incoming messages.
123 It is important to remember that this method is called in the thread
151 It is important to remember that this method is called in the thread
124 so that some logic must be done to ensure that the application leve
152 so that some logic must be done to ensure that the application leve
125 handlers are called in the application thread.
153 handlers are called in the application thread.
126 """
154 """
127 raise NotImplementedError('call_handlers must be defined in a subclass.')
155 raise NotImplementedError('call_handlers must be defined in a subclass.')
128
156
129 def flush(self, timeout=1.0):
157 def flush(self, timeout=1.0):
130 """Immediately processes all pending messages on the SUB channel.
158 """Immediately processes all pending messages on the SUB channel.
131
159
132 This method is thread safe.
160 This method is thread safe.
133
161
134 Parameters
162 Parameters
135 ----------
163 ----------
136 timeout : float, optional
164 timeout : float, optional
137 The maximum amount of time to spend flushing, in seconds. The
165 The maximum amount of time to spend flushing, in seconds. The
138 default is one second.
166 default is one second.
139 """
167 """
140 # We do the IOLoop callback process twice to ensure that the IOLoop
168 # We do the IOLoop callback process twice to ensure that the IOLoop
141 # gets to perform at least one full poll.
169 # gets to perform at least one full poll.
142 stop_time = time.time() + timeout
170 stop_time = time.time() + timeout
143 for i in xrange(2):
171 for i in xrange(2):
144 self._flushed = False
172 self._flushed = False
145 self.ioloop.add_callback(self._flush)
173 self.ioloop.add_callback(self._flush)
146 while not self._flushed and time.time() < stop_time:
174 while not self._flushed and time.time() < stop_time:
147 time.sleep(0.01)
175 time.sleep(0.01)
148
176
149 def _flush(self):
177 def _flush(self):
150 """Called in this thread by the IOLoop to indicate that all events have
178 """Called in this thread by the IOLoop to indicate that all events have
151 been processed.
179 been processed.
152 """
180 """
153 self._flushed = True
181 self._flushed = True
154
182
155
183
156 class XReqSocketChannel(ZmqSocketChannel):
184 class XReqSocketChannel(ZmqSocketChannel):
157
185
158 handler_queue = None
186 handler_queue = None
159 command_queue = None
187 command_queue = None
160 handlers = None
188 handlers = None
161 _overriden_call_handler = None
189 _overriden_call_handler = None
162
190
163 def __init__(self, context, session, address=None):
191 def __init__(self, context, session, address=None):
164 self.handlers = {}
192 self.handlers = {}
165 self.handler_queue = Queue()
193 self.handler_queue = Queue()
166 self.command_queue = Queue()
194 self.command_queue = Queue()
167 super(XReqSocketChannel, self).__init__(context, session, address)
195 super(XReqSocketChannel, self).__init__(context, session, address)
168
196
169 def run(self):
197 def run(self):
170 self.socket = self.context.socket(zmq.XREQ)
198 self.socket = self.context.socket(zmq.XREQ)
171 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
199 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
172 self.socket.connect('tcp://%s:%i' % self.address)
200 self.socket.connect('tcp://%s:%i' % self.address)
173 self.ioloop = ioloop.IOLoop()
201 self.ioloop = ioloop.IOLoop()
202 self.iostate = POLLERR|POLLIN
174 self.ioloop.add_handler(self.socket, self._handle_events,
203 self.ioloop.add_handler(self.socket, self._handle_events,
175 POLLIN|POLLOUT|POLLERR)
204 self.iostate)
176 self.ioloop.start()
205 self.ioloop.start()
177
206
178 def stop(self):
207 def stop(self):
179 self.ioloop.stop()
208 self.ioloop.stop()
180 super(XReqSocketChannel, self).stop()
209 super(XReqSocketChannel, self).stop()
181
210
182 def _handle_events(self, socket, events):
211 def _handle_events(self, socket, events):
183 # Turn on and off POLLOUT depending on if we have made a request
184 if events & POLLERR:
212 if events & POLLERR:
185 self._handle_err()
213 self._handle_err()
186 if events & POLLOUT:
214 if events & POLLOUT:
187 self._handle_send()
215 self._handle_send()
188 if events & POLLIN:
216 if events & POLLIN:
189 self._handle_recv()
217 self._handle_recv()
190
218
191 def _handle_recv(self):
219 def _handle_recv(self):
192 msg = self.socket.recv_json()
220 msg = self.socket.recv_json()
193 self.call_handlers(msg)
221 self.call_handlers(msg)
194
222
195 def _handle_send(self):
223 def _handle_send(self):
196 try:
224 try:
197 msg = self.command_queue.get(False)
225 msg = self.command_queue.get(False)
198 except Empty:
226 except Empty:
199 pass
227 pass
200 else:
228 else:
201 self.socket.send_json(msg)
229 self.socket.send_json(msg)
230 if self.command_queue.empty():
231 self.drop_io_state(POLLOUT)
202
232
203 def _handle_err(self):
233 def _handle_err(self):
204 # We don't want to let this go silently, so eventually we should log.
234 # We don't want to let this go silently, so eventually we should log.
205 raise zmq.ZMQError()
235 raise zmq.ZMQError()
206
236
207 def _queue_request(self, msg, callback):
237 def _queue_request(self, msg, callback):
208 handler = self._find_handler(msg['msg_type'], callback)
238 handler = self._find_handler(msg['msg_type'], callback)
209 self.handler_queue.put(handler)
239 self.handler_queue.put(handler)
210 self.command_queue.put(msg)
240 self.command_queue.put(msg)
241 self.add_io_state(POLLOUT)
211
242
212 def execute(self, code, callback=None):
243 def execute(self, code, callback=None):
213 # Create class for content/msg creation. Related to, but possibly
244 # Create class for content/msg creation. Related to, but possibly
214 # not in Session.
245 # not in Session.
215 content = dict(code=code)
246 content = dict(code=code)
216 msg = self.session.msg('execute_request', content)
247 msg = self.session.msg('execute_request', content)
217 self._queue_request(msg, callback)
248 self._queue_request(msg, callback)
218 return msg['header']['msg_id']
249 return msg['header']['msg_id']
219
250
220 def complete(self, text, line, block=None, callback=None):
251 def complete(self, text, line, block=None, callback=None):
221 content = dict(text=text, line=line)
252 content = dict(text=text, line=line)
222 msg = self.session.msg('complete_request', content)
253 msg = self.session.msg('complete_request', content)
223 self._queue_request(msg, callback)
254 self._queue_request(msg, callback)
224 return msg['header']['msg_id']
255 return msg['header']['msg_id']
225
256
226 def object_info(self, oname, callback=None):
257 def object_info(self, oname, callback=None):
227 content = dict(oname=oname)
258 content = dict(oname=oname)
228 msg = self.session.msg('object_info_request', content)
259 msg = self.session.msg('object_info_request', content)
229 self._queue_request(msg, callback)
260 self._queue_request(msg, callback)
230 return msg['header']['msg_id']
261 return msg['header']['msg_id']
231
262
232 def _find_handler(self, name, callback):
263 def _find_handler(self, name, callback):
233 if callback is not None:
264 if callback is not None:
234 return callback
265 return callback
235 handler = self.handlers.get(name)
266 handler = self.handlers.get(name)
236 if handler is None:
267 if handler is None:
237 raise MissingHandlerError(
268 raise MissingHandlerError(
238 'No handler defined for method: %s' % name)
269 'No handler defined for method: %s' % name)
239 return handler
270 return handler
240
271
241 def override_call_handler(self, func):
272 def override_call_handler(self, func):
242 """Permanently override the call_handler.
273 """Permanently override the call_handler.
243
274
244 The function func will be called as::
275 The function func will be called as::
245
276
246 func(handler, msg)
277 func(handler, msg)
247
278
248 And must call::
279 And must call::
249
280
250 handler(msg)
281 handler(msg)
251
282
252 in the main thread.
283 in the main thread.
253 """
284 """
254 assert callable(func), "not a callable: %r" % func
285 assert callable(func), "not a callable: %r" % func
255 self._overriden_call_handler = func
286 self._overriden_call_handler = func
256
287
257 def call_handlers(self, msg):
288 def call_handlers(self, msg):
258 try:
289 try:
259 handler = self.handler_queue.get(False)
290 handler = self.handler_queue.get(False)
260 except Empty:
291 except Empty:
261 print "Message received with no handler!!!"
292 print "Message received with no handler!!!"
262 print msg
293 print msg
263 else:
294 else:
264 self.call_handler(handler, msg)
295 self.call_handler(handler, msg)
265
296
266 def call_handler(self, handler, msg):
297 def call_handler(self, handler, msg):
267 if self._overriden_call_handler is not None:
298 if self._overriden_call_handler is not None:
268 self._overriden_call_handler(handler, msg)
299 self._overriden_call_handler(handler, msg)
269 elif hasattr(self, '_call_handler'):
300 elif hasattr(self, '_call_handler'):
270 call_handler = getattr(self, '_call_handler')
301 call_handler = getattr(self, '_call_handler')
271 call_handler(handler, msg)
302 call_handler(handler, msg)
272 else:
303 else:
273 raise RuntimeError('no handler!')
304 raise RuntimeError('no handler!')
274
305
275
306
276 class RepSocketChannel(ZmqSocketChannel):
307 class RepSocketChannel(ZmqSocketChannel):
277
308
278 def on_raw_input(self):
309 def on_raw_input(self):
279 pass
310 pass
280
311
281
312
282 class KernelManager(HasTraits):
313 class KernelManager(HasTraits):
283 """ Manages a kernel for a frontend.
314 """ Manages a kernel for a frontend.
284
315
285 The SUB channel is for the frontend to receive messages published by the
316 The SUB channel is for the frontend to receive messages published by the
286 kernel.
317 kernel.
287
318
288 The REQ channel is for the frontend to make requests of the kernel.
319 The REQ channel is for the frontend to make requests of the kernel.
289
320
290 The REP channel is for the kernel to request stdin (raw_input) from the
321 The REP channel is for the kernel to request stdin (raw_input) from the
291 frontend.
322 frontend.
292 """
323 """
293
324
294 # Whether the kernel manager is currently listening on its channels.
325 # Whether the kernel manager is currently listening on its channels.
295 is_listening = Bool(False)
326 is_listening = Bool(False)
296
327
297 # The PyZMQ Context to use for communication with the kernel.
328 # The PyZMQ Context to use for communication with the kernel.
298 context = Instance(zmq.Context, ())
329 context = Instance(zmq.Context, ())
299
330
300 # The Session to use for communication with the kernel.
331 # The Session to use for communication with the kernel.
301 session = Instance(Session, ())
332 session = Instance(Session, ())
302
333
303 # The classes to use for the various channels.
334 # The classes to use for the various channels.
304 sub_channel_class = Type(SubSocketChannel)
335 sub_channel_class = Type(SubSocketChannel)
305 xreq_channel_class = Type(XReqSocketChannel)
336 xreq_channel_class = Type(XReqSocketChannel)
306 rep_channel_class = Type(RepSocketChannel)
337 rep_channel_class = Type(RepSocketChannel)
307
338
308 # Protected traits.
339 # Protected traits.
309 _kernel = Instance(Popen)
340 _kernel = Instance(Popen)
310 _sub_channel = Any
341 _sub_channel = Any
311 _xreq_channel = Any
342 _xreq_channel = Any
312 _rep_channel = Any
343 _rep_channel = Any
313
344
314 #--------------------------------------------------------------------------
345 #--------------------------------------------------------------------------
315 # Channel management methods:
346 # Channel management methods:
316 #--------------------------------------------------------------------------
347 #--------------------------------------------------------------------------
317
348
318 def start_listening(self):
349 def start_listening(self):
319 """Starts listening on the specified ports. If already listening, raises
350 """Starts listening on the specified ports. If already listening, raises
320 a RuntimeError.
351 a RuntimeError.
321 """
352 """
322 if self.is_listening:
353 if self.is_listening:
323 raise RuntimeError("Cannot start listening. Already listening!")
354 raise RuntimeError("Cannot start listening. Already listening!")
324 else:
355 else:
325 self.is_listening = True
356 self.is_listening = True
326 self.sub_channel.start()
357 self.sub_channel.start()
327 self.xreq_channel.start()
358 self.xreq_channel.start()
328 self.rep_channel.start()
359 self.rep_channel.start()
329
360
330 @property
361 @property
331 def is_alive(self):
362 def is_alive(self):
332 """ Returns whether the kernel is alive. """
363 """ Returns whether the kernel is alive. """
333 if self.is_listening:
364 if self.is_listening:
334 # TODO: check if alive.
365 # TODO: check if alive.
335 return True
366 return True
336 else:
367 else:
337 return False
368 return False
338
369
339 def stop_listening(self):
370 def stop_listening(self):
340 """Stops listening. If not listening, does nothing. """
371 """Stops listening. If not listening, does nothing. """
341 if self.is_listening:
372 if self.is_listening:
342 self.is_listening = False
373 self.is_listening = False
343 self.sub_channel.stop()
374 self.sub_channel.stop()
344 self.xreq_channel.stop()
375 self.xreq_channel.stop()
345 self.rep_channel.stop()
376 self.rep_channel.stop()
346
377
347 #--------------------------------------------------------------------------
378 #--------------------------------------------------------------------------
348 # Kernel process management methods:
379 # Kernel process management methods:
349 #--------------------------------------------------------------------------
380 #--------------------------------------------------------------------------
350
381
351 def start_kernel(self):
382 def start_kernel(self):
352 """Starts a kernel process and configures the manager to use it.
383 """Starts a kernel process and configures the manager to use it.
353
384
354 If ports have been specified via the address attributes, they are used.
385 If ports have been specified via the address attributes, they are used.
355 Otherwise, open ports are chosen by the OS and the channel port
386 Otherwise, open ports are chosen by the OS and the channel port
356 attributes are configured as appropriate.
387 attributes are configured as appropriate.
357 """
388 """
358 xreq, sub = self.xreq_address, self.sub_address
389 xreq, sub = self.xreq_address, self.sub_address
359 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
390 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
360 raise RuntimeError("Can only launch a kernel on localhost."
391 raise RuntimeError("Can only launch a kernel on localhost."
361 "Make sure that the '*_address' attributes are "
392 "Make sure that the '*_address' attributes are "
362 "configured properly.")
393 "configured properly.")
363
394
364 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
395 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
365 self.set_kernel(kernel)
396 self.set_kernel(kernel)
366 self.xreq_address = (LOCALHOST, xrep)
397 self.xreq_address = (LOCALHOST, xrep)
367 self.sub_address = (LOCALHOST, pub)
398 self.sub_address = (LOCALHOST, pub)
368
399
369 def set_kernel(self, kernel):
400 def set_kernel(self, kernel):
370 """Sets the kernel manager's kernel to an existing kernel process.
401 """Sets the kernel manager's kernel to an existing kernel process.
371
402
372 It is *not* necessary to a set a kernel to communicate with it via the
403 It is *not* necessary to a set a kernel to communicate with it via the
373 channels, and those objects must be configured separately. It
404 channels, and those objects must be configured separately. It
374 *is* necessary to set a kernel if you want to use the manager (or
405 *is* necessary to set a kernel if you want to use the manager (or
375 frontends that use the manager) to signal and/or kill the kernel.
406 frontends that use the manager) to signal and/or kill the kernel.
376
407
377 Parameters:
408 Parameters:
378 -----------
409 -----------
379 kernel : Popen
410 kernel : Popen
380 An existing kernel process.
411 An existing kernel process.
381 """
412 """
382 self._kernel = kernel
413 self._kernel = kernel
383
414
384 @property
415 @property
385 def has_kernel(self):
416 def has_kernel(self):
386 """Returns whether a kernel process has been specified for the kernel
417 """Returns whether a kernel process has been specified for the kernel
387 manager.
418 manager.
388
419
389 A kernel process can be set via 'start_kernel' or 'set_kernel'.
420 A kernel process can be set via 'start_kernel' or 'set_kernel'.
390 """
421 """
391 return self._kernel is not None
422 return self._kernel is not None
392
423
393 def kill_kernel(self):
424 def kill_kernel(self):
394 """ Kill the running kernel. """
425 """ Kill the running kernel. """
395 if self._kernel:
426 if self._kernel:
396 self._kernel.kill()
427 self._kernel.kill()
397 self._kernel = None
428 self._kernel = None
398 else:
429 else:
399 raise RuntimeError("Cannot kill kernel. No kernel is running!")
430 raise RuntimeError("Cannot kill kernel. No kernel is running!")
400
431
401 def signal_kernel(self, signum):
432 def signal_kernel(self, signum):
402 """ Sends a signal to the kernel. """
433 """ Sends a signal to the kernel. """
403 if self._kernel:
434 if self._kernel:
404 self._kernel.send_signal(signum)
435 self._kernel.send_signal(signum)
405 else:
436 else:
406 raise RuntimeError("Cannot signal kernel. No kernel is running!")
437 raise RuntimeError("Cannot signal kernel. No kernel is running!")
407
438
408 #--------------------------------------------------------------------------
439 #--------------------------------------------------------------------------
409 # Channels used for communication with the kernel:
440 # Channels used for communication with the kernel:
410 #--------------------------------------------------------------------------
441 #--------------------------------------------------------------------------
411
442
412 @property
443 @property
413 def sub_channel(self):
444 def sub_channel(self):
414 """Get the SUB socket channel object."""
445 """Get the SUB socket channel object."""
415 if self._sub_channel is None:
446 if self._sub_channel is None:
416 self._sub_channel = self.sub_channel_class(self.context,
447 self._sub_channel = self.sub_channel_class(self.context,
417 self.session)
448 self.session)
418 return self._sub_channel
449 return self._sub_channel
419
450
420 @property
451 @property
421 def xreq_channel(self):
452 def xreq_channel(self):
422 """Get the REQ socket channel object to make requests of the kernel."""
453 """Get the REQ socket channel object to make requests of the kernel."""
423 if self._xreq_channel is None:
454 if self._xreq_channel is None:
424 self._xreq_channel = self.xreq_channel_class(self.context,
455 self._xreq_channel = self.xreq_channel_class(self.context,
425 self.session)
456 self.session)
426 return self._xreq_channel
457 return self._xreq_channel
427
458
428 @property
459 @property
429 def rep_channel(self):
460 def rep_channel(self):
430 """Get the REP socket channel object to handle stdin (raw_input)."""
461 """Get the REP socket channel object to handle stdin (raw_input)."""
431 if self._rep_channel is None:
462 if self._rep_channel is None:
432 self._rep_channel = self.rep_channel_class(self.context,
463 self._rep_channel = self.rep_channel_class(self.context,
433 self.session)
464 self.session)
434 return self._rep_channel
465 return self._rep_channel
435
466
436 #--------------------------------------------------------------------------
467 #--------------------------------------------------------------------------
437 # Delegates for the Channel address attributes:
468 # Delegates for the Channel address attributes:
438 #--------------------------------------------------------------------------
469 #--------------------------------------------------------------------------
439
470
440 def get_sub_address(self):
471 def get_sub_address(self):
441 return self.sub_channel.address
472 return self.sub_channel.address
442
473
443 def set_sub_address(self, address):
474 def set_sub_address(self, address):
444 self.sub_channel.address = address
475 self.sub_channel.address = address
445
476
446 sub_address = property(get_sub_address, set_sub_address,
477 sub_address = property(get_sub_address, set_sub_address,
447 doc="The address used by SUB socket channel.")
478 doc="The address used by SUB socket channel.")
448
479
449 def get_xreq_address(self):
480 def get_xreq_address(self):
450 return self.xreq_channel.address
481 return self.xreq_channel.address
451
482
452 def set_xreq_address(self, address):
483 def set_xreq_address(self, address):
453 self.xreq_channel.address = address
484 self.xreq_channel.address = address
454
485
455 xreq_address = property(get_xreq_address, set_xreq_address,
486 xreq_address = property(get_xreq_address, set_xreq_address,
456 doc="The address used by XREQ socket channel.")
487 doc="The address used by XREQ socket channel.")
457
488
458 def get_rep_address(self):
489 def get_rep_address(self):
459 return self.rep_channel.address
490 return self.rep_channel.address
460
491
461 def set_rep_address(self, address):
492 def set_rep_address(self, address):
462 self.rep_channel.address = address
493 self.rep_channel.address = address
463
494
464 rep_address = property(get_rep_address, set_rep_address,
495 rep_address = property(get_rep_address, set_rep_address,
465 doc="The address used by REP socket channel.")
496 doc="The address used by REP socket channel.")
466
497
General Comments 0
You need to be logged in to leave comments. Login now