##// END OF EJS Templates
Changing -1=>0 for bind to random ports.
Brian Granger -
Show More
@@ -1,361 +1,361 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Finish implementing `raw_input`.
7 7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 8 call set_parent on all the PUB objects with the message about to be executed.
9 9 * Implement random port and security key logic.
10 10 * Implement control messages.
11 11 * Implement event loop and poll version.
12 12 """
13 13
14 14 # Standard library imports.
15 15 import __builtin__
16 16 import sys
17 17 import time
18 18 import traceback
19 19 from code import CommandCompiler
20 20
21 21 # System library imports.
22 22 import zmq
23 23
24 24 # Local imports.
25 25 from IPython.external.argparse import ArgumentParser
26 26 from session import Session, Message, extract_header
27 27 from completer import KernelCompleter
28 28
29 29
30 30 class OutStream(object):
31 31 """A file like object that publishes the stream to a 0MQ PUB socket."""
32 32
33 33 def __init__(self, session, pub_socket, name, max_buffer=200):
34 34 self.session = session
35 35 self.pub_socket = pub_socket
36 36 self.name = name
37 37 self._buffer = []
38 38 self._buffer_len = 0
39 39 self.max_buffer = max_buffer
40 40 self.parent_header = {}
41 41
42 42 def set_parent(self, parent):
43 43 self.parent_header = extract_header(parent)
44 44
45 45 def close(self):
46 46 self.pub_socket = None
47 47
48 48 def flush(self):
49 49 if self.pub_socket is None:
50 50 raise ValueError(u'I/O operation on closed file')
51 51 else:
52 52 if self._buffer:
53 53 data = ''.join(self._buffer)
54 54 content = {u'name':self.name, u'data':data}
55 55 msg = self.session.msg(u'stream', content=content,
56 56 parent=self.parent_header)
57 57 print>>sys.__stdout__, Message(msg)
58 58 self.pub_socket.send_json(msg)
59 59 self._buffer_len = 0
60 60 self._buffer = []
61 61
62 62 def isattr(self):
63 63 return False
64 64
65 65 def next(self):
66 66 raise IOError('Read not supported on a write only stream.')
67 67
68 68 def read(self, size=None):
69 69 raise IOError('Read not supported on a write only stream.')
70 70
71 71 readline=read
72 72
73 73 def write(self, s):
74 74 if self.pub_socket is None:
75 75 raise ValueError('I/O operation on closed file')
76 76 else:
77 77 self._buffer.append(s)
78 78 self._buffer_len += len(s)
79 79 self._maybe_send()
80 80
81 81 def _maybe_send(self):
82 82 if '\n' in self._buffer[-1]:
83 83 self.flush()
84 84 if self._buffer_len > self.max_buffer:
85 85 self.flush()
86 86
87 87 def writelines(self, sequence):
88 88 if self.pub_socket is None:
89 89 raise ValueError('I/O operation on closed file')
90 90 else:
91 91 for s in sequence:
92 92 self.write(s)
93 93
94 94
95 95 class DisplayHook(object):
96 96
97 97 def __init__(self, session, pub_socket):
98 98 self.session = session
99 99 self.pub_socket = pub_socket
100 100 self.parent_header = {}
101 101
102 102 def __call__(self, obj):
103 103 if obj is None:
104 104 return
105 105
106 106 __builtin__._ = obj
107 107 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
108 108 parent=self.parent_header)
109 109 self.pub_socket.send_json(msg)
110 110
111 111 def set_parent(self, parent):
112 112 self.parent_header = extract_header(parent)
113 113
114 114
115 115 class RawInput(object):
116 116
117 117 def __init__(self, session, socket):
118 118 self.session = session
119 119 self.socket = socket
120 120
121 121 def __call__(self, prompt=None):
122 122 msg = self.session.msg(u'raw_input')
123 123 self.socket.send_json(msg)
124 124 while True:
125 125 try:
126 126 reply = self.socket.recv_json(zmq.NOBLOCK)
127 127 except zmq.ZMQError, e:
128 128 if e.errno == zmq.EAGAIN:
129 129 pass
130 130 else:
131 131 raise
132 132 else:
133 133 break
134 134 return reply[u'content'][u'data']
135 135
136 136
137 137 class Kernel(object):
138 138
139 139 def __init__(self, session, reply_socket, pub_socket):
140 140 self.session = session
141 141 self.reply_socket = reply_socket
142 142 self.pub_socket = pub_socket
143 143 self.user_ns = {}
144 144 self.history = []
145 145 self.compiler = CommandCompiler()
146 146 self.completer = KernelCompleter(self.user_ns)
147 147
148 148 # Build dict of handlers for message types
149 149 msg_types = [ 'execute_request', 'complete_request',
150 150 'object_info_request' ]
151 151 self.handlers = {}
152 152 for msg_type in msg_types:
153 153 self.handlers[msg_type] = getattr(self, msg_type)
154 154
155 155 def abort_queue(self):
156 156 while True:
157 157 try:
158 158 ident = self.reply_socket.recv(zmq.NOBLOCK)
159 159 except zmq.ZMQError, e:
160 160 if e.errno == zmq.EAGAIN:
161 161 break
162 162 else:
163 163 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
164 164 msg = self.reply_socket.recv_json()
165 165 print>>sys.__stdout__, "Aborting:"
166 166 print>>sys.__stdout__, Message(msg)
167 167 msg_type = msg['msg_type']
168 168 reply_type = msg_type.split('_')[0] + '_reply'
169 169 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
170 170 print>>sys.__stdout__, Message(reply_msg)
171 171 self.reply_socket.send(ident,zmq.SNDMORE)
172 172 self.reply_socket.send_json(reply_msg)
173 173 # We need to wait a bit for requests to come in. This can probably
174 174 # be set shorter for true asynchronous clients.
175 175 time.sleep(0.1)
176 176
177 177 def execute_request(self, ident, parent):
178 178 try:
179 179 code = parent[u'content'][u'code']
180 180 except:
181 181 print>>sys.__stderr__, "Got bad msg: "
182 182 print>>sys.__stderr__, Message(parent)
183 183 return
184 184 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
185 185 self.pub_socket.send_json(pyin_msg)
186 186 try:
187 187 comp_code = self.compiler(code, '<zmq-kernel>')
188 188 sys.displayhook.set_parent(parent)
189 189 exec comp_code in self.user_ns, self.user_ns
190 190 except:
191 191 result = u'error'
192 192 etype, evalue, tb = sys.exc_info()
193 193 tb = traceback.format_exception(etype, evalue, tb)
194 194 exc_content = {
195 195 u'status' : u'error',
196 196 u'traceback' : tb,
197 197 u'etype' : unicode(etype),
198 198 u'evalue' : unicode(evalue)
199 199 }
200 200 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
201 201 self.pub_socket.send_json(exc_msg)
202 202 reply_content = exc_content
203 203 else:
204 204 reply_content = {'status' : 'ok'}
205 205 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
206 206 print>>sys.__stdout__, Message(reply_msg)
207 207 self.reply_socket.send(ident, zmq.SNDMORE)
208 208 self.reply_socket.send_json(reply_msg)
209 209 if reply_msg['content']['status'] == u'error':
210 210 self.abort_queue()
211 211
212 212 def complete_request(self, ident, parent):
213 213 matches = {'matches' : self.complete(parent),
214 214 'status' : 'ok'}
215 215 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
216 216 matches, parent, ident)
217 217 print >> sys.__stdout__, completion_msg
218 218
219 219 def complete(self, msg):
220 220 return self.completer.complete(msg.content.line, msg.content.text)
221 221
222 222 def object_info_request(self, ident, parent):
223 223 context = parent['content']['oname'].split('.')
224 224 object_info = self.object_info(context)
225 225 msg = self.session.send(self.reply_socket, 'object_info_reply',
226 226 object_info, parent, ident)
227 227 print >> sys.__stdout__, msg
228 228
229 229 def object_info(self, context):
230 230 symbol, leftover = self.symbol_from_context(context)
231 231 if symbol is not None and not leftover:
232 232 doc = getattr(symbol, '__doc__', '')
233 233 else:
234 234 doc = ''
235 235 object_info = dict(docstring = doc)
236 236 return object_info
237 237
238 238 def symbol_from_context(self, context):
239 239 if not context:
240 240 return None, context
241 241
242 242 base_symbol_string = context[0]
243 243 symbol = self.user_ns.get(base_symbol_string, None)
244 244 if symbol is None:
245 245 symbol = __builtin__.__dict__.get(base_symbol_string, None)
246 246 if symbol is None:
247 247 return None, context
248 248
249 249 context = context[1:]
250 250 for i, name in enumerate(context):
251 251 new_symbol = getattr(symbol, name, None)
252 252 if new_symbol is None:
253 253 return symbol, context[i:]
254 254 else:
255 255 symbol = new_symbol
256 256
257 257 return symbol, []
258 258
259 259 def start(self):
260 260 while True:
261 261 ident = self.reply_socket.recv()
262 262 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
263 263 msg = self.reply_socket.recv_json()
264 264 omsg = Message(msg)
265 265 print>>sys.__stdout__
266 266 print>>sys.__stdout__, omsg
267 267 handler = self.handlers.get(omsg.msg_type, None)
268 268 if handler is None:
269 269 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
270 270 else:
271 271 handler(ident, omsg)
272 272
273 273
274 274 def bind_port(socket, ip, port):
275 275 """ Binds the specified ZMQ socket. If the port is less than zero, a random
276 276 port is chosen. Returns the port that was bound.
277 277 """
278 278 connection = 'tcp://%s' % ip
279 279 if port < 0:
280 280 port = socket.bind_to_random_port(connection)
281 281 else:
282 282 connection += ':%i' % port
283 283 socket.bind(connection)
284 284 return port
285 285
286 286 def main():
287 287 """ Main entry point for launching a kernel.
288 288 """
289 289 # Parse command line arguments.
290 290 parser = ArgumentParser()
291 291 parser.add_argument('--ip', type=str, default='127.0.0.1',
292 292 help='set the kernel\'s IP address [default: local]')
293 parser.add_argument('--xrep', type=int, metavar='PORT', default=-1,
293 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
294 294 help='set the XREP Channel port [default: random]')
295 parser.add_argument('--pub', type=int, metavar='PORT', default=-1,
295 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
296 296 help='set the PUB Channel port [default: random]')
297 297 namespace = parser.parse_args()
298 298
299 299 # Create context, session, and kernel sockets.
300 300 print >>sys.__stdout__, "Starting the kernel..."
301 301 context = zmq.Context()
302 302 session = Session(username=u'kernel')
303 303
304 304 reply_socket = context.socket(zmq.XREP)
305 305 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
306 306 print >>sys.__stdout__, "XREP Channel on port", xrep_port
307 307
308 308 pub_socket = context.socket(zmq.PUB)
309 309 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
310 310 print >>sys.__stdout__, "PUB Channel on port", pub_port
311 311
312 312 # Redirect input streams and set a display hook.
313 313 sys.stdout = OutStream(session, pub_socket, u'stdout')
314 314 sys.stderr = OutStream(session, pub_socket, u'stderr')
315 315 sys.displayhook = DisplayHook(session, pub_socket)
316 316
317 317 kernel = Kernel(session, reply_socket, pub_socket)
318 318
319 319 # For debugging convenience, put sleep and a string in the namespace, so we
320 320 # have them every time we start.
321 321 kernel.user_ns['sleep'] = time.sleep
322 322 kernel.user_ns['s'] = 'Test string'
323 323
324 324 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
325 325 kernel.start()
326 326
327 def launch_kernel(xrep_port=-1, pub_port=-1):
327 def launch_kernel(xrep_port=0, pub_port=0):
328 328 """ Launches a localhost kernel, binding to the specified ports. For any
329 329 port that is left unspecified, a port is chosen by the operating system.
330 330
331 331 Returns a tuple of form:
332 332 (kernel_process [Popen], rep_port [int], sub_port [int])
333 333 """
334 334 import socket
335 335 from subprocess import Popen
336 336
337 337 # Find open ports as necessary.
338 338 ports = []
339 ports_needed = int(xrep_port < 0) + int(pub_port < 0)
339 ports_needed = int(xrep_port == 0) + int(pub_port == 0)
340 340 for i in xrange(ports_needed):
341 341 sock = socket.socket()
342 342 sock.bind(('', 0))
343 343 ports.append(sock)
344 344 for i, sock in enumerate(ports):
345 345 port = sock.getsockname()[1]
346 346 sock.close()
347 347 ports[i] = port
348 if xrep_port < 0:
348 if xrep_port == 0:
349 349 xrep_port = ports.pop()
350 if pub_port < 0:
350 if pub_port == 0:
351 351 pub_port = ports.pop()
352 352
353 353 # Spawn a kernel.
354 354 command = 'from IPython.zmq.kernel import main; main()'
355 355 proc = Popen([ sys.executable, '-c', command,
356 356 '--xrep', str(xrep_port), '--pub', str(pub_port) ])
357 357 return proc, xrep_port, pub_port
358 358
359 359
360 360 if __name__ == '__main__':
361 361 main()
@@ -1,493 +1,493 b''
1 1 """Kernel frontend classes.
2 2
3 3 TODO: Create logger to handle debugging and console messages.
4 4
5 5 """
6 6
7 7 # Standard library imports.
8 8 from Queue import Queue, Empty
9 9 from subprocess import Popen
10 10 from threading import Thread
11 11 import time
12 12 import traceback
13 13
14 14 # System library imports.
15 15 import zmq
16 16 from zmq import POLLIN, POLLOUT, POLLERR
17 17 from zmq.eventloop import ioloop
18 18
19 19 # Local imports.
20 20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
21 21 Type
22 22 from kernel import launch_kernel
23 23 from session import Session
24 24
25 25 # Constants.
26 26 LOCALHOST = '127.0.0.1'
27 27
28 28
29 29 class MissingHandlerError(Exception):
30 30 pass
31 31
32 32
33 33 class ZmqSocketChannel(Thread):
34 34 """ The base class for the channels that use ZMQ sockets.
35 35 """
36 36
37 37 def __init__(self, context, session, address=None):
38 38 super(ZmqSocketChannel, self).__init__()
39 39 self.daemon = True
40 40
41 41 self.context = context
42 42 self.session = session
43 43 self.address = address
44 44 self.socket = None
45 45
46 46 def stop(self):
47 47 """ Stop the thread's activity. Returns when the thread terminates.
48 48 """
49 49 self.join()
50 50
51 51 # Allow the thread to be started again.
52 52 # FIXME: Although this works (and there's no reason why it shouldn't),
53 53 # it feels wrong. Is there a cleaner way to achieve this?
54 54 Thread.__init__(self)
55 55
56 56 def get_address(self):
57 57 """ Get the channel's address. By the default, a channel is on
58 58 localhost with no port specified (a negative port number).
59 59 """
60 60 return self._address
61 61
62 62 def set_adresss(self, address):
63 63 """ Set the channel's address. Should be a tuple of form:
64 64 (ip address [str], port [int]).
65 65 or None, in which case the address is reset to its default value.
66 66 """
67 67 # FIXME: Validate address.
68 68 if self.is_alive():
69 69 raise RuntimeError("Cannot set address on a running channel!")
70 70 else:
71 71 if address is None:
72 address = (LOCALHOST, -1)
72 address = (LOCALHOST, 0)
73 73 self._address = address
74 74
75 75 address = property(get_address, set_adresss)
76 76
77 77
78 78 class SubSocketChannel(ZmqSocketChannel):
79 79
80 80 handlers = None
81 81 _overriden_call_handler = None
82 82
83 83 def __init__(self, context, session, address=None):
84 84 self.handlers = {}
85 85 super(SubSocketChannel, self).__init__(context, session, address)
86 86
87 87 def run(self):
88 88 self.socket = self.context.socket(zmq.SUB)
89 89 self.socket.setsockopt(zmq.SUBSCRIBE,'')
90 90 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
91 91 self.socket.connect('tcp://%s:%i' % self.address)
92 92 self.ioloop = ioloop.IOLoop()
93 93 self.ioloop.add_handler(self.socket, self._handle_events,
94 94 POLLIN|POLLERR)
95 95 self.ioloop.start()
96 96
97 97 def stop(self):
98 98 self.ioloop.stop()
99 99 super(SubSocketChannel, self).stop()
100 100
101 101 def _handle_events(self, socket, events):
102 102 # Turn on and off POLLOUT depending on if we have made a request
103 103 if events & POLLERR:
104 104 self._handle_err()
105 105 if events & POLLIN:
106 106 self._handle_recv()
107 107
108 108 def _handle_err(self):
109 109 raise zmq.ZmqError()
110 110
111 111 def _handle_recv(self):
112 112 msg = self.socket.recv_json()
113 113 self.call_handlers(msg)
114 114
115 115 def override_call_handler(self, func):
116 116 """Permanently override the call_handler.
117 117
118 118 The function func will be called as::
119 119
120 120 func(handler, msg)
121 121
122 122 And must call::
123 123
124 124 handler(msg)
125 125
126 126 in the main thread.
127 127 """
128 128 assert callable(func), "not a callable: %r" % func
129 129 self._overriden_call_handler = func
130 130
131 131 def call_handlers(self, msg):
132 132 handler = self.handlers.get(msg['msg_type'], None)
133 133 if handler is not None:
134 134 try:
135 135 self.call_handler(handler, msg)
136 136 except:
137 137 # XXX: This should be logged at least
138 138 traceback.print_last()
139 139
140 140 def call_handler(self, handler, msg):
141 141 if self._overriden_call_handler is not None:
142 142 self._overriden_call_handler(handler, msg)
143 143 elif hasattr(self, '_call_handler'):
144 144 call_handler = getattr(self, '_call_handler')
145 145 call_handler(handler, msg)
146 146 else:
147 147 raise RuntimeError('no handler!')
148 148
149 149 def add_handler(self, callback, msg_type):
150 150 """Register a callback for msg type."""
151 151 self.handlers[msg_type] = callback
152 152
153 153 def remove_handler(self, msg_type):
154 154 """Remove the callback for msg type."""
155 155 self.handlers.pop(msg_type, None)
156 156
157 157 def flush(self, timeout=1.0):
158 158 """Immediately processes all pending messages on the SUB channel.
159 159
160 160 This method is thread safe.
161 161
162 162 Parameters
163 163 ----------
164 164 timeout : float, optional
165 165 The maximum amount of time to spend flushing, in seconds. The
166 166 default is one second.
167 167 """
168 168 # We do the IOLoop callback process twice to ensure that the IOLoop
169 169 # gets to perform at least one full poll.
170 170 stop_time = time.time() + timeout
171 171 for i in xrange(2):
172 172 self._flushed = False
173 173 self.ioloop.add_callback(self._flush)
174 174 while not self._flushed and time.time() < stop_time:
175 175 time.sleep(0.01)
176 176
177 177 def _flush(self):
178 178 """Called in this thread by the IOLoop to indicate that all events have
179 179 been processed.
180 180 """
181 181 self._flushed = True
182 182
183 183
184 184 class XReqSocketChannel(ZmqSocketChannel):
185 185
186 186 handler_queue = None
187 187 command_queue = None
188 188 handlers = None
189 189 _overriden_call_handler = None
190 190
191 191 def __init__(self, context, session, address=None):
192 192 self.handlers = {}
193 193 self.handler_queue = Queue()
194 194 self.command_queue = Queue()
195 195 super(XReqSocketChannel, self).__init__(context, session, address)
196 196
197 197 def run(self):
198 198 self.socket = self.context.socket(zmq.XREQ)
199 199 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
200 200 self.socket.connect('tcp://%s:%i' % self.address)
201 201 self.ioloop = ioloop.IOLoop()
202 202 self.ioloop.add_handler(self.socket, self._handle_events,
203 203 POLLIN|POLLOUT|POLLERR)
204 204 self.ioloop.start()
205 205
206 206 def stop(self):
207 207 self.ioloop.stop()
208 208 super(XReqSocketChannel, self).stop()
209 209
210 210 def _handle_events(self, socket, events):
211 211 # Turn on and off POLLOUT depending on if we have made a request
212 212 if events & POLLERR:
213 213 self._handle_err()
214 214 if events & POLLOUT:
215 215 self._handle_send()
216 216 if events & POLLIN:
217 217 self._handle_recv()
218 218
219 219 def _handle_recv(self):
220 220 msg = self.socket.recv_json()
221 221 self.call_handlers(msg)
222 222
223 223 def _handle_send(self):
224 224 try:
225 225 msg = self.command_queue.get(False)
226 226 except Empty:
227 227 pass
228 228 else:
229 229 self.socket.send_json(msg)
230 230
231 231 def _handle_err(self):
232 232 raise zmq.ZmqError()
233 233
234 234 def _queue_request(self, msg, callback):
235 235 handler = self._find_handler(msg['msg_type'], callback)
236 236 self.handler_queue.put(handler)
237 237 self.command_queue.put(msg)
238 238
239 239 def execute(self, code, callback=None):
240 240 # Create class for content/msg creation. Related to, but possibly
241 241 # not in Session.
242 242 content = dict(code=code)
243 243 msg = self.session.msg('execute_request', content)
244 244 self._queue_request(msg, callback)
245 245 return msg['header']['msg_id']
246 246
247 247 def complete(self, text, line, block=None, callback=None):
248 248 content = dict(text=text, line=line)
249 249 msg = self.session.msg('complete_request', content)
250 250 self._queue_request(msg, callback)
251 251 return msg['header']['msg_id']
252 252
253 253 def object_info(self, oname, callback=None):
254 254 content = dict(oname=oname)
255 255 msg = self.session.msg('object_info_request', content)
256 256 self._queue_request(msg, callback)
257 257 return msg['header']['msg_id']
258 258
259 259 def _find_handler(self, name, callback):
260 260 if callback is not None:
261 261 return callback
262 262 handler = self.handlers.get(name)
263 263 if handler is None:
264 264 raise MissingHandlerError(
265 265 'No handler defined for method: %s' % name)
266 266 return handler
267 267
268 268 def override_call_handler(self, func):
269 269 """Permanently override the call_handler.
270 270
271 271 The function func will be called as::
272 272
273 273 func(handler, msg)
274 274
275 275 And must call::
276 276
277 277 handler(msg)
278 278
279 279 in the main thread.
280 280 """
281 281 assert callable(func), "not a callable: %r" % func
282 282 self._overriden_call_handler = func
283 283
284 284 def call_handlers(self, msg):
285 285 try:
286 286 handler = self.handler_queue.get(False)
287 287 except Empty:
288 288 print "Message received with no handler!!!"
289 289 print msg
290 290 else:
291 291 self.call_handler(handler, msg)
292 292
293 293 def call_handler(self, handler, msg):
294 294 if self._overriden_call_handler is not None:
295 295 self._overriden_call_handler(handler, msg)
296 296 elif hasattr(self, '_call_handler'):
297 297 call_handler = getattr(self, '_call_handler')
298 298 call_handler(handler, msg)
299 299 else:
300 300 raise RuntimeError('no handler!')
301 301
302 302
303 303 class RepSocketChannel(ZmqSocketChannel):
304 304
305 305 def on_raw_input(self):
306 306 pass
307 307
308 308
309 309 class KernelManager(HasTraits):
310 310 """ Manages a kernel for a frontend.
311 311
312 312 The SUB channel is for the frontend to receive messages published by the
313 313 kernel.
314 314
315 315 The REQ channel is for the frontend to make requests of the kernel.
316 316
317 317 The REP channel is for the kernel to request stdin (raw_input) from the
318 318 frontend.
319 319 """
320 320
321 321 # Whether the kernel manager is currently listening on its channels.
322 322 is_listening = Bool(False)
323 323
324 324 # The PyZMQ Context to use for communication with the kernel.
325 325 context = Instance(zmq.Context, ())
326 326
327 327 # The Session to use for communication with the kernel.
328 328 session = Instance(Session, ())
329 329
330 330 # The classes to use for the various channels.
331 331 sub_channel_class = Type(SubSocketChannel)
332 332 xreq_channel_class = Type(XReqSocketChannel)
333 333 rep_channel_class = Type(RepSocketChannel)
334 334
335 335 # Protected traits.
336 336 _kernel = Instance(Popen)
337 337 _sub_channel = Any
338 338 _xreq_channel = Any
339 339 _rep_channel = Any
340 340
341 341 #--------------------------------------------------------------------------
342 342 # Channel management methods:
343 343 #--------------------------------------------------------------------------
344 344
345 345 def start_listening(self):
346 346 """Starts listening on the specified ports. If already listening, raises
347 347 a RuntimeError.
348 348 """
349 349 if self.is_listening:
350 350 raise RuntimeError("Cannot start listening. Already listening!")
351 351 else:
352 352 self.is_listening = True
353 353 self.sub_channel.start()
354 354 self.xreq_channel.start()
355 355 self.rep_channel.start()
356 356
357 357 @property
358 358 def is_alive(self):
359 359 """ Returns whether the kernel is alive. """
360 360 if self.is_listening:
361 361 # TODO: check if alive.
362 362 return True
363 363 else:
364 364 return False
365 365
366 366 def stop_listening(self):
367 367 """Stops listening. If not listening, does nothing. """
368 368 if self.is_listening:
369 369 self.is_listening = False
370 370 self.sub_channel.stop()
371 371 self.xreq_channel.stop()
372 372 self.rep_channel.stop()
373 373
374 374 #--------------------------------------------------------------------------
375 375 # Kernel process management methods:
376 376 #--------------------------------------------------------------------------
377 377
378 378 def start_kernel(self):
379 379 """Starts a kernel process and configures the manager to use it.
380 380
381 381 If ports have been specified via the address attributes, they are used.
382 382 Otherwise, open ports are chosen by the OS and the channel port
383 383 attributes are configured as appropriate.
384 384 """
385 385 xreq, sub = self.xreq_address, self.sub_address
386 386 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
387 387 raise RuntimeError("Can only launch a kernel on localhost."
388 388 "Make sure that the '*_address' attributes are "
389 389 "configured properly.")
390 390
391 391 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
392 392 self.set_kernel(kernel)
393 393 self.xreq_address = (LOCALHOST, xrep)
394 394 self.sub_address = (LOCALHOST, pub)
395 395
396 396 def set_kernel(self, kernel):
397 397 """Sets the kernel manager's kernel to an existing kernel process.
398 398
399 399 It is *not* necessary to a set a kernel to communicate with it via the
400 400 channels, and those objects must be configured separately. It
401 401 *is* necessary to set a kernel if you want to use the manager (or
402 402 frontends that use the manager) to signal and/or kill the kernel.
403 403
404 404 Parameters:
405 405 -----------
406 406 kernel : Popen
407 407 An existing kernel process.
408 408 """
409 409 self._kernel = kernel
410 410
411 411 @property
412 412 def has_kernel(self):
413 413 """Returns whether a kernel process has been specified for the kernel
414 414 manager.
415 415
416 416 A kernel process can be set via 'start_kernel' or 'set_kernel'.
417 417 """
418 418 return self._kernel is not None
419 419
420 420 def kill_kernel(self):
421 421 """ Kill the running kernel. """
422 422 if self._kernel:
423 423 self._kernel.kill()
424 424 self._kernel = None
425 425 else:
426 426 raise RuntimeError("Cannot kill kernel. No kernel is running!")
427 427
428 428 def signal_kernel(self, signum):
429 429 """ Sends a signal to the kernel. """
430 430 if self._kernel:
431 431 self._kernel.send_signal(signum)
432 432 else:
433 433 raise RuntimeError("Cannot signal kernel. No kernel is running!")
434 434
435 435 #--------------------------------------------------------------------------
436 436 # Channels used for communication with the kernel:
437 437 #--------------------------------------------------------------------------
438 438
439 439 @property
440 440 def sub_channel(self):
441 441 """Get the SUB socket channel object."""
442 442 if self._sub_channel is None:
443 443 self._sub_channel = self.sub_channel_class(self.context,
444 444 self.session)
445 445 return self._sub_channel
446 446
447 447 @property
448 448 def xreq_channel(self):
449 449 """Get the REQ socket channel object to make requests of the kernel."""
450 450 if self._xreq_channel is None:
451 451 self._xreq_channel = self.xreq_channel_class(self.context,
452 452 self.session)
453 453 return self._xreq_channel
454 454
455 455 @property
456 456 def rep_channel(self):
457 457 """Get the REP socket channel object to handle stdin (raw_input)."""
458 458 if self._rep_channel is None:
459 459 self._rep_channel = self.rep_channel_class(self.context,
460 460 self.session)
461 461 return self._rep_channel
462 462
463 463 #--------------------------------------------------------------------------
464 464 # Delegates for the Channel address attributes:
465 465 #--------------------------------------------------------------------------
466 466
467 467 def get_sub_address(self):
468 468 return self.sub_channel.address
469 469
470 470 def set_sub_address(self, address):
471 471 self.sub_channel.address = address
472 472
473 473 sub_address = property(get_sub_address, set_sub_address,
474 474 doc="The address used by SUB socket channel.")
475 475
476 476 def get_xreq_address(self):
477 477 return self.xreq_channel.address
478 478
479 479 def set_xreq_address(self, address):
480 480 self.xreq_channel.address = address
481 481
482 482 xreq_address = property(get_xreq_address, set_xreq_address,
483 483 doc="The address used by XREQ socket channel.")
484 484
485 485 def get_rep_address(self):
486 486 return self.rep_channel.address
487 487
488 488 def set_rep_address(self, address):
489 489 self.rep_channel.address = address
490 490
491 491 rep_address = property(get_rep_address, set_rep_address,
492 492 doc="The address used by REP socket channel.")
493 493
General Comments 0
You need to be logged in to leave comments. Login now