##// END OF EJS Templates
logging implemented kernel's messages
Omar Andres Zapata Mesa -
Show More
@@ -1,85 +1,86 b''
1 import logging
1 2 import sys
2 3 import time
3 4 from cStringIO import StringIO
4 5
5 6 from session import extract_header, Message
6 7
7 8 from IPython.utils import io
8 9
9 10 #-----------------------------------------------------------------------------
10 11 # Stream classes
11 12 #-----------------------------------------------------------------------------
12 13
13 14 class OutStream(object):
14 15 """A file like object that publishes the stream to a 0MQ PUB socket."""
15 16
16 17 # The time interval between automatic flushes, in seconds.
17 18 flush_interval = 0.05
18
19 _logger = logging.getLogger()
19 20 def __init__(self, session, pub_socket, name):
20 21 self.session = session
21 22 self.pub_socket = pub_socket
22 23 self.name = name
23 24 self.parent_header = {}
24 25 self._new_buffer()
25 26
26 27 def set_parent(self, parent):
27 28 self.parent_header = extract_header(parent)
28 29
29 30 def close(self):
30 31 self.pub_socket = None
31 32
32 33 def flush(self):
33 34 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
34 35 if self.pub_socket is None:
35 36 raise ValueError(u'I/O operation on closed file')
36 37 else:
37 38 data = self._buffer.getvalue()
38 39 if data:
39 40 content = {u'name':self.name, u'data':data}
40 41 msg = self.session.msg(u'stream', content=content,
41 42 parent=self.parent_header)
42 io.raw_print(msg)
43 self._logger.debug(msg)
43 44 self.pub_socket.send_json(msg)
44 45
45 46 self._buffer.close()
46 47 self._new_buffer()
47 48
48 49 def isatty(self):
49 50 return False
50 51
51 52 def next(self):
52 53 raise IOError('Read not supported on a write only stream.')
53 54
54 55 def read(self, size=-1):
55 56 raise IOError('Read not supported on a write only stream.')
56 57
57 58 def readline(self, size=-1):
58 59 raise IOError('Read not supported on a write only stream.')
59 60
60 61 def write(self, string):
61 62 if self.pub_socket is None:
62 63 raise ValueError('I/O operation on closed file')
63 64 else:
64 65 # We can only send raw bytes, not unicode objects, so we encode
65 66 # into utf-8 for all frontends if we get unicode inputs.
66 67 if type(string) == unicode:
67 68 string = string.encode('utf-8')
68 69
69 70 self._buffer.write(string)
70 71 current_time = time.time()
71 72 if self._start <= 0:
72 73 self._start = current_time
73 74 elif current_time - self._start > self.flush_interval:
74 75 self.flush()
75 76
76 77 def writelines(self, sequence):
77 78 if self.pub_socket is None:
78 79 raise ValueError('I/O operation on closed file')
79 80 else:
80 81 for string in sequence:
81 82 self.write(string)
82 83
83 84 def _new_buffer(self):
84 85 self._buffer = StringIO()
85 86 self._start = -1
@@ -1,642 +1,644 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports.
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24
24 import logging
25 25 # System library imports.
26 26 import zmq
27 27
28 28 # Local imports.
29 29 from IPython.config.configurable import Configurable
30 30 from IPython.utils import io
31 31 from IPython.utils.jsonutil import json_clean
32 32 from IPython.lib import pylabtools
33 33 from IPython.utils.traitlets import Instance, Float
34 34 from entry_point import (base_launch_kernel, make_argument_parser, make_kernel,
35 35 start_kernel)
36 36 from iostream import OutStream
37 37 from session import Session, Message
38 38 from zmqshell import ZMQInteractiveShell
39 39
40 40 #-----------------------------------------------------------------------------
41 41 # Main kernel class
42 42 #-----------------------------------------------------------------------------
43 43
44 44 class Kernel(Configurable):
45 45
46 46 #---------------------------------------------------------------------------
47 47 # Kernel interface
48 48 #---------------------------------------------------------------------------
49 49
50 50 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
51 51 session = Instance(Session)
52 52 reply_socket = Instance('zmq.Socket')
53 53 pub_socket = Instance('zmq.Socket')
54 54 req_socket = Instance('zmq.Socket')
55 55
56 56 # Private interface
57 57
58 58 # Time to sleep after flushing the stdout/err buffers in each execute
59 59 # cycle. While this introduces a hard limit on the minimal latency of the
60 60 # execute cycle, it helps prevent output synchronization problems for
61 61 # clients.
62 62 # Units are in seconds. The minimum zmq latency on local host is probably
63 63 # ~150 microseconds, set this to 500us for now. We may need to increase it
64 64 # a little if it's not enough after more interactive testing.
65 65 _execute_sleep = Float(0.0005, config=True)
66 66
67 67 # Frequency of the kernel's event loop.
68 68 # Units are in seconds, kernel subclasses for GUI toolkits may need to
69 69 # adapt to milliseconds.
70 70 _poll_interval = Float(0.05, config=True)
71 71
72 72 # If the shutdown was requested over the network, we leave here the
73 73 # necessary reply message so it can be sent by our registered atexit
74 74 # handler. This ensures that the reply is only sent to clients truly at
75 75 # the end of our shutdown process (which happens after the underlying
76 76 # IPython shell's own shutdown).
77 77 _shutdown_message = None
78 78
79 79 # This is a dict of port number that the kernel is listening on. It is set
80 80 # by record_ports and used by connect_request.
81 81 _recorded_ports = None
82
82
83
84 _logger = logging.getLogger()
85
83 86 def __init__(self, **kwargs):
84 87 super(Kernel, self).__init__(**kwargs)
85 88
86 89 # Before we even start up the shell, register *first* our exit handlers
87 90 # so they come before the shell's
88 91 atexit.register(self._at_shutdown)
89 92
90 93 # Initialize the InteractiveShell subclass
91 94 self.shell = ZMQInteractiveShell.instance()
92 95 self.shell.displayhook.session = self.session
93 96 self.shell.displayhook.pub_socket = self.pub_socket
94 97
95 98 # TMP - hack while developing
96 99 self.shell._reply_content = None
97 100
98 101 # Build dict of handlers for message types
99 102 msg_types = [ 'execute_request', 'complete_request',
100 103 'object_info_request', 'history_request',
101 104 'connect_request', 'shutdown_request']
102 105 self.handlers = {}
103 106 for msg_type in msg_types:
104 107 self.handlers[msg_type] = getattr(self, msg_type)
105 108
106 109 def do_one_iteration(self):
107 110 """Do one iteration of the kernel's evaluation loop.
108 111 """
109 112 try:
110 113 ident = self.reply_socket.recv(zmq.NOBLOCK)
111 114 except zmq.ZMQError, e:
112 115 if e.errno == zmq.EAGAIN:
113 116 return
114 117 else:
115 118 raise
116 119 # This assert will raise in versions of zeromq 2.0.7 and lesser.
117 120 # We now require 2.0.8 or above, so we can uncomment for safety.
118 121 assert self.reply_socket.rcvmore(), "Missing message part."
119 122 msg = self.reply_socket.recv_json()
120 123
121 124 # Print some info about this message and leave a '--->' marker, so it's
122 125 # easier to trace visually the message chain when debugging. Each
123 126 # handler prints its message at the end.
124 127 # Eventually we'll move these from stdout to a logger.
125 io.raw_print('\n*** MESSAGE TYPE:', msg['msg_type'], '***')
126 io.raw_print(' Content: ', msg['content'],
127 '\n --->\n ', sep='', end='')
128 self._logger.debug('\n*** MESSAGE TYPE:'+str(msg['msg_type'])+'***')
129 self._logger.debug(' Content: '+str(msg['content'])+'\n --->\n ')
128 130
129 131 # Find and call actual handler for message
130 132 handler = self.handlers.get(msg['msg_type'], None)
131 133 if handler is None:
132 io.raw_print_err("UNKNOWN MESSAGE TYPE:", msg)
134 self._logger.error("UNKNOWN MESSAGE TYPE:" +str(msg))
133 135 else:
134 136 handler(ident, msg)
135 137
136 138 # Check whether we should exit, in case the incoming message set the
137 139 # exit flag on
138 140 if self.shell.exit_now:
139 io.raw_print('\nExiting IPython kernel...')
141 self._logger.debug('\nExiting IPython kernel...')
140 142 # We do a normal, clean exit, which allows any actions registered
141 143 # via atexit (such as history saving) to take place.
142 144 sys.exit(0)
143 145
144 146
145 147 def start(self):
146 148 """ Start the kernel main loop.
147 149 """
148 150 while True:
149 151 time.sleep(self._poll_interval)
150 152 self.do_one_iteration()
151 153
152 154 def record_ports(self, xrep_port, pub_port, req_port, hb_port):
153 155 """Record the ports that this kernel is using.
154 156
155 157 The creator of the Kernel instance must call this methods if they
156 158 want the :meth:`connect_request` method to return the port numbers.
157 159 """
158 160 self._recorded_ports = {
159 161 'xrep_port' : xrep_port,
160 162 'pub_port' : pub_port,
161 163 'req_port' : req_port,
162 164 'hb_port' : hb_port
163 165 }
164 166
165 167 #---------------------------------------------------------------------------
166 168 # Kernel request handlers
167 169 #---------------------------------------------------------------------------
168 170
169 171 def _publish_pyin(self, code, parent):
170 172 """Publish the code request on the pyin stream."""
171 173
172 174 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
173 175 self.pub_socket.send_json(pyin_msg)
174 176
175 177 def execute_request(self, ident, parent):
176 178
177 179 status_msg = self.session.msg(
178 180 u'status',
179 181 {u'execution_state':u'busy'},
180 182 parent=parent
181 183 )
182 184 self.pub_socket.send_json(status_msg)
183 185
184 186 try:
185 187 content = parent[u'content']
186 188 code = content[u'code']
187 189 silent = content[u'silent']
188 190 except:
189 io.raw_print_err("Got bad msg: ")
190 io.raw_print_err(Message(parent))
191 self._logger.error("Got bad msg: ")
192 self._logger.error(str(Message(parent)))
191 193 return
192 194
193 195 shell = self.shell # we'll need this a lot here
194 196
195 197 # Replace raw_input. Note that is not sufficient to replace
196 198 # raw_input in the user namespace.
197 199 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
198 200 __builtin__.raw_input = raw_input
199 201
200 202 # Set the parent message of the display hook and out streams.
201 203 shell.displayhook.set_parent(parent)
202 204 sys.stdout.set_parent(parent)
203 205 sys.stderr.set_parent(parent)
204 206
205 207 # Re-broadcast our input for the benefit of listening clients, and
206 208 # start computing output
207 209 if not silent:
208 210 self._publish_pyin(code, parent)
209 211
210 212 reply_content = {}
211 213 try:
212 214 if silent:
213 215 # run_code uses 'exec' mode, so no displayhook will fire, and it
214 216 # doesn't call logging or history manipulations. Print
215 217 # statements in that code will obviously still execute.
216 218 shell.run_code(code)
217 219 else:
218 220 # FIXME: the shell calls the exception handler itself.
219 221 shell._reply_content = None
220 222 shell.run_cell(code)
221 223 except:
222 224 status = u'error'
223 225 # FIXME: this code right now isn't being used yet by default,
224 226 # because the runlines() call above directly fires off exception
225 227 # reporting. This code, therefore, is only active in the scenario
226 228 # where runlines itself has an unhandled exception. We need to
227 229 # uniformize this, for all exception construction to come from a
228 230 # single location in the codbase.
229 231 etype, evalue, tb = sys.exc_info()
230 232 tb_list = traceback.format_exception(etype, evalue, tb)
231 233 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
232 234 else:
233 235 status = u'ok'
234 236
235 237 reply_content[u'status'] = status
236 238
237 239 # Return the execution counter so clients can display prompts
238 240 reply_content['execution_count'] = shell.execution_count -1
239 241
240 242 # FIXME - fish exception info out of shell, possibly left there by
241 243 # runlines. We'll need to clean up this logic later.
242 244 if shell._reply_content is not None:
243 245 reply_content.update(shell._reply_content)
244 246
245 247 # At this point, we can tell whether the main code execution succeeded
246 248 # or not. If it did, we proceed to evaluate user_variables/expressions
247 249 if reply_content['status'] == 'ok':
248 250 reply_content[u'user_variables'] = \
249 251 shell.user_variables(content[u'user_variables'])
250 252 reply_content[u'user_expressions'] = \
251 253 shell.user_expressions(content[u'user_expressions'])
252 254 else:
253 255 # If there was an error, don't even try to compute variables or
254 256 # expressions
255 257 reply_content[u'user_variables'] = {}
256 258 reply_content[u'user_expressions'] = {}
257 259
258 260 # Payloads should be retrieved regardless of outcome, so we can both
259 261 # recover partial output (that could have been generated early in a
260 262 # block, before an error) and clear the payload system always.
261 263 reply_content[u'payload'] = shell.payload_manager.read_payload()
262 264 # Be agressive about clearing the payload because we don't want
263 265 # it to sit in memory until the next execute_request comes in.
264 266 shell.payload_manager.clear_payload()
265 267
266 268 # Send the reply.
267 269 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
268 io.raw_print(reply_msg)
270 self._logger.debug(str(reply_msg))
269 271
270 272 # Flush output before sending the reply.
271 273 sys.stdout.flush()
272 274 sys.stderr.flush()
273 275 # FIXME: on rare occasions, the flush doesn't seem to make it to the
274 276 # clients... This seems to mitigate the problem, but we definitely need
275 277 # to better understand what's going on.
276 278 if self._execute_sleep:
277 279 time.sleep(self._execute_sleep)
278 280
279 281 self.reply_socket.send(ident, zmq.SNDMORE)
280 282 self.reply_socket.send_json(reply_msg)
281 283 if reply_msg['content']['status'] == u'error':
282 284 self._abort_queue()
283 285
284 286 status_msg = self.session.msg(
285 287 u'status',
286 288 {u'execution_state':u'idle'},
287 289 parent=parent
288 290 )
289 291 self.pub_socket.send_json(status_msg)
290 292
291 293 def complete_request(self, ident, parent):
292 294 txt, matches = self._complete(parent)
293 295 matches = {'matches' : matches,
294 296 'matched_text' : txt,
295 297 'status' : 'ok'}
296 298 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
297 299 matches, parent, ident)
298 io.raw_print(completion_msg)
300 self._logger.debug(str(completion_msg))
299 301
300 302 def object_info_request(self, ident, parent):
301 303 object_info = self.shell.object_inspect(parent['content']['oname'])
302 304 # Before we send this object over, we scrub it for JSON usage
303 305 oinfo = json_clean(object_info)
304 306 msg = self.session.send(self.reply_socket, 'object_info_reply',
305 307 oinfo, parent, ident)
306 io.raw_print(msg)
308 self._logger.debug(msg)
307 309
308 310 def history_request(self, ident, parent):
309 311 output = parent['content']['output']
310 312 index = parent['content']['index']
311 313 raw = parent['content']['raw']
312 314 hist = self.shell.get_history(index=index, raw=raw, output=output)
313 315 content = {'history' : hist}
314 316 msg = self.session.send(self.reply_socket, 'history_reply',
315 317 content, parent, ident)
316 io.raw_print(msg)
318 self._logger.debug(str(msg))
317 319
318 320 def connect_request(self, ident, parent):
319 321 if self._recorded_ports is not None:
320 322 content = self._recorded_ports.copy()
321 323 else:
322 324 content = {}
323 325 msg = self.session.send(self.reply_socket, 'connect_reply',
324 326 content, parent, ident)
325 io.raw_print(msg)
327 self._logger.debug(msg)
326 328
327 329 def shutdown_request(self, ident, parent):
328 330 self.shell.exit_now = True
329 331 self._shutdown_message = self.session.msg(u'shutdown_reply', parent['content'], parent)
330 332 sys.exit(0)
331 333
332 334 #---------------------------------------------------------------------------
333 335 # Protected interface
334 336 #---------------------------------------------------------------------------
335 337
336 338 def _abort_queue(self):
337 339 while True:
338 340 try:
339 341 ident = self.reply_socket.recv(zmq.NOBLOCK)
340 342 except zmq.ZMQError, e:
341 343 if e.errno == zmq.EAGAIN:
342 344 break
343 345 else:
344 346 assert self.reply_socket.rcvmore(), \
345 347 "Unexpected missing message part."
346 348 msg = self.reply_socket.recv_json()
347 io.raw_print("Aborting:\n", Message(msg))
349 self._logger.debug("Aborting:\n"+str(Message(msg)))
348 350 msg_type = msg['msg_type']
349 351 reply_type = msg_type.split('_')[0] + '_reply'
350 352 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
351 io.raw_print(reply_msg)
353 self._logger.debug(reply_msg)
352 354 self.reply_socket.send(ident,zmq.SNDMORE)
353 355 self.reply_socket.send_json(reply_msg)
354 356 # We need to wait a bit for requests to come in. This can probably
355 357 # be set shorter for true asynchronous clients.
356 358 time.sleep(0.1)
357 359
358 360 def _raw_input(self, prompt, ident, parent):
359 361 # Flush output before making the request.
360 362 sys.stderr.flush()
361 363 sys.stdout.flush()
362 364
363 365 # Send the input request.
364 366 content = dict(prompt=prompt)
365 367 msg = self.session.msg(u'input_request', content, parent)
366 368 self.req_socket.send_json(msg)
367 369
368 370 # Await a response.
369 371 reply = self.req_socket.recv_json()
370 372 try:
371 373 value = reply['content']['value']
372 374 except:
373 io.raw_print_err("Got bad raw_input reply: ")
374 io.raw_print_err(Message(parent))
375 self._logger.error("Got bad raw_input reply: ")
376 self._logger.error(str(Message(parent)))
375 377 value = ''
376 378 return value
377 379
378 380 def _complete(self, msg):
379 381 c = msg['content']
380 382 try:
381 383 cpos = int(c['cursor_pos'])
382 384 except:
383 385 # If we don't get something that we can convert to an integer, at
384 386 # least attempt the completion guessing the cursor is at the end of
385 387 # the text, if there's any, and otherwise of the line
386 388 cpos = len(c['text'])
387 389 if cpos==0:
388 390 cpos = len(c['line'])
389 391 return self.shell.complete(c['text'], c['line'], cpos)
390 392
391 393 def _object_info(self, context):
392 394 symbol, leftover = self._symbol_from_context(context)
393 395 if symbol is not None and not leftover:
394 396 doc = getattr(symbol, '__doc__', '')
395 397 else:
396 398 doc = ''
397 399 object_info = dict(docstring = doc)
398 400 return object_info
399 401
400 402 def _symbol_from_context(self, context):
401 403 if not context:
402 404 return None, context
403 405
404 406 base_symbol_string = context[0]
405 407 symbol = self.shell.user_ns.get(base_symbol_string, None)
406 408 if symbol is None:
407 409 symbol = __builtin__.__dict__.get(base_symbol_string, None)
408 410 if symbol is None:
409 411 return None, context
410 412
411 413 context = context[1:]
412 414 for i, name in enumerate(context):
413 415 new_symbol = getattr(symbol, name, None)
414 416 if new_symbol is None:
415 417 return symbol, context[i:]
416 418 else:
417 419 symbol = new_symbol
418 420
419 421 return symbol, []
420 422
421 423 def _at_shutdown(self):
422 424 """Actions taken at shutdown by the kernel, called by python's atexit.
423 425 """
424 426 # io.rprint("Kernel at_shutdown") # dbg
425 427 if self._shutdown_message is not None:
426 428 self.reply_socket.send_json(self._shutdown_message)
427 429 self.pub_socket.send_json(self._shutdown_message)
428 io.raw_print(self._shutdown_message)
430 self._logger.debug(str(self._shutdown_message))
429 431 # A very short sleep to give zmq time to flush its message buffers
430 432 # before Python truly shuts down.
431 433 time.sleep(0.01)
432 434
433 435
434 436 class QtKernel(Kernel):
435 437 """A Kernel subclass with Qt support."""
436 438
437 439 def start(self):
438 440 """Start a kernel with QtPy4 event loop integration."""
439 441
440 442 from PyQt4 import QtCore
441 443 from IPython.lib.guisupport import get_app_qt4, start_event_loop_qt4
442 444
443 445 self.app = get_app_qt4([" "])
444 446 self.app.setQuitOnLastWindowClosed(False)
445 447 self.timer = QtCore.QTimer()
446 448 self.timer.timeout.connect(self.do_one_iteration)
447 449 # Units for the timer are in milliseconds
448 450 self.timer.start(1000*self._poll_interval)
449 451 start_event_loop_qt4(self.app)
450 452
451 453
452 454 class WxKernel(Kernel):
453 455 """A Kernel subclass with Wx support."""
454 456
455 457 def start(self):
456 458 """Start a kernel with wx event loop support."""
457 459
458 460 import wx
459 461 from IPython.lib.guisupport import start_event_loop_wx
460 462
461 463 doi = self.do_one_iteration
462 464 # Wx uses milliseconds
463 465 poll_interval = int(1000*self._poll_interval)
464 466
465 467 # We have to put the wx.Timer in a wx.Frame for it to fire properly.
466 468 # We make the Frame hidden when we create it in the main app below.
467 469 class TimerFrame(wx.Frame):
468 470 def __init__(self, func):
469 471 wx.Frame.__init__(self, None, -1)
470 472 self.timer = wx.Timer(self)
471 473 # Units for the timer are in milliseconds
472 474 self.timer.Start(poll_interval)
473 475 self.Bind(wx.EVT_TIMER, self.on_timer)
474 476 self.func = func
475 477
476 478 def on_timer(self, event):
477 479 self.func()
478 480
479 481 # We need a custom wx.App to create our Frame subclass that has the
480 482 # wx.Timer to drive the ZMQ event loop.
481 483 class IPWxApp(wx.App):
482 484 def OnInit(self):
483 485 self.frame = TimerFrame(doi)
484 486 self.frame.Show(False)
485 487 return True
486 488
487 489 # The redirect=False here makes sure that wx doesn't replace
488 490 # sys.stdout/stderr with its own classes.
489 491 self.app = IPWxApp(redirect=False)
490 492 start_event_loop_wx(self.app)
491 493
492 494
493 495 class TkKernel(Kernel):
494 496 """A Kernel subclass with Tk support."""
495 497
496 498 def start(self):
497 499 """Start a Tk enabled event loop."""
498 500
499 501 import Tkinter
500 502 doi = self.do_one_iteration
501 503 # Tk uses milliseconds
502 504 poll_interval = int(1000*self._poll_interval)
503 505 # For Tkinter, we create a Tk object and call its withdraw method.
504 506 class Timer(object):
505 507 def __init__(self, func):
506 508 self.app = Tkinter.Tk()
507 509 self.app.withdraw()
508 510 self.func = func
509 511
510 512 def on_timer(self):
511 513 self.func()
512 514 self.app.after(poll_interval, self.on_timer)
513 515
514 516 def start(self):
515 517 self.on_timer() # Call it once to get things going.
516 518 self.app.mainloop()
517 519
518 520 self.timer = Timer(doi)
519 521 self.timer.start()
520 522
521 523
522 524 class GTKKernel(Kernel):
523 525 """A Kernel subclass with GTK support."""
524 526
525 527 def start(self):
526 528 """Start the kernel, coordinating with the GTK event loop"""
527 529 from .gui.gtkembed import GTKEmbed
528 530
529 531 gtk_kernel = GTKEmbed(self)
530 532 gtk_kernel.start()
531 533
532 534
533 535 #-----------------------------------------------------------------------------
534 536 # Kernel main and launch functions
535 537 #-----------------------------------------------------------------------------
536 538
537 539 def launch_kernel(ip=None, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
538 540 independent=False, pylab=False, colors=None):
539 541 """Launches a localhost kernel, binding to the specified ports.
540 542
541 543 Parameters
542 544 ----------
543 545 ip : str, optional
544 546 The ip address the kernel will bind to.
545 547
546 548 xrep_port : int, optional
547 549 The port to use for XREP channel.
548 550
549 551 pub_port : int, optional
550 552 The port to use for the SUB channel.
551 553
552 554 req_port : int, optional
553 555 The port to use for the REQ (raw input) channel.
554 556
555 557 hb_port : int, optional
556 558 The port to use for the hearbeat REP channel.
557 559
558 560 independent : bool, optional (default False)
559 561 If set, the kernel process is guaranteed to survive if this process
560 562 dies. If not set, an effort is made to ensure that the kernel is killed
561 563 when this process dies. Note that in this case it is still good practice
562 564 to kill kernels manually before exiting.
563 565
564 566 pylab : bool or string, optional (default False)
565 567 If not False, the kernel will be launched with pylab enabled. If a
566 568 string is passed, matplotlib will use the specified backend. Otherwise,
567 569 matplotlib's default backend will be used.
568 570
569 571 colors : None or string, optional (default None)
570 572 If not None, specify the color scheme. One of (NoColor, LightBG, Linux)
571 573
572 574 Returns
573 575 -------
574 576 A tuple of form:
575 577 (kernel_process, xrep_port, pub_port, req_port)
576 578 where kernel_process is a Popen object and the ports are integers.
577 579 """
578 580 extra_arguments = []
579 581 if pylab:
580 582 extra_arguments.append('--pylab')
581 583 if isinstance(pylab, basestring):
582 584 extra_arguments.append(pylab)
583 585 if ip is not None:
584 586 extra_arguments.append('--ip')
585 587 if isinstance(ip, basestring):
586 588 extra_arguments.append(ip)
587 589 if colors is not None:
588 590 extra_arguments.append('--colors')
589 591 extra_arguments.append(colors)
590 592 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
591 593 xrep_port, pub_port, req_port, hb_port,
592 594 independent, extra_arguments)
593 595
594 596
595 597 def main():
596 598 """ The IPython kernel main entry point.
597 599 """
598 600 parser = make_argument_parser()
599 601 parser.add_argument('--pylab', type=str, metavar='GUI', nargs='?',
600 602 const='auto', help = \
601 603 "Pre-load matplotlib and numpy for interactive use. If GUI is not \
602 604 given, the GUI backend is matplotlib's, otherwise use one of: \
603 605 ['tk', 'gtk', 'qt', 'wx', 'inline'].")
604 606 parser.add_argument('--colors',
605 607 type=str, dest='colors',
606 608 help="Set the color scheme (NoColor, Linux, and LightBG).",
607 609 metavar='ZMQInteractiveShell.colors')
608 610 namespace = parser.parse_args()
609 611
610 612 kernel_class = Kernel
611 613
612 614 kernel_classes = {
613 615 'qt' : QtKernel,
614 616 'qt4': QtKernel,
615 617 'inline': Kernel,
616 618 'wx' : WxKernel,
617 619 'tk' : TkKernel,
618 620 'gtk': GTKKernel,
619 621 }
620 622 if namespace.pylab:
621 623 if namespace.pylab == 'auto':
622 624 gui, backend = pylabtools.find_gui_and_backend()
623 625 else:
624 626 gui, backend = pylabtools.find_gui_and_backend(namespace.pylab)
625 627 kernel_class = kernel_classes.get(gui)
626 628 if kernel_class is None:
627 629 raise ValueError('GUI is not supported: %r' % gui)
628 630 pylabtools.activate_matplotlib(backend)
629 631 if namespace.colors:
630 632 ZMQInteractiveShell.colors=namespace.colors
631 633
632 634 kernel = make_kernel(namespace, kernel_class, OutStream)
633 635
634 636 if namespace.pylab:
635 637 pylabtools.import_pylab(kernel.shell.user_ns, backend,
636 638 shell=kernel.shell)
637 639
638 640 start_kernel(namespace, kernel)
639 641
640 642
641 643 if __name__ == '__main__':
642 644 main()
@@ -1,906 +1,907 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2010 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 from Queue import Queue, Empty
21 21 from subprocess import Popen
22 22 import signal
23 23 import sys
24 24 from threading import Thread
25 25 import time
26 import logging
26 27
27 28 # System library imports.
28 29 import zmq
29 30 from zmq import POLLIN, POLLOUT, POLLERR
30 31 from zmq.eventloop import ioloop
31 32
32 33 # Local imports.
33 34 from IPython.utils import io
34 35 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
35 36 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
36 37 from session import Session
37 38
38 39 #-----------------------------------------------------------------------------
39 40 # Constants and exceptions
40 41 #-----------------------------------------------------------------------------
41 42
42 43 class InvalidPortNumber(Exception):
43 44 pass
44 45
45 46 #-----------------------------------------------------------------------------
46 47 # Utility functions
47 48 #-----------------------------------------------------------------------------
48 49
49 50 # some utilities to validate message structure, these might get moved elsewhere
50 51 # if they prove to have more generic utility
51 52
52 53 def validate_string_list(lst):
53 54 """Validate that the input is a list of strings.
54 55
55 56 Raises ValueError if not."""
56 57 if not isinstance(lst, list):
57 58 raise ValueError('input %r must be a list' % lst)
58 59 for x in lst:
59 60 if not isinstance(x, basestring):
60 61 raise ValueError('element %r in list must be a string' % x)
61 62
62 63
63 64 def validate_string_dict(dct):
64 65 """Validate that the input is a dict with string keys and values.
65 66
66 67 Raises ValueError if not."""
67 68 for k,v in dct.iteritems():
68 69 if not isinstance(k, basestring):
69 70 raise ValueError('key %r in dict must be a string' % k)
70 71 if not isinstance(v, basestring):
71 72 raise ValueError('value %r in dict must be a string' % v)
72 73
73 74
74 75 #-----------------------------------------------------------------------------
75 76 # ZMQ Socket Channel classes
76 77 #-----------------------------------------------------------------------------
77 78
78 79 class ZmqSocketChannel(Thread):
79 80 """The base class for the channels that use ZMQ sockets.
80 81 """
81 82 context = None
82 83 session = None
83 84 socket = None
84 85 ioloop = None
85 86 iostate = None
86 87 _address = None
87 88
88 89 def __init__(self, context, session, address):
89 90 """Create a channel
90 91
91 92 Parameters
92 93 ----------
93 94 context : :class:`zmq.Context`
94 95 The ZMQ context to use.
95 96 session : :class:`session.Session`
96 97 The session to use.
97 98 address : tuple
98 99 Standard (ip, port) tuple that the kernel is listening on.
99 100 """
100 101 super(ZmqSocketChannel, self).__init__()
101 102 self.daemon = True
102 103
103 104 self.context = context
104 105 self.session = session
105 106 if address[1] == 0:
106 107 message = 'The port number for a channel cannot be 0.'
107 108 raise InvalidPortNumber(message)
108 109 self._address = address
109 110
110 111 def stop(self):
111 112 """Stop the channel's activity.
112 113
113 114 This calls :method:`Thread.join` and returns when the thread
114 115 terminates. :class:`RuntimeError` will be raised if
115 116 :method:`self.start` is called again.
116 117 """
117 118 self.join()
118 119
119 120 @property
120 121 def address(self):
121 122 """Get the channel's address as an (ip, port) tuple.
122 123
123 124 By the default, the address is (localhost, 0), where 0 means a random
124 125 port.
125 126 """
126 127 return self._address
127 128
128 129 def add_io_state(self, state):
129 130 """Add IO state to the eventloop.
130 131
131 132 Parameters
132 133 ----------
133 134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 135 The IO state flag to set.
135 136
136 137 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 138 """
138 139 def add_io_state_callback():
139 140 if not self.iostate & state:
140 141 self.iostate = self.iostate | state
141 142 self.ioloop.update_handler(self.socket, self.iostate)
142 143 self.ioloop.add_callback(add_io_state_callback)
143 144
144 145 def drop_io_state(self, state):
145 146 """Drop IO state from the eventloop.
146 147
147 148 Parameters
148 149 ----------
149 150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 151 The IO state flag to set.
151 152
152 153 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 154 """
154 155 def drop_io_state_callback():
155 156 if self.iostate & state:
156 157 self.iostate = self.iostate & (~state)
157 158 self.ioloop.update_handler(self.socket, self.iostate)
158 159 self.ioloop.add_callback(drop_io_state_callback)
159 160
160 161
161 162 class XReqSocketChannel(ZmqSocketChannel):
162 163 """The XREQ channel for issues request/replies to the kernel.
163 164 """
164 165
165 166 command_queue = None
166 167
167 168 def __init__(self, context, session, address):
168 169 super(XReqSocketChannel, self).__init__(context, session, address)
169 170 self.command_queue = Queue()
170 171 self.ioloop = ioloop.IOLoop()
171 172
172 173 def run(self):
173 174 """The thread's main activity. Call start() instead."""
174 175 self.socket = self.context.socket(zmq.XREQ)
175 176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
176 177 self.socket.connect('tcp://%s:%i' % self.address)
177 178 self.iostate = POLLERR|POLLIN
178 179 self.ioloop.add_handler(self.socket, self._handle_events,
179 180 self.iostate)
180 181 self.ioloop.start()
181 182
182 183 def stop(self):
183 184 self.ioloop.stop()
184 185 super(XReqSocketChannel, self).stop()
185 186
186 187 def call_handlers(self, msg):
187 188 """This method is called in the ioloop thread when a message arrives.
188 189
189 190 Subclasses should override this method to handle incoming messages.
190 191 It is important to remember that this method is called in the thread
191 192 so that some logic must be done to ensure that the application leve
192 193 handlers are called in the application thread.
193 194 """
194 195 raise NotImplementedError('call_handlers must be defined in a subclass.')
195 196
196 197 def execute(self, code, silent=False,
197 198 user_variables=None, user_expressions=None):
198 199 """Execute code in the kernel.
199 200
200 201 Parameters
201 202 ----------
202 203 code : str
203 204 A string of Python code.
204 205
205 206 silent : bool, optional (default False)
206 207 If set, the kernel will execute the code as quietly possible.
207 208
208 209 user_variables : list, optional
209 210 A list of variable names to pull from the user's namespace. They
210 211 will come back as a dict with these names as keys and their
211 212 :func:`repr` as values.
212 213
213 214 user_expressions : dict, optional
214 215 A dict with string keys and to pull from the user's
215 216 namespace. They will come back as a dict with these names as keys
216 217 and their :func:`repr` as values.
217 218
218 219 Returns
219 220 -------
220 221 The msg_id of the message sent.
221 222 """
222 223 if user_variables is None:
223 224 user_variables = []
224 225 if user_expressions is None:
225 226 user_expressions = {}
226 227
227 228 # Don't waste network traffic if inputs are invalid
228 229 if not isinstance(code, basestring):
229 230 raise ValueError('code %r must be a string' % code)
230 231 validate_string_list(user_variables)
231 232 validate_string_dict(user_expressions)
232 233
233 234 # Create class for content/msg creation. Related to, but possibly
234 235 # not in Session.
235 236 content = dict(code=code, silent=silent,
236 237 user_variables=user_variables,
237 238 user_expressions=user_expressions)
238 239 msg = self.session.msg('execute_request', content)
239 240 self._queue_request(msg)
240 241 return msg['header']['msg_id']
241 242
242 243 def complete(self, text, line, cursor_pos, block=None):
243 244 """Tab complete text in the kernel's namespace.
244 245
245 246 Parameters
246 247 ----------
247 248 text : str
248 249 The text to complete.
249 250 line : str
250 251 The full line of text that is the surrounding context for the
251 252 text to complete.
252 253 cursor_pos : int
253 254 The position of the cursor in the line where the completion was
254 255 requested.
255 256 block : str, optional
256 257 The full block of code in which the completion is being requested.
257 258
258 259 Returns
259 260 -------
260 261 The msg_id of the message sent.
261 262 """
262 263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 264 msg = self.session.msg('complete_request', content)
264 265 self._queue_request(msg)
265 266 return msg['header']['msg_id']
266 267
267 268 def object_info(self, oname):
268 269 """Get metadata information about an object.
269 270
270 271 Parameters
271 272 ----------
272 273 oname : str
273 274 A string specifying the object name.
274 275
275 276 Returns
276 277 -------
277 278 The msg_id of the message sent.
278 279 """
279 280 content = dict(oname=oname)
280 281 msg = self.session.msg('object_info_request', content)
281 282 self._queue_request(msg)
282 283 return msg['header']['msg_id']
283 284
284 285 def history(self, index=None, raw=False, output=True):
285 286 """Get the history list.
286 287
287 288 Parameters
288 289 ----------
289 290 index : n or (n1, n2) or None
290 291 If n, then the last entries. If a tuple, then all in
291 292 range(n1, n2). If None, then all entries. Raises IndexError if
292 293 the format of index is incorrect.
293 294 raw : bool
294 295 If True, return the raw input.
295 296 output : bool
296 297 If True, then return the output as well.
297 298
298 299 Returns
299 300 -------
300 301 The msg_id of the message sent.
301 302 """
302 303 content = dict(index=index, raw=raw, output=output)
303 304 msg = self.session.msg('history_request', content)
304 305 self._queue_request(msg)
305 306 return msg['header']['msg_id']
306 307
307 308 def shutdown(self, restart=False):
308 309 """Request an immediate kernel shutdown.
309 310
310 311 Upon receipt of the (empty) reply, client code can safely assume that
311 312 the kernel has shut down and it's safe to forcefully terminate it if
312 313 it's still alive.
313 314
314 315 The kernel will send the reply via a function registered with Python's
315 316 atexit module, ensuring it's truly done as the kernel is done with all
316 317 normal operation.
317 318 """
318 319 # Send quit message to kernel. Once we implement kernel-side setattr,
319 320 # this should probably be done that way, but for now this will do.
320 321 msg = self.session.msg('shutdown_request', {'restart':restart})
321 322 self._queue_request(msg)
322 323 return msg['header']['msg_id']
323 324
324 325 def _handle_events(self, socket, events):
325 326 if events & POLLERR:
326 327 self._handle_err()
327 328 if events & POLLOUT:
328 329 self._handle_send()
329 330 if events & POLLIN:
330 331 self._handle_recv()
331 332
332 333 def _handle_recv(self):
333 334 msg = self.socket.recv_json()
334 335 self.call_handlers(msg)
335 336
336 337 def _handle_send(self):
337 338 try:
338 339 msg = self.command_queue.get(False)
339 340 except Empty:
340 341 pass
341 342 else:
342 343 self.socket.send_json(msg)
343 344 if self.command_queue.empty():
344 345 self.drop_io_state(POLLOUT)
345 346
346 347 def _handle_err(self):
347 348 # We don't want to let this go silently, so eventually we should log.
348 349 raise zmq.ZMQError()
349 350
350 351 def _queue_request(self, msg):
351 352 self.command_queue.put(msg)
352 353 self.add_io_state(POLLOUT)
353 354
354 355
355 356 class SubSocketChannel(ZmqSocketChannel):
356 357 """The SUB channel which listens for messages that the kernel publishes.
357 358 """
358 359
359 360 def __init__(self, context, session, address):
360 361 super(SubSocketChannel, self).__init__(context, session, address)
361 362 self.ioloop = ioloop.IOLoop()
362 363
363 364 def run(self):
364 365 """The thread's main activity. Call start() instead."""
365 366 self.socket = self.context.socket(zmq.SUB)
366 367 self.socket.setsockopt(zmq.SUBSCRIBE,'')
367 368 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
368 369 self.socket.connect('tcp://%s:%i' % self.address)
369 370 self.iostate = POLLIN|POLLERR
370 371 self.ioloop.add_handler(self.socket, self._handle_events,
371 372 self.iostate)
372 373 self.ioloop.start()
373 374
374 375 def stop(self):
375 376 self.ioloop.stop()
376 377 super(SubSocketChannel, self).stop()
377 378
378 379 def call_handlers(self, msg):
379 380 """This method is called in the ioloop thread when a message arrives.
380 381
381 382 Subclasses should override this method to handle incoming messages.
382 383 It is important to remember that this method is called in the thread
383 384 so that some logic must be done to ensure that the application leve
384 385 handlers are called in the application thread.
385 386 """
386 387 raise NotImplementedError('call_handlers must be defined in a subclass.')
387 388
388 389 def flush(self, timeout=1.0):
389 390 """Immediately processes all pending messages on the SUB channel.
390 391
391 392 Callers should use this method to ensure that :method:`call_handlers`
392 393 has been called for all messages that have been received on the
393 394 0MQ SUB socket of this channel.
394 395
395 396 This method is thread safe.
396 397
397 398 Parameters
398 399 ----------
399 400 timeout : float, optional
400 401 The maximum amount of time to spend flushing, in seconds. The
401 402 default is one second.
402 403 """
403 404 # We do the IOLoop callback process twice to ensure that the IOLoop
404 405 # gets to perform at least one full poll.
405 406 stop_time = time.time() + timeout
406 407 for i in xrange(2):
407 408 self._flushed = False
408 409 self.ioloop.add_callback(self._flush)
409 410 while not self._flushed and time.time() < stop_time:
410 411 time.sleep(0.01)
411 412
412 413 def _handle_events(self, socket, events):
413 414 # Turn on and off POLLOUT depending on if we have made a request
414 415 if events & POLLERR:
415 416 self._handle_err()
416 417 if events & POLLIN:
417 418 self._handle_recv()
418 419
419 420 def _handle_err(self):
420 421 # We don't want to let this go silently, so eventually we should log.
421 422 raise zmq.ZMQError()
422 423
423 424 def _handle_recv(self):
424 425 # Get all of the messages we can
425 426 while True:
426 427 try:
427 428 msg = self.socket.recv_json(zmq.NOBLOCK)
428 429 except zmq.ZMQError:
429 430 # Check the errno?
430 431 # Will this trigger POLLERR?
431 432 break
432 433 else:
433 434 self.call_handlers(msg)
434 435
435 436 def _flush(self):
436 437 """Callback for :method:`self.flush`."""
437 438 self._flushed = True
438 439
439 440
440 441 class RepSocketChannel(ZmqSocketChannel):
441 442 """A reply channel to handle raw_input requests that the kernel makes."""
442 443
443 444 msg_queue = None
444 445
445 446 def __init__(self, context, session, address):
446 447 super(RepSocketChannel, self).__init__(context, session, address)
447 448 self.ioloop = ioloop.IOLoop()
448 449 self.msg_queue = Queue()
449 450
450 451 def run(self):
451 452 """The thread's main activity. Call start() instead."""
452 453 self.socket = self.context.socket(zmq.XREQ)
453 454 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
454 455 self.socket.connect('tcp://%s:%i' % self.address)
455 456 self.iostate = POLLERR|POLLIN
456 457 self.ioloop.add_handler(self.socket, self._handle_events,
457 458 self.iostate)
458 459 self.ioloop.start()
459 460
460 461 def stop(self):
461 462 self.ioloop.stop()
462 463 super(RepSocketChannel, self).stop()
463 464
464 465 def call_handlers(self, msg):
465 466 """This method is called in the ioloop thread when a message arrives.
466 467
467 468 Subclasses should override this method to handle incoming messages.
468 469 It is important to remember that this method is called in the thread
469 470 so that some logic must be done to ensure that the application leve
470 471 handlers are called in the application thread.
471 472 """
472 473 raise NotImplementedError('call_handlers must be defined in a subclass.')
473 474
474 475 def input(self, string):
475 476 """Send a string of raw input to the kernel."""
476 477 content = dict(value=string)
477 478 msg = self.session.msg('input_reply', content)
478 479 self._queue_reply(msg)
479 480
480 481 def _handle_events(self, socket, events):
481 482 if events & POLLERR:
482 483 self._handle_err()
483 484 if events & POLLOUT:
484 485 self._handle_send()
485 486 if events & POLLIN:
486 487 self._handle_recv()
487 488
488 489 def _handle_recv(self):
489 490 msg = self.socket.recv_json()
490 491 self.call_handlers(msg)
491 492
492 493 def _handle_send(self):
493 494 try:
494 495 msg = self.msg_queue.get(False)
495 496 except Empty:
496 497 pass
497 498 else:
498 499 self.socket.send_json(msg)
499 500 if self.msg_queue.empty():
500 501 self.drop_io_state(POLLOUT)
501 502
502 503 def _handle_err(self):
503 504 # We don't want to let this go silently, so eventually we should log.
504 505 raise zmq.ZMQError()
505 506
506 507 def _queue_reply(self, msg):
507 508 self.msg_queue.put(msg)
508 509 self.add_io_state(POLLOUT)
509 510
510 511
511 512 class HBSocketChannel(ZmqSocketChannel):
512 513 """The heartbeat channel which monitors the kernel heartbeat.
513 514
514 515 Note that the heartbeat channel is paused by default. As long as you start
515 516 this channel, the kernel manager will ensure that it is paused and un-paused
516 517 as appropriate.
517 518 """
518 519
519 520 time_to_dead = 3.0
520 521 socket = None
521 522 poller = None
522 523 _running = None
523 524 _pause = None
524 525
525 526 def __init__(self, context, session, address):
526 527 super(HBSocketChannel, self).__init__(context, session, address)
527 528 self._running = False
528 529 self._pause = True
529 530
530 531 def _create_socket(self):
531 532 self.socket = self.context.socket(zmq.REQ)
532 533 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
533 534 self.socket.connect('tcp://%s:%i' % self.address)
534 535 self.poller = zmq.Poller()
535 536 self.poller.register(self.socket, zmq.POLLIN)
536 537
537 538 def run(self):
538 539 """The thread's main activity. Call start() instead."""
539 540 self._create_socket()
540 541 self._running = True
541 542 while self._running:
542 543 if self._pause:
543 544 time.sleep(self.time_to_dead)
544 545 else:
545 546 since_last_heartbeat = 0.0
546 547 request_time = time.time()
547 548 try:
548 549 #io.rprint('Ping from HB channel') # dbg
549 550 self.socket.send_json('ping')
550 551 except zmq.ZMQError, e:
551 552 #io.rprint('*** HB Error:', e) # dbg
552 553 if e.errno == zmq.EFSM:
553 554 #io.rprint('sleep...', self.time_to_dead) # dbg
554 555 time.sleep(self.time_to_dead)
555 556 self._create_socket()
556 557 else:
557 558 raise
558 559 else:
559 560 while True:
560 561 try:
561 562 self.socket.recv_json(zmq.NOBLOCK)
562 563 except zmq.ZMQError, e:
563 564 #io.rprint('*** HB Error 2:', e) # dbg
564 565 if e.errno == zmq.EAGAIN:
565 566 before_poll = time.time()
566 567 until_dead = self.time_to_dead - (before_poll -
567 568 request_time)
568 569
569 570 # When the return value of poll() is an empty
570 571 # list, that is when things have gone wrong
571 572 # (zeromq bug). As long as it is not an empty
572 573 # list, poll is working correctly even if it
573 574 # returns quickly. Note: poll timeout is in
574 575 # milliseconds.
575 576 self.poller.poll(1000*until_dead)
576 577
577 578 since_last_heartbeat = time.time()-request_time
578 579 if since_last_heartbeat > self.time_to_dead:
579 580 self.call_handlers(since_last_heartbeat)
580 581 break
581 582 else:
582 583 # FIXME: We should probably log this instead.
583 584 raise
584 585 else:
585 586 until_dead = self.time_to_dead - (time.time() -
586 587 request_time)
587 588 if until_dead > 0.0:
588 589 #io.rprint('sleep...', self.time_to_dead) # dbg
589 590 time.sleep(until_dead)
590 591 break
591 592
592 593 def pause(self):
593 594 """Pause the heartbeat."""
594 595 self._pause = True
595 596
596 597 def unpause(self):
597 598 """Unpause the heartbeat."""
598 599 self._pause = False
599 600
600 601 def is_beating(self):
601 602 """Is the heartbeat running and not paused."""
602 603 if self.is_alive() and not self._pause:
603 604 return True
604 605 else:
605 606 return False
606 607
607 608 def stop(self):
608 609 self._running = False
609 610 super(HBSocketChannel, self).stop()
610 611
611 612 def call_handlers(self, since_last_heartbeat):
612 613 """This method is called in the ioloop thread when a message arrives.
613 614
614 615 Subclasses should override this method to handle incoming messages.
615 616 It is important to remember that this method is called in the thread
616 617 so that some logic must be done to ensure that the application leve
617 618 handlers are called in the application thread.
618 619 """
619 620 raise NotImplementedError('call_handlers must be defined in a subclass.')
620 621
621 622
622 623 #-----------------------------------------------------------------------------
623 624 # Main kernel manager class
624 625 #-----------------------------------------------------------------------------
625 626
626 627 class KernelManager(HasTraits):
627 628 """ Manages a kernel for a frontend.
628 629
629 630 The SUB channel is for the frontend to receive messages published by the
630 631 kernel.
631 632
632 633 The REQ channel is for the frontend to make requests of the kernel.
633 634
634 635 The REP channel is for the kernel to request stdin (raw_input) from the
635 636 frontend.
636 637 """
637 638 # The PyZMQ Context to use for communication with the kernel.
638 639 context = Instance(zmq.Context,(),{})
639 640
640 641 # The Session to use for communication with the kernel.
641 642 session = Instance(Session,(),{})
642 643
643 644 # The kernel process with which the KernelManager is communicating.
644 645 kernel = Instance(Popen)
645 646
646 647 # The addresses for the communication channels.
647 648 xreq_address = TCPAddress((LOCALHOST, 0))
648 649 sub_address = TCPAddress((LOCALHOST, 0))
649 650 rep_address = TCPAddress((LOCALHOST, 0))
650 651 hb_address = TCPAddress((LOCALHOST, 0))
651 652
652 653 # The classes to use for the various channels.
653 654 xreq_channel_class = Type(XReqSocketChannel)
654 655 sub_channel_class = Type(SubSocketChannel)
655 656 rep_channel_class = Type(RepSocketChannel)
656 657 hb_channel_class = Type(HBSocketChannel)
657 658
658 659 # Protected traits.
659 660 _launch_args = Any
660 661 _xreq_channel = Any
661 662 _sub_channel = Any
662 663 _rep_channel = Any
663 664 _hb_channel = Any
664 665
665 666 def __init__(self, **kwargs):
666 667 super(KernelManager, self).__init__(**kwargs)
667 668 # Uncomment this to try closing the context.
668 669 # atexit.register(self.context.close)
669 670
670 671 #--------------------------------------------------------------------------
671 672 # Channel management methods:
672 673 #--------------------------------------------------------------------------
673 674
674 675 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
675 676 """Starts the channels for this kernel.
676 677
677 678 This will create the channels if they do not exist and then start
678 679 them. If port numbers of 0 are being used (random ports) then you
679 680 must first call :method:`start_kernel`. If the channels have been
680 681 stopped and you call this, :class:`RuntimeError` will be raised.
681 682 """
682 683 if xreq:
683 684 self.xreq_channel.start()
684 685 if sub:
685 686 self.sub_channel.start()
686 687 if rep:
687 688 self.rep_channel.start()
688 689 if hb:
689 690 self.hb_channel.start()
690 691
691 692 def stop_channels(self):
692 693 """Stops all the running channels for this kernel.
693 694 """
694 695 if self.xreq_channel.is_alive():
695 696 self.xreq_channel.stop()
696 697 if self.sub_channel.is_alive():
697 698 self.sub_channel.stop()
698 699 if self.rep_channel.is_alive():
699 700 self.rep_channel.stop()
700 701 if self.hb_channel.is_alive():
701 702 self.hb_channel.stop()
702 703
703 704 @property
704 705 def channels_running(self):
705 706 """Are any of the channels created and running?"""
706 707 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
707 708 self.rep_channel.is_alive() or self.hb_channel.is_alive())
708 709
709 710 #--------------------------------------------------------------------------
710 711 # Kernel process management methods:
711 712 #--------------------------------------------------------------------------
712 713
713 714 def start_kernel(self, **kw):
714 715 """Starts a kernel process and configures the manager to use it.
715 716
716 717 If random ports (port=0) are being used, this method must be called
717 718 before the channels are created.
718 719
719 720 Parameters:
720 721 -----------
721 722 ipython : bool, optional (default True)
722 723 Whether to use an IPython kernel instead of a plain Python kernel.
723 724 """
724 725 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
725 726 self.rep_address, self.hb_address
726 727 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
727 728 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
728 729 raise RuntimeError("Can only launch a kernel on a local interface. "
729 730 "Make sure that the '*_address' attributes are "
730 731 "configured properly. "
731 732 "Currently valid addresses are: %s"%LOCAL_IPS
732 733 )
733 734
734 735 self._launch_args = kw.copy()
735 736 if kw.pop('ipython', True):
736 737 from ipkernel import launch_kernel
737 738 else:
738 739 from pykernel import launch_kernel
739 740 self.kernel, xrep, pub, req, _hb = launch_kernel(
740 741 xrep_port=xreq[1], pub_port=sub[1],
741 742 req_port=rep[1], hb_port=hb[1], **kw)
742 743 self.xreq_address = (xreq[0], xrep)
743 744 self.sub_address = (sub[0], pub)
744 745 self.rep_address = (rep[0], req)
745 746 self.hb_address = (hb[0], _hb)
746 747
747 748 def shutdown_kernel(self, restart=False):
748 749 """ Attempts to the stop the kernel process cleanly. If the kernel
749 750 cannot be stopped, it is killed, if possible.
750 751 """
751 752 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
752 753 if sys.platform == 'win32':
753 754 self.kill_kernel()
754 755 return
755 756
756 757 # Pause the heart beat channel if it exists.
757 758 if self._hb_channel is not None:
758 759 self._hb_channel.pause()
759 760
760 761 # Don't send any additional kernel kill messages immediately, to give
761 762 # the kernel a chance to properly execute shutdown actions. Wait for at
762 763 # most 1s, checking every 0.1s.
763 764 self.xreq_channel.shutdown(restart=restart)
764 765 for i in range(10):
765 766 if self.is_alive:
766 767 time.sleep(0.1)
767 768 else:
768 769 break
769 770 else:
770 771 # OK, we've waited long enough.
771 772 if self.has_kernel:
772 773 self.kill_kernel()
773 774
774 775 def restart_kernel(self, now=False):
775 776 """Restarts a kernel with the same arguments that were used to launch
776 777 it. If the old kernel was launched with random ports, the same ports
777 778 will be used for the new kernel.
778 779
779 780 Parameters
780 781 ----------
781 782 now : bool, optional
782 783 If True, the kernel is forcefully restarted *immediately*, without
783 784 having a chance to do any cleanup action. Otherwise the kernel is
784 785 given 1s to clean up before a forceful restart is issued.
785 786
786 787 In all cases the kernel is restarted, the only difference is whether
787 788 it is given a chance to perform a clean shutdown or not.
788 789 """
789 790 if self._launch_args is None:
790 791 raise RuntimeError("Cannot restart the kernel. "
791 792 "No previous call to 'start_kernel'.")
792 793 else:
793 794 if self.has_kernel:
794 795 if now:
795 796 self.kill_kernel()
796 797 else:
797 798 self.shutdown_kernel(restart=True)
798 799 self.start_kernel(**self._launch_args)
799 800
800 801 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
801 802 # unless there is some delay here.
802 803 if sys.platform == 'win32':
803 804 time.sleep(0.2)
804 805
805 806 @property
806 807 def has_kernel(self):
807 808 """Returns whether a kernel process has been specified for the kernel
808 809 manager.
809 810 """
810 811 return self.kernel is not None
811 812
812 813 def kill_kernel(self):
813 814 """ Kill the running kernel. """
814 815 if self.has_kernel:
815 816 # Pause the heart beat channel if it exists.
816 817 if self._hb_channel is not None:
817 818 self._hb_channel.pause()
818 819
819 820 # Attempt to kill the kernel.
820 821 try:
821 822 self.kernel.kill()
822 823 except OSError, e:
823 824 # In Windows, we will get an Access Denied error if the process
824 825 # has already terminated. Ignore it.
825 826 if not (sys.platform == 'win32' and e.winerror == 5):
826 827 raise
827 828 self.kernel = None
828 829 else:
829 830 raise RuntimeError("Cannot kill kernel. No kernel is running!")
830 831
831 832 def interrupt_kernel(self):
832 833 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
833 834 well supported on all platforms.
834 835 """
835 836 if self.has_kernel:
836 837 if sys.platform == 'win32':
837 838 from parentpoller import ParentPollerWindows as Poller
838 839 Poller.send_interrupt(self.kernel.win32_interrupt_event)
839 840 else:
840 841 self.kernel.send_signal(signal.SIGINT)
841 842 else:
842 843 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
843 844
844 845 def signal_kernel(self, signum):
845 846 """ Sends a signal to the kernel. Note that since only SIGTERM is
846 847 supported on Windows, this function is only useful on Unix systems.
847 848 """
848 849 if self.has_kernel:
849 850 self.kernel.send_signal(signum)
850 851 else:
851 852 raise RuntimeError("Cannot signal kernel. No kernel is running!")
852 853
853 854 @property
854 855 def is_alive(self):
855 856 """Is the kernel process still running?"""
856 857 # FIXME: not using a heartbeat means this method is broken for any
857 858 # remote kernel, it's only capable of handling local kernels.
858 859 if self.has_kernel:
859 860 if self.kernel.poll() is None:
860 861 return True
861 862 else:
862 863 return False
863 864 else:
864 865 # We didn't start the kernel with this KernelManager so we don't
865 866 # know if it is running. We should use a heartbeat for this case.
866 867 return True
867 868
868 869 #--------------------------------------------------------------------------
869 870 # Channels used for communication with the kernel:
870 871 #--------------------------------------------------------------------------
871 872
872 873 @property
873 874 def xreq_channel(self):
874 875 """Get the REQ socket channel object to make requests of the kernel."""
875 876 if self._xreq_channel is None:
876 877 self._xreq_channel = self.xreq_channel_class(self.context,
877 878 self.session,
878 879 self.xreq_address)
879 880 return self._xreq_channel
880 881
881 882 @property
882 883 def sub_channel(self):
883 884 """Get the SUB socket channel object."""
884 885 if self._sub_channel is None:
885 886 self._sub_channel = self.sub_channel_class(self.context,
886 887 self.session,
887 888 self.sub_address)
888 889 return self._sub_channel
889 890
890 891 @property
891 892 def rep_channel(self):
892 893 """Get the REP socket channel object to handle stdin (raw_input)."""
893 894 if self._rep_channel is None:
894 895 self._rep_channel = self.rep_channel_class(self.context,
895 896 self.session,
896 897 self.rep_address)
897 898 return self._rep_channel
898 899
899 900 @property
900 901 def hb_channel(self):
901 902 """Get the REP socket channel object to handle stdin (raw_input)."""
902 903 if self._hb_channel is None:
903 904 self._hb_channel = self.hb_channel_class(self.context,
904 905 self.session,
905 906 self.hb_address)
906 907 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now