##// END OF EJS Templates
SUB channel _handle_recv is now greedy....
Brian Granger -
Show More
@@ -1,458 +1,466 b''
1 1 """Kernel frontend classes.
2 2
3 3 TODO: Create logger to handle debugging and console messages.
4 4
5 5 """
6 6
7 7 # Standard library imports.
8 8 from Queue import Queue, Empty
9 9 from subprocess import Popen
10 10 from threading import Thread
11 11 import time
12 12 import traceback
13 13
14 14 # System library imports.
15 15 import zmq
16 16 from zmq import POLLIN, POLLOUT, POLLERR
17 17 from zmq.eventloop import ioloop
18 18
19 19 # Local imports.
20 20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
21 21 Type
22 22 from kernel import launch_kernel
23 23 from session import Session
24 24
25 25 # Constants.
26 26 LOCALHOST = '127.0.0.1'
27 27
28 28
29 29 class MissingHandlerError(Exception):
30 30 pass
31 31
32 32
33 33 class ZmqSocketChannel(Thread):
34 34 """ The base class for the channels that use ZMQ sockets.
35 35 """
36 36
37 37 def __init__(self, context, session, address=None):
38 38 super(ZmqSocketChannel, self).__init__()
39 39 self.daemon = True
40 40
41 41 self.context = context
42 42 self.session = session
43 43 self.address = address
44 44 self.socket = None
45 45
46 46 def stop(self):
47 47 """Stop the thread's activity. Returns when the thread terminates.
48 48
49 49 The thread will raise :class:`RuntimeError` if :method:`self.start`
50 50 is called again.
51 51 """
52 52 self.join()
53 53
54 54
55 55 def get_address(self):
56 56 """ Get the channel's address. By the default, a channel is on
57 57 localhost with no port specified (a negative port number).
58 58 """
59 59 return self._address
60 60
61 61 def set_adresss(self, address):
62 62 """ Set the channel's address. Should be a tuple of form:
63 63 (ip address [str], port [int]).
64 64 or None, in which case the address is reset to its default value.
65 65 """
66 66 # FIXME: Validate address.
67 67 if self.is_alive(): # This is Thread.is_alive
68 68 raise RuntimeError("Cannot set address on a running channel!")
69 69 else:
70 70 if address is None:
71 71 address = (LOCALHOST, 0)
72 72 self._address = address
73 73
74 74 address = property(get_address, set_adresss)
75 75
76 76
77 77 class SubSocketChannel(ZmqSocketChannel):
78 78
79 79 def __init__(self, context, session, address=None):
80 80 super(SubSocketChannel, self).__init__(context, session, address)
81 81
82 82 def run(self):
83 83 self.socket = self.context.socket(zmq.SUB)
84 84 self.socket.setsockopt(zmq.SUBSCRIBE,'')
85 85 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
86 86 self.socket.connect('tcp://%s:%i' % self.address)
87 87 self.ioloop = ioloop.IOLoop()
88 88 self.ioloop.add_handler(self.socket, self._handle_events,
89 89 POLLIN|POLLERR)
90 90 self.ioloop.start()
91 91
92 92 def stop(self):
93 93 self.ioloop.stop()
94 94 super(SubSocketChannel, self).stop()
95 95
96 96 def _handle_events(self, socket, events):
97 97 # Turn on and off POLLOUT depending on if we have made a request
98 98 if events & POLLERR:
99 99 self._handle_err()
100 100 if events & POLLIN:
101 101 self._handle_recv()
102 102
103 103 def _handle_err(self):
104 104 # We don't want to let this go silently, so eventually we should log.
105 raise zmq.ZmqError()
105 raise zmq.ZMQError()
106 106
107 107 def _handle_recv(self):
108 msg = self.socket.recv_json()
109 self.call_handlers(msg)
108 # Get all of the messages we can
109 while True:
110 try:
111 msg = self.socket.recv_json(zmq.NOBLOCK)
112 except zmq.ZMQError:
113 # Check the errno?
114 # Will this tigger POLLERR?
115 break
116 else:
117 self.call_handlers(msg)
110 118
111 119 def call_handlers(self, msg):
112 120 """This method is called in the ioloop thread when a message arrives.
113 121
114 122 Subclasses should override this method to handle incoming messages.
115 123 It is important to remember that this method is called in the thread
116 124 so that some logic must be done to ensure that the application leve
117 125 handlers are called in the application thread.
118 126 """
119 127 raise NotImplementedError('call_handlers must be defined in a subclass.')
120 128
121 129 def flush(self, timeout=1.0):
122 130 """Immediately processes all pending messages on the SUB channel.
123 131
124 132 This method is thread safe.
125 133
126 134 Parameters
127 135 ----------
128 136 timeout : float, optional
129 137 The maximum amount of time to spend flushing, in seconds. The
130 138 default is one second.
131 139 """
132 140 # We do the IOLoop callback process twice to ensure that the IOLoop
133 141 # gets to perform at least one full poll.
134 142 stop_time = time.time() + timeout
135 143 for i in xrange(2):
136 144 self._flushed = False
137 145 self.ioloop.add_callback(self._flush)
138 146 while not self._flushed and time.time() < stop_time:
139 147 time.sleep(0.01)
140 148
141 149 def _flush(self):
142 150 """Called in this thread by the IOLoop to indicate that all events have
143 151 been processed.
144 152 """
145 153 self._flushed = True
146 154
147 155
148 156 class XReqSocketChannel(ZmqSocketChannel):
149 157
150 158 handler_queue = None
151 159 command_queue = None
152 160 handlers = None
153 161 _overriden_call_handler = None
154 162
155 163 def __init__(self, context, session, address=None):
156 164 self.handlers = {}
157 165 self.handler_queue = Queue()
158 166 self.command_queue = Queue()
159 167 super(XReqSocketChannel, self).__init__(context, session, address)
160 168
161 169 def run(self):
162 170 self.socket = self.context.socket(zmq.XREQ)
163 171 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
164 172 self.socket.connect('tcp://%s:%i' % self.address)
165 173 self.ioloop = ioloop.IOLoop()
166 174 self.ioloop.add_handler(self.socket, self._handle_events,
167 175 POLLIN|POLLOUT|POLLERR)
168 176 self.ioloop.start()
169 177
170 178 def stop(self):
171 179 self.ioloop.stop()
172 180 super(XReqSocketChannel, self).stop()
173 181
174 182 def _handle_events(self, socket, events):
175 183 # Turn on and off POLLOUT depending on if we have made a request
176 184 if events & POLLERR:
177 185 self._handle_err()
178 186 if events & POLLOUT:
179 187 self._handle_send()
180 188 if events & POLLIN:
181 189 self._handle_recv()
182 190
183 191 def _handle_recv(self):
184 192 msg = self.socket.recv_json()
185 193 self.call_handlers(msg)
186 194
187 195 def _handle_send(self):
188 196 try:
189 197 msg = self.command_queue.get(False)
190 198 except Empty:
191 199 pass
192 200 else:
193 201 self.socket.send_json(msg)
194 202
195 203 def _handle_err(self):
196 204 # We don't want to let this go silently, so eventually we should log.
197 raise zmq.ZmqError()
205 raise zmq.ZMQError()
198 206
199 207 def _queue_request(self, msg, callback):
200 208 handler = self._find_handler(msg['msg_type'], callback)
201 209 self.handler_queue.put(handler)
202 210 self.command_queue.put(msg)
203 211
204 212 def execute(self, code, callback=None):
205 213 # Create class for content/msg creation. Related to, but possibly
206 214 # not in Session.
207 215 content = dict(code=code)
208 216 msg = self.session.msg('execute_request', content)
209 217 self._queue_request(msg, callback)
210 218 return msg['header']['msg_id']
211 219
212 220 def complete(self, text, line, block=None, callback=None):
213 221 content = dict(text=text, line=line)
214 222 msg = self.session.msg('complete_request', content)
215 223 self._queue_request(msg, callback)
216 224 return msg['header']['msg_id']
217 225
218 226 def object_info(self, oname, callback=None):
219 227 content = dict(oname=oname)
220 228 msg = self.session.msg('object_info_request', content)
221 229 self._queue_request(msg, callback)
222 230 return msg['header']['msg_id']
223 231
224 232 def _find_handler(self, name, callback):
225 233 if callback is not None:
226 234 return callback
227 235 handler = self.handlers.get(name)
228 236 if handler is None:
229 237 raise MissingHandlerError(
230 238 'No handler defined for method: %s' % name)
231 239 return handler
232 240
233 241 def override_call_handler(self, func):
234 242 """Permanently override the call_handler.
235 243
236 244 The function func will be called as::
237 245
238 246 func(handler, msg)
239 247
240 248 And must call::
241 249
242 250 handler(msg)
243 251
244 252 in the main thread.
245 253 """
246 254 assert callable(func), "not a callable: %r" % func
247 255 self._overriden_call_handler = func
248 256
249 257 def call_handlers(self, msg):
250 258 try:
251 259 handler = self.handler_queue.get(False)
252 260 except Empty:
253 261 print "Message received with no handler!!!"
254 262 print msg
255 263 else:
256 264 self.call_handler(handler, msg)
257 265
258 266 def call_handler(self, handler, msg):
259 267 if self._overriden_call_handler is not None:
260 268 self._overriden_call_handler(handler, msg)
261 269 elif hasattr(self, '_call_handler'):
262 270 call_handler = getattr(self, '_call_handler')
263 271 call_handler(handler, msg)
264 272 else:
265 273 raise RuntimeError('no handler!')
266 274
267 275
268 276 class RepSocketChannel(ZmqSocketChannel):
269 277
270 278 def on_raw_input(self):
271 279 pass
272 280
273 281
274 282 class KernelManager(HasTraits):
275 283 """ Manages a kernel for a frontend.
276 284
277 285 The SUB channel is for the frontend to receive messages published by the
278 286 kernel.
279 287
280 288 The REQ channel is for the frontend to make requests of the kernel.
281 289
282 290 The REP channel is for the kernel to request stdin (raw_input) from the
283 291 frontend.
284 292 """
285 293
286 294 # Whether the kernel manager is currently listening on its channels.
287 295 is_listening = Bool(False)
288 296
289 297 # The PyZMQ Context to use for communication with the kernel.
290 298 context = Instance(zmq.Context, ())
291 299
292 300 # The Session to use for communication with the kernel.
293 301 session = Instance(Session, ())
294 302
295 303 # The classes to use for the various channels.
296 304 sub_channel_class = Type(SubSocketChannel)
297 305 xreq_channel_class = Type(XReqSocketChannel)
298 306 rep_channel_class = Type(RepSocketChannel)
299 307
300 308 # Protected traits.
301 309 _kernel = Instance(Popen)
302 310 _sub_channel = Any
303 311 _xreq_channel = Any
304 312 _rep_channel = Any
305 313
306 314 #--------------------------------------------------------------------------
307 315 # Channel management methods:
308 316 #--------------------------------------------------------------------------
309 317
310 318 def start_listening(self):
311 319 """Starts listening on the specified ports. If already listening, raises
312 320 a RuntimeError.
313 321 """
314 322 if self.is_listening:
315 323 raise RuntimeError("Cannot start listening. Already listening!")
316 324 else:
317 325 self.is_listening = True
318 326 self.sub_channel.start()
319 327 self.xreq_channel.start()
320 328 self.rep_channel.start()
321 329
322 330 @property
323 331 def is_alive(self):
324 332 """ Returns whether the kernel is alive. """
325 333 if self.is_listening:
326 334 # TODO: check if alive.
327 335 return True
328 336 else:
329 337 return False
330 338
331 339 def stop_listening(self):
332 340 """Stops listening. If not listening, does nothing. """
333 341 if self.is_listening:
334 342 self.is_listening = False
335 343 self.sub_channel.stop()
336 344 self.xreq_channel.stop()
337 345 self.rep_channel.stop()
338 346
339 347 #--------------------------------------------------------------------------
340 348 # Kernel process management methods:
341 349 #--------------------------------------------------------------------------
342 350
343 351 def start_kernel(self):
344 352 """Starts a kernel process and configures the manager to use it.
345 353
346 354 If ports have been specified via the address attributes, they are used.
347 355 Otherwise, open ports are chosen by the OS and the channel port
348 356 attributes are configured as appropriate.
349 357 """
350 358 xreq, sub = self.xreq_address, self.sub_address
351 359 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
352 360 raise RuntimeError("Can only launch a kernel on localhost."
353 361 "Make sure that the '*_address' attributes are "
354 362 "configured properly.")
355 363
356 364 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
357 365 self.set_kernel(kernel)
358 366 self.xreq_address = (LOCALHOST, xrep)
359 367 self.sub_address = (LOCALHOST, pub)
360 368
361 369 def set_kernel(self, kernel):
362 370 """Sets the kernel manager's kernel to an existing kernel process.
363 371
364 372 It is *not* necessary to a set a kernel to communicate with it via the
365 373 channels, and those objects must be configured separately. It
366 374 *is* necessary to set a kernel if you want to use the manager (or
367 375 frontends that use the manager) to signal and/or kill the kernel.
368 376
369 377 Parameters:
370 378 -----------
371 379 kernel : Popen
372 380 An existing kernel process.
373 381 """
374 382 self._kernel = kernel
375 383
376 384 @property
377 385 def has_kernel(self):
378 386 """Returns whether a kernel process has been specified for the kernel
379 387 manager.
380 388
381 389 A kernel process can be set via 'start_kernel' or 'set_kernel'.
382 390 """
383 391 return self._kernel is not None
384 392
385 393 def kill_kernel(self):
386 394 """ Kill the running kernel. """
387 395 if self._kernel:
388 396 self._kernel.kill()
389 397 self._kernel = None
390 398 else:
391 399 raise RuntimeError("Cannot kill kernel. No kernel is running!")
392 400
393 401 def signal_kernel(self, signum):
394 402 """ Sends a signal to the kernel. """
395 403 if self._kernel:
396 404 self._kernel.send_signal(signum)
397 405 else:
398 406 raise RuntimeError("Cannot signal kernel. No kernel is running!")
399 407
400 408 #--------------------------------------------------------------------------
401 409 # Channels used for communication with the kernel:
402 410 #--------------------------------------------------------------------------
403 411
404 412 @property
405 413 def sub_channel(self):
406 414 """Get the SUB socket channel object."""
407 415 if self._sub_channel is None:
408 416 self._sub_channel = self.sub_channel_class(self.context,
409 417 self.session)
410 418 return self._sub_channel
411 419
412 420 @property
413 421 def xreq_channel(self):
414 422 """Get the REQ socket channel object to make requests of the kernel."""
415 423 if self._xreq_channel is None:
416 424 self._xreq_channel = self.xreq_channel_class(self.context,
417 425 self.session)
418 426 return self._xreq_channel
419 427
420 428 @property
421 429 def rep_channel(self):
422 430 """Get the REP socket channel object to handle stdin (raw_input)."""
423 431 if self._rep_channel is None:
424 432 self._rep_channel = self.rep_channel_class(self.context,
425 433 self.session)
426 434 return self._rep_channel
427 435
428 436 #--------------------------------------------------------------------------
429 437 # Delegates for the Channel address attributes:
430 438 #--------------------------------------------------------------------------
431 439
432 440 def get_sub_address(self):
433 441 return self.sub_channel.address
434 442
435 443 def set_sub_address(self, address):
436 444 self.sub_channel.address = address
437 445
438 446 sub_address = property(get_sub_address, set_sub_address,
439 447 doc="The address used by SUB socket channel.")
440 448
441 449 def get_xreq_address(self):
442 450 return self.xreq_channel.address
443 451
444 452 def set_xreq_address(self, address):
445 453 self.xreq_channel.address = address
446 454
447 455 xreq_address = property(get_xreq_address, set_xreq_address,
448 456 doc="The address used by XREQ socket channel.")
449 457
450 458 def get_rep_address(self):
451 459 return self.rep_channel.address
452 460
453 461 def set_rep_address(self, address):
454 462 self.rep_channel.address = address
455 463
456 464 rep_address = property(get_rep_address, set_rep_address,
457 465 doc="The address used by REP socket channel.")
458 466
General Comments 0
You need to be logged in to leave comments. Login now