##// END OF EJS Templates
Channels can no longer be restarted.
Brian Granger -
Show More
@@ -1,493 +1,492 b''
1 1 """Kernel frontend classes.
2 2
3 3 TODO: Create logger to handle debugging and console messages.
4 4
5 5 """
6 6
7 7 # Standard library imports.
8 8 from Queue import Queue, Empty
9 9 from subprocess import Popen
10 10 from threading import Thread
11 11 import time
12 12 import traceback
13 13
14 14 # System library imports.
15 15 import zmq
16 16 from zmq import POLLIN, POLLOUT, POLLERR
17 17 from zmq.eventloop import ioloop
18 18
19 19 # Local imports.
20 20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
21 21 Type
22 22 from kernel import launch_kernel
23 23 from session import Session
24 24
25 25 # Constants.
26 26 LOCALHOST = '127.0.0.1'
27 27
28 28
29 29 class MissingHandlerError(Exception):
30 30 pass
31 31
32 32
33 33 class ZmqSocketChannel(Thread):
34 34 """ The base class for the channels that use ZMQ sockets.
35 35 """
36 36
37 37 def __init__(self, context, session, address=None):
38 38 super(ZmqSocketChannel, self).__init__()
39 39 self.daemon = True
40 40
41 41 self.context = context
42 42 self.session = session
43 43 self.address = address
44 44 self.socket = None
45 45
46 46 def stop(self):
47 """ Stop the thread's activity. Returns when the thread terminates.
47 """Stop the thread's activity. Returns when the thread terminates.
48
49 The thread will raise :class:`RuntimeError` if :method:`self.start`
50 is called again.
48 51 """
49 52 self.join()
50 53
51 # Allow the thread to be started again.
52 # FIXME: Although this works (and there's no reason why it shouldn't),
53 # it feels wrong. Is there a cleaner way to achieve this?
54 Thread.__init__(self)
55 54
56 55 def get_address(self):
57 56 """ Get the channel's address. By the default, a channel is on
58 57 localhost with no port specified (a negative port number).
59 58 """
60 59 return self._address
61 60
62 61 def set_adresss(self, address):
63 62 """ Set the channel's address. Should be a tuple of form:
64 63 (ip address [str], port [int]).
65 64 or None, in which case the address is reset to its default value.
66 65 """
67 66 # FIXME: Validate address.
68 if self.is_alive():
67 if self.is_alive(): # This is Thread.is_alive
69 68 raise RuntimeError("Cannot set address on a running channel!")
70 69 else:
71 70 if address is None:
72 71 address = (LOCALHOST, 0)
73 72 self._address = address
74 73
75 74 address = property(get_address, set_adresss)
76 75
77 76
78 77 class SubSocketChannel(ZmqSocketChannel):
79 78
80 79 handlers = None
81 80 _overriden_call_handler = None
82 81
83 82 def __init__(self, context, session, address=None):
84 83 self.handlers = {}
85 84 super(SubSocketChannel, self).__init__(context, session, address)
86 85
87 86 def run(self):
88 87 self.socket = self.context.socket(zmq.SUB)
89 88 self.socket.setsockopt(zmq.SUBSCRIBE,'')
90 89 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
91 90 self.socket.connect('tcp://%s:%i' % self.address)
92 91 self.ioloop = ioloop.IOLoop()
93 92 self.ioloop.add_handler(self.socket, self._handle_events,
94 93 POLLIN|POLLERR)
95 94 self.ioloop.start()
96 95
97 96 def stop(self):
98 97 self.ioloop.stop()
99 98 super(SubSocketChannel, self).stop()
100 99
101 100 def _handle_events(self, socket, events):
102 101 # Turn on and off POLLOUT depending on if we have made a request
103 102 if events & POLLERR:
104 103 self._handle_err()
105 104 if events & POLLIN:
106 105 self._handle_recv()
107 106
108 107 def _handle_err(self):
109 108 raise zmq.ZmqError()
110 109
111 110 def _handle_recv(self):
112 111 msg = self.socket.recv_json()
113 112 self.call_handlers(msg)
114 113
115 114 def override_call_handler(self, func):
116 115 """Permanently override the call_handler.
117 116
118 117 The function func will be called as::
119 118
120 119 func(handler, msg)
121 120
122 121 And must call::
123 122
124 123 handler(msg)
125 124
126 125 in the main thread.
127 126 """
128 127 assert callable(func), "not a callable: %r" % func
129 128 self._overriden_call_handler = func
130 129
131 130 def call_handlers(self, msg):
132 131 handler = self.handlers.get(msg['msg_type'], None)
133 132 if handler is not None:
134 133 try:
135 134 self.call_handler(handler, msg)
136 135 except:
137 136 # XXX: This should be logged at least
138 137 traceback.print_last()
139 138
140 139 def call_handler(self, handler, msg):
141 140 if self._overriden_call_handler is not None:
142 141 self._overriden_call_handler(handler, msg)
143 142 elif hasattr(self, '_call_handler'):
144 143 call_handler = getattr(self, '_call_handler')
145 144 call_handler(handler, msg)
146 145 else:
147 146 raise RuntimeError('no handler!')
148 147
149 148 def add_handler(self, callback, msg_type):
150 149 """Register a callback for msg type."""
151 150 self.handlers[msg_type] = callback
152 151
153 152 def remove_handler(self, msg_type):
154 153 """Remove the callback for msg type."""
155 154 self.handlers.pop(msg_type, None)
156 155
157 156 def flush(self, timeout=1.0):
158 157 """Immediately processes all pending messages on the SUB channel.
159 158
160 159 This method is thread safe.
161 160
162 161 Parameters
163 162 ----------
164 163 timeout : float, optional
165 164 The maximum amount of time to spend flushing, in seconds. The
166 165 default is one second.
167 166 """
168 167 # We do the IOLoop callback process twice to ensure that the IOLoop
169 168 # gets to perform at least one full poll.
170 169 stop_time = time.time() + timeout
171 170 for i in xrange(2):
172 171 self._flushed = False
173 172 self.ioloop.add_callback(self._flush)
174 173 while not self._flushed and time.time() < stop_time:
175 174 time.sleep(0.01)
176 175
177 176 def _flush(self):
178 177 """Called in this thread by the IOLoop to indicate that all events have
179 178 been processed.
180 179 """
181 180 self._flushed = True
182 181
183 182
184 183 class XReqSocketChannel(ZmqSocketChannel):
185 184
186 185 handler_queue = None
187 186 command_queue = None
188 187 handlers = None
189 188 _overriden_call_handler = None
190 189
191 190 def __init__(self, context, session, address=None):
192 191 self.handlers = {}
193 192 self.handler_queue = Queue()
194 193 self.command_queue = Queue()
195 194 super(XReqSocketChannel, self).__init__(context, session, address)
196 195
197 196 def run(self):
198 197 self.socket = self.context.socket(zmq.XREQ)
199 198 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
200 199 self.socket.connect('tcp://%s:%i' % self.address)
201 200 self.ioloop = ioloop.IOLoop()
202 201 self.ioloop.add_handler(self.socket, self._handle_events,
203 202 POLLIN|POLLOUT|POLLERR)
204 203 self.ioloop.start()
205 204
206 205 def stop(self):
207 206 self.ioloop.stop()
208 207 super(XReqSocketChannel, self).stop()
209 208
210 209 def _handle_events(self, socket, events):
211 210 # Turn on and off POLLOUT depending on if we have made a request
212 211 if events & POLLERR:
213 212 self._handle_err()
214 213 if events & POLLOUT:
215 214 self._handle_send()
216 215 if events & POLLIN:
217 216 self._handle_recv()
218 217
219 218 def _handle_recv(self):
220 219 msg = self.socket.recv_json()
221 220 self.call_handlers(msg)
222 221
223 222 def _handle_send(self):
224 223 try:
225 224 msg = self.command_queue.get(False)
226 225 except Empty:
227 226 pass
228 227 else:
229 228 self.socket.send_json(msg)
230 229
231 230 def _handle_err(self):
232 231 raise zmq.ZmqError()
233 232
234 233 def _queue_request(self, msg, callback):
235 234 handler = self._find_handler(msg['msg_type'], callback)
236 235 self.handler_queue.put(handler)
237 236 self.command_queue.put(msg)
238 237
239 238 def execute(self, code, callback=None):
240 239 # Create class for content/msg creation. Related to, but possibly
241 240 # not in Session.
242 241 content = dict(code=code)
243 242 msg = self.session.msg('execute_request', content)
244 243 self._queue_request(msg, callback)
245 244 return msg['header']['msg_id']
246 245
247 246 def complete(self, text, line, block=None, callback=None):
248 247 content = dict(text=text, line=line)
249 248 msg = self.session.msg('complete_request', content)
250 249 self._queue_request(msg, callback)
251 250 return msg['header']['msg_id']
252 251
253 252 def object_info(self, oname, callback=None):
254 253 content = dict(oname=oname)
255 254 msg = self.session.msg('object_info_request', content)
256 255 self._queue_request(msg, callback)
257 256 return msg['header']['msg_id']
258 257
259 258 def _find_handler(self, name, callback):
260 259 if callback is not None:
261 260 return callback
262 261 handler = self.handlers.get(name)
263 262 if handler is None:
264 263 raise MissingHandlerError(
265 264 'No handler defined for method: %s' % name)
266 265 return handler
267 266
268 267 def override_call_handler(self, func):
269 268 """Permanently override the call_handler.
270 269
271 270 The function func will be called as::
272 271
273 272 func(handler, msg)
274 273
275 274 And must call::
276 275
277 276 handler(msg)
278 277
279 278 in the main thread.
280 279 """
281 280 assert callable(func), "not a callable: %r" % func
282 281 self._overriden_call_handler = func
283 282
284 283 def call_handlers(self, msg):
285 284 try:
286 285 handler = self.handler_queue.get(False)
287 286 except Empty:
288 287 print "Message received with no handler!!!"
289 288 print msg
290 289 else:
291 290 self.call_handler(handler, msg)
292 291
293 292 def call_handler(self, handler, msg):
294 293 if self._overriden_call_handler is not None:
295 294 self._overriden_call_handler(handler, msg)
296 295 elif hasattr(self, '_call_handler'):
297 296 call_handler = getattr(self, '_call_handler')
298 297 call_handler(handler, msg)
299 298 else:
300 299 raise RuntimeError('no handler!')
301 300
302 301
303 302 class RepSocketChannel(ZmqSocketChannel):
304 303
305 304 def on_raw_input(self):
306 305 pass
307 306
308 307
309 308 class KernelManager(HasTraits):
310 309 """ Manages a kernel for a frontend.
311 310
312 311 The SUB channel is for the frontend to receive messages published by the
313 312 kernel.
314 313
315 314 The REQ channel is for the frontend to make requests of the kernel.
316 315
317 316 The REP channel is for the kernel to request stdin (raw_input) from the
318 317 frontend.
319 318 """
320 319
321 320 # Whether the kernel manager is currently listening on its channels.
322 321 is_listening = Bool(False)
323 322
324 323 # The PyZMQ Context to use for communication with the kernel.
325 324 context = Instance(zmq.Context, ())
326 325
327 326 # The Session to use for communication with the kernel.
328 327 session = Instance(Session, ())
329 328
330 329 # The classes to use for the various channels.
331 330 sub_channel_class = Type(SubSocketChannel)
332 331 xreq_channel_class = Type(XReqSocketChannel)
333 332 rep_channel_class = Type(RepSocketChannel)
334 333
335 334 # Protected traits.
336 335 _kernel = Instance(Popen)
337 336 _sub_channel = Any
338 337 _xreq_channel = Any
339 338 _rep_channel = Any
340 339
341 340 #--------------------------------------------------------------------------
342 341 # Channel management methods:
343 342 #--------------------------------------------------------------------------
344 343
345 344 def start_listening(self):
346 345 """Starts listening on the specified ports. If already listening, raises
347 346 a RuntimeError.
348 347 """
349 348 if self.is_listening:
350 349 raise RuntimeError("Cannot start listening. Already listening!")
351 350 else:
352 351 self.is_listening = True
353 352 self.sub_channel.start()
354 353 self.xreq_channel.start()
355 354 self.rep_channel.start()
356 355
357 356 @property
358 357 def is_alive(self):
359 358 """ Returns whether the kernel is alive. """
360 359 if self.is_listening:
361 360 # TODO: check if alive.
362 361 return True
363 362 else:
364 363 return False
365 364
366 365 def stop_listening(self):
367 366 """Stops listening. If not listening, does nothing. """
368 367 if self.is_listening:
369 368 self.is_listening = False
370 369 self.sub_channel.stop()
371 370 self.xreq_channel.stop()
372 371 self.rep_channel.stop()
373 372
374 373 #--------------------------------------------------------------------------
375 374 # Kernel process management methods:
376 375 #--------------------------------------------------------------------------
377 376
378 377 def start_kernel(self):
379 378 """Starts a kernel process and configures the manager to use it.
380 379
381 380 If ports have been specified via the address attributes, they are used.
382 381 Otherwise, open ports are chosen by the OS and the channel port
383 382 attributes are configured as appropriate.
384 383 """
385 384 xreq, sub = self.xreq_address, self.sub_address
386 385 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
387 386 raise RuntimeError("Can only launch a kernel on localhost."
388 387 "Make sure that the '*_address' attributes are "
389 388 "configured properly.")
390 389
391 390 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
392 391 self.set_kernel(kernel)
393 392 self.xreq_address = (LOCALHOST, xrep)
394 393 self.sub_address = (LOCALHOST, pub)
395 394
396 395 def set_kernel(self, kernel):
397 396 """Sets the kernel manager's kernel to an existing kernel process.
398 397
399 398 It is *not* necessary to a set a kernel to communicate with it via the
400 399 channels, and those objects must be configured separately. It
401 400 *is* necessary to set a kernel if you want to use the manager (or
402 401 frontends that use the manager) to signal and/or kill the kernel.
403 402
404 403 Parameters:
405 404 -----------
406 405 kernel : Popen
407 406 An existing kernel process.
408 407 """
409 408 self._kernel = kernel
410 409
411 410 @property
412 411 def has_kernel(self):
413 412 """Returns whether a kernel process has been specified for the kernel
414 413 manager.
415 414
416 415 A kernel process can be set via 'start_kernel' or 'set_kernel'.
417 416 """
418 417 return self._kernel is not None
419 418
420 419 def kill_kernel(self):
421 420 """ Kill the running kernel. """
422 421 if self._kernel:
423 422 self._kernel.kill()
424 423 self._kernel = None
425 424 else:
426 425 raise RuntimeError("Cannot kill kernel. No kernel is running!")
427 426
428 427 def signal_kernel(self, signum):
429 428 """ Sends a signal to the kernel. """
430 429 if self._kernel:
431 430 self._kernel.send_signal(signum)
432 431 else:
433 432 raise RuntimeError("Cannot signal kernel. No kernel is running!")
434 433
435 434 #--------------------------------------------------------------------------
436 435 # Channels used for communication with the kernel:
437 436 #--------------------------------------------------------------------------
438 437
439 438 @property
440 439 def sub_channel(self):
441 440 """Get the SUB socket channel object."""
442 441 if self._sub_channel is None:
443 442 self._sub_channel = self.sub_channel_class(self.context,
444 443 self.session)
445 444 return self._sub_channel
446 445
447 446 @property
448 447 def xreq_channel(self):
449 448 """Get the REQ socket channel object to make requests of the kernel."""
450 449 if self._xreq_channel is None:
451 450 self._xreq_channel = self.xreq_channel_class(self.context,
452 451 self.session)
453 452 return self._xreq_channel
454 453
455 454 @property
456 455 def rep_channel(self):
457 456 """Get the REP socket channel object to handle stdin (raw_input)."""
458 457 if self._rep_channel is None:
459 458 self._rep_channel = self.rep_channel_class(self.context,
460 459 self.session)
461 460 return self._rep_channel
462 461
463 462 #--------------------------------------------------------------------------
464 463 # Delegates for the Channel address attributes:
465 464 #--------------------------------------------------------------------------
466 465
467 466 def get_sub_address(self):
468 467 return self.sub_channel.address
469 468
470 469 def set_sub_address(self, address):
471 470 self.sub_channel.address = address
472 471
473 472 sub_address = property(get_sub_address, set_sub_address,
474 473 doc="The address used by SUB socket channel.")
475 474
476 475 def get_xreq_address(self):
477 476 return self.xreq_channel.address
478 477
479 478 def set_xreq_address(self, address):
480 479 self.xreq_channel.address = address
481 480
482 481 xreq_address = property(get_xreq_address, set_xreq_address,
483 482 doc="The address used by XREQ socket channel.")
484 483
485 484 def get_rep_address(self):
486 485 return self.rep_channel.address
487 486
488 487 def set_rep_address(self, address):
489 488 self.rep_channel.address = address
490 489
491 490 rep_address = property(get_rep_address, set_rep_address,
492 491 doc="The address used by REP socket channel.")
493 492
General Comments 0
You need to be logged in to leave comments. Login now