##// END OF EJS Templates
Draft of debugging the multiple inheritance problems in km....
Brian Granger -
Show More
@@ -1,141 +1,150 b''
1 """ Defines a KernelManager that provides signals and slots.
1 """ Defines a KernelManager that provides signals and slots.
2 """
2 """
3
3
4 # System library imports.
4 # System library imports.
5 from PyQt4 import QtCore
5 from PyQt4 import QtCore
6 import zmq
6 import zmq
7
7
8 # IPython imports.
8 # IPython imports.
9 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
9 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
10 XReqSocketChannel, RepSocketChannel
10 XReqSocketChannel, RepSocketChannel
11 from util import MetaQObjectHasTraits
11 from util import MetaQObjectHasTraits
12
12
13
13 # When doing multiple inheritance from QtCore.QObject and other classes
14 # the calling of the parent __init__'s is a subtle issue:
15 # * QtCore.QObject does not call super so you can't use super and put
16 # QObject first in the inheritance list.
17 # * QtCore.QObject.__init__ takes 1 argument, the parent. So if you are going
18 # to use super, any class that comes before QObject must pass it something
19 # reasonable.
14
20
15 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
21 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
16
22
17 # Emitted when any message is received.
23 # Emitted when any message is received.
18 message_received = QtCore.pyqtSignal(object)
24 message_received = QtCore.pyqtSignal(object)
19
25
20 # Emitted when a message of type 'pyout' or 'stdout' is received.
26 # Emitted when a message of type 'pyout' or 'stdout' is received.
21 output_received = QtCore.pyqtSignal(object)
27 output_received = QtCore.pyqtSignal(object)
22
28
23 # Emitted when a message of type 'pyerr' or 'stderr' is received.
29 # Emitted when a message of type 'pyerr' or 'stderr' is received.
24 error_received = QtCore.pyqtSignal(object)
30 error_received = QtCore.pyqtSignal(object)
25
31
26 #---------------------------------------------------------------------------
32 #---------------------------------------------------------------------------
27 # 'object' interface
33 # 'object' interface
28 #---------------------------------------------------------------------------
34 #---------------------------------------------------------------------------
29
35
30 def __init__(self, *args, **kw):
36 def __init__(self, *args, **kw):
31 """ Reimplemented to ensure that QtCore.QObject is initialized first.
37 """ Reimplemented to ensure that QtCore.QObject is initialized first.
32 """
38 """
33 QtCore.QObject.__init__(self)
39 QtCore.QObject.__init__(self)
34 SubSocketChannel.__init__(self, *args, **kw)
40 SubSocketChannel.__init__(self, *args, **kw)
35
41
36 #---------------------------------------------------------------------------
42 #---------------------------------------------------------------------------
37 # 'SubSocketChannel' interface
43 # 'SubSocketChannel' interface
38 #---------------------------------------------------------------------------
44 #---------------------------------------------------------------------------
39
45
40 def call_handlers(self, msg):
46 def call_handlers(self, msg):
41 """ Reimplemented to emit signals instead of making callbacks.
47 """ Reimplemented to emit signals instead of making callbacks.
42 """
48 """
43 # Emit the generic signal.
49 # Emit the generic signal.
44 self.message_received.emit(msg)
50 self.message_received.emit(msg)
45
51
46 # Emit signals for specialized message types.
52 # Emit signals for specialized message types.
47 msg_type = msg['msg_type']
53 msg_type = msg['msg_type']
48 if msg_type in ('pyout', 'stdout'):
54 if msg_type in ('pyout', 'stdout'):
49 self.output_received.emit(msg)
55 self.output_received.emit(msg)
50 elif msg_type in ('pyerr', 'stderr'):
56 elif msg_type in ('pyerr', 'stderr'):
51 self.error_received.emit(msg)
57 self.error_received.emit(msg)
52
58
53 def flush(self):
59 def flush(self):
54 """ Reimplemented to ensure that signals are dispatched immediately.
60 """ Reimplemented to ensure that signals are dispatched immediately.
55 """
61 """
56 super(QtSubSocketChannel, self).flush()
62 super(QtSubSocketChannel, self).flush()
57 QtCore.QCoreApplication.instance().processEvents()
63 QtCore.QCoreApplication.instance().processEvents()
58
64
59
65
60 class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):
66 class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):
61
67
62 # Emitted when any message is received.
68 # Emitted when any message is received.
63 message_received = QtCore.pyqtSignal(object)
69 message_received = QtCore.pyqtSignal(object)
64
70
65 # Emitted when a reply has been received for the corresponding request type.
71 # Emitted when a reply has been received for the corresponding request type.
66 execute_reply = QtCore.pyqtSignal(object)
72 execute_reply = QtCore.pyqtSignal(object)
67 complete_reply = QtCore.pyqtSignal(object)
73 complete_reply = QtCore.pyqtSignal(object)
68 object_info_reply = QtCore.pyqtSignal(object)
74 object_info_reply = QtCore.pyqtSignal(object)
69
75
70 #---------------------------------------------------------------------------
76 #---------------------------------------------------------------------------
71 # 'object' interface
77 # 'object' interface
72 #---------------------------------------------------------------------------
78 #---------------------------------------------------------------------------
73
79
74 def __init__(self, *args, **kw):
80 def __init__(self, *args, **kw):
75 """ Reimplemented to ensure that QtCore.QObject is initialized first.
81 """ Reimplemented to ensure that QtCore.QObject is initialized first.
76 """
82 """
77 QtCore.QObject.__init__(self)
83 QtCore.QObject.__init__(self)
78 XReqSocketChannel.__init__(self, *args, **kw)
84 XReqSocketChannel.__init__(self, *args, **kw)
79
85
80 #---------------------------------------------------------------------------
86 #---------------------------------------------------------------------------
81 # 'XReqSocketChannel' interface
87 # 'XReqSocketChannel' interface
82 #---------------------------------------------------------------------------
88 #---------------------------------------------------------------------------
83
89
84 def call_handlers(self, msg):
90 def call_handlers(self, msg):
85 """ Reimplemented to emit signals instead of making callbacks.
91 """ Reimplemented to emit signals instead of making callbacks.
86 """
92 """
87 # Emit the generic signal.
93 # Emit the generic signal.
88 self.message_received.emit(msg)
94 self.message_received.emit(msg)
89
95
90 # Emit signals for specialized message types.
96 # Emit signals for specialized message types.
91 msg_type = msg['msg_type']
97 msg_type = msg['msg_type']
92 signal = getattr(self, msg_type, None)
98 signal = getattr(self, msg_type, None)
93 if signal:
99 if signal:
94 signal.emit(msg)
100 signal.emit(msg)
95
101
96
102
97 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
103 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
98
104
99 #---------------------------------------------------------------------------
105 #---------------------------------------------------------------------------
100 # 'object' interface
106 # 'object' interface
101 #---------------------------------------------------------------------------
107 #---------------------------------------------------------------------------
102
108
103 def __init__(self, *args, **kw):
109 def __init__(self, *args, **kw):
104 """ Reimplemented to ensure that QtCore.QObject is initialized first.
110 """ Reimplemented to ensure that QtCore.QObject is initialized first.
105 """
111 """
106 QtCore.QObject.__init__(self)
112 QtCore.QObject.__init__(self)
107 RepSocketChannel.__init__(self, *args, **kw)
113 RepSocketChannel.__init__(self, *args, **kw)
108
114
109
110 class QtKernelManager(KernelManager, QtCore.QObject):
115 class QtKernelManager(KernelManager, QtCore.QObject):
111 """ A KernelManager that provides signals and slots.
116 """ A KernelManager that provides signals and slots.
112 """
117 """
113
118
114 __metaclass__ = MetaQObjectHasTraits
119 __metaclass__ = MetaQObjectHasTraits
115
120
116 # Emitted when the kernel manager has started listening.
121 # Emitted when the kernel manager has started listening.
117 started_listening = QtCore.pyqtSignal()
122 started_listening = QtCore.pyqtSignal()
118
123
119 # Emitted when the kernel manager has stopped listening.
124 # Emitted when the kernel manager has stopped listening.
120 stopped_listening = QtCore.pyqtSignal()
125 stopped_listening = QtCore.pyqtSignal()
121
126
122 # Use Qt-specific channel classes that emit signals.
127 # Use Qt-specific channel classes that emit signals.
123 sub_channel_class = QtSubSocketChannel
128 sub_channel_class = QtSubSocketChannel
124 xreq_channel_class = QtXReqSocketChannel
129 xreq_channel_class = QtXReqSocketChannel
125 rep_channel_class = QtRepSocketChannel
130 rep_channel_class = QtRepSocketChannel
126
131
132 def __init__(self, *args, **kw):
133 QtCore.QObject.__init__(self)
134 KernelManager.__init__(self, *args, **kw)
135
127 #---------------------------------------------------------------------------
136 #---------------------------------------------------------------------------
128 # 'KernelManager' interface
137 # 'KernelManager' interface
129 #---------------------------------------------------------------------------
138 #---------------------------------------------------------------------------
130
139
131 def start_channels(self):
140 def start_channels(self):
132 """ Reimplemented to emit signal.
141 """ Reimplemented to emit signal.
133 """
142 """
134 super(QtKernelManager, self).start_channels()
143 super(QtKernelManager, self).start_channels()
135 self.started_listening.emit()
144 self.started_listening.emit()
136
145
137 def stop_channels(self):
146 def stop_channels(self):
138 """ Reimplemented to emit signal.
147 """ Reimplemented to emit signal.
139 """
148 """
140 super(QtKernelManager, self).stop_channels()
149 super(QtKernelManager, self).stop_channels()
141 self.stopped_listening.emit()
150 self.stopped_listening.emit()
@@ -1,25 +1,27 b''
1 """ Defines miscellaneous Qt-related helper classes and functions.
1 """ Defines miscellaneous Qt-related helper classes and functions.
2 """
2 """
3
3
4 # System library imports.
4 # System library imports.
5 from PyQt4 import QtCore
5 from PyQt4 import QtCore
6
6
7 # IPython imports.
7 # IPython imports.
8 from IPython.utils.traitlets import HasTraits
8 from IPython.utils.traitlets import HasTraits
9
9
10
10
11 MetaHasTraits = type(HasTraits)
11 MetaHasTraits = type(HasTraits)
12 MetaQObject = type(QtCore.QObject)
12 MetaQObject = type(QtCore.QObject)
13
13
14 # You can switch the order of the parents here.
14 class MetaQObjectHasTraits(MetaQObject, MetaHasTraits):
15 class MetaQObjectHasTraits(MetaQObject, MetaHasTraits):
15 """ A metaclass that inherits from the metaclasses of both HasTraits and
16 """ A metaclass that inherits from the metaclasses of both HasTraits and
16 QObject.
17 QObject.
17
18
18 Using this metaclass allows a class to inherit from both HasTraits and
19 Using this metaclass allows a class to inherit from both HasTraits and
19 QObject. See QtKernelManager for an example.
20 QObject. See QtKernelManager for an example.
20 """
21 """
21
22 # pass
22 def __init__(cls, name, bases, dct):
23 # ???You can get rid of this, but only if the order above is MetaQObject, MetaHasTraits
23 MetaQObject.__init__(cls, name, bases, dct)
24 # def __init__(cls, name, bases, dct):
24 MetaHasTraits.__init__(cls, name, bases, dct)
25 # MetaQObject.__init__(cls, name, bases, dct)
26 # MetaHasTraits.__init__(cls, name, bases, dct)
25
27
@@ -1,528 +1,529 b''
1 """Classes to manage the interaction with a running kernel.
1 """Classes to manage the interaction with a running kernel.
2
2
3 Todo
3 Todo
4 ====
4 ====
5
5
6 * Create logger to handle debugging and console messages.
6 * Create logger to handle debugging and console messages.
7 """
7 """
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2008-2010 The IPython Development Team
10 # Copyright (C) 2008-2010 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 # Standard library imports.
20 # Standard library imports.
21 from Queue import Queue, Empty
21 from Queue import Queue, Empty
22 from subprocess import Popen
22 from subprocess import Popen
23 from threading import Thread
23 from threading import Thread
24 import time
24 import time
25
25
26 # System library imports.
26 # System library imports.
27 import zmq
27 import zmq
28 from zmq import POLLIN, POLLOUT, POLLERR
28 from zmq import POLLIN, POLLOUT, POLLERR
29 from zmq.eventloop import ioloop
29 from zmq.eventloop import ioloop
30
30
31 # Local imports.
31 # Local imports.
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type
33 from kernel import launch_kernel
33 from kernel import launch_kernel
34 from session import Session
34 from session import Session
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Constants and exceptions
37 # Constants and exceptions
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 LOCALHOST = '127.0.0.1'
40 LOCALHOST = '127.0.0.1'
41
41
42 class InvalidPortNumber(Exception):
42 class InvalidPortNumber(Exception):
43 pass
43 pass
44
44
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46 # ZMQ Socket Channel classes
46 # ZMQ Socket Channel classes
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48
48
49 class ZmqSocketChannel(Thread):
49 class ZmqSocketChannel(Thread):
50 """The base class for the channels that use ZMQ sockets.
50 """The base class for the channels that use ZMQ sockets.
51 """
51 """
52 context = None
52 context = None
53 session = None
53 session = None
54 socket = None
54 socket = None
55 ioloop = None
55 ioloop = None
56 iostate = None
56 iostate = None
57 _address = None
57 _address = None
58
58
59 def __init__(self, context, session, address):
59 def __init__(self, context, session, address):
60 """Create a channel
60 """Create a channel
61
61
62 Parameters
62 Parameters
63 ----------
63 ----------
64 context : zmq.Context
64 context : zmq.Context
65 The ZMQ context to use.
65 The ZMQ context to use.
66 session : session.Session
66 session : session.Session
67 The session to use.
67 The session to use.
68 address : tuple
68 address : tuple
69 Standard (ip, port) tuple that the kernel is listening on.
69 Standard (ip, port) tuple that the kernel is listening on.
70 """
70 """
71 super(ZmqSocketChannel, self).__init__()
71 super(ZmqSocketChannel, self).__init__()
72 self.daemon = True
72 self.daemon = True
73
73
74 self.context = context
74 self.context = context
75 self.session = session
75 self.session = session
76 if address[1] == 0:
76 if address[1] == 0:
77 raise InvalidPortNumber('The port number for a channel cannot be 0.')
77 raise InvalidPortNumber('The port number for a channel cannot be 0.')
78 self._address = address
78 self._address = address
79
79
80 def stop(self):
80 def stop(self):
81 """Stop the channel's activity.
81 """Stop the channel's activity.
82
82
83 This calls :method:`Thread.join` and returns when the thread
83 This calls :method:`Thread.join` and returns when the thread
84 terminates. :class:`RuntimeError` will be raised if
84 terminates. :class:`RuntimeError` will be raised if
85 :method:`self.start` is called again.
85 :method:`self.start` is called again.
86 """
86 """
87 self.join()
87 self.join()
88
88
89 @property
89 @property
90 def address(self):
90 def address(self):
91 """Get the channel's address as an (ip, port) tuple.
91 """Get the channel's address as an (ip, port) tuple.
92
92
93 By the default, the address is (localhost, 0), where 0 means a random
93 By the default, the address is (localhost, 0), where 0 means a random
94 port.
94 port.
95 """
95 """
96 return self._address
96 return self._address
97
97
98 def add_io_state(self, state):
98 def add_io_state(self, state):
99 """Add IO state to the eventloop.
99 """Add IO state to the eventloop.
100
100
101 Parameters
101 Parameters
102 ----------
102 ----------
103 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
103 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
104 The IO state flag to set.
104 The IO state flag to set.
105
105
106 This is thread safe as it uses the thread safe IOLoop.add_callback.
106 This is thread safe as it uses the thread safe IOLoop.add_callback.
107 """
107 """
108 def add_io_state_callback():
108 def add_io_state_callback():
109 if not self.iostate & state:
109 if not self.iostate & state:
110 self.iostate = self.iostate | state
110 self.iostate = self.iostate | state
111 self.ioloop.update_handler(self.socket, self.iostate)
111 self.ioloop.update_handler(self.socket, self.iostate)
112 self.ioloop.add_callback(add_io_state_callback)
112 self.ioloop.add_callback(add_io_state_callback)
113
113
114 def drop_io_state(self, state):
114 def drop_io_state(self, state):
115 """Drop IO state from the eventloop.
115 """Drop IO state from the eventloop.
116
116
117 Parameters
117 Parameters
118 ----------
118 ----------
119 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
119 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
120 The IO state flag to set.
120 The IO state flag to set.
121
121
122 This is thread safe as it uses the thread safe IOLoop.add_callback.
122 This is thread safe as it uses the thread safe IOLoop.add_callback.
123 """
123 """
124 def drop_io_state_callback():
124 def drop_io_state_callback():
125 if self.iostate & state:
125 if self.iostate & state:
126 self.iostate = self.iostate & (~state)
126 self.iostate = self.iostate & (~state)
127 self.ioloop.update_handler(self.socket, self.iostate)
127 self.ioloop.update_handler(self.socket, self.iostate)
128 self.ioloop.add_callback(drop_io_state_callback)
128 self.ioloop.add_callback(drop_io_state_callback)
129
129
130
130
131 class XReqSocketChannel(ZmqSocketChannel):
131 class XReqSocketChannel(ZmqSocketChannel):
132 """The XREQ channel for issues request/replies to the kernel.
132 """The XREQ channel for issues request/replies to the kernel.
133 """
133 """
134
134
135 command_queue = None
135 command_queue = None
136
136
137 def __init__(self, context, session, address):
137 def __init__(self, context, session, address):
138 self.command_queue = Queue()
138 self.command_queue = Queue()
139 super(XReqSocketChannel, self).__init__(context, session, address)
139 super(XReqSocketChannel, self).__init__(context, session, address)
140
140
141 def run(self):
141 def run(self):
142 """The thread's main activity. Call start() instead."""
142 """The thread's main activity. Call start() instead."""
143 self.socket = self.context.socket(zmq.XREQ)
143 self.socket = self.context.socket(zmq.XREQ)
144 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
144 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
145 self.socket.connect('tcp://%s:%i' % self.address)
145 self.socket.connect('tcp://%s:%i' % self.address)
146 self.ioloop = ioloop.IOLoop()
146 self.ioloop = ioloop.IOLoop()
147 self.iostate = POLLERR|POLLIN
147 self.iostate = POLLERR|POLLIN
148 self.ioloop.add_handler(self.socket, self._handle_events,
148 self.ioloop.add_handler(self.socket, self._handle_events,
149 self.iostate)
149 self.iostate)
150 self.ioloop.start()
150 self.ioloop.start()
151
151
152 def stop(self):
152 def stop(self):
153 self.ioloop.stop()
153 self.ioloop.stop()
154 super(XReqSocketChannel, self).stop()
154 super(XReqSocketChannel, self).stop()
155
155
156 def call_handlers(self, msg):
156 def call_handlers(self, msg):
157 """This method is called in the ioloop thread when a message arrives.
157 """This method is called in the ioloop thread when a message arrives.
158
158
159 Subclasses should override this method to handle incoming messages.
159 Subclasses should override this method to handle incoming messages.
160 It is important to remember that this method is called in the thread
160 It is important to remember that this method is called in the thread
161 so that some logic must be done to ensure that the application leve
161 so that some logic must be done to ensure that the application leve
162 handlers are called in the application thread.
162 handlers are called in the application thread.
163 """
163 """
164 raise NotImplementedError('call_handlers must be defined in a subclass.')
164 raise NotImplementedError('call_handlers must be defined in a subclass.')
165
165
166 def execute(self, code):
166 def execute(self, code):
167 """Execute code in the kernel.
167 """Execute code in the kernel.
168
168
169 Parameters
169 Parameters
170 ----------
170 ----------
171 code : str
171 code : str
172 A string of Python code.
172 A string of Python code.
173
173
174 Returns
174 Returns
175 -------
175 -------
176 The msg_id of the message sent.
176 The msg_id of the message sent.
177 """
177 """
178 # Create class for content/msg creation. Related to, but possibly
178 # Create class for content/msg creation. Related to, but possibly
179 # not in Session.
179 # not in Session.
180 content = dict(code=code)
180 content = dict(code=code)
181 msg = self.session.msg('execute_request', content)
181 msg = self.session.msg('execute_request', content)
182 self._queue_request(msg)
182 self._queue_request(msg)
183 return msg['header']['msg_id']
183 return msg['header']['msg_id']
184
184
185 def complete(self, text, line, block=None):
185 def complete(self, text, line, block=None):
186 """Tab complete text, line, block in the kernel's namespace.
186 """Tab complete text, line, block in the kernel's namespace.
187
187
188 Parameters
188 Parameters
189 ----------
189 ----------
190 text : str
190 text : str
191 The text to complete.
191 The text to complete.
192 line : str
192 line : str
193 The full line of text that is the surrounding context for the
193 The full line of text that is the surrounding context for the
194 text to complete.
194 text to complete.
195 block : str
195 block : str
196 The full block of code in which the completion is being requested.
196 The full block of code in which the completion is being requested.
197
197
198 Returns
198 Returns
199 -------
199 -------
200 The msg_id of the message sent.
200 The msg_id of the message sent.
201
201
202 """
202 """
203 content = dict(text=text, line=line)
203 content = dict(text=text, line=line)
204 msg = self.session.msg('complete_request', content)
204 msg = self.session.msg('complete_request', content)
205 self._queue_request(msg)
205 self._queue_request(msg)
206 return msg['header']['msg_id']
206 return msg['header']['msg_id']
207
207
208 def object_info(self, oname):
208 def object_info(self, oname):
209 """Get metadata information about an object.
209 """Get metadata information about an object.
210
210
211 Parameters
211 Parameters
212 ----------
212 ----------
213 oname : str
213 oname : str
214 A string specifying the object name.
214 A string specifying the object name.
215
215
216 Returns
216 Returns
217 -------
217 -------
218 The msg_id of the message sent.
218 The msg_id of the message sent.
219 """
219 """
220 print oname
220 print oname
221 content = dict(oname=oname)
221 content = dict(oname=oname)
222 msg = self.session.msg('object_info_request', content)
222 msg = self.session.msg('object_info_request', content)
223 self._queue_request(msg)
223 self._queue_request(msg)
224 return msg['header']['msg_id']
224 return msg['header']['msg_id']
225
225
226 def _handle_events(self, socket, events):
226 def _handle_events(self, socket, events):
227 if events & POLLERR:
227 if events & POLLERR:
228 self._handle_err()
228 self._handle_err()
229 if events & POLLOUT:
229 if events & POLLOUT:
230 self._handle_send()
230 self._handle_send()
231 if events & POLLIN:
231 if events & POLLIN:
232 self._handle_recv()
232 self._handle_recv()
233
233
234 def _handle_recv(self):
234 def _handle_recv(self):
235 msg = self.socket.recv_json()
235 msg = self.socket.recv_json()
236 self.call_handlers(msg)
236 self.call_handlers(msg)
237
237
238 def _handle_send(self):
238 def _handle_send(self):
239 try:
239 try:
240 msg = self.command_queue.get(False)
240 msg = self.command_queue.get(False)
241 except Empty:
241 except Empty:
242 pass
242 pass
243 else:
243 else:
244 self.socket.send_json(msg)
244 self.socket.send_json(msg)
245 if self.command_queue.empty():
245 if self.command_queue.empty():
246 self.drop_io_state(POLLOUT)
246 self.drop_io_state(POLLOUT)
247
247
248 def _handle_err(self):
248 def _handle_err(self):
249 # We don't want to let this go silently, so eventually we should log.
249 # We don't want to let this go silently, so eventually we should log.
250 raise zmq.ZMQError()
250 raise zmq.ZMQError()
251
251
252 def _queue_request(self, msg):
252 def _queue_request(self, msg):
253 self.command_queue.put(msg)
253 self.command_queue.put(msg)
254 self.add_io_state(POLLOUT)
254 self.add_io_state(POLLOUT)
255
255
256
256
257 class SubSocketChannel(ZmqSocketChannel):
257 class SubSocketChannel(ZmqSocketChannel):
258 """The SUB channel which listens for messages that the kernel publishes.
258 """The SUB channel which listens for messages that the kernel publishes.
259 """
259 """
260
260
261 def __init__(self, context, session, address):
261 def __init__(self, context, session, address):
262 super(SubSocketChannel, self).__init__(context, session, address)
262 super(SubSocketChannel, self).__init__(context, session, address)
263
263
264 def run(self):
264 def run(self):
265 """The thread's main activity. Call start() instead."""
265 """The thread's main activity. Call start() instead."""
266 self.socket = self.context.socket(zmq.SUB)
266 self.socket = self.context.socket(zmq.SUB)
267 self.socket.setsockopt(zmq.SUBSCRIBE,'')
267 self.socket.setsockopt(zmq.SUBSCRIBE,'')
268 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
268 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
269 self.socket.connect('tcp://%s:%i' % self.address)
269 self.socket.connect('tcp://%s:%i' % self.address)
270 self.ioloop = ioloop.IOLoop()
270 self.ioloop = ioloop.IOLoop()
271 self.iostate = POLLIN|POLLERR
271 self.iostate = POLLIN|POLLERR
272 self.ioloop.add_handler(self.socket, self._handle_events,
272 self.ioloop.add_handler(self.socket, self._handle_events,
273 self.iostate)
273 self.iostate)
274 self.ioloop.start()
274 self.ioloop.start()
275
275
276 def stop(self):
276 def stop(self):
277 self.ioloop.stop()
277 self.ioloop.stop()
278 super(SubSocketChannel, self).stop()
278 super(SubSocketChannel, self).stop()
279
279
280 def call_handlers(self, msg):
280 def call_handlers(self, msg):
281 """This method is called in the ioloop thread when a message arrives.
281 """This method is called in the ioloop thread when a message arrives.
282
282
283 Subclasses should override this method to handle incoming messages.
283 Subclasses should override this method to handle incoming messages.
284 It is important to remember that this method is called in the thread
284 It is important to remember that this method is called in the thread
285 so that some logic must be done to ensure that the application leve
285 so that some logic must be done to ensure that the application leve
286 handlers are called in the application thread.
286 handlers are called in the application thread.
287 """
287 """
288 raise NotImplementedError('call_handlers must be defined in a subclass.')
288 raise NotImplementedError('call_handlers must be defined in a subclass.')
289
289
290 def flush(self, timeout=1.0):
290 def flush(self, timeout=1.0):
291 """Immediately processes all pending messages on the SUB channel.
291 """Immediately processes all pending messages on the SUB channel.
292
292
293 This method is thread safe.
293 This method is thread safe.
294
294
295 Parameters
295 Parameters
296 ----------
296 ----------
297 timeout : float, optional
297 timeout : float, optional
298 The maximum amount of time to spend flushing, in seconds. The
298 The maximum amount of time to spend flushing, in seconds. The
299 default is one second.
299 default is one second.
300 """
300 """
301 # We do the IOLoop callback process twice to ensure that the IOLoop
301 # We do the IOLoop callback process twice to ensure that the IOLoop
302 # gets to perform at least one full poll.
302 # gets to perform at least one full poll.
303 stop_time = time.time() + timeout
303 stop_time = time.time() + timeout
304 for i in xrange(2):
304 for i in xrange(2):
305 self._flushed = False
305 self._flushed = False
306 self.ioloop.add_callback(self._flush)
306 self.ioloop.add_callback(self._flush)
307 while not self._flushed and time.time() < stop_time:
307 while not self._flushed and time.time() < stop_time:
308 time.sleep(0.01)
308 time.sleep(0.01)
309
309
310 def _handle_events(self, socket, events):
310 def _handle_events(self, socket, events):
311 # Turn on and off POLLOUT depending on if we have made a request
311 # Turn on and off POLLOUT depending on if we have made a request
312 if events & POLLERR:
312 if events & POLLERR:
313 self._handle_err()
313 self._handle_err()
314 if events & POLLIN:
314 if events & POLLIN:
315 self._handle_recv()
315 self._handle_recv()
316
316
317 def _handle_err(self):
317 def _handle_err(self):
318 # We don't want to let this go silently, so eventually we should log.
318 # We don't want to let this go silently, so eventually we should log.
319 raise zmq.ZMQError()
319 raise zmq.ZMQError()
320
320
321 def _handle_recv(self):
321 def _handle_recv(self):
322 # Get all of the messages we can
322 # Get all of the messages we can
323 while True:
323 while True:
324 try:
324 try:
325 msg = self.socket.recv_json(zmq.NOBLOCK)
325 msg = self.socket.recv_json(zmq.NOBLOCK)
326 except zmq.ZMQError:
326 except zmq.ZMQError:
327 # Check the errno?
327 # Check the errno?
328 # Will this tigger POLLERR?
328 # Will this tigger POLLERR?
329 break
329 break
330 else:
330 else:
331 self.call_handlers(msg)
331 self.call_handlers(msg)
332
332
333 def _flush(self):
333 def _flush(self):
334 """Callback for :method:`self.flush`."""
334 """Callback for :method:`self.flush`."""
335 self._flushed = True
335 self._flushed = True
336
336
337
337
338 class RepSocketChannel(ZmqSocketChannel):
338 class RepSocketChannel(ZmqSocketChannel):
339 """A reply channel to handle raw_input requests that the kernel makes."""
339 """A reply channel to handle raw_input requests that the kernel makes."""
340
340
341 def on_raw_input(self):
341 def on_raw_input(self):
342 pass
342 pass
343
343
344
344
345 #-----------------------------------------------------------------------------
345 #-----------------------------------------------------------------------------
346 # Main kernel manager class
346 # Main kernel manager class
347 #-----------------------------------------------------------------------------
347 #-----------------------------------------------------------------------------
348
348
349
349
350 class KernelManager(HasTraits):
350 class KernelManager(HasTraits):
351 """ Manages a kernel for a frontend.
351 """ Manages a kernel for a frontend.
352
352
353 The SUB channel is for the frontend to receive messages published by the
353 The SUB channel is for the frontend to receive messages published by the
354 kernel.
354 kernel.
355
355
356 The REQ channel is for the frontend to make requests of the kernel.
356 The REQ channel is for the frontend to make requests of the kernel.
357
357
358 The REP channel is for the kernel to request stdin (raw_input) from the
358 The REP channel is for the kernel to request stdin (raw_input) from the
359 frontend.
359 frontend.
360 """
360 """
361 # The PyZMQ Context to use for communication with the kernel.
361 # The PyZMQ Context to use for communication with the kernel.
362 context = Instance(zmq.Context)
362 context = Instance(zmq.Context)
363
363
364 # The Session to use for communication with the kernel.
364 # The Session to use for communication with the kernel.
365 session = Instance(Session)
365 session = Instance(Session)
366
366
367 # The classes to use for the various channels.
367 # The classes to use for the various channels.
368 xreq_channel_class = Type(XReqSocketChannel)
368 xreq_channel_class = Type(XReqSocketChannel)
369 sub_channel_class = Type(SubSocketChannel)
369 sub_channel_class = Type(SubSocketChannel)
370 rep_channel_class = Type(RepSocketChannel)
370 rep_channel_class = Type(RepSocketChannel)
371
371
372 # Protected traits.
372 # Protected traits.
373 _kernel = Instance(Popen)
373 _kernel = Instance(Popen)
374 _xreq_address = Any
374 _xreq_address = Any
375 _sub_address = Any
375 _sub_address = Any
376 _rep_address = Any
376 _rep_address = Any
377 _xreq_channel = Any
377 _xreq_channel = Any
378 _sub_channel = Any
378 _sub_channel = Any
379 _rep_channel = Any
379 _rep_channel = Any
380
380
381 def __init__(self, xreq_address=None, sub_address=None, rep_address=None,
381 def __init__(self, xreq_address=None, sub_address=None, rep_address=None,
382 context=None, session=None):
382 context=None, session=None):
383 self._xreq_address = (LOCALHOST, 0) if xreq_address is None else xreq_address
383 self._xreq_address = (LOCALHOST, 0) if xreq_address is None else xreq_address
384 self._sub_address = (LOCALHOST, 0) if sub_address is None else sub_address
384 self._sub_address = (LOCALHOST, 0) if sub_address is None else sub_address
385 self._rep_address = (LOCALHOST, 0) if rep_address is None else rep_address
385 self._rep_address = (LOCALHOST, 0) if rep_address is None else rep_address
386 self.context = zmq.Context() if context is None else context
386 self.context = zmq.Context() if context is None else context
387 self.session = Session() if session is None else session
387 self.session = Session() if session is None else session
388 super(KernelManager, self).__init__()
388
389
389 #--------------------------------------------------------------------------
390 #--------------------------------- -----------------------------------------
390 # Channel management methods:
391 # Channel management methods:
391 #--------------------------------------------------------------------------
392 #--------------------------------------------------------------------------
392
393
393 def start_channels(self):
394 def start_channels(self):
394 """Starts the channels for this kernel.
395 """Starts the channels for this kernel.
395
396
396 This will create the channels if they do not exist and then start
397 This will create the channels if they do not exist and then start
397 them. If port numbers of 0 are being used (random ports) then you
398 them. If port numbers of 0 are being used (random ports) then you
398 must first call :method:`start_kernel`. If the channels have been
399 must first call :method:`start_kernel`. If the channels have been
399 stopped and you call this, :class:`RuntimeError` will be raised.
400 stopped and you call this, :class:`RuntimeError` will be raised.
400 """
401 """
401 self.xreq_channel.start()
402 self.xreq_channel.start()
402 self.sub_channel.start()
403 self.sub_channel.start()
403 self.rep_channel.start()
404 self.rep_channel.start()
404
405
405 def stop_channels(self):
406 def stop_channels(self):
406 """Stops the channels for this kernel.
407 """Stops the channels for this kernel.
407
408
408 This stops the channels by joining their threads. If the channels
409 This stops the channels by joining their threads. If the channels
409 were not started, :class:`RuntimeError` will be raised.
410 were not started, :class:`RuntimeError` will be raised.
410 """
411 """
411 self.xreq_channel.stop()
412 self.xreq_channel.stop()
412 self.sub_channel.stop()
413 self.sub_channel.stop()
413 self.rep_channel.stop()
414 self.rep_channel.stop()
414
415
415 @property
416 @property
416 def channels_running(self):
417 def channels_running(self):
417 """Are all of the channels created and running?"""
418 """Are all of the channels created and running?"""
418 return self.xreq_channel.is_alive() \
419 return self.xreq_channel.is_alive() \
419 and self.sub_channel.is_alive() \
420 and self.sub_channel.is_alive() \
420 and self.rep_channel.is_alive()
421 and self.rep_channel.is_alive()
421
422
422 #--------------------------------------------------------------------------
423 #--------------------------------------------------------------------------
423 # Kernel process management methods:
424 # Kernel process management methods:
424 #--------------------------------------------------------------------------
425 #--------------------------------------------------------------------------
425
426
426 def start_kernel(self):
427 def start_kernel(self):
427 """Starts a kernel process and configures the manager to use it.
428 """Starts a kernel process and configures the manager to use it.
428
429
429 If random ports (port=0) are being used, this method must be called
430 If random ports (port=0) are being used, this method must be called
430 before the channels are created.
431 before the channels are created.
431 """
432 """
432 xreq, sub = self.xreq_address, self.sub_address
433 xreq, sub = self.xreq_address, self.sub_address
433 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
434 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
434 raise RuntimeError("Can only launch a kernel on localhost."
435 raise RuntimeError("Can only launch a kernel on localhost."
435 "Make sure that the '*_address' attributes are "
436 "Make sure that the '*_address' attributes are "
436 "configured properly.")
437 "configured properly.")
437
438
438 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
439 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
439 self._kernel = kernel
440 self._kernel = kernel
440 print xrep, pub
441 print xrep, pub
441 self._xreq_address = (LOCALHOST, xrep)
442 self._xreq_address = (LOCALHOST, xrep)
442 self._sub_address = (LOCALHOST, pub)
443 self._sub_address = (LOCALHOST, pub)
443 # The rep channel is not fully working yet, but its base class makes
444 # The rep channel is not fully working yet, but its base class makes
444 # sure the port is not 0. We set to -1 for now until the rep channel
445 # sure the port is not 0. We set to -1 for now until the rep channel
445 # is fully working.
446 # is fully working.
446 self._rep_address = (LOCALHOST, -1)
447 self._rep_address = (LOCALHOST, -1)
447
448
448 @property
449 @property
449 def has_kernel(self):
450 def has_kernel(self):
450 """Returns whether a kernel process has been specified for the kernel
451 """Returns whether a kernel process has been specified for the kernel
451 manager.
452 manager.
452
453
453 A kernel process can be set via 'start_kernel' or 'set_kernel'.
454 A kernel process can be set via 'start_kernel' or 'set_kernel'.
454 """
455 """
455 return self._kernel is not None
456 return self._kernel is not None
456
457
457 def kill_kernel(self):
458 def kill_kernel(self):
458 """ Kill the running kernel. """
459 """ Kill the running kernel. """
459 if self._kernel is not None:
460 if self._kernel is not None:
460 self._kernel.kill()
461 self._kernel.kill()
461 self._kernel = None
462 self._kernel = None
462 else:
463 else:
463 raise RuntimeError("Cannot kill kernel. No kernel is running!")
464 raise RuntimeError("Cannot kill kernel. No kernel is running!")
464
465
465 def signal_kernel(self, signum):
466 def signal_kernel(self, signum):
466 """ Sends a signal to the kernel. """
467 """ Sends a signal to the kernel. """
467 if self._kernel is not None:
468 if self._kernel is not None:
468 self._kernel.send_signal(signum)
469 self._kernel.send_signal(signum)
469 else:
470 else:
470 raise RuntimeError("Cannot signal kernel. No kernel is running!")
471 raise RuntimeError("Cannot signal kernel. No kernel is running!")
471
472
472 @property
473 @property
473 def is_alive(self):
474 def is_alive(self):
474 """Is the kernel process still running?"""
475 """Is the kernel process still running?"""
475 if self._kernel is not None:
476 if self._kernel is not None:
476 if self._kernel.poll() is None:
477 if self._kernel.poll() is None:
477 return True
478 return True
478 else:
479 else:
479 return False
480 return False
480 else:
481 else:
481 # We didn't start the kernel with this KernelManager so we don't
482 # We didn't start the kernel with this KernelManager so we don't
482 # know if it is running. We should use a heartbeat for this case.
483 # know if it is running. We should use a heartbeat for this case.
483 return True
484 return True
484
485
485 #--------------------------------------------------------------------------
486 #--------------------------------------------------------------------------
486 # Channels used for communication with the kernel:
487 # Channels used for communication with the kernel:
487 #--------------------------------------------------------------------------
488 #--------------------------------------------------------------------------
488
489
489 @property
490 @property
490 def xreq_channel(self):
491 def xreq_channel(self):
491 """Get the REQ socket channel object to make requests of the kernel."""
492 """Get the REQ socket channel object to make requests of the kernel."""
492 if self._xreq_channel is None:
493 if self._xreq_channel is None:
493 self._xreq_channel = self.xreq_channel_class(self.context,
494 self._xreq_channel = self.xreq_channel_class(self.context,
494 self.session,
495 self.session,
495 self.xreq_address)
496 self.xreq_address)
496 return self._xreq_channel
497 return self._xreq_channel
497
498
498 @property
499 @property
499 def sub_channel(self):
500 def sub_channel(self):
500 """Get the SUB socket channel object."""
501 """Get the SUB socket channel object."""
501 if self._sub_channel is None:
502 if self._sub_channel is None:
502 self._sub_channel = self.sub_channel_class(self.context,
503 self._sub_channel = self.sub_channel_class(self.context,
503 self.session,
504 self.session,
504 self.sub_address)
505 self.sub_address)
505 return self._sub_channel
506 return self._sub_channel
506
507
507 @property
508 @property
508 def rep_channel(self):
509 def rep_channel(self):
509 """Get the REP socket channel object to handle stdin (raw_input)."""
510 """Get the REP socket channel object to handle stdin (raw_input)."""
510 if self._rep_channel is None:
511 if self._rep_channel is None:
511 self._rep_channel = self.rep_channel_class(self.context,
512 self._rep_channel = self.rep_channel_class(self.context,
512 self.session,
513 self.session,
513 self.rep_address)
514 self.rep_address)
514 return self._rep_channel
515 return self._rep_channel
515
516
516 @property
517 @property
517 def xreq_address(self):
518 def xreq_address(self):
518 return self._xreq_address
519 return self._xreq_address
519
520
520 @property
521 @property
521 def sub_address(self):
522 def sub_address(self):
522 return self._sub_address
523 return self._sub_address
523
524
524 @property
525 @property
525 def rep_address(self):
526 def rep_address(self):
526 return self._rep_address
527 return self._rep_address
527
528
528
529
General Comments 0
You need to be logged in to leave comments. Login now