##// END OF EJS Templates
Cleaned up KernelManager interface and clarified documentation.
epatters -
Show More
@@ -1,83 +1,82 b''
1 1 # System library imports
2 2 from PyQt4 import QtCore, QtGui
3 3
4 4 # Local imports
5 5 from frontend_widget import FrontendWidget
6 6
7 7
8 8 class IPythonWidget(FrontendWidget):
9 9 """ A FrontendWidget for an IPython kernel.
10 10 """
11 11
12 12 #---------------------------------------------------------------------------
13 13 # 'FrontendWidget' interface
14 14 #---------------------------------------------------------------------------
15 15
16 16 def __init__(self, kernel_manager, parent=None):
17 17 super(IPythonWidget, self).__init__(kernel_manager, parent)
18 18
19 19 self._magic_overrides = {}
20 20
21 21 def execute_source(self, source, hidden=False, interactive=False):
22 22 """ Reimplemented to override magic commands.
23 23 """
24 24 magic_source = source.strip()
25 25 if magic_source.startswith('%'):
26 26 magic_source = magic_source[1:]
27 27 magic, sep, arguments = magic_source.partition(' ')
28 28 if not magic:
29 29 magic = magic_source
30 30
31 31 callback = self._magic_overrides.get(magic)
32 32 if callback:
33 33 output = callback(arguments)
34 34 if output:
35 35 self.appendPlainText(output)
36 36 self._show_prompt('>>> ')
37 37 return True
38 38 else:
39 39 return super(IPythonWidget, self).execute_source(source, hidden,
40 40 interactive)
41 41
42 42 #---------------------------------------------------------------------------
43 43 # 'IPythonWidget' interface
44 44 #---------------------------------------------------------------------------
45 45
46 46 def set_magic_override(self, magic, callback):
47 47 """ Overrides an IPython magic command. This magic will be intercepted
48 48 by the frontend rather than passed on to the kernel and 'callback'
49 49 will be called with a single argument: a string of argument(s) for
50 50 the magic. The callback can (optionally) return text to print to the
51 51 console.
52 52 """
53 53 self._magic_overrides[magic] = callback
54 54
55 55 def remove_magic_override(self, magic):
56 56 """ Removes the override for the specified magic, if there is one.
57 57 """
58 58 try:
59 59 del self._magic_overrides[magic]
60 60 except KeyError:
61 61 pass
62 62
63 63
64 64 if __name__ == '__main__':
65 65 import sys
66 66 from IPython.frontend.qt.kernelmanager import QtKernelManager
67 67
68 68 # Create KernelManager
69 xreq_addr = ('127.0.0.1', 5575)
70 sub_addr = ('127.0.0.1', 5576)
71 rep_addr = ('127.0.0.1', 5577)
72 kernel_manager = QtKernelManager(xreq_addr, sub_addr, rep_addr)
69 kernel_manager = QtKernelManager(xreq_address = ('127.0.0.1', 5575),
70 sub_address = ('127.0.0.1', 5576),
71 rep_address = ('127.0.0.1', 5577))
73 72 kernel_manager.sub_channel.start()
74 73 kernel_manager.xreq_channel.start()
75 74
76 75 # Launch application
77 76 app = QtGui.QApplication(sys.argv)
78 77 widget = IPythonWidget(kernel_manager)
79 78 widget.setWindowTitle('Python')
80 79 widget.resize(640, 480)
81 80 widget.show()
82 81 sys.exit(app.exec_())
83 82
@@ -1,335 +1,359 b''
1 1 """Kernel frontend classes.
2 2
3 3 TODO: Create logger to handle debugging and console messages.
4 4
5 5 """
6 6
7 7 # Standard library imports.
8 8 from Queue import Queue, Empty
9 9 from threading import Thread
10 10 import time
11 11 import traceback
12 12
13 13 # System library imports.
14 14 import zmq
15 15 from zmq import POLLIN, POLLOUT, POLLERR
16 16 from zmq.eventloop import ioloop
17 17
18 18 # Local imports.
19 19 from IPython.utils.traitlets import HasTraits, Any, Int, Instance, Str, Type
20 20 from session import Session
21 21
22 22
23 23 class MissingHandlerError(Exception):
24 24 pass
25 25
26 26
27 27 class ZmqSocketChannel(Thread):
28 """ The base class for the channels that use ZMQ sockets.
29 """
28 30
29 socket = None
30
31 def __init__(self, context, session, addr):
31 def __init__(self, context, session, addr=None):
32 32 self.context = context
33 33 self.session = session
34 34 self.addr = addr
35 self.socket = None
36
35 37 super(ZmqSocketChannel, self).__init__()
36 38 self.daemon = True
37 39
38 40
39 41 class SubSocketChannel(ZmqSocketChannel):
40 42
41 43 handlers = None
42 44 _overriden_call_handler = None
43 45
44 def __init__(self, context, session, addr):
46 def __init__(self, context, session, addr=None):
45 47 self.handlers = {}
46 48 super(SubSocketChannel, self).__init__(context, session, addr)
47 49
48 50 def run(self):
49 51 self.socket = self.context.socket(zmq.SUB)
50 52 self.socket.setsockopt(zmq.SUBSCRIBE,'')
51 53 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
52 54 self.socket.connect('tcp://%s:%i' % self.addr)
53 55 self.ioloop = ioloop.IOLoop()
54 56 self.ioloop.add_handler(self.socket, self._handle_events,
55 57 POLLIN|POLLERR)
56 58 self.ioloop.start()
57 59
58 60 def _handle_events(self, socket, events):
59 61 # Turn on and off POLLOUT depending on if we have made a request
60 62 if events & POLLERR:
61 63 self._handle_err()
62 64 if events & POLLIN:
63 65 self._handle_recv()
64 66
65 67 def _handle_err(self):
66 68 raise zmq.ZmqError()
67 69
68 70 def _handle_recv(self):
69 71 msg = self.socket.recv_json()
70 72 self.call_handlers(msg)
71 73
72 74 def override_call_handler(self, func):
73 75 """Permanently override the call_handler.
74 76
75 77 The function func will be called as::
76 78
77 79 func(handler, msg)
78 80
79 81 And must call::
80 82
81 83 handler(msg)
82 84
83 85 in the main thread.
84 86 """
85 87 assert callable(func), "not a callable: %r" % func
86 88 self._overriden_call_handler = func
87 89
88 90 def call_handlers(self, msg):
89 91 handler = self.handlers.get(msg['msg_type'], None)
90 92 if handler is not None:
91 93 try:
92 94 self.call_handler(handler, msg)
93 95 except:
94 96 # XXX: This should be logged at least
95 97 traceback.print_last()
96 98
97 99 def call_handler(self, handler, msg):
98 100 if self._overriden_call_handler is not None:
99 101 self._overriden_call_handler(handler, msg)
100 102 elif hasattr(self, '_call_handler'):
101 103 call_handler = getattr(self, '_call_handler')
102 104 call_handler(handler, msg)
103 105 else:
104 106 raise RuntimeError('no handler!')
105 107
106 108 def add_handler(self, callback, msg_type):
107 109 """Register a callback for msg type."""
108 110 self.handlers[msg_type] = callback
109 111
110 112 def remove_handler(self, msg_type):
111 113 """Remove the callback for msg type."""
112 114 self.handlers.pop(msg_type, None)
113 115
114 116 def flush(self):
115 117 """Immediately processes all pending messages on the SUB channel. This
116 118 method is thread safe.
117 119 """
118 120 self._flushed = False
119 121 self.ioloop.add_callback(self._flush)
120 122 while not self._flushed:
121 123 time.sleep(0.01)
122 124
123 125 def _flush(self):
124 126 """Called in this thread by the IOLoop to indicate that all events have
125 127 been processed.
126 128 """
127 129 self._flushed = True
128 130
129 131
130 132 class XReqSocketChannel(ZmqSocketChannel):
131 133
132 134 handler_queue = None
133 135 command_queue = None
134 136 handlers = None
135 137 _overriden_call_handler = None
136 138
137 def __init__(self, context, session, addr):
139 def __init__(self, context, session, addr=None):
138 140 self.handlers = {}
139 141 self.handler_queue = Queue()
140 142 self.command_queue = Queue()
141 143 super(XReqSocketChannel, self).__init__(context, session, addr)
142 144
143 145 def run(self):
144 146 self.socket = self.context.socket(zmq.XREQ)
145 147 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
146 148 self.socket.connect('tcp://%s:%i' % self.addr)
147 149 self.ioloop = ioloop.IOLoop()
148 150 self.ioloop.add_handler(self.socket, self._handle_events,
149 151 POLLIN|POLLOUT|POLLERR)
150 152 self.ioloop.start()
151 153
152 154 def _handle_events(self, socket, events):
153 155 # Turn on and off POLLOUT depending on if we have made a request
154 156 if events & POLLERR:
155 157 self._handle_err()
156 158 if events & POLLOUT:
157 159 self._handle_send()
158 160 if events & POLLIN:
159 161 self._handle_recv()
160 162
161 163 def _handle_recv(self):
162 164 msg = self.socket.recv_json()
163 165 self.call_handlers(msg)
164 166
165 167 def _handle_send(self):
166 168 try:
167 169 msg = self.command_queue.get(False)
168 170 except Empty:
169 171 pass
170 172 else:
171 173 self.socket.send_json(msg)
172 174
173 175 def _handle_err(self):
174 176 raise zmq.ZmqError()
175 177
176 178 def _queue_request(self, msg, callback):
177 179 handler = self._find_handler(msg['msg_type'], callback)
178 180 self.handler_queue.put(handler)
179 181 self.command_queue.put(msg)
180 182
181 183 def execute(self, code, callback=None):
182 184 # Create class for content/msg creation. Related to, but possibly
183 185 # not in Session.
184 186 content = dict(code=code)
185 187 msg = self.session.msg('execute_request', content)
186 188 self._queue_request(msg, callback)
187 189 return msg['header']['msg_id']
188 190
189 191 def complete(self, text, line, block=None, callback=None):
190 192 content = dict(text=text, line=line)
191 193 msg = self.session.msg('complete_request', content)
192 194 self._queue_request(msg, callback)
193 195 return msg['header']['msg_id']
194 196
195 197 def object_info(self, oname, callback=None):
196 198 content = dict(oname=oname)
197 199 msg = self.session.msg('object_info_request', content)
198 200 self._queue_request(msg, callback)
199 201 return msg['header']['msg_id']
200 202
201 203 def _find_handler(self, name, callback):
202 204 if callback is not None:
203 205 return callback
204 206 handler = self.handlers.get(name)
205 207 if handler is None:
206 208 raise MissingHandlerError(
207 209 'No handler defined for method: %s' % name)
208 210 return handler
209 211
210 212 def override_call_handler(self, func):
211 213 """Permanently override the call_handler.
212 214
213 215 The function func will be called as::
214 216
215 217 func(handler, msg)
216 218
217 219 And must call::
218 220
219 221 handler(msg)
220 222
221 223 in the main thread.
222 224 """
223 225 assert callable(func), "not a callable: %r" % func
224 226 self._overriden_call_handler = func
225 227
226 228 def call_handlers(self, msg):
227 229 try:
228 230 handler = self.handler_queue.get(False)
229 231 except Empty:
230 232 print "Message received with no handler!!!"
231 233 print msg
232 234 else:
233 235 self.call_handler(handler, msg)
234 236
235 237 def call_handler(self, handler, msg):
236 238 if self._overriden_call_handler is not None:
237 239 self._overriden_call_handler(handler, msg)
238 240 elif hasattr(self, '_call_handler'):
239 241 call_handler = getattr(self, '_call_handler')
240 242 call_handler(handler, msg)
241 243 else:
242 244 raise RuntimeError('no handler!')
243 245
244 246
245 247 class RepSocketChannel(ZmqSocketChannel):
246 248
247 249 def on_raw_input():
248 250 pass
249 251
250 252
251 253 class KernelManager(HasTraits):
254 """ Manages a kernel for a frontend.
252 255
253 # The addresses to use for the various channels. Should be tuples of form
254 # (ip_address, port).
255 sub_address = Any
256 xreq_address = Any
257 rep_address = Any
258 # FIXME: Add Tuple to Traitlets.
259 #sub_address = Tuple(Str, Int)
260 #xreq_address = Tuple(Str, Int)
261 #rep_address = Tuple(Str, Int)
256 The SUB channel is for the frontend to receive messages published by the
257 kernel.
258
259 The REQ channel is for the frontend to make requests of the kernel.
260
261 The REP channel is for the kernel to request stdin (raw_input) from the
262 frontend.
263 """
262 264
263 265 # The PyZMQ Context to use for communication with the kernel.
264 266 context = Instance(zmq.Context, ())
265 267
266 268 # The Session to use for communication with the kernel.
267 269 session = Instance(Session, ())
268 270
271 # The channels objects used for communication with the kernel.
272 # FIXME: Add '_traitname_default' instantiation method to Traitlets.
273 #sub_channel = Instance(SubSocketChannel)
274 #xreq_channel = Instance(XReqSocketChannel)
275 #rep_channel = Instance(RepSocketChannel)
276
269 277 # The classes to use for the various channels.
270 278 sub_channel_class = Type(SubSocketChannel)
271 279 xreq_channel_class = Type(XReqSocketChannel)
272 280 rep_channel_class = Type(RepSocketChannel)
273 281
282 # The addresses to use for the various channels. Should be tuples of form
283 # (ip_address, port).
284 #sub_address = DelegatesTo('sub_channel')
285 #xreq_address = DelegatesTo('xreq_channel')
286 #rep_address = DelegatesTo('rep_channel')
287
274 288 # Protected traits.
275 289 _sub_channel = Any
276 290 _xreq_channel = Any
277 291 _rep_channel = Any
278 292
279 def __init__(self, xreq_address, sub_address, rep_address, **traits):
293 def __init__(self, **traits):
280 294 super(KernelManager, self).__init__()
281 295
282 self.xreq_address = xreq_address
283 self.sub_address = sub_address
284 self.rep_address = rep_address
285
286 296 # FIXME: This should be the business of HasTraits. The convention is:
287 297 # HasTraits.__init__(self, **traits_to_be_initialized.)
288 298 for trait in traits:
289 299 setattr(self, trait, traits[trait])
290 300
291 301 def start_kernel(self):
292 """Start a localhost kernel on ip and port.
293
294 The SUB channel is for the frontend to receive messages published by
295 the kernel.
296
297 The REQ channel is for the frontend to make requests of the kernel.
298
299 The REP channel is for the kernel to request stdin (raw_input) from
300 the frontend.
302 """Start a localhost kernel. If ports have been specified, use
303 them. Otherwise, choose an open port at random.
301 304 """
302 305
303 306 def kill_kernel(self):
304 307 """Kill the running kernel"""
305 308
306 309 def is_alive(self):
307 310 """Is the kernel alive?"""
308 311 return True
309 312
310 313 def signal_kernel(self, signum):
311 314 """Send signum to the kernel."""
312 315
313 316 @property
314 317 def sub_channel(self):
315 318 """Get the SUB socket channel object."""
316 319 if self._sub_channel is None:
317 self._sub_channel = self.sub_channel_class(
318 self.context, self.session, self.sub_address)
320 self._sub_channel = self.sub_channel_class(self.context,
321 self.session)
319 322 return self._sub_channel
320 323
321 324 @property
322 325 def xreq_channel(self):
323 326 """Get the REQ socket channel object to make requests of the kernel."""
324 327 if self._xreq_channel is None:
325 self._xreq_channel = self.xreq_channel_class(
326 self.context, self.session, self.xreq_address)
328 self._xreq_channel = self.xreq_channel_class(self.context,
329 self.session)
327 330 return self._xreq_channel
328 331
329 332 @property
330 333 def rep_channel(self):
331 334 """Get the REP socket channel object to handle stdin (raw_input)."""
332 335 if self._rep_channel is None:
333 self._rep_channel = self.rep_channel_class(
334 self.context, self.session, self.rep_address)
336 self._rep_channel = self.rep_channel_class(self.context,
337 self.session)
335 338 return self._rep_channel
339
340 def get_sub_address(self):
341 return self.sub_channel.addr
342 def set_sub_address(self, addr):
343 self.sub_channel.addr = addr
344 sub_address = property(get_sub_address, set_sub_address,
345 doc="The address used by SUB socket channel.")
346
347 def get_xreq_address(self):
348 return self.xreq_channel.addr
349 def set_xreq_address(self, addr):
350 self.xreq_channel.addr = addr
351 xreq_address = property(get_xreq_address, set_xreq_address,
352 doc="The address used by XREQ socket channel.")
353
354 def get_rep_address(self):
355 return self.rep_channel.addr
356 def set_rep_address(self, addr):
357 self.rep_channel.addr = addr
358 rep_address = property(get_rep_address, set_rep_address,
359 doc="The address used by REP socket channel.")
General Comments 0
You need to be logged in to leave comments. Login now