##// END OF EJS Templates
* Implemented a proper main() function for kernel.py that reads command line input....
epatters -
Show More
@@ -1,96 +1,96 b''
1 1 # System library imports
2 2 from PyQt4 import QtCore, QtGui
3 3
4 4 # Local imports
5 5 from frontend_widget import FrontendWidget
6 6
7 7
8 8 class IPythonWidget(FrontendWidget):
9 9 """ A FrontendWidget for an IPython kernel.
10 10 """
11 11
12 12 #---------------------------------------------------------------------------
13 13 # 'FrontendWidget' interface
14 14 #---------------------------------------------------------------------------
15 15
16 16 def __init__(self, parent=None):
17 17 super(IPythonWidget, self).__init__(parent)
18 18
19 19 self._magic_overrides = {}
20 20
21 21 def execute_source(self, source, hidden=False, interactive=False):
22 22 """ Reimplemented to override magic commands.
23 23 """
24 24 magic_source = source.strip()
25 25 if magic_source.startswith('%'):
26 26 magic_source = magic_source[1:]
27 27 magic, sep, arguments = magic_source.partition(' ')
28 28 if not magic:
29 29 magic = magic_source
30 30
31 31 callback = self._magic_overrides.get(magic)
32 32 if callback:
33 33 output = callback(arguments)
34 34 if output:
35 35 self.appendPlainText(output)
36 36 self._show_prompt('>>> ')
37 37 return True
38 38 else:
39 39 return super(IPythonWidget, self).execute_source(source, hidden,
40 40 interactive)
41 41
42 42 #---------------------------------------------------------------------------
43 43 # 'IPythonWidget' interface
44 44 #---------------------------------------------------------------------------
45 45
46 46 def set_magic_override(self, magic, callback):
47 47 """ Overrides an IPython magic command. This magic will be intercepted
48 48 by the frontend rather than passed on to the kernel and 'callback'
49 49 will be called with a single argument: a string of argument(s) for
50 50 the magic. The callback can (optionally) return text to print to the
51 51 console.
52 52 """
53 53 self._magic_overrides[magic] = callback
54 54
55 55 def remove_magic_override(self, magic):
56 56 """ Removes the override for the specified magic, if there is one.
57 57 """
58 58 try:
59 59 del self._magic_overrides[magic]
60 60 except KeyError:
61 61 pass
62 62
63 63
64 64 if __name__ == '__main__':
65 from IPython.external.argparse import ArgumentParser
66 from IPython.frontend.qt.kernelmanager import QtKernelManager
67
68 # Don't let Qt swallow KeyboardInterupts.
69 65 import signal
70 signal.signal(signal.SIGINT, signal.SIG_DFL)
66 from IPython.frontend.qt.kernelmanager import QtKernelManager
71 67
72 # Parse command line arguments.
73 parser = ArgumentParser()
74 parser.add_argument('--ip', type=str, default='127.0.0.1',
75 help='set the kernel\'s IP address [default localhost]')
76 parser.add_argument('--xreq', type=int, metavar='PORT', default=5575,
77 help='set the XREQ Channel port [default %(default)i]')
78 parser.add_argument('--sub', type=int, metavar='PORT', default=5576,
79 help='set the SUB Channel port [default %(default)i]')
80 namespace = parser.parse_args()
81
82 # Create KernelManager
83 ip = namespace.ip
84 kernel_manager = QtKernelManager(xreq_address = (ip, namespace.xreq),
85 sub_address = (ip, namespace.sub))
68 # Create a KernelManager.
69 kernel_manager = QtKernelManager()
70 kernel_manager.start_kernel()
86 71 kernel_manager.start_listening()
87 72
88 # Launch application
73 # Don't let Qt or ZMQ swallow KeyboardInterupts.
74 # FIXME: Gah, ZMQ swallows even custom signal handlers. So for now we leave
75 # behind a kernel process when Ctrl-C is pressed.
76 #def sigint_hook(signum, frame):
77 # QtGui.qApp.quit()
78 #signal.signal(signal.SIGINT, sigint_hook)
79 signal.signal(signal.SIGINT, signal.SIG_DFL)
80
81 # Create the application, making sure to clean up nicely when we exit.
89 82 app = QtGui.QApplication([])
83 def quit_hook():
84 kernel_manager.stop_listening()
85 kernel_manager.kill_kernel()
86 app.aboutToQuit.connect(quit_hook)
87
88 # Launch the application.
90 89 widget = IPythonWidget()
91 90 widget.kernel_manager = kernel_manager
92 91 widget.setWindowTitle('Python')
93 92 widget.resize(640, 480)
94 93 widget.show()
95 94 app.exec_()
95
96 96
@@ -1,349 +1,361 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Finish implementing `raw_input`.
7 7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 8 call set_parent on all the PUB objects with the message about to be executed.
9 9 * Implement random port and security key logic.
10 10 * Implement control messages.
11 11 * Implement event loop and poll version.
12 12 """
13 13
14 14 # Standard library imports.
15 15 import __builtin__
16 16 import sys
17 17 import time
18 18 import traceback
19 19 from code import CommandCompiler
20 20
21 21 # System library imports.
22 22 import zmq
23 23
24 24 # Local imports.
25 from IPython.external.argparse import ArgumentParser
25 26 from session import Session, Message, extract_header
26 27 from completer import KernelCompleter
27 28
28 29
29 30 class OutStream(object):
30 31 """A file like object that publishes the stream to a 0MQ PUB socket."""
31 32
32 33 def __init__(self, session, pub_socket, name, max_buffer=200):
33 34 self.session = session
34 35 self.pub_socket = pub_socket
35 36 self.name = name
36 37 self._buffer = []
37 38 self._buffer_len = 0
38 39 self.max_buffer = max_buffer
39 40 self.parent_header = {}
40 41
41 42 def set_parent(self, parent):
42 43 self.parent_header = extract_header(parent)
43 44
44 45 def close(self):
45 46 self.pub_socket = None
46 47
47 48 def flush(self):
48 49 if self.pub_socket is None:
49 50 raise ValueError(u'I/O operation on closed file')
50 51 else:
51 52 if self._buffer:
52 53 data = ''.join(self._buffer)
53 54 content = {u'name':self.name, u'data':data}
54 55 msg = self.session.msg(u'stream', content=content,
55 56 parent=self.parent_header)
56 57 print>>sys.__stdout__, Message(msg)
57 58 self.pub_socket.send_json(msg)
58 59 self._buffer_len = 0
59 60 self._buffer = []
60 61
61 62 def isattr(self):
62 63 return False
63 64
64 65 def next(self):
65 66 raise IOError('Read not supported on a write only stream.')
66 67
67 68 def read(self, size=None):
68 69 raise IOError('Read not supported on a write only stream.')
69 70
70 71 readline=read
71 72
72 73 def write(self, s):
73 74 if self.pub_socket is None:
74 75 raise ValueError('I/O operation on closed file')
75 76 else:
76 77 self._buffer.append(s)
77 78 self._buffer_len += len(s)
78 79 self._maybe_send()
79 80
80 81 def _maybe_send(self):
81 82 if '\n' in self._buffer[-1]:
82 83 self.flush()
83 84 if self._buffer_len > self.max_buffer:
84 85 self.flush()
85 86
86 87 def writelines(self, sequence):
87 88 if self.pub_socket is None:
88 89 raise ValueError('I/O operation on closed file')
89 90 else:
90 91 for s in sequence:
91 92 self.write(s)
92 93
93 94
94 95 class DisplayHook(object):
95 96
96 97 def __init__(self, session, pub_socket):
97 98 self.session = session
98 99 self.pub_socket = pub_socket
99 100 self.parent_header = {}
100 101
101 102 def __call__(self, obj):
102 103 if obj is None:
103 104 return
104 105
105 106 __builtin__._ = obj
106 107 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
107 108 parent=self.parent_header)
108 109 self.pub_socket.send_json(msg)
109 110
110 111 def set_parent(self, parent):
111 112 self.parent_header = extract_header(parent)
112 113
113 114
114 115 class RawInput(object):
115 116
116 117 def __init__(self, session, socket):
117 118 self.session = session
118 119 self.socket = socket
119 120
120 121 def __call__(self, prompt=None):
121 122 msg = self.session.msg(u'raw_input')
122 123 self.socket.send_json(msg)
123 124 while True:
124 125 try:
125 126 reply = self.socket.recv_json(zmq.NOBLOCK)
126 127 except zmq.ZMQError, e:
127 128 if e.errno == zmq.EAGAIN:
128 129 pass
129 130 else:
130 131 raise
131 132 else:
132 133 break
133 134 return reply[u'content'][u'data']
134 135
135 136
136 137 class Kernel(object):
137 138
138 139 def __init__(self, session, reply_socket, pub_socket):
139 140 self.session = session
140 141 self.reply_socket = reply_socket
141 142 self.pub_socket = pub_socket
142 143 self.user_ns = {}
143 144 self.history = []
144 145 self.compiler = CommandCompiler()
145 146 self.completer = KernelCompleter(self.user_ns)
146 147
147 148 # Build dict of handlers for message types
148 149 msg_types = [ 'execute_request', 'complete_request',
149 150 'object_info_request' ]
150 151 self.handlers = {}
151 152 for msg_type in msg_types:
152 153 self.handlers[msg_type] = getattr(self, msg_type)
153 154
154 155 def abort_queue(self):
155 156 while True:
156 157 try:
157 158 ident = self.reply_socket.recv(zmq.NOBLOCK)
158 159 except zmq.ZMQError, e:
159 160 if e.errno == zmq.EAGAIN:
160 161 break
161 162 else:
162 163 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
163 164 msg = self.reply_socket.recv_json()
164 165 print>>sys.__stdout__, "Aborting:"
165 166 print>>sys.__stdout__, Message(msg)
166 167 msg_type = msg['msg_type']
167 168 reply_type = msg_type.split('_')[0] + '_reply'
168 169 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
169 170 print>>sys.__stdout__, Message(reply_msg)
170 171 self.reply_socket.send(ident,zmq.SNDMORE)
171 172 self.reply_socket.send_json(reply_msg)
172 173 # We need to wait a bit for requests to come in. This can probably
173 174 # be set shorter for true asynchronous clients.
174 175 time.sleep(0.1)
175 176
176 177 def execute_request(self, ident, parent):
177 178 try:
178 179 code = parent[u'content'][u'code']
179 180 except:
180 181 print>>sys.__stderr__, "Got bad msg: "
181 182 print>>sys.__stderr__, Message(parent)
182 183 return
183 184 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
184 185 self.pub_socket.send_json(pyin_msg)
185 186 try:
186 187 comp_code = self.compiler(code, '<zmq-kernel>')
187 188 sys.displayhook.set_parent(parent)
188 189 exec comp_code in self.user_ns, self.user_ns
189 190 except:
190 191 result = u'error'
191 192 etype, evalue, tb = sys.exc_info()
192 193 tb = traceback.format_exception(etype, evalue, tb)
193 194 exc_content = {
194 195 u'status' : u'error',
195 196 u'traceback' : tb,
196 197 u'etype' : unicode(etype),
197 198 u'evalue' : unicode(evalue)
198 199 }
199 200 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
200 201 self.pub_socket.send_json(exc_msg)
201 202 reply_content = exc_content
202 203 else:
203 204 reply_content = {'status' : 'ok'}
204 205 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
205 206 print>>sys.__stdout__, Message(reply_msg)
206 207 self.reply_socket.send(ident, zmq.SNDMORE)
207 208 self.reply_socket.send_json(reply_msg)
208 209 if reply_msg['content']['status'] == u'error':
209 210 self.abort_queue()
210 211
211 212 def complete_request(self, ident, parent):
212 213 matches = {'matches' : self.complete(parent),
213 214 'status' : 'ok'}
214 215 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
215 216 matches, parent, ident)
216 217 print >> sys.__stdout__, completion_msg
217 218
218 219 def complete(self, msg):
219 220 return self.completer.complete(msg.content.line, msg.content.text)
220 221
221 222 def object_info_request(self, ident, parent):
222 223 context = parent['content']['oname'].split('.')
223 224 object_info = self.object_info(context)
224 225 msg = self.session.send(self.reply_socket, 'object_info_reply',
225 226 object_info, parent, ident)
226 227 print >> sys.__stdout__, msg
227 228
228 229 def object_info(self, context):
229 230 symbol, leftover = self.symbol_from_context(context)
230 231 if symbol is not None and not leftover:
231 232 doc = getattr(symbol, '__doc__', '')
232 233 else:
233 234 doc = ''
234 235 object_info = dict(docstring = doc)
235 236 return object_info
236 237
237 238 def symbol_from_context(self, context):
238 239 if not context:
239 240 return None, context
240 241
241 242 base_symbol_string = context[0]
242 243 symbol = self.user_ns.get(base_symbol_string, None)
243 244 if symbol is None:
244 245 symbol = __builtin__.__dict__.get(base_symbol_string, None)
245 246 if symbol is None:
246 247 return None, context
247 248
248 249 context = context[1:]
249 250 for i, name in enumerate(context):
250 251 new_symbol = getattr(symbol, name, None)
251 252 if new_symbol is None:
252 253 return symbol, context[i:]
253 254 else:
254 255 symbol = new_symbol
255 256
256 257 return symbol, []
257 258
258 259 def start(self):
259 260 while True:
260 261 ident = self.reply_socket.recv()
261 262 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
262 263 msg = self.reply_socket.recv_json()
263 264 omsg = Message(msg)
264 265 print>>sys.__stdout__
265 266 print>>sys.__stdout__, omsg
266 267 handler = self.handlers.get(omsg.msg_type, None)
267 268 if handler is None:
268 269 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
269 270 else:
270 271 handler(ident, omsg)
271 272
272 273
273 274 def bind_port(socket, ip, port):
274 275 """ Binds the specified ZMQ socket. If the port is less than zero, a random
275 276 port is chosen. Returns the port that was bound.
276 277 """
277 278 connection = 'tcp://%s' % ip
278 279 if port < 0:
279 280 port = socket.bind_to_random_port(connection)
280 281 else:
281 282 connection += ':%i' % port
282 283 socket.bind(connection)
283 284 return port
284 285
285 def main(ip='127.0.0.1', rep_port=-1, pub_port=-1):
286 """ Start a kernel on 'ip' (default localhost) at the specified ports. If
287 ports are not specified, they are chosen at random.
286 def main():
287 """ Main entry point for launching a kernel.
288 288 """
289 # Parse command line arguments.
290 parser = ArgumentParser()
291 parser.add_argument('--ip', type=str, default='127.0.0.1',
292 help='set the kernel\'s IP address [default: local]')
293 parser.add_argument('--xrep', type=int, metavar='PORT', default=-1,
294 help='set the XREP Channel port [default: random]')
295 parser.add_argument('--pub', type=int, metavar='PORT', default=-1,
296 help='set the PUB Channel port [default: random]')
297 namespace = parser.parse_args()
298
299 # Create context, session, and kernel sockets.
289 300 print >>sys.__stdout__, "Starting the kernel..."
290
291 301 context = zmq.Context()
292 302 session = Session(username=u'kernel')
293 303
294 304 reply_socket = context.socket(zmq.XREP)
295 rep_port = bind_port(reply_socket, ip, rep_port)
296 print >>sys.__stdout__, "XREP Channel on port", rep_port
305 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
306 print >>sys.__stdout__, "XREP Channel on port", xrep_port
297 307
298 308 pub_socket = context.socket(zmq.PUB)
299 pub_port = bind_port(pub_socket, ip, pub_port)
309 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
300 310 print >>sys.__stdout__, "PUB Channel on port", pub_port
301 311
312 # Redirect input streams and set a display hook.
302 313 sys.stdout = OutStream(session, pub_socket, u'stdout')
303 314 sys.stderr = OutStream(session, pub_socket, u'stderr')
304 315 sys.displayhook = DisplayHook(session, pub_socket)
305 316
306 317 kernel = Kernel(session, reply_socket, pub_socket)
307 318
308 319 # For debugging convenience, put sleep and a string in the namespace, so we
309 320 # have them every time we start.
310 321 kernel.user_ns['sleep'] = time.sleep
311 322 kernel.user_ns['s'] = 'Test string'
312 323
313 324 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
314 325 kernel.start()
315 326
316 def launch_kernel():
317 """ Launches a kernel on this machine and binds its to channels to open
318 ports as it determined by the OS.
327 def launch_kernel(xrep_port=-1, pub_port=-1):
328 """ Launches a localhost kernel, binding to the specified ports. For any
329 port that is left unspecified, a port is chosen by the operating system.
319 330
320 331 Returns a tuple of form:
321 332 (kernel_process [Popen], rep_port [int], sub_port [int])
322 333 """
323 334 import socket
324 335 from subprocess import Popen
325 336
326 # Find some open ports.
337 # Find open ports as necessary.
327 338 ports = []
328 for i in xrange(2):
339 ports_needed = int(xrep_port < 0) + int(pub_port < 0)
340 for i in xrange(ports_needed):
329 341 sock = socket.socket()
330 342 sock.bind(('', 0))
331 343 ports.append(sock)
332 344 for i, sock in enumerate(ports):
333 345 port = sock.getsockname()[1]
334 346 sock.close()
335 347 ports[i] = port
336 rep_port, sub_port = ports
337
348 if xrep_port < 0:
349 xrep_port = ports.pop()
350 if pub_port < 0:
351 pub_port = ports.pop()
352
338 353 # Spawn a kernel.
339 command = 'from IPython.zmq.kernel import main;' \
340 'main(rep_port=%i, pub_port=%i)'
341 proc = Popen([sys.executable, '-c', command % (rep_port, sub_port)])
342
343 return proc, rep_port, sub_port
344
354 command = 'from IPython.zmq.kernel import main; main()'
355 proc = Popen([ sys.executable, '-c', command,
356 '--xrep', str(xrep_port), '--pub', str(pub_port) ])
357 return proc, xrep_port, pub_port
358
345 359
346 360 if __name__ == '__main__':
347 base_port = 5575
348 main(rep_port = base_port,
349 pub_port = base_port + 1)
361 main()
@@ -1,433 +1,451 b''
1 1 """Kernel frontend classes.
2 2
3 3 TODO: Create logger to handle debugging and console messages.
4 4
5 5 """
6 6
7 7 # Standard library imports.
8 8 from Queue import Queue, Empty
9 from subprocess import Popen
9 10 from threading import Thread
10 11 import time
11 12 import traceback
12 13
13 14 # System library imports.
14 15 import zmq
15 16 from zmq import POLLIN, POLLOUT, POLLERR
16 17 from zmq.eventloop import ioloop
17 18
18 19 # Local imports.
19 20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
20 21 Type
22 from kernel import launch_kernel
21 23 from session import Session
22 24
25 # Constants.
26 LOCALHOST = '127.0.0.1'
27
23 28
24 29 class MissingHandlerError(Exception):
25 30 pass
26 31
27 32
28 33 class ZmqSocketChannel(Thread):
29 34 """ The base class for the channels that use ZMQ sockets.
30 35 """
31 36
32 37 def __init__(self, context, session, address=None):
33 38 super(ZmqSocketChannel, self).__init__()
34 39 self.daemon = True
35 40
36 41 self.context = context
37 42 self.session = session
38 43 self.address = address
39 44 self.socket = None
40 45
41 46 def stop(self):
42 47 """ Stop the thread's activity. Returns when the thread terminates.
43 48 """
44 49 self.join()
45 50
46 51 # Allow the thread to be started again.
47 52 # FIXME: Although this works (and there's no reason why it shouldn't),
48 53 # it feels wrong. Is there a cleaner way to achieve this?
49 54 Thread.__init__(self)
50 55
51 56 def get_address(self):
52 """ Get the channel's address.
57 """ Get the channel's address. By the default, a channel is on
58 localhost with no port specified (a negative port number).
53 59 """
54 60 return self._address
55 61
56 62 def set_adresss(self, address):
57 63 """ Set the channel's address. Should be a tuple of form:
58 (ip address [str], port [int])
59 or 'None' to indicate that no address has been specified.
64 (ip address [str], port [int]).
65 or None, in which case the address is reset to its default value.
60 66 """
61 67 # FIXME: Validate address.
62 68 if self.is_alive():
63 69 raise RuntimeError("Cannot set address on a running channel!")
64 70 else:
71 if address is None:
72 address = (LOCALHOST, -1)
65 73 self._address = address
66 74
67 75 address = property(get_address, set_adresss)
68 76
69 77
70 78 class SubSocketChannel(ZmqSocketChannel):
71 79
72 80 handlers = None
73 81 _overriden_call_handler = None
74 82
75 83 def __init__(self, context, session, address=None):
76 84 self.handlers = {}
77 85 super(SubSocketChannel, self).__init__(context, session, address)
78 86
79 87 def run(self):
80 88 self.socket = self.context.socket(zmq.SUB)
81 89 self.socket.setsockopt(zmq.SUBSCRIBE,'')
82 90 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
83 91 self.socket.connect('tcp://%s:%i' % self.address)
84 92 self.ioloop = ioloop.IOLoop()
85 93 self.ioloop.add_handler(self.socket, self._handle_events,
86 94 POLLIN|POLLERR)
87 95 self.ioloop.start()
88 96
89 97 def stop(self):
90 98 self.ioloop.stop()
91 99 super(SubSocketChannel, self).stop()
92 100
93 101 def _handle_events(self, socket, events):
94 102 # Turn on and off POLLOUT depending on if we have made a request
95 103 if events & POLLERR:
96 104 self._handle_err()
97 105 if events & POLLIN:
98 106 self._handle_recv()
99 107
100 108 def _handle_err(self):
101 109 raise zmq.ZmqError()
102 110
103 111 def _handle_recv(self):
104 112 msg = self.socket.recv_json()
105 113 self.call_handlers(msg)
106 114
107 115 def override_call_handler(self, func):
108 116 """Permanently override the call_handler.
109 117
110 118 The function func will be called as::
111 119
112 120 func(handler, msg)
113 121
114 122 And must call::
115 123
116 124 handler(msg)
117 125
118 126 in the main thread.
119 127 """
120 128 assert callable(func), "not a callable: %r" % func
121 129 self._overriden_call_handler = func
122 130
123 131 def call_handlers(self, msg):
124 132 handler = self.handlers.get(msg['msg_type'], None)
125 133 if handler is not None:
126 134 try:
127 135 self.call_handler(handler, msg)
128 136 except:
129 137 # XXX: This should be logged at least
130 138 traceback.print_last()
131 139
132 140 def call_handler(self, handler, msg):
133 141 if self._overriden_call_handler is not None:
134 142 self._overriden_call_handler(handler, msg)
135 143 elif hasattr(self, '_call_handler'):
136 144 call_handler = getattr(self, '_call_handler')
137 145 call_handler(handler, msg)
138 146 else:
139 147 raise RuntimeError('no handler!')
140 148
141 149 def add_handler(self, callback, msg_type):
142 150 """Register a callback for msg type."""
143 151 self.handlers[msg_type] = callback
144 152
145 153 def remove_handler(self, msg_type):
146 154 """Remove the callback for msg type."""
147 155 self.handlers.pop(msg_type, None)
148 156
149 157 def flush(self):
150 158 """Immediately processes all pending messages on the SUB channel. This
151 159 method is thread safe.
152 160 """
153 161 self._flushed = False
154 162 self.ioloop.add_callback(self._flush)
155 163 while not self._flushed:
156 164 time.sleep(0.01)
157 165
158 166 def _flush(self):
159 167 """Called in this thread by the IOLoop to indicate that all events have
160 168 been processed.
161 169 """
162 170 self._flushed = True
163 171
164 172
165 173 class XReqSocketChannel(ZmqSocketChannel):
166 174
167 175 handler_queue = None
168 176 command_queue = None
169 177 handlers = None
170 178 _overriden_call_handler = None
171 179
172 180 def __init__(self, context, session, address=None):
173 181 self.handlers = {}
174 182 self.handler_queue = Queue()
175 183 self.command_queue = Queue()
176 184 super(XReqSocketChannel, self).__init__(context, session, address)
177 185
178 186 def run(self):
179 187 self.socket = self.context.socket(zmq.XREQ)
180 188 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
181 189 self.socket.connect('tcp://%s:%i' % self.address)
182 190 self.ioloop = ioloop.IOLoop()
183 191 self.ioloop.add_handler(self.socket, self._handle_events,
184 192 POLLIN|POLLOUT|POLLERR)
185 193 self.ioloop.start()
186 194
187 195 def stop(self):
188 196 self.ioloop.stop()
189 197 super(XReqSocketChannel, self).stop()
190 198
191 199 def _handle_events(self, socket, events):
192 200 # Turn on and off POLLOUT depending on if we have made a request
193 201 if events & POLLERR:
194 202 self._handle_err()
195 203 if events & POLLOUT:
196 204 self._handle_send()
197 205 if events & POLLIN:
198 206 self._handle_recv()
199 207
200 208 def _handle_recv(self):
201 209 msg = self.socket.recv_json()
202 210 self.call_handlers(msg)
203 211
204 212 def _handle_send(self):
205 213 try:
206 214 msg = self.command_queue.get(False)
207 215 except Empty:
208 216 pass
209 217 else:
210 218 self.socket.send_json(msg)
211 219
212 220 def _handle_err(self):
213 221 raise zmq.ZmqError()
214 222
215 223 def _queue_request(self, msg, callback):
216 224 handler = self._find_handler(msg['msg_type'], callback)
217 225 self.handler_queue.put(handler)
218 226 self.command_queue.put(msg)
219 227
220 228 def execute(self, code, callback=None):
221 229 # Create class for content/msg creation. Related to, but possibly
222 230 # not in Session.
223 231 content = dict(code=code)
224 232 msg = self.session.msg('execute_request', content)
225 233 self._queue_request(msg, callback)
226 234 return msg['header']['msg_id']
227 235
228 236 def complete(self, text, line, block=None, callback=None):
229 237 content = dict(text=text, line=line)
230 238 msg = self.session.msg('complete_request', content)
231 239 self._queue_request(msg, callback)
232 240 return msg['header']['msg_id']
233 241
234 242 def object_info(self, oname, callback=None):
235 243 content = dict(oname=oname)
236 244 msg = self.session.msg('object_info_request', content)
237 245 self._queue_request(msg, callback)
238 246 return msg['header']['msg_id']
239 247
240 248 def _find_handler(self, name, callback):
241 249 if callback is not None:
242 250 return callback
243 251 handler = self.handlers.get(name)
244 252 if handler is None:
245 253 raise MissingHandlerError(
246 254 'No handler defined for method: %s' % name)
247 255 return handler
248 256
249 257 def override_call_handler(self, func):
250 258 """Permanently override the call_handler.
251 259
252 260 The function func will be called as::
253 261
254 262 func(handler, msg)
255 263
256 264 And must call::
257 265
258 266 handler(msg)
259 267
260 268 in the main thread.
261 269 """
262 270 assert callable(func), "not a callable: %r" % func
263 271 self._overriden_call_handler = func
264 272
265 273 def call_handlers(self, msg):
266 274 try:
267 275 handler = self.handler_queue.get(False)
268 276 except Empty:
269 277 print "Message received with no handler!!!"
270 278 print msg
271 279 else:
272 280 self.call_handler(handler, msg)
273 281
274 282 def call_handler(self, handler, msg):
275 283 if self._overriden_call_handler is not None:
276 284 self._overriden_call_handler(handler, msg)
277 285 elif hasattr(self, '_call_handler'):
278 286 call_handler = getattr(self, '_call_handler')
279 287 call_handler(handler, msg)
280 288 else:
281 289 raise RuntimeError('no handler!')
282 290
283 291
284 292 class RepSocketChannel(ZmqSocketChannel):
285 293
286 294 def on_raw_input(self):
287 295 pass
288 296
289 297
290 298 class KernelManager(HasTraits):
291 299 """ Manages a kernel for a frontend.
292 300
293 301 The SUB channel is for the frontend to receive messages published by the
294 302 kernel.
295 303
296 304 The REQ channel is for the frontend to make requests of the kernel.
297 305
298 306 The REP channel is for the kernel to request stdin (raw_input) from the
299 307 frontend.
300 308 """
301 309
302 310 # Whether the kernel manager is currently listening on its channels.
303 311 is_listening = Bool(False)
304 312
305 313 # The PyZMQ Context to use for communication with the kernel.
306 314 context = Instance(zmq.Context, ())
307 315
308 316 # The Session to use for communication with the kernel.
309 317 session = Instance(Session, ())
310 318
311 319 # The classes to use for the various channels.
312 320 sub_channel_class = Type(SubSocketChannel)
313 321 xreq_channel_class = Type(XReqSocketChannel)
314 322 rep_channel_class = Type(RepSocketChannel)
315 323
316 324 # Protected traits.
325 _kernel = Instance(Popen)
317 326 _sub_channel = Any
318 327 _xreq_channel = Any
319 328 _rep_channel = Any
320 329
321 330 def __init__(self, **traits):
322 331 super(KernelManager, self).__init__()
323 332
324 333 # FIXME: This should be the business of HasTraits. The convention is:
325 334 # HasTraits.__init__(self, **traits_to_be_initialized.)
326 335 for trait in traits:
327 336 setattr(self, trait, traits[trait])
328 337
329 338 def start_listening(self):
330 339 """Start listening on the specified ports. If already listening, raises
331 340 a RuntimeError.
332 341 """
333 342 if self.is_listening:
334 343 raise RuntimeError("Cannot start listening. Already listening!")
335 344 else:
336 345 self.is_listening = True
337 346 self.sub_channel.start()
338 347 self.xreq_channel.start()
339 348 self.rep_channel.start()
340 349
341 350 def stop_listening(self):
342 351 """Stop listening. If not listening, does nothing. """
343 352 if self.is_listening:
344 353 self.is_listening = False
345 354 self.sub_channel.stop()
346 355 self.xreq_channel.stop()
347 356 self.rep_channel.stop()
348 357
349 358 def start_kernel(self):
350 """Start a localhost kernel. If ports have been specified, use them.
351 Otherwise, choose an open port at random.
359 """Start a localhost kernel. If ports have been specified via the
360 address attributes, use them. Otherwise, choose open ports at random.
352 361 """
353 # TODO: start a kernel.
354 self.start_listening()
362 xreq, sub = self.xreq_address, self.sub_address
363 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
364 raise RuntimeError("Can only launch a kernel on localhost."
365 "Make sure that the '*_address' attributes are "
366 "configured properly.")
367
368 self._kernel, xrep, pub = launch_kernel(xrep_port=xreq[1],
369 pub_port=sub[1])
370 self.xreq_address = (LOCALHOST, xrep)
371 self.sub_address = (LOCALHOST, pub)
355 372
356 373 def kill_kernel(self):
357 """Kill the running kernel.
374 """Kill the running kernel, if there is one.
358 375 """
359 # TODO: kill the kernel.
360 self.stop_listening()
376 if self._kernel:
377 self._kernel.kill()
378 self._kernel = None
361 379
362 380 @property
363 381 def is_alive(self):
364 382 """ Returns whether the kernel is alive. """
365 383 if self.is_listening:
366 384 # TODO: check if alive.
367 385 return True
368 386 else:
369 387 return False
370 388
371 389 def signal_kernel(self, signum):
372 390 """Send signum to the kernel."""
373 391 # TODO: signal the kernel.
374 392
375 393 #--------------------------------------------------------------------------
376 394 # Channels used for communication with the kernel:
377 395 #--------------------------------------------------------------------------
378 396
379 397 @property
380 398 def sub_channel(self):
381 399 """Get the SUB socket channel object."""
382 400 if self._sub_channel is None:
383 401 self._sub_channel = self.sub_channel_class(self.context,
384 402 self.session)
385 403 return self._sub_channel
386 404
387 405 @property
388 406 def xreq_channel(self):
389 407 """Get the REQ socket channel object to make requests of the kernel."""
390 408 if self._xreq_channel is None:
391 409 self._xreq_channel = self.xreq_channel_class(self.context,
392 410 self.session)
393 411 return self._xreq_channel
394 412
395 413 @property
396 414 def rep_channel(self):
397 415 """Get the REP socket channel object to handle stdin (raw_input)."""
398 416 if self._rep_channel is None:
399 417 self._rep_channel = self.rep_channel_class(self.context,
400 418 self.session)
401 419 return self._rep_channel
402 420
403 421 #--------------------------------------------------------------------------
404 422 # Channel address attributes:
405 423 #--------------------------------------------------------------------------
406 424
407 425 def get_sub_address(self):
408 426 return self.sub_channel.address
409 427
410 428 def set_sub_address(self, address):
411 429 self.sub_channel.address = address
412 430
413 431 sub_address = property(get_sub_address, set_sub_address,
414 432 doc="The address used by SUB socket channel.")
415 433
416 434 def get_xreq_address(self):
417 435 return self.xreq_channel.address
418 436
419 437 def set_xreq_address(self, address):
420 438 self.xreq_channel.address = address
421 439
422 440 xreq_address = property(get_xreq_address, set_xreq_address,
423 441 doc="The address used by XREQ socket channel.")
424 442
425 443 def get_rep_address(self):
426 444 return self.rep_channel.address
427 445
428 446 def set_rep_address(self, address):
429 447 self.rep_channel.address = address
430 448
431 449 rep_address = property(get_rep_address, set_rep_address,
432 450 doc="The address used by REP socket channel.")
433 451
General Comments 0
You need to be logged in to leave comments. Login now