##// END OF EJS Templates
* Implemented KernelManager's 'signal_kernel' method....
epatters -
Show More
@@ -1,462 +1,493 b''
1 1 """Kernel frontend classes.
2 2
3 3 TODO: Create logger to handle debugging and console messages.
4 4
5 5 """
6 6
7 7 # Standard library imports.
8 8 from Queue import Queue, Empty
9 9 from subprocess import Popen
10 10 from threading import Thread
11 11 import time
12 12 import traceback
13 13
14 14 # System library imports.
15 15 import zmq
16 16 from zmq import POLLIN, POLLOUT, POLLERR
17 17 from zmq.eventloop import ioloop
18 18
19 19 # Local imports.
20 20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
21 21 Type
22 22 from kernel import launch_kernel
23 23 from session import Session
24 24
25 25 # Constants.
26 26 LOCALHOST = '127.0.0.1'
27 27
28 28
29 29 class MissingHandlerError(Exception):
30 30 pass
31 31
32 32
33 33 class ZmqSocketChannel(Thread):
34 34 """ The base class for the channels that use ZMQ sockets.
35 35 """
36 36
37 37 def __init__(self, context, session, address=None):
38 38 super(ZmqSocketChannel, self).__init__()
39 39 self.daemon = True
40 40
41 41 self.context = context
42 42 self.session = session
43 43 self.address = address
44 44 self.socket = None
45 45
46 46 def stop(self):
47 47 """ Stop the thread's activity. Returns when the thread terminates.
48 48 """
49 49 self.join()
50 50
51 51 # Allow the thread to be started again.
52 52 # FIXME: Although this works (and there's no reason why it shouldn't),
53 53 # it feels wrong. Is there a cleaner way to achieve this?
54 54 Thread.__init__(self)
55 55
56 56 def get_address(self):
57 57 """ Get the channel's address. By the default, a channel is on
58 58 localhost with no port specified (a negative port number).
59 59 """
60 60 return self._address
61 61
62 62 def set_adresss(self, address):
63 63 """ Set the channel's address. Should be a tuple of form:
64 64 (ip address [str], port [int]).
65 65 or None, in which case the address is reset to its default value.
66 66 """
67 67 # FIXME: Validate address.
68 68 if self.is_alive():
69 69 raise RuntimeError("Cannot set address on a running channel!")
70 70 else:
71 71 if address is None:
72 72 address = (LOCALHOST, -1)
73 73 self._address = address
74 74
75 75 address = property(get_address, set_adresss)
76 76
77 77
78 78 class SubSocketChannel(ZmqSocketChannel):
79 79
80 80 handlers = None
81 81 _overriden_call_handler = None
82 82
83 83 def __init__(self, context, session, address=None):
84 84 self.handlers = {}
85 85 super(SubSocketChannel, self).__init__(context, session, address)
86 86
87 87 def run(self):
88 88 self.socket = self.context.socket(zmq.SUB)
89 89 self.socket.setsockopt(zmq.SUBSCRIBE,'')
90 90 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
91 91 self.socket.connect('tcp://%s:%i' % self.address)
92 92 self.ioloop = ioloop.IOLoop()
93 93 self.ioloop.add_handler(self.socket, self._handle_events,
94 94 POLLIN|POLLERR)
95 95 self.ioloop.start()
96 96
97 97 def stop(self):
98 98 self.ioloop.stop()
99 99 super(SubSocketChannel, self).stop()
100 100
101 101 def _handle_events(self, socket, events):
102 102 # Turn on and off POLLOUT depending on if we have made a request
103 103 if events & POLLERR:
104 104 self._handle_err()
105 105 if events & POLLIN:
106 106 self._handle_recv()
107 107
108 108 def _handle_err(self):
109 109 raise zmq.ZmqError()
110 110
111 111 def _handle_recv(self):
112 112 msg = self.socket.recv_json()
113 113 self.call_handlers(msg)
114 114
115 115 def override_call_handler(self, func):
116 116 """Permanently override the call_handler.
117 117
118 118 The function func will be called as::
119 119
120 120 func(handler, msg)
121 121
122 122 And must call::
123 123
124 124 handler(msg)
125 125
126 126 in the main thread.
127 127 """
128 128 assert callable(func), "not a callable: %r" % func
129 129 self._overriden_call_handler = func
130 130
131 131 def call_handlers(self, msg):
132 132 handler = self.handlers.get(msg['msg_type'], None)
133 133 if handler is not None:
134 134 try:
135 135 self.call_handler(handler, msg)
136 136 except:
137 137 # XXX: This should be logged at least
138 138 traceback.print_last()
139 139
140 140 def call_handler(self, handler, msg):
141 141 if self._overriden_call_handler is not None:
142 142 self._overriden_call_handler(handler, msg)
143 143 elif hasattr(self, '_call_handler'):
144 144 call_handler = getattr(self, '_call_handler')
145 145 call_handler(handler, msg)
146 146 else:
147 147 raise RuntimeError('no handler!')
148 148
149 149 def add_handler(self, callback, msg_type):
150 150 """Register a callback for msg type."""
151 151 self.handlers[msg_type] = callback
152 152
153 153 def remove_handler(self, msg_type):
154 154 """Remove the callback for msg type."""
155 155 self.handlers.pop(msg_type, None)
156 156
157 157 def flush(self, timeout=1.0):
158 158 """Immediately processes all pending messages on the SUB channel.
159 159
160 160 This method is thread safe.
161 161
162 162 Parameters
163 163 ----------
164 164 timeout : float, optional
165 165 The maximum amount of time to spend flushing, in seconds. The
166 166 default is one second.
167 167 """
168 168 # We do the IOLoop callback process twice to ensure that the IOLoop
169 169 # gets to perform at least one full poll.
170 170 stop_time = time.time() + timeout
171 171 for i in xrange(2):
172 172 self._flushed = False
173 173 self.ioloop.add_callback(self._flush)
174 174 while not self._flushed and time.time() < stop_time:
175 175 time.sleep(0.01)
176 176
177 177 def _flush(self):
178 178 """Called in this thread by the IOLoop to indicate that all events have
179 179 been processed.
180 180 """
181 181 self._flushed = True
182 182
183 183
184 184 class XReqSocketChannel(ZmqSocketChannel):
185 185
186 186 handler_queue = None
187 187 command_queue = None
188 188 handlers = None
189 189 _overriden_call_handler = None
190 190
191 191 def __init__(self, context, session, address=None):
192 192 self.handlers = {}
193 193 self.handler_queue = Queue()
194 194 self.command_queue = Queue()
195 195 super(XReqSocketChannel, self).__init__(context, session, address)
196 196
197 197 def run(self):
198 198 self.socket = self.context.socket(zmq.XREQ)
199 199 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
200 200 self.socket.connect('tcp://%s:%i' % self.address)
201 201 self.ioloop = ioloop.IOLoop()
202 202 self.ioloop.add_handler(self.socket, self._handle_events,
203 203 POLLIN|POLLOUT|POLLERR)
204 204 self.ioloop.start()
205 205
206 206 def stop(self):
207 207 self.ioloop.stop()
208 208 super(XReqSocketChannel, self).stop()
209 209
210 210 def _handle_events(self, socket, events):
211 211 # Turn on and off POLLOUT depending on if we have made a request
212 212 if events & POLLERR:
213 213 self._handle_err()
214 214 if events & POLLOUT:
215 215 self._handle_send()
216 216 if events & POLLIN:
217 217 self._handle_recv()
218 218
219 219 def _handle_recv(self):
220 220 msg = self.socket.recv_json()
221 221 self.call_handlers(msg)
222 222
223 223 def _handle_send(self):
224 224 try:
225 225 msg = self.command_queue.get(False)
226 226 except Empty:
227 227 pass
228 228 else:
229 229 self.socket.send_json(msg)
230 230
231 231 def _handle_err(self):
232 232 raise zmq.ZmqError()
233 233
234 234 def _queue_request(self, msg, callback):
235 235 handler = self._find_handler(msg['msg_type'], callback)
236 236 self.handler_queue.put(handler)
237 237 self.command_queue.put(msg)
238 238
239 239 def execute(self, code, callback=None):
240 240 # Create class for content/msg creation. Related to, but possibly
241 241 # not in Session.
242 242 content = dict(code=code)
243 243 msg = self.session.msg('execute_request', content)
244 244 self._queue_request(msg, callback)
245 245 return msg['header']['msg_id']
246 246
247 247 def complete(self, text, line, block=None, callback=None):
248 248 content = dict(text=text, line=line)
249 249 msg = self.session.msg('complete_request', content)
250 250 self._queue_request(msg, callback)
251 251 return msg['header']['msg_id']
252 252
253 253 def object_info(self, oname, callback=None):
254 254 content = dict(oname=oname)
255 255 msg = self.session.msg('object_info_request', content)
256 256 self._queue_request(msg, callback)
257 257 return msg['header']['msg_id']
258 258
259 259 def _find_handler(self, name, callback):
260 260 if callback is not None:
261 261 return callback
262 262 handler = self.handlers.get(name)
263 263 if handler is None:
264 264 raise MissingHandlerError(
265 265 'No handler defined for method: %s' % name)
266 266 return handler
267 267
268 268 def override_call_handler(self, func):
269 269 """Permanently override the call_handler.
270 270
271 271 The function func will be called as::
272 272
273 273 func(handler, msg)
274 274
275 275 And must call::
276 276
277 277 handler(msg)
278 278
279 279 in the main thread.
280 280 """
281 281 assert callable(func), "not a callable: %r" % func
282 282 self._overriden_call_handler = func
283 283
284 284 def call_handlers(self, msg):
285 285 try:
286 286 handler = self.handler_queue.get(False)
287 287 except Empty:
288 288 print "Message received with no handler!!!"
289 289 print msg
290 290 else:
291 291 self.call_handler(handler, msg)
292 292
293 293 def call_handler(self, handler, msg):
294 294 if self._overriden_call_handler is not None:
295 295 self._overriden_call_handler(handler, msg)
296 296 elif hasattr(self, '_call_handler'):
297 297 call_handler = getattr(self, '_call_handler')
298 298 call_handler(handler, msg)
299 299 else:
300 300 raise RuntimeError('no handler!')
301 301
302 302
303 303 class RepSocketChannel(ZmqSocketChannel):
304 304
305 305 def on_raw_input(self):
306 306 pass
307 307
308 308
309 309 class KernelManager(HasTraits):
310 310 """ Manages a kernel for a frontend.
311 311
312 312 The SUB channel is for the frontend to receive messages published by the
313 313 kernel.
314 314
315 315 The REQ channel is for the frontend to make requests of the kernel.
316 316
317 317 The REP channel is for the kernel to request stdin (raw_input) from the
318 318 frontend.
319 319 """
320 320
321 321 # Whether the kernel manager is currently listening on its channels.
322 322 is_listening = Bool(False)
323 323
324 324 # The PyZMQ Context to use for communication with the kernel.
325 325 context = Instance(zmq.Context, ())
326 326
327 327 # The Session to use for communication with the kernel.
328 328 session = Instance(Session, ())
329 329
330 330 # The classes to use for the various channels.
331 331 sub_channel_class = Type(SubSocketChannel)
332 332 xreq_channel_class = Type(XReqSocketChannel)
333 333 rep_channel_class = Type(RepSocketChannel)
334 334
335 335 # Protected traits.
336 336 _kernel = Instance(Popen)
337 337 _sub_channel = Any
338 338 _xreq_channel = Any
339 339 _rep_channel = Any
340 340
341 def __init__(self, **traits):
342 super(KernelManager, self).__init__()
343
344 # FIXME: This should be the business of HasTraits. The convention is:
345 # HasTraits.__init__(self, **traits_to_be_initialized.)
346 for trait in traits:
347 setattr(self, trait, traits[trait])
341 #--------------------------------------------------------------------------
342 # Channel management methods:
343 #--------------------------------------------------------------------------
348 344
349 345 def start_listening(self):
350 """Start listening on the specified ports. If already listening, raises
346 """Starts listening on the specified ports. If already listening, raises
351 347 a RuntimeError.
352 348 """
353 349 if self.is_listening:
354 350 raise RuntimeError("Cannot start listening. Already listening!")
355 351 else:
356 352 self.is_listening = True
357 353 self.sub_channel.start()
358 354 self.xreq_channel.start()
359 355 self.rep_channel.start()
360 356
357 @property
358 def is_alive(self):
359 """ Returns whether the kernel is alive. """
360 if self.is_listening:
361 # TODO: check if alive.
362 return True
363 else:
364 return False
365
361 366 def stop_listening(self):
362 """Stop listening. If not listening, does nothing. """
367 """Stops listening. If not listening, does nothing. """
363 368 if self.is_listening:
364 369 self.is_listening = False
365 370 self.sub_channel.stop()
366 371 self.xreq_channel.stop()
367 372 self.rep_channel.stop()
368 373
374 #--------------------------------------------------------------------------
375 # Kernel process management methods:
376 #--------------------------------------------------------------------------
377
369 378 def start_kernel(self):
370 """Start a localhost kernel. If ports have been specified via the
371 address attributes, use them. Otherwise, choose open ports at random.
379 """Starts a kernel process and configures the manager to use it.
380
381 If ports have been specified via the address attributes, they are used.
382 Otherwise, open ports are chosen by the OS and the channel port
383 attributes are configured as appropriate.
372 384 """
373 385 xreq, sub = self.xreq_address, self.sub_address
374 386 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
375 387 raise RuntimeError("Can only launch a kernel on localhost."
376 388 "Make sure that the '*_address' attributes are "
377 389 "configured properly.")
378 390
379 self._kernel, xrep, pub = launch_kernel(xrep_port=xreq[1],
380 pub_port=sub[1])
391 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
392 self.set_kernel(kernel)
381 393 self.xreq_address = (LOCALHOST, xrep)
382 394 self.sub_address = (LOCALHOST, pub)
383 395
384 def kill_kernel(self):
385 """Kill the running kernel, if there is one.
396 def set_kernel(self, kernel):
397 """Sets the kernel manager's kernel to an existing kernel process.
398
399 It is *not* necessary to a set a kernel to communicate with it via the
400 channels, and those objects must be configured separately. It
401 *is* necessary to set a kernel if you want to use the manager (or
402 frontends that use the manager) to signal and/or kill the kernel.
403
404 Parameters:
405 -----------
406 kernel : Popen
407 An existing kernel process.
408 """
409 self._kernel = kernel
410
411 @property
412 def has_kernel(self):
413 """Returns whether a kernel process has been specified for the kernel
414 manager.
415
416 A kernel process can be set via 'start_kernel' or 'set_kernel'.
386 417 """
418 return self._kernel is not None
419
420 def kill_kernel(self):
421 """ Kill the running kernel. """
387 422 if self._kernel:
388 423 self._kernel.kill()
389 424 self._kernel = None
390
391 @property
392 def is_alive(self):
393 """ Returns whether the kernel is alive. """
394 if self.is_listening:
395 # TODO: check if alive.
396 return True
397 425 else:
398 return False
426 raise RuntimeError("Cannot kill kernel. No kernel is running!")
399 427
400 428 def signal_kernel(self, signum):
401 """Send signum to the kernel."""
402 # TODO: signal the kernel.
429 """ Sends a signal to the kernel. """
430 if self._kernel:
431 self._kernel.send_signal(signum)
432 else:
433 raise RuntimeError("Cannot signal kernel. No kernel is running!")
403 434
404 435 #--------------------------------------------------------------------------
405 436 # Channels used for communication with the kernel:
406 437 #--------------------------------------------------------------------------
407 438
408 439 @property
409 440 def sub_channel(self):
410 441 """Get the SUB socket channel object."""
411 442 if self._sub_channel is None:
412 443 self._sub_channel = self.sub_channel_class(self.context,
413 444 self.session)
414 445 return self._sub_channel
415 446
416 447 @property
417 448 def xreq_channel(self):
418 449 """Get the REQ socket channel object to make requests of the kernel."""
419 450 if self._xreq_channel is None:
420 451 self._xreq_channel = self.xreq_channel_class(self.context,
421 452 self.session)
422 453 return self._xreq_channel
423 454
424 455 @property
425 456 def rep_channel(self):
426 457 """Get the REP socket channel object to handle stdin (raw_input)."""
427 458 if self._rep_channel is None:
428 459 self._rep_channel = self.rep_channel_class(self.context,
429 460 self.session)
430 461 return self._rep_channel
431 462
432 463 #--------------------------------------------------------------------------
433 # Channel address attributes:
464 # Delegates for the Channel address attributes:
434 465 #--------------------------------------------------------------------------
435 466
436 467 def get_sub_address(self):
437 468 return self.sub_channel.address
438 469
439 470 def set_sub_address(self, address):
440 471 self.sub_channel.address = address
441 472
442 473 sub_address = property(get_sub_address, set_sub_address,
443 474 doc="The address used by SUB socket channel.")
444 475
445 476 def get_xreq_address(self):
446 477 return self.xreq_channel.address
447 478
448 479 def set_xreq_address(self, address):
449 480 self.xreq_channel.address = address
450 481
451 482 xreq_address = property(get_xreq_address, set_xreq_address,
452 483 doc="The address used by XREQ socket channel.")
453 484
454 485 def get_rep_address(self):
455 486 return self.rep_channel.address
456 487
457 488 def set_rep_address(self, address):
458 489 self.rep_channel.address = address
459 490
460 491 rep_address = property(get_rep_address, set_rep_address,
461 492 doc="The address used by REP socket channel.")
462 493
General Comments 0
You need to be logged in to leave comments. Login now