##// END OF EJS Templates
* Implemented a proper main() function for kernel.py that reads command line input....
epatters -
Show More
@@ -1,96 +1,96 b''
1 # System library imports
1 # System library imports
2 from PyQt4 import QtCore, QtGui
2 from PyQt4 import QtCore, QtGui
3
3
4 # Local imports
4 # Local imports
5 from frontend_widget import FrontendWidget
5 from frontend_widget import FrontendWidget
6
6
7
7
8 class IPythonWidget(FrontendWidget):
8 class IPythonWidget(FrontendWidget):
9 """ A FrontendWidget for an IPython kernel.
9 """ A FrontendWidget for an IPython kernel.
10 """
10 """
11
11
12 #---------------------------------------------------------------------------
12 #---------------------------------------------------------------------------
13 # 'FrontendWidget' interface
13 # 'FrontendWidget' interface
14 #---------------------------------------------------------------------------
14 #---------------------------------------------------------------------------
15
15
16 def __init__(self, parent=None):
16 def __init__(self, parent=None):
17 super(IPythonWidget, self).__init__(parent)
17 super(IPythonWidget, self).__init__(parent)
18
18
19 self._magic_overrides = {}
19 self._magic_overrides = {}
20
20
21 def execute_source(self, source, hidden=False, interactive=False):
21 def execute_source(self, source, hidden=False, interactive=False):
22 """ Reimplemented to override magic commands.
22 """ Reimplemented to override magic commands.
23 """
23 """
24 magic_source = source.strip()
24 magic_source = source.strip()
25 if magic_source.startswith('%'):
25 if magic_source.startswith('%'):
26 magic_source = magic_source[1:]
26 magic_source = magic_source[1:]
27 magic, sep, arguments = magic_source.partition(' ')
27 magic, sep, arguments = magic_source.partition(' ')
28 if not magic:
28 if not magic:
29 magic = magic_source
29 magic = magic_source
30
30
31 callback = self._magic_overrides.get(magic)
31 callback = self._magic_overrides.get(magic)
32 if callback:
32 if callback:
33 output = callback(arguments)
33 output = callback(arguments)
34 if output:
34 if output:
35 self.appendPlainText(output)
35 self.appendPlainText(output)
36 self._show_prompt('>>> ')
36 self._show_prompt('>>> ')
37 return True
37 return True
38 else:
38 else:
39 return super(IPythonWidget, self).execute_source(source, hidden,
39 return super(IPythonWidget, self).execute_source(source, hidden,
40 interactive)
40 interactive)
41
41
42 #---------------------------------------------------------------------------
42 #---------------------------------------------------------------------------
43 # 'IPythonWidget' interface
43 # 'IPythonWidget' interface
44 #---------------------------------------------------------------------------
44 #---------------------------------------------------------------------------
45
45
46 def set_magic_override(self, magic, callback):
46 def set_magic_override(self, magic, callback):
47 """ Overrides an IPython magic command. This magic will be intercepted
47 """ Overrides an IPython magic command. This magic will be intercepted
48 by the frontend rather than passed on to the kernel and 'callback'
48 by the frontend rather than passed on to the kernel and 'callback'
49 will be called with a single argument: a string of argument(s) for
49 will be called with a single argument: a string of argument(s) for
50 the magic. The callback can (optionally) return text to print to the
50 the magic. The callback can (optionally) return text to print to the
51 console.
51 console.
52 """
52 """
53 self._magic_overrides[magic] = callback
53 self._magic_overrides[magic] = callback
54
54
55 def remove_magic_override(self, magic):
55 def remove_magic_override(self, magic):
56 """ Removes the override for the specified magic, if there is one.
56 """ Removes the override for the specified magic, if there is one.
57 """
57 """
58 try:
58 try:
59 del self._magic_overrides[magic]
59 del self._magic_overrides[magic]
60 except KeyError:
60 except KeyError:
61 pass
61 pass
62
62
63
63
64 if __name__ == '__main__':
64 if __name__ == '__main__':
65 from IPython.external.argparse import ArgumentParser
66 from IPython.frontend.qt.kernelmanager import QtKernelManager
67
68 # Don't let Qt swallow KeyboardInterupts.
69 import signal
65 import signal
70 signal.signal(signal.SIGINT, signal.SIG_DFL)
66 from IPython.frontend.qt.kernelmanager import QtKernelManager
71
67
72 # Parse command line arguments.
68 # Create a KernelManager.
73 parser = ArgumentParser()
69 kernel_manager = QtKernelManager()
74 parser.add_argument('--ip', type=str, default='127.0.0.1',
70 kernel_manager.start_kernel()
75 help='set the kernel\'s IP address [default localhost]')
76 parser.add_argument('--xreq', type=int, metavar='PORT', default=5575,
77 help='set the XREQ Channel port [default %(default)i]')
78 parser.add_argument('--sub', type=int, metavar='PORT', default=5576,
79 help='set the SUB Channel port [default %(default)i]')
80 namespace = parser.parse_args()
81
82 # Create KernelManager
83 ip = namespace.ip
84 kernel_manager = QtKernelManager(xreq_address = (ip, namespace.xreq),
85 sub_address = (ip, namespace.sub))
86 kernel_manager.start_listening()
71 kernel_manager.start_listening()
87
72
88 # Launch application
73 # Don't let Qt or ZMQ swallow KeyboardInterupts.
74 # FIXME: Gah, ZMQ swallows even custom signal handlers. So for now we leave
75 # behind a kernel process when Ctrl-C is pressed.
76 #def sigint_hook(signum, frame):
77 # QtGui.qApp.quit()
78 #signal.signal(signal.SIGINT, sigint_hook)
79 signal.signal(signal.SIGINT, signal.SIG_DFL)
80
81 # Create the application, making sure to clean up nicely when we exit.
89 app = QtGui.QApplication([])
82 app = QtGui.QApplication([])
83 def quit_hook():
84 kernel_manager.stop_listening()
85 kernel_manager.kill_kernel()
86 app.aboutToQuit.connect(quit_hook)
87
88 # Launch the application.
90 widget = IPythonWidget()
89 widget = IPythonWidget()
91 widget.kernel_manager = kernel_manager
90 widget.kernel_manager = kernel_manager
92 widget.setWindowTitle('Python')
91 widget.setWindowTitle('Python')
93 widget.resize(640, 480)
92 widget.resize(640, 480)
94 widget.show()
93 widget.show()
95 app.exec_()
94 app.exec_()
95
96
96
@@ -1,349 +1,361 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Finish implementing `raw_input`.
6 * Finish implementing `raw_input`.
7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 call set_parent on all the PUB objects with the message about to be executed.
8 call set_parent on all the PUB objects with the message about to be executed.
9 * Implement random port and security key logic.
9 * Implement random port and security key logic.
10 * Implement control messages.
10 * Implement control messages.
11 * Implement event loop and poll version.
11 * Implement event loop and poll version.
12 """
12 """
13
13
14 # Standard library imports.
14 # Standard library imports.
15 import __builtin__
15 import __builtin__
16 import sys
16 import sys
17 import time
17 import time
18 import traceback
18 import traceback
19 from code import CommandCompiler
19 from code import CommandCompiler
20
20
21 # System library imports.
21 # System library imports.
22 import zmq
22 import zmq
23
23
24 # Local imports.
24 # Local imports.
25 from IPython.external.argparse import ArgumentParser
25 from session import Session, Message, extract_header
26 from session import Session, Message, extract_header
26 from completer import KernelCompleter
27 from completer import KernelCompleter
27
28
28
29
29 class OutStream(object):
30 class OutStream(object):
30 """A file like object that publishes the stream to a 0MQ PUB socket."""
31 """A file like object that publishes the stream to a 0MQ PUB socket."""
31
32
32 def __init__(self, session, pub_socket, name, max_buffer=200):
33 def __init__(self, session, pub_socket, name, max_buffer=200):
33 self.session = session
34 self.session = session
34 self.pub_socket = pub_socket
35 self.pub_socket = pub_socket
35 self.name = name
36 self.name = name
36 self._buffer = []
37 self._buffer = []
37 self._buffer_len = 0
38 self._buffer_len = 0
38 self.max_buffer = max_buffer
39 self.max_buffer = max_buffer
39 self.parent_header = {}
40 self.parent_header = {}
40
41
41 def set_parent(self, parent):
42 def set_parent(self, parent):
42 self.parent_header = extract_header(parent)
43 self.parent_header = extract_header(parent)
43
44
44 def close(self):
45 def close(self):
45 self.pub_socket = None
46 self.pub_socket = None
46
47
47 def flush(self):
48 def flush(self):
48 if self.pub_socket is None:
49 if self.pub_socket is None:
49 raise ValueError(u'I/O operation on closed file')
50 raise ValueError(u'I/O operation on closed file')
50 else:
51 else:
51 if self._buffer:
52 if self._buffer:
52 data = ''.join(self._buffer)
53 data = ''.join(self._buffer)
53 content = {u'name':self.name, u'data':data}
54 content = {u'name':self.name, u'data':data}
54 msg = self.session.msg(u'stream', content=content,
55 msg = self.session.msg(u'stream', content=content,
55 parent=self.parent_header)
56 parent=self.parent_header)
56 print>>sys.__stdout__, Message(msg)
57 print>>sys.__stdout__, Message(msg)
57 self.pub_socket.send_json(msg)
58 self.pub_socket.send_json(msg)
58 self._buffer_len = 0
59 self._buffer_len = 0
59 self._buffer = []
60 self._buffer = []
60
61
61 def isattr(self):
62 def isattr(self):
62 return False
63 return False
63
64
64 def next(self):
65 def next(self):
65 raise IOError('Read not supported on a write only stream.')
66 raise IOError('Read not supported on a write only stream.')
66
67
67 def read(self, size=None):
68 def read(self, size=None):
68 raise IOError('Read not supported on a write only stream.')
69 raise IOError('Read not supported on a write only stream.')
69
70
70 readline=read
71 readline=read
71
72
72 def write(self, s):
73 def write(self, s):
73 if self.pub_socket is None:
74 if self.pub_socket is None:
74 raise ValueError('I/O operation on closed file')
75 raise ValueError('I/O operation on closed file')
75 else:
76 else:
76 self._buffer.append(s)
77 self._buffer.append(s)
77 self._buffer_len += len(s)
78 self._buffer_len += len(s)
78 self._maybe_send()
79 self._maybe_send()
79
80
80 def _maybe_send(self):
81 def _maybe_send(self):
81 if '\n' in self._buffer[-1]:
82 if '\n' in self._buffer[-1]:
82 self.flush()
83 self.flush()
83 if self._buffer_len > self.max_buffer:
84 if self._buffer_len > self.max_buffer:
84 self.flush()
85 self.flush()
85
86
86 def writelines(self, sequence):
87 def writelines(self, sequence):
87 if self.pub_socket is None:
88 if self.pub_socket is None:
88 raise ValueError('I/O operation on closed file')
89 raise ValueError('I/O operation on closed file')
89 else:
90 else:
90 for s in sequence:
91 for s in sequence:
91 self.write(s)
92 self.write(s)
92
93
93
94
94 class DisplayHook(object):
95 class DisplayHook(object):
95
96
96 def __init__(self, session, pub_socket):
97 def __init__(self, session, pub_socket):
97 self.session = session
98 self.session = session
98 self.pub_socket = pub_socket
99 self.pub_socket = pub_socket
99 self.parent_header = {}
100 self.parent_header = {}
100
101
101 def __call__(self, obj):
102 def __call__(self, obj):
102 if obj is None:
103 if obj is None:
103 return
104 return
104
105
105 __builtin__._ = obj
106 __builtin__._ = obj
106 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
107 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
107 parent=self.parent_header)
108 parent=self.parent_header)
108 self.pub_socket.send_json(msg)
109 self.pub_socket.send_json(msg)
109
110
110 def set_parent(self, parent):
111 def set_parent(self, parent):
111 self.parent_header = extract_header(parent)
112 self.parent_header = extract_header(parent)
112
113
113
114
114 class RawInput(object):
115 class RawInput(object):
115
116
116 def __init__(self, session, socket):
117 def __init__(self, session, socket):
117 self.session = session
118 self.session = session
118 self.socket = socket
119 self.socket = socket
119
120
120 def __call__(self, prompt=None):
121 def __call__(self, prompt=None):
121 msg = self.session.msg(u'raw_input')
122 msg = self.session.msg(u'raw_input')
122 self.socket.send_json(msg)
123 self.socket.send_json(msg)
123 while True:
124 while True:
124 try:
125 try:
125 reply = self.socket.recv_json(zmq.NOBLOCK)
126 reply = self.socket.recv_json(zmq.NOBLOCK)
126 except zmq.ZMQError, e:
127 except zmq.ZMQError, e:
127 if e.errno == zmq.EAGAIN:
128 if e.errno == zmq.EAGAIN:
128 pass
129 pass
129 else:
130 else:
130 raise
131 raise
131 else:
132 else:
132 break
133 break
133 return reply[u'content'][u'data']
134 return reply[u'content'][u'data']
134
135
135
136
136 class Kernel(object):
137 class Kernel(object):
137
138
138 def __init__(self, session, reply_socket, pub_socket):
139 def __init__(self, session, reply_socket, pub_socket):
139 self.session = session
140 self.session = session
140 self.reply_socket = reply_socket
141 self.reply_socket = reply_socket
141 self.pub_socket = pub_socket
142 self.pub_socket = pub_socket
142 self.user_ns = {}
143 self.user_ns = {}
143 self.history = []
144 self.history = []
144 self.compiler = CommandCompiler()
145 self.compiler = CommandCompiler()
145 self.completer = KernelCompleter(self.user_ns)
146 self.completer = KernelCompleter(self.user_ns)
146
147
147 # Build dict of handlers for message types
148 # Build dict of handlers for message types
148 msg_types = [ 'execute_request', 'complete_request',
149 msg_types = [ 'execute_request', 'complete_request',
149 'object_info_request' ]
150 'object_info_request' ]
150 self.handlers = {}
151 self.handlers = {}
151 for msg_type in msg_types:
152 for msg_type in msg_types:
152 self.handlers[msg_type] = getattr(self, msg_type)
153 self.handlers[msg_type] = getattr(self, msg_type)
153
154
154 def abort_queue(self):
155 def abort_queue(self):
155 while True:
156 while True:
156 try:
157 try:
157 ident = self.reply_socket.recv(zmq.NOBLOCK)
158 ident = self.reply_socket.recv(zmq.NOBLOCK)
158 except zmq.ZMQError, e:
159 except zmq.ZMQError, e:
159 if e.errno == zmq.EAGAIN:
160 if e.errno == zmq.EAGAIN:
160 break
161 break
161 else:
162 else:
162 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
163 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
163 msg = self.reply_socket.recv_json()
164 msg = self.reply_socket.recv_json()
164 print>>sys.__stdout__, "Aborting:"
165 print>>sys.__stdout__, "Aborting:"
165 print>>sys.__stdout__, Message(msg)
166 print>>sys.__stdout__, Message(msg)
166 msg_type = msg['msg_type']
167 msg_type = msg['msg_type']
167 reply_type = msg_type.split('_')[0] + '_reply'
168 reply_type = msg_type.split('_')[0] + '_reply'
168 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
169 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
169 print>>sys.__stdout__, Message(reply_msg)
170 print>>sys.__stdout__, Message(reply_msg)
170 self.reply_socket.send(ident,zmq.SNDMORE)
171 self.reply_socket.send(ident,zmq.SNDMORE)
171 self.reply_socket.send_json(reply_msg)
172 self.reply_socket.send_json(reply_msg)
172 # We need to wait a bit for requests to come in. This can probably
173 # We need to wait a bit for requests to come in. This can probably
173 # be set shorter for true asynchronous clients.
174 # be set shorter for true asynchronous clients.
174 time.sleep(0.1)
175 time.sleep(0.1)
175
176
176 def execute_request(self, ident, parent):
177 def execute_request(self, ident, parent):
177 try:
178 try:
178 code = parent[u'content'][u'code']
179 code = parent[u'content'][u'code']
179 except:
180 except:
180 print>>sys.__stderr__, "Got bad msg: "
181 print>>sys.__stderr__, "Got bad msg: "
181 print>>sys.__stderr__, Message(parent)
182 print>>sys.__stderr__, Message(parent)
182 return
183 return
183 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
184 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
184 self.pub_socket.send_json(pyin_msg)
185 self.pub_socket.send_json(pyin_msg)
185 try:
186 try:
186 comp_code = self.compiler(code, '<zmq-kernel>')
187 comp_code = self.compiler(code, '<zmq-kernel>')
187 sys.displayhook.set_parent(parent)
188 sys.displayhook.set_parent(parent)
188 exec comp_code in self.user_ns, self.user_ns
189 exec comp_code in self.user_ns, self.user_ns
189 except:
190 except:
190 result = u'error'
191 result = u'error'
191 etype, evalue, tb = sys.exc_info()
192 etype, evalue, tb = sys.exc_info()
192 tb = traceback.format_exception(etype, evalue, tb)
193 tb = traceback.format_exception(etype, evalue, tb)
193 exc_content = {
194 exc_content = {
194 u'status' : u'error',
195 u'status' : u'error',
195 u'traceback' : tb,
196 u'traceback' : tb,
196 u'etype' : unicode(etype),
197 u'etype' : unicode(etype),
197 u'evalue' : unicode(evalue)
198 u'evalue' : unicode(evalue)
198 }
199 }
199 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
200 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
200 self.pub_socket.send_json(exc_msg)
201 self.pub_socket.send_json(exc_msg)
201 reply_content = exc_content
202 reply_content = exc_content
202 else:
203 else:
203 reply_content = {'status' : 'ok'}
204 reply_content = {'status' : 'ok'}
204 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
205 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
205 print>>sys.__stdout__, Message(reply_msg)
206 print>>sys.__stdout__, Message(reply_msg)
206 self.reply_socket.send(ident, zmq.SNDMORE)
207 self.reply_socket.send(ident, zmq.SNDMORE)
207 self.reply_socket.send_json(reply_msg)
208 self.reply_socket.send_json(reply_msg)
208 if reply_msg['content']['status'] == u'error':
209 if reply_msg['content']['status'] == u'error':
209 self.abort_queue()
210 self.abort_queue()
210
211
211 def complete_request(self, ident, parent):
212 def complete_request(self, ident, parent):
212 matches = {'matches' : self.complete(parent),
213 matches = {'matches' : self.complete(parent),
213 'status' : 'ok'}
214 'status' : 'ok'}
214 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
215 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
215 matches, parent, ident)
216 matches, parent, ident)
216 print >> sys.__stdout__, completion_msg
217 print >> sys.__stdout__, completion_msg
217
218
218 def complete(self, msg):
219 def complete(self, msg):
219 return self.completer.complete(msg.content.line, msg.content.text)
220 return self.completer.complete(msg.content.line, msg.content.text)
220
221
221 def object_info_request(self, ident, parent):
222 def object_info_request(self, ident, parent):
222 context = parent['content']['oname'].split('.')
223 context = parent['content']['oname'].split('.')
223 object_info = self.object_info(context)
224 object_info = self.object_info(context)
224 msg = self.session.send(self.reply_socket, 'object_info_reply',
225 msg = self.session.send(self.reply_socket, 'object_info_reply',
225 object_info, parent, ident)
226 object_info, parent, ident)
226 print >> sys.__stdout__, msg
227 print >> sys.__stdout__, msg
227
228
228 def object_info(self, context):
229 def object_info(self, context):
229 symbol, leftover = self.symbol_from_context(context)
230 symbol, leftover = self.symbol_from_context(context)
230 if symbol is not None and not leftover:
231 if symbol is not None and not leftover:
231 doc = getattr(symbol, '__doc__', '')
232 doc = getattr(symbol, '__doc__', '')
232 else:
233 else:
233 doc = ''
234 doc = ''
234 object_info = dict(docstring = doc)
235 object_info = dict(docstring = doc)
235 return object_info
236 return object_info
236
237
237 def symbol_from_context(self, context):
238 def symbol_from_context(self, context):
238 if not context:
239 if not context:
239 return None, context
240 return None, context
240
241
241 base_symbol_string = context[0]
242 base_symbol_string = context[0]
242 symbol = self.user_ns.get(base_symbol_string, None)
243 symbol = self.user_ns.get(base_symbol_string, None)
243 if symbol is None:
244 if symbol is None:
244 symbol = __builtin__.__dict__.get(base_symbol_string, None)
245 symbol = __builtin__.__dict__.get(base_symbol_string, None)
245 if symbol is None:
246 if symbol is None:
246 return None, context
247 return None, context
247
248
248 context = context[1:]
249 context = context[1:]
249 for i, name in enumerate(context):
250 for i, name in enumerate(context):
250 new_symbol = getattr(symbol, name, None)
251 new_symbol = getattr(symbol, name, None)
251 if new_symbol is None:
252 if new_symbol is None:
252 return symbol, context[i:]
253 return symbol, context[i:]
253 else:
254 else:
254 symbol = new_symbol
255 symbol = new_symbol
255
256
256 return symbol, []
257 return symbol, []
257
258
258 def start(self):
259 def start(self):
259 while True:
260 while True:
260 ident = self.reply_socket.recv()
261 ident = self.reply_socket.recv()
261 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
262 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
262 msg = self.reply_socket.recv_json()
263 msg = self.reply_socket.recv_json()
263 omsg = Message(msg)
264 omsg = Message(msg)
264 print>>sys.__stdout__
265 print>>sys.__stdout__
265 print>>sys.__stdout__, omsg
266 print>>sys.__stdout__, omsg
266 handler = self.handlers.get(omsg.msg_type, None)
267 handler = self.handlers.get(omsg.msg_type, None)
267 if handler is None:
268 if handler is None:
268 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
269 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
269 else:
270 else:
270 handler(ident, omsg)
271 handler(ident, omsg)
271
272
272
273
273 def bind_port(socket, ip, port):
274 def bind_port(socket, ip, port):
274 """ Binds the specified ZMQ socket. If the port is less than zero, a random
275 """ Binds the specified ZMQ socket. If the port is less than zero, a random
275 port is chosen. Returns the port that was bound.
276 port is chosen. Returns the port that was bound.
276 """
277 """
277 connection = 'tcp://%s' % ip
278 connection = 'tcp://%s' % ip
278 if port < 0:
279 if port < 0:
279 port = socket.bind_to_random_port(connection)
280 port = socket.bind_to_random_port(connection)
280 else:
281 else:
281 connection += ':%i' % port
282 connection += ':%i' % port
282 socket.bind(connection)
283 socket.bind(connection)
283 return port
284 return port
284
285
285 def main(ip='127.0.0.1', rep_port=-1, pub_port=-1):
286 def main():
286 """ Start a kernel on 'ip' (default localhost) at the specified ports. If
287 """ Main entry point for launching a kernel.
287 ports are not specified, they are chosen at random.
288 """
288 """
289 # Parse command line arguments.
290 parser = ArgumentParser()
291 parser.add_argument('--ip', type=str, default='127.0.0.1',
292 help='set the kernel\'s IP address [default: local]')
293 parser.add_argument('--xrep', type=int, metavar='PORT', default=-1,
294 help='set the XREP Channel port [default: random]')
295 parser.add_argument('--pub', type=int, metavar='PORT', default=-1,
296 help='set the PUB Channel port [default: random]')
297 namespace = parser.parse_args()
298
299 # Create context, session, and kernel sockets.
289 print >>sys.__stdout__, "Starting the kernel..."
300 print >>sys.__stdout__, "Starting the kernel..."
290
291 context = zmq.Context()
301 context = zmq.Context()
292 session = Session(username=u'kernel')
302 session = Session(username=u'kernel')
293
303
294 reply_socket = context.socket(zmq.XREP)
304 reply_socket = context.socket(zmq.XREP)
295 rep_port = bind_port(reply_socket, ip, rep_port)
305 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
296 print >>sys.__stdout__, "XREP Channel on port", rep_port
306 print >>sys.__stdout__, "XREP Channel on port", xrep_port
297
307
298 pub_socket = context.socket(zmq.PUB)
308 pub_socket = context.socket(zmq.PUB)
299 pub_port = bind_port(pub_socket, ip, pub_port)
309 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
300 print >>sys.__stdout__, "PUB Channel on port", pub_port
310 print >>sys.__stdout__, "PUB Channel on port", pub_port
301
311
312 # Redirect input streams and set a display hook.
302 sys.stdout = OutStream(session, pub_socket, u'stdout')
313 sys.stdout = OutStream(session, pub_socket, u'stdout')
303 sys.stderr = OutStream(session, pub_socket, u'stderr')
314 sys.stderr = OutStream(session, pub_socket, u'stderr')
304 sys.displayhook = DisplayHook(session, pub_socket)
315 sys.displayhook = DisplayHook(session, pub_socket)
305
316
306 kernel = Kernel(session, reply_socket, pub_socket)
317 kernel = Kernel(session, reply_socket, pub_socket)
307
318
308 # For debugging convenience, put sleep and a string in the namespace, so we
319 # For debugging convenience, put sleep and a string in the namespace, so we
309 # have them every time we start.
320 # have them every time we start.
310 kernel.user_ns['sleep'] = time.sleep
321 kernel.user_ns['sleep'] = time.sleep
311 kernel.user_ns['s'] = 'Test string'
322 kernel.user_ns['s'] = 'Test string'
312
323
313 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
324 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
314 kernel.start()
325 kernel.start()
315
326
316 def launch_kernel():
327 def launch_kernel(xrep_port=-1, pub_port=-1):
317 """ Launches a kernel on this machine and binds its to channels to open
328 """ Launches a localhost kernel, binding to the specified ports. For any
318 ports as it determined by the OS.
329 port that is left unspecified, a port is chosen by the operating system.
319
330
320 Returns a tuple of form:
331 Returns a tuple of form:
321 (kernel_process [Popen], rep_port [int], sub_port [int])
332 (kernel_process [Popen], rep_port [int], sub_port [int])
322 """
333 """
323 import socket
334 import socket
324 from subprocess import Popen
335 from subprocess import Popen
325
336
326 # Find some open ports.
337 # Find open ports as necessary.
327 ports = []
338 ports = []
328 for i in xrange(2):
339 ports_needed = int(xrep_port < 0) + int(pub_port < 0)
340 for i in xrange(ports_needed):
329 sock = socket.socket()
341 sock = socket.socket()
330 sock.bind(('', 0))
342 sock.bind(('', 0))
331 ports.append(sock)
343 ports.append(sock)
332 for i, sock in enumerate(ports):
344 for i, sock in enumerate(ports):
333 port = sock.getsockname()[1]
345 port = sock.getsockname()[1]
334 sock.close()
346 sock.close()
335 ports[i] = port
347 ports[i] = port
336 rep_port, sub_port = ports
348 if xrep_port < 0:
337
349 xrep_port = ports.pop()
350 if pub_port < 0:
351 pub_port = ports.pop()
352
338 # Spawn a kernel.
353 # Spawn a kernel.
339 command = 'from IPython.zmq.kernel import main;' \
354 command = 'from IPython.zmq.kernel import main; main()'
340 'main(rep_port=%i, pub_port=%i)'
355 proc = Popen([ sys.executable, '-c', command,
341 proc = Popen([sys.executable, '-c', command % (rep_port, sub_port)])
356 '--xrep', str(xrep_port), '--pub', str(pub_port) ])
342
357 return proc, xrep_port, pub_port
343 return proc, rep_port, sub_port
358
344
345
359
346 if __name__ == '__main__':
360 if __name__ == '__main__':
347 base_port = 5575
361 main()
348 main(rep_port = base_port,
349 pub_port = base_port + 1)
@@ -1,433 +1,451 b''
1 """Kernel frontend classes.
1 """Kernel frontend classes.
2
2
3 TODO: Create logger to handle debugging and console messages.
3 TODO: Create logger to handle debugging and console messages.
4
4
5 """
5 """
6
6
7 # Standard library imports.
7 # Standard library imports.
8 from Queue import Queue, Empty
8 from Queue import Queue, Empty
9 from subprocess import Popen
9 from threading import Thread
10 from threading import Thread
10 import time
11 import time
11 import traceback
12 import traceback
12
13
13 # System library imports.
14 # System library imports.
14 import zmq
15 import zmq
15 from zmq import POLLIN, POLLOUT, POLLERR
16 from zmq import POLLIN, POLLOUT, POLLERR
16 from zmq.eventloop import ioloop
17 from zmq.eventloop import ioloop
17
18
18 # Local imports.
19 # Local imports.
19 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
20 Type
21 Type
22 from kernel import launch_kernel
21 from session import Session
23 from session import Session
22
24
25 # Constants.
26 LOCALHOST = '127.0.0.1'
27
23
28
24 class MissingHandlerError(Exception):
29 class MissingHandlerError(Exception):
25 pass
30 pass
26
31
27
32
28 class ZmqSocketChannel(Thread):
33 class ZmqSocketChannel(Thread):
29 """ The base class for the channels that use ZMQ sockets.
34 """ The base class for the channels that use ZMQ sockets.
30 """
35 """
31
36
32 def __init__(self, context, session, address=None):
37 def __init__(self, context, session, address=None):
33 super(ZmqSocketChannel, self).__init__()
38 super(ZmqSocketChannel, self).__init__()
34 self.daemon = True
39 self.daemon = True
35
40
36 self.context = context
41 self.context = context
37 self.session = session
42 self.session = session
38 self.address = address
43 self.address = address
39 self.socket = None
44 self.socket = None
40
45
41 def stop(self):
46 def stop(self):
42 """ Stop the thread's activity. Returns when the thread terminates.
47 """ Stop the thread's activity. Returns when the thread terminates.
43 """
48 """
44 self.join()
49 self.join()
45
50
46 # Allow the thread to be started again.
51 # Allow the thread to be started again.
47 # FIXME: Although this works (and there's no reason why it shouldn't),
52 # FIXME: Although this works (and there's no reason why it shouldn't),
48 # it feels wrong. Is there a cleaner way to achieve this?
53 # it feels wrong. Is there a cleaner way to achieve this?
49 Thread.__init__(self)
54 Thread.__init__(self)
50
55
51 def get_address(self):
56 def get_address(self):
52 """ Get the channel's address.
57 """ Get the channel's address. By the default, a channel is on
58 localhost with no port specified (a negative port number).
53 """
59 """
54 return self._address
60 return self._address
55
61
56 def set_adresss(self, address):
62 def set_adresss(self, address):
57 """ Set the channel's address. Should be a tuple of form:
63 """ Set the channel's address. Should be a tuple of form:
58 (ip address [str], port [int])
64 (ip address [str], port [int]).
59 or 'None' to indicate that no address has been specified.
65 or None, in which case the address is reset to its default value.
60 """
66 """
61 # FIXME: Validate address.
67 # FIXME: Validate address.
62 if self.is_alive():
68 if self.is_alive():
63 raise RuntimeError("Cannot set address on a running channel!")
69 raise RuntimeError("Cannot set address on a running channel!")
64 else:
70 else:
71 if address is None:
72 address = (LOCALHOST, -1)
65 self._address = address
73 self._address = address
66
74
67 address = property(get_address, set_adresss)
75 address = property(get_address, set_adresss)
68
76
69
77
70 class SubSocketChannel(ZmqSocketChannel):
78 class SubSocketChannel(ZmqSocketChannel):
71
79
72 handlers = None
80 handlers = None
73 _overriden_call_handler = None
81 _overriden_call_handler = None
74
82
75 def __init__(self, context, session, address=None):
83 def __init__(self, context, session, address=None):
76 self.handlers = {}
84 self.handlers = {}
77 super(SubSocketChannel, self).__init__(context, session, address)
85 super(SubSocketChannel, self).__init__(context, session, address)
78
86
79 def run(self):
87 def run(self):
80 self.socket = self.context.socket(zmq.SUB)
88 self.socket = self.context.socket(zmq.SUB)
81 self.socket.setsockopt(zmq.SUBSCRIBE,'')
89 self.socket.setsockopt(zmq.SUBSCRIBE,'')
82 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
90 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
83 self.socket.connect('tcp://%s:%i' % self.address)
91 self.socket.connect('tcp://%s:%i' % self.address)
84 self.ioloop = ioloop.IOLoop()
92 self.ioloop = ioloop.IOLoop()
85 self.ioloop.add_handler(self.socket, self._handle_events,
93 self.ioloop.add_handler(self.socket, self._handle_events,
86 POLLIN|POLLERR)
94 POLLIN|POLLERR)
87 self.ioloop.start()
95 self.ioloop.start()
88
96
89 def stop(self):
97 def stop(self):
90 self.ioloop.stop()
98 self.ioloop.stop()
91 super(SubSocketChannel, self).stop()
99 super(SubSocketChannel, self).stop()
92
100
93 def _handle_events(self, socket, events):
101 def _handle_events(self, socket, events):
94 # Turn on and off POLLOUT depending on if we have made a request
102 # Turn on and off POLLOUT depending on if we have made a request
95 if events & POLLERR:
103 if events & POLLERR:
96 self._handle_err()
104 self._handle_err()
97 if events & POLLIN:
105 if events & POLLIN:
98 self._handle_recv()
106 self._handle_recv()
99
107
100 def _handle_err(self):
108 def _handle_err(self):
101 raise zmq.ZmqError()
109 raise zmq.ZmqError()
102
110
103 def _handle_recv(self):
111 def _handle_recv(self):
104 msg = self.socket.recv_json()
112 msg = self.socket.recv_json()
105 self.call_handlers(msg)
113 self.call_handlers(msg)
106
114
107 def override_call_handler(self, func):
115 def override_call_handler(self, func):
108 """Permanently override the call_handler.
116 """Permanently override the call_handler.
109
117
110 The function func will be called as::
118 The function func will be called as::
111
119
112 func(handler, msg)
120 func(handler, msg)
113
121
114 And must call::
122 And must call::
115
123
116 handler(msg)
124 handler(msg)
117
125
118 in the main thread.
126 in the main thread.
119 """
127 """
120 assert callable(func), "not a callable: %r" % func
128 assert callable(func), "not a callable: %r" % func
121 self._overriden_call_handler = func
129 self._overriden_call_handler = func
122
130
123 def call_handlers(self, msg):
131 def call_handlers(self, msg):
124 handler = self.handlers.get(msg['msg_type'], None)
132 handler = self.handlers.get(msg['msg_type'], None)
125 if handler is not None:
133 if handler is not None:
126 try:
134 try:
127 self.call_handler(handler, msg)
135 self.call_handler(handler, msg)
128 except:
136 except:
129 # XXX: This should be logged at least
137 # XXX: This should be logged at least
130 traceback.print_last()
138 traceback.print_last()
131
139
132 def call_handler(self, handler, msg):
140 def call_handler(self, handler, msg):
133 if self._overriden_call_handler is not None:
141 if self._overriden_call_handler is not None:
134 self._overriden_call_handler(handler, msg)
142 self._overriden_call_handler(handler, msg)
135 elif hasattr(self, '_call_handler'):
143 elif hasattr(self, '_call_handler'):
136 call_handler = getattr(self, '_call_handler')
144 call_handler = getattr(self, '_call_handler')
137 call_handler(handler, msg)
145 call_handler(handler, msg)
138 else:
146 else:
139 raise RuntimeError('no handler!')
147 raise RuntimeError('no handler!')
140
148
141 def add_handler(self, callback, msg_type):
149 def add_handler(self, callback, msg_type):
142 """Register a callback for msg type."""
150 """Register a callback for msg type."""
143 self.handlers[msg_type] = callback
151 self.handlers[msg_type] = callback
144
152
145 def remove_handler(self, msg_type):
153 def remove_handler(self, msg_type):
146 """Remove the callback for msg type."""
154 """Remove the callback for msg type."""
147 self.handlers.pop(msg_type, None)
155 self.handlers.pop(msg_type, None)
148
156
149 def flush(self):
157 def flush(self):
150 """Immediately processes all pending messages on the SUB channel. This
158 """Immediately processes all pending messages on the SUB channel. This
151 method is thread safe.
159 method is thread safe.
152 """
160 """
153 self._flushed = False
161 self._flushed = False
154 self.ioloop.add_callback(self._flush)
162 self.ioloop.add_callback(self._flush)
155 while not self._flushed:
163 while not self._flushed:
156 time.sleep(0.01)
164 time.sleep(0.01)
157
165
158 def _flush(self):
166 def _flush(self):
159 """Called in this thread by the IOLoop to indicate that all events have
167 """Called in this thread by the IOLoop to indicate that all events have
160 been processed.
168 been processed.
161 """
169 """
162 self._flushed = True
170 self._flushed = True
163
171
164
172
165 class XReqSocketChannel(ZmqSocketChannel):
173 class XReqSocketChannel(ZmqSocketChannel):
166
174
167 handler_queue = None
175 handler_queue = None
168 command_queue = None
176 command_queue = None
169 handlers = None
177 handlers = None
170 _overriden_call_handler = None
178 _overriden_call_handler = None
171
179
172 def __init__(self, context, session, address=None):
180 def __init__(self, context, session, address=None):
173 self.handlers = {}
181 self.handlers = {}
174 self.handler_queue = Queue()
182 self.handler_queue = Queue()
175 self.command_queue = Queue()
183 self.command_queue = Queue()
176 super(XReqSocketChannel, self).__init__(context, session, address)
184 super(XReqSocketChannel, self).__init__(context, session, address)
177
185
178 def run(self):
186 def run(self):
179 self.socket = self.context.socket(zmq.XREQ)
187 self.socket = self.context.socket(zmq.XREQ)
180 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
188 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
181 self.socket.connect('tcp://%s:%i' % self.address)
189 self.socket.connect('tcp://%s:%i' % self.address)
182 self.ioloop = ioloop.IOLoop()
190 self.ioloop = ioloop.IOLoop()
183 self.ioloop.add_handler(self.socket, self._handle_events,
191 self.ioloop.add_handler(self.socket, self._handle_events,
184 POLLIN|POLLOUT|POLLERR)
192 POLLIN|POLLOUT|POLLERR)
185 self.ioloop.start()
193 self.ioloop.start()
186
194
187 def stop(self):
195 def stop(self):
188 self.ioloop.stop()
196 self.ioloop.stop()
189 super(XReqSocketChannel, self).stop()
197 super(XReqSocketChannel, self).stop()
190
198
191 def _handle_events(self, socket, events):
199 def _handle_events(self, socket, events):
192 # Turn on and off POLLOUT depending on if we have made a request
200 # Turn on and off POLLOUT depending on if we have made a request
193 if events & POLLERR:
201 if events & POLLERR:
194 self._handle_err()
202 self._handle_err()
195 if events & POLLOUT:
203 if events & POLLOUT:
196 self._handle_send()
204 self._handle_send()
197 if events & POLLIN:
205 if events & POLLIN:
198 self._handle_recv()
206 self._handle_recv()
199
207
200 def _handle_recv(self):
208 def _handle_recv(self):
201 msg = self.socket.recv_json()
209 msg = self.socket.recv_json()
202 self.call_handlers(msg)
210 self.call_handlers(msg)
203
211
204 def _handle_send(self):
212 def _handle_send(self):
205 try:
213 try:
206 msg = self.command_queue.get(False)
214 msg = self.command_queue.get(False)
207 except Empty:
215 except Empty:
208 pass
216 pass
209 else:
217 else:
210 self.socket.send_json(msg)
218 self.socket.send_json(msg)
211
219
212 def _handle_err(self):
220 def _handle_err(self):
213 raise zmq.ZmqError()
221 raise zmq.ZmqError()
214
222
215 def _queue_request(self, msg, callback):
223 def _queue_request(self, msg, callback):
216 handler = self._find_handler(msg['msg_type'], callback)
224 handler = self._find_handler(msg['msg_type'], callback)
217 self.handler_queue.put(handler)
225 self.handler_queue.put(handler)
218 self.command_queue.put(msg)
226 self.command_queue.put(msg)
219
227
220 def execute(self, code, callback=None):
228 def execute(self, code, callback=None):
221 # Create class for content/msg creation. Related to, but possibly
229 # Create class for content/msg creation. Related to, but possibly
222 # not in Session.
230 # not in Session.
223 content = dict(code=code)
231 content = dict(code=code)
224 msg = self.session.msg('execute_request', content)
232 msg = self.session.msg('execute_request', content)
225 self._queue_request(msg, callback)
233 self._queue_request(msg, callback)
226 return msg['header']['msg_id']
234 return msg['header']['msg_id']
227
235
228 def complete(self, text, line, block=None, callback=None):
236 def complete(self, text, line, block=None, callback=None):
229 content = dict(text=text, line=line)
237 content = dict(text=text, line=line)
230 msg = self.session.msg('complete_request', content)
238 msg = self.session.msg('complete_request', content)
231 self._queue_request(msg, callback)
239 self._queue_request(msg, callback)
232 return msg['header']['msg_id']
240 return msg['header']['msg_id']
233
241
234 def object_info(self, oname, callback=None):
242 def object_info(self, oname, callback=None):
235 content = dict(oname=oname)
243 content = dict(oname=oname)
236 msg = self.session.msg('object_info_request', content)
244 msg = self.session.msg('object_info_request', content)
237 self._queue_request(msg, callback)
245 self._queue_request(msg, callback)
238 return msg['header']['msg_id']
246 return msg['header']['msg_id']
239
247
240 def _find_handler(self, name, callback):
248 def _find_handler(self, name, callback):
241 if callback is not None:
249 if callback is not None:
242 return callback
250 return callback
243 handler = self.handlers.get(name)
251 handler = self.handlers.get(name)
244 if handler is None:
252 if handler is None:
245 raise MissingHandlerError(
253 raise MissingHandlerError(
246 'No handler defined for method: %s' % name)
254 'No handler defined for method: %s' % name)
247 return handler
255 return handler
248
256
249 def override_call_handler(self, func):
257 def override_call_handler(self, func):
250 """Permanently override the call_handler.
258 """Permanently override the call_handler.
251
259
252 The function func will be called as::
260 The function func will be called as::
253
261
254 func(handler, msg)
262 func(handler, msg)
255
263
256 And must call::
264 And must call::
257
265
258 handler(msg)
266 handler(msg)
259
267
260 in the main thread.
268 in the main thread.
261 """
269 """
262 assert callable(func), "not a callable: %r" % func
270 assert callable(func), "not a callable: %r" % func
263 self._overriden_call_handler = func
271 self._overriden_call_handler = func
264
272
265 def call_handlers(self, msg):
273 def call_handlers(self, msg):
266 try:
274 try:
267 handler = self.handler_queue.get(False)
275 handler = self.handler_queue.get(False)
268 except Empty:
276 except Empty:
269 print "Message received with no handler!!!"
277 print "Message received with no handler!!!"
270 print msg
278 print msg
271 else:
279 else:
272 self.call_handler(handler, msg)
280 self.call_handler(handler, msg)
273
281
274 def call_handler(self, handler, msg):
282 def call_handler(self, handler, msg):
275 if self._overriden_call_handler is not None:
283 if self._overriden_call_handler is not None:
276 self._overriden_call_handler(handler, msg)
284 self._overriden_call_handler(handler, msg)
277 elif hasattr(self, '_call_handler'):
285 elif hasattr(self, '_call_handler'):
278 call_handler = getattr(self, '_call_handler')
286 call_handler = getattr(self, '_call_handler')
279 call_handler(handler, msg)
287 call_handler(handler, msg)
280 else:
288 else:
281 raise RuntimeError('no handler!')
289 raise RuntimeError('no handler!')
282
290
283
291
284 class RepSocketChannel(ZmqSocketChannel):
292 class RepSocketChannel(ZmqSocketChannel):
285
293
286 def on_raw_input(self):
294 def on_raw_input(self):
287 pass
295 pass
288
296
289
297
290 class KernelManager(HasTraits):
298 class KernelManager(HasTraits):
291 """ Manages a kernel for a frontend.
299 """ Manages a kernel for a frontend.
292
300
293 The SUB channel is for the frontend to receive messages published by the
301 The SUB channel is for the frontend to receive messages published by the
294 kernel.
302 kernel.
295
303
296 The REQ channel is for the frontend to make requests of the kernel.
304 The REQ channel is for the frontend to make requests of the kernel.
297
305
298 The REP channel is for the kernel to request stdin (raw_input) from the
306 The REP channel is for the kernel to request stdin (raw_input) from the
299 frontend.
307 frontend.
300 """
308 """
301
309
302 # Whether the kernel manager is currently listening on its channels.
310 # Whether the kernel manager is currently listening on its channels.
303 is_listening = Bool(False)
311 is_listening = Bool(False)
304
312
305 # The PyZMQ Context to use for communication with the kernel.
313 # The PyZMQ Context to use for communication with the kernel.
306 context = Instance(zmq.Context, ())
314 context = Instance(zmq.Context, ())
307
315
308 # The Session to use for communication with the kernel.
316 # The Session to use for communication with the kernel.
309 session = Instance(Session, ())
317 session = Instance(Session, ())
310
318
311 # The classes to use for the various channels.
319 # The classes to use for the various channels.
312 sub_channel_class = Type(SubSocketChannel)
320 sub_channel_class = Type(SubSocketChannel)
313 xreq_channel_class = Type(XReqSocketChannel)
321 xreq_channel_class = Type(XReqSocketChannel)
314 rep_channel_class = Type(RepSocketChannel)
322 rep_channel_class = Type(RepSocketChannel)
315
323
316 # Protected traits.
324 # Protected traits.
325 _kernel = Instance(Popen)
317 _sub_channel = Any
326 _sub_channel = Any
318 _xreq_channel = Any
327 _xreq_channel = Any
319 _rep_channel = Any
328 _rep_channel = Any
320
329
321 def __init__(self, **traits):
330 def __init__(self, **traits):
322 super(KernelManager, self).__init__()
331 super(KernelManager, self).__init__()
323
332
324 # FIXME: This should be the business of HasTraits. The convention is:
333 # FIXME: This should be the business of HasTraits. The convention is:
325 # HasTraits.__init__(self, **traits_to_be_initialized.)
334 # HasTraits.__init__(self, **traits_to_be_initialized.)
326 for trait in traits:
335 for trait in traits:
327 setattr(self, trait, traits[trait])
336 setattr(self, trait, traits[trait])
328
337
329 def start_listening(self):
338 def start_listening(self):
330 """Start listening on the specified ports. If already listening, raises
339 """Start listening on the specified ports. If already listening, raises
331 a RuntimeError.
340 a RuntimeError.
332 """
341 """
333 if self.is_listening:
342 if self.is_listening:
334 raise RuntimeError("Cannot start listening. Already listening!")
343 raise RuntimeError("Cannot start listening. Already listening!")
335 else:
344 else:
336 self.is_listening = True
345 self.is_listening = True
337 self.sub_channel.start()
346 self.sub_channel.start()
338 self.xreq_channel.start()
347 self.xreq_channel.start()
339 self.rep_channel.start()
348 self.rep_channel.start()
340
349
341 def stop_listening(self):
350 def stop_listening(self):
342 """Stop listening. If not listening, does nothing. """
351 """Stop listening. If not listening, does nothing. """
343 if self.is_listening:
352 if self.is_listening:
344 self.is_listening = False
353 self.is_listening = False
345 self.sub_channel.stop()
354 self.sub_channel.stop()
346 self.xreq_channel.stop()
355 self.xreq_channel.stop()
347 self.rep_channel.stop()
356 self.rep_channel.stop()
348
357
349 def start_kernel(self):
358 def start_kernel(self):
350 """Start a localhost kernel. If ports have been specified, use them.
359 """Start a localhost kernel. If ports have been specified via the
351 Otherwise, choose an open port at random.
360 address attributes, use them. Otherwise, choose open ports at random.
352 """
361 """
353 # TODO: start a kernel.
362 xreq, sub = self.xreq_address, self.sub_address
354 self.start_listening()
363 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
364 raise RuntimeError("Can only launch a kernel on localhost."
365 "Make sure that the '*_address' attributes are "
366 "configured properly.")
367
368 self._kernel, xrep, pub = launch_kernel(xrep_port=xreq[1],
369 pub_port=sub[1])
370 self.xreq_address = (LOCALHOST, xrep)
371 self.sub_address = (LOCALHOST, pub)
355
372
356 def kill_kernel(self):
373 def kill_kernel(self):
357 """Kill the running kernel.
374 """Kill the running kernel, if there is one.
358 """
375 """
359 # TODO: kill the kernel.
376 if self._kernel:
360 self.stop_listening()
377 self._kernel.kill()
378 self._kernel = None
361
379
362 @property
380 @property
363 def is_alive(self):
381 def is_alive(self):
364 """ Returns whether the kernel is alive. """
382 """ Returns whether the kernel is alive. """
365 if self.is_listening:
383 if self.is_listening:
366 # TODO: check if alive.
384 # TODO: check if alive.
367 return True
385 return True
368 else:
386 else:
369 return False
387 return False
370
388
371 def signal_kernel(self, signum):
389 def signal_kernel(self, signum):
372 """Send signum to the kernel."""
390 """Send signum to the kernel."""
373 # TODO: signal the kernel.
391 # TODO: signal the kernel.
374
392
375 #--------------------------------------------------------------------------
393 #--------------------------------------------------------------------------
376 # Channels used for communication with the kernel:
394 # Channels used for communication with the kernel:
377 #--------------------------------------------------------------------------
395 #--------------------------------------------------------------------------
378
396
379 @property
397 @property
380 def sub_channel(self):
398 def sub_channel(self):
381 """Get the SUB socket channel object."""
399 """Get the SUB socket channel object."""
382 if self._sub_channel is None:
400 if self._sub_channel is None:
383 self._sub_channel = self.sub_channel_class(self.context,
401 self._sub_channel = self.sub_channel_class(self.context,
384 self.session)
402 self.session)
385 return self._sub_channel
403 return self._sub_channel
386
404
387 @property
405 @property
388 def xreq_channel(self):
406 def xreq_channel(self):
389 """Get the REQ socket channel object to make requests of the kernel."""
407 """Get the REQ socket channel object to make requests of the kernel."""
390 if self._xreq_channel is None:
408 if self._xreq_channel is None:
391 self._xreq_channel = self.xreq_channel_class(self.context,
409 self._xreq_channel = self.xreq_channel_class(self.context,
392 self.session)
410 self.session)
393 return self._xreq_channel
411 return self._xreq_channel
394
412
395 @property
413 @property
396 def rep_channel(self):
414 def rep_channel(self):
397 """Get the REP socket channel object to handle stdin (raw_input)."""
415 """Get the REP socket channel object to handle stdin (raw_input)."""
398 if self._rep_channel is None:
416 if self._rep_channel is None:
399 self._rep_channel = self.rep_channel_class(self.context,
417 self._rep_channel = self.rep_channel_class(self.context,
400 self.session)
418 self.session)
401 return self._rep_channel
419 return self._rep_channel
402
420
403 #--------------------------------------------------------------------------
421 #--------------------------------------------------------------------------
404 # Channel address attributes:
422 # Channel address attributes:
405 #--------------------------------------------------------------------------
423 #--------------------------------------------------------------------------
406
424
407 def get_sub_address(self):
425 def get_sub_address(self):
408 return self.sub_channel.address
426 return self.sub_channel.address
409
427
410 def set_sub_address(self, address):
428 def set_sub_address(self, address):
411 self.sub_channel.address = address
429 self.sub_channel.address = address
412
430
413 sub_address = property(get_sub_address, set_sub_address,
431 sub_address = property(get_sub_address, set_sub_address,
414 doc="The address used by SUB socket channel.")
432 doc="The address used by SUB socket channel.")
415
433
416 def get_xreq_address(self):
434 def get_xreq_address(self):
417 return self.xreq_channel.address
435 return self.xreq_channel.address
418
436
419 def set_xreq_address(self, address):
437 def set_xreq_address(self, address):
420 self.xreq_channel.address = address
438 self.xreq_channel.address = address
421
439
422 xreq_address = property(get_xreq_address, set_xreq_address,
440 xreq_address = property(get_xreq_address, set_xreq_address,
423 doc="The address used by XREQ socket channel.")
441 doc="The address used by XREQ socket channel.")
424
442
425 def get_rep_address(self):
443 def get_rep_address(self):
426 return self.rep_channel.address
444 return self.rep_channel.address
427
445
428 def set_rep_address(self, address):
446 def set_rep_address(self, address):
429 self.rep_channel.address = address
447 self.rep_channel.address = address
430
448
431 rep_address = property(get_rep_address, set_rep_address,
449 rep_address = property(get_rep_address, set_rep_address,
432 doc="The address used by REP socket channel.")
450 doc="The address used by REP socket channel.")
433
451
General Comments 0
You need to be logged in to leave comments. Login now