##// END OF EJS Templates
Merge branch 'kernel-logging' of https://github.com/omazapa/ipython into omazapa-kernel-logging...
Fernando Perez -
r3322:9d65bd5f merge
parent child Browse files
Show More
@@ -1,84 +1,92 b''
1 import logging
1 2 import sys
2 3 import time
3 4 from cStringIO import StringIO
4 5
5 6 from session import extract_header, Message
6 7
7 8 from IPython.utils import io
8 9
9 10 #-----------------------------------------------------------------------------
11 # Globals
12 #-----------------------------------------------------------------------------
13
14 # Module-level logger
15 logger = logging.getLogger(__name__)
16
17 #-----------------------------------------------------------------------------
10 18 # Stream classes
11 19 #-----------------------------------------------------------------------------
12 20
13 21 class OutStream(object):
14 22 """A file like object that publishes the stream to a 0MQ PUB socket."""
15 23
16 24 # The time interval between automatic flushes, in seconds.
17 25 flush_interval = 0.05
18 26
19 27 def __init__(self, session, pub_socket, name):
20 28 self.session = session
21 29 self.pub_socket = pub_socket
22 30 self.name = name
23 31 self.parent_header = {}
24 32 self._new_buffer()
25 33
26 34 def set_parent(self, parent):
27 35 self.parent_header = extract_header(parent)
28 36
29 37 def close(self):
30 38 self.pub_socket = None
31 39
32 40 def flush(self):
33 41 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
34 42 if self.pub_socket is None:
35 43 raise ValueError(u'I/O operation on closed file')
36 44 else:
37 45 data = self._buffer.getvalue()
38 46 if data:
39 47 content = {u'name':self.name, u'data':data}
40 msg = self.session.send(self.pub_socket, u'stream', content=content,
48 msg = self.session.send(self.pub_socket, u'stream',
49 content=content,
41 50 parent=self.parent_header)
42 io.raw_print(msg)
43
51 logger.debug(msg)
44 52 self._buffer.close()
45 53 self._new_buffer()
46 54
47 55 def isatty(self):
48 56 return False
49 57
50 58 def next(self):
51 59 raise IOError('Read not supported on a write only stream.')
52 60
53 61 def read(self, size=-1):
54 62 raise IOError('Read not supported on a write only stream.')
55 63
56 64 def readline(self, size=-1):
57 65 raise IOError('Read not supported on a write only stream.')
58 66
59 67 def write(self, string):
60 68 if self.pub_socket is None:
61 69 raise ValueError('I/O operation on closed file')
62 70 else:
63 71 # We can only send raw bytes, not unicode objects, so we encode
64 72 # into utf-8 for all frontends if we get unicode inputs.
65 73 if type(string) == unicode:
66 74 string = string.encode('utf-8')
67 75
68 76 self._buffer.write(string)
69 77 current_time = time.time()
70 78 if self._start <= 0:
71 79 self._start = current_time
72 80 elif current_time - self._start > self.flush_interval:
73 81 self.flush()
74 82
75 83 def writelines(self, sequence):
76 84 if self.pub_socket is None:
77 85 raise ValueError('I/O operation on closed file')
78 86 else:
79 87 for string in sequence:
80 88 self.write(string)
81 89
82 90 def _new_buffer(self):
83 91 self._buffer = StringIO()
84 92 self._start = -1
@@ -1,632 +1,641 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports.
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24
24 import logging
25 25 # System library imports.
26 26 import zmq
27 27
28 28 # Local imports.
29 29 from IPython.config.configurable import Configurable
30 30 from IPython.utils import io
31 31 from IPython.utils.jsonutil import json_clean
32 32 from IPython.lib import pylabtools
33 33 from IPython.utils.traitlets import Instance, Float
34 34 from entry_point import (base_launch_kernel, make_argument_parser, make_kernel,
35 35 start_kernel)
36 36 from iostream import OutStream
37 37 from session import Session, Message
38 38 from zmqshell import ZMQInteractiveShell
39 39
40 40 #-----------------------------------------------------------------------------
41 # Globals
42 #-----------------------------------------------------------------------------
43
44 # Module-level logger
45 logger = logging.getLogger(__name__)
46
47 #-----------------------------------------------------------------------------
41 48 # Main kernel class
42 49 #-----------------------------------------------------------------------------
43 50
44 51 class Kernel(Configurable):
45 52
46 53 #---------------------------------------------------------------------------
47 54 # Kernel interface
48 55 #---------------------------------------------------------------------------
49 56
50 57 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
51 58 session = Instance(Session)
52 59 reply_socket = Instance('zmq.Socket')
53 60 pub_socket = Instance('zmq.Socket')
54 61 req_socket = Instance('zmq.Socket')
55 62
56 63 # Private interface
57 64
58 65 # Time to sleep after flushing the stdout/err buffers in each execute
59 66 # cycle. While this introduces a hard limit on the minimal latency of the
60 67 # execute cycle, it helps prevent output synchronization problems for
61 68 # clients.
62 69 # Units are in seconds. The minimum zmq latency on local host is probably
63 70 # ~150 microseconds, set this to 500us for now. We may need to increase it
64 71 # a little if it's not enough after more interactive testing.
65 72 _execute_sleep = Float(0.0005, config=True)
66 73
67 74 # Frequency of the kernel's event loop.
68 75 # Units are in seconds, kernel subclasses for GUI toolkits may need to
69 76 # adapt to milliseconds.
70 77 _poll_interval = Float(0.05, config=True)
71 78
72 79 # If the shutdown was requested over the network, we leave here the
73 80 # necessary reply message so it can be sent by our registered atexit
74 81 # handler. This ensures that the reply is only sent to clients truly at
75 82 # the end of our shutdown process (which happens after the underlying
76 83 # IPython shell's own shutdown).
77 84 _shutdown_message = None
78 85
79 86 # This is a dict of port number that the kernel is listening on. It is set
80 87 # by record_ports and used by connect_request.
81 88 _recorded_ports = None
82 89
90
83 91 def __init__(self, **kwargs):
84 92 super(Kernel, self).__init__(**kwargs)
85 93
86 94 # Before we even start up the shell, register *first* our exit handlers
87 95 # so they come before the shell's
88 96 atexit.register(self._at_shutdown)
89 97
90 98 # Initialize the InteractiveShell subclass
91 99 self.shell = ZMQInteractiveShell.instance()
92 100 self.shell.displayhook.session = self.session
93 101 self.shell.displayhook.pub_socket = self.pub_socket
94 102 self.shell.display_pub.session = self.session
95 103 self.shell.display_pub.pub_socket = self.pub_socket
96 104
97 105 # TMP - hack while developing
98 106 self.shell._reply_content = None
99 107
100 108 # Build dict of handlers for message types
101 109 msg_types = [ 'execute_request', 'complete_request',
102 110 'object_info_request', 'history_request',
103 111 'connect_request', 'shutdown_request']
104 112 self.handlers = {}
105 113 for msg_type in msg_types:
106 114 self.handlers[msg_type] = getattr(self, msg_type)
107 115
108 116 def do_one_iteration(self):
109 117 """Do one iteration of the kernel's evaluation loop.
110 118 """
111 119 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
112 120 if msg is None:
113 121 return
114 122
115 123 # This assert will raise in versions of zeromq 2.0.7 and lesser.
116 124 # We now require 2.0.8 or above, so we can uncomment for safety.
117 125 # print(ident,msg, file=sys.__stdout__)
118 126 assert ident is not None, "Missing message part."
119 127
120 128 # Print some info about this message and leave a '--->' marker, so it's
121 129 # easier to trace visually the message chain when debugging. Each
122 130 # handler prints its message at the end.
123 131 # Eventually we'll move these from stdout to a logger.
124 io.raw_print('\n*** MESSAGE TYPE:', msg['msg_type'], '***')
125 io.raw_print(' Content: ', msg['content'],
126 '\n --->\n ', sep='', end='')
132 logger.debug('\n*** MESSAGE TYPE:'+str(msg['msg_type'])+'***')
133 logger.debug(' Content: '+str(msg['content'])+'\n --->\n ')
127 134
128 135 # Find and call actual handler for message
129 136 handler = self.handlers.get(msg['msg_type'], None)
130 137 if handler is None:
131 io.raw_print_err("UNKNOWN MESSAGE TYPE:", msg)
138 logger.error("UNKNOWN MESSAGE TYPE:" +str(msg))
132 139 else:
133 140 handler(ident, msg)
134 141
135 142 # Check whether we should exit, in case the incoming message set the
136 143 # exit flag on
137 144 if self.shell.exit_now:
138 io.raw_print('\nExiting IPython kernel...')
145 logger.debug('\nExiting IPython kernel...')
139 146 # We do a normal, clean exit, which allows any actions registered
140 147 # via atexit (such as history saving) to take place.
141 148 sys.exit(0)
142 149
143 150
144 151 def start(self):
145 152 """ Start the kernel main loop.
146 153 """
147 154 while True:
148 155 time.sleep(self._poll_interval)
149 156 self.do_one_iteration()
150 157
151 158 def record_ports(self, xrep_port, pub_port, req_port, hb_port):
152 159 """Record the ports that this kernel is using.
153 160
154 161 The creator of the Kernel instance must call this methods if they
155 162 want the :meth:`connect_request` method to return the port numbers.
156 163 """
157 164 self._recorded_ports = {
158 165 'xrep_port' : xrep_port,
159 166 'pub_port' : pub_port,
160 167 'req_port' : req_port,
161 168 'hb_port' : hb_port
162 169 }
163 170
164 171 #---------------------------------------------------------------------------
165 172 # Kernel request handlers
166 173 #---------------------------------------------------------------------------
167 174
168 175 def _publish_pyin(self, code, parent):
169 176 """Publish the code request on the pyin stream."""
170 177
171 178 pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent)
172 179
173 180 def execute_request(self, ident, parent):
174 181
175 182 status_msg = self.session.send(self.pub_socket,
176 183 u'status',
177 184 {u'execution_state':u'busy'},
178 185 parent=parent
179 186 )
180 187
181 188 try:
182 189 content = parent[u'content']
183 190 code = content[u'code']
184 191 silent = content[u'silent']
185 192 except:
186 io.raw_print_err("Got bad msg: ")
187 io.raw_print_err(Message(parent))
193 logger.error("Got bad msg: ")
194 logger.error(str(Message(parent)))
188 195 return
189 196
190 197 shell = self.shell # we'll need this a lot here
191 198
192 199 # Replace raw_input. Note that is not sufficient to replace
193 200 # raw_input in the user namespace.
194 201 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
195 202 __builtin__.raw_input = raw_input
196 203
197 204 # Set the parent message of the display hook and out streams.
198 205 shell.displayhook.set_parent(parent)
199 206 shell.display_pub.set_parent(parent)
200 207 sys.stdout.set_parent(parent)
201 208 sys.stderr.set_parent(parent)
202 209
203 210 # Re-broadcast our input for the benefit of listening clients, and
204 211 # start computing output
205 212 if not silent:
206 213 self._publish_pyin(code, parent)
207 214
208 215 reply_content = {}
209 216 try:
210 217 if silent:
211 218 # run_code uses 'exec' mode, so no displayhook will fire, and it
212 219 # doesn't call logging or history manipulations. Print
213 220 # statements in that code will obviously still execute.
214 221 shell.run_code(code)
215 222 else:
216 223 # FIXME: the shell calls the exception handler itself.
217 224 shell._reply_content = None
218 225 shell.run_cell(code)
219 226 except:
220 227 status = u'error'
221 228 # FIXME: this code right now isn't being used yet by default,
222 229 # because the runlines() call above directly fires off exception
223 230 # reporting. This code, therefore, is only active in the scenario
224 231 # where runlines itself has an unhandled exception. We need to
225 232 # uniformize this, for all exception construction to come from a
226 233 # single location in the codbase.
227 234 etype, evalue, tb = sys.exc_info()
228 235 tb_list = traceback.format_exception(etype, evalue, tb)
229 236 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
230 237 else:
231 238 status = u'ok'
232 239
233 240 reply_content[u'status'] = status
234 241
235 242 # Return the execution counter so clients can display prompts
236 243 reply_content['execution_count'] = shell.execution_count -1
237 244
238 245 # FIXME - fish exception info out of shell, possibly left there by
239 246 # runlines. We'll need to clean up this logic later.
240 247 if shell._reply_content is not None:
241 248 reply_content.update(shell._reply_content)
242 249
243 250 # At this point, we can tell whether the main code execution succeeded
244 251 # or not. If it did, we proceed to evaluate user_variables/expressions
245 252 if reply_content['status'] == 'ok':
246 253 reply_content[u'user_variables'] = \
247 254 shell.user_variables(content[u'user_variables'])
248 255 reply_content[u'user_expressions'] = \
249 256 shell.user_expressions(content[u'user_expressions'])
250 257 else:
251 258 # If there was an error, don't even try to compute variables or
252 259 # expressions
253 260 reply_content[u'user_variables'] = {}
254 261 reply_content[u'user_expressions'] = {}
255 262
256 263 # Payloads should be retrieved regardless of outcome, so we can both
257 264 # recover partial output (that could have been generated early in a
258 265 # block, before an error) and clear the payload system always.
259 266 reply_content[u'payload'] = shell.payload_manager.read_payload()
260 267 # Be agressive about clearing the payload because we don't want
261 268 # it to sit in memory until the next execute_request comes in.
262 269 shell.payload_manager.clear_payload()
263 270
264 271 # Send the reply.
265 reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident)
266 io.raw_print(reply_msg)
272 reply_msg = self.session.send(self.reply_socket, u'execute_reply',
273 reply_content, parent, ident=ident)
274 logger.debug(str(reply_msg))
267 275
268 276 # Flush output before sending the reply.
269 277 sys.stdout.flush()
270 278 sys.stderr.flush()
271 279 # FIXME: on rare occasions, the flush doesn't seem to make it to the
272 280 # clients... This seems to mitigate the problem, but we definitely need
273 281 # to better understand what's going on.
274 282 if self._execute_sleep:
275 283 time.sleep(self._execute_sleep)
276 284
277 285 if reply_msg['content']['status'] == u'error':
278 286 self._abort_queue()
279 287
280 288 status_msg = self.session.send(self.pub_socket,
281 289 u'status',
282 290 {u'execution_state':u'idle'},
283 291 parent=parent
284 292 )
285 293
286 294 def complete_request(self, ident, parent):
287 295 txt, matches = self._complete(parent)
288 296 matches = {'matches' : matches,
289 297 'matched_text' : txt,
290 298 'status' : 'ok'}
291 299 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
292 300 matches, parent, ident)
293 io.raw_print(completion_msg)
301 logger.debug(str(completion_msg))
294 302
295 303 def object_info_request(self, ident, parent):
296 304 object_info = self.shell.object_inspect(parent['content']['oname'])
297 305 # Before we send this object over, we scrub it for JSON usage
298 306 oinfo = json_clean(object_info)
299 307 msg = self.session.send(self.reply_socket, 'object_info_reply',
300 308 oinfo, parent, ident)
301 io.raw_print(msg)
309 logger.debug(msg)
302 310
303 311 def history_request(self, ident, parent):
304 312 output = parent['content']['output']
305 313 index = parent['content']['index']
306 314 raw = parent['content']['raw']
307 315 hist = self.shell.get_history(index=index, raw=raw, output=output)
308 316 content = {'history' : hist}
309 317 msg = self.session.send(self.reply_socket, 'history_reply',
310 318 content, parent, ident)
311 io.raw_print(msg)
319 logger.debug(str(msg))
312 320
313 321 def connect_request(self, ident, parent):
314 322 if self._recorded_ports is not None:
315 323 content = self._recorded_ports.copy()
316 324 else:
317 325 content = {}
318 326 msg = self.session.send(self.reply_socket, 'connect_reply',
319 327 content, parent, ident)
320 io.raw_print(msg)
328 logger.debug(msg)
321 329
322 330 def shutdown_request(self, ident, parent):
323 331 self.shell.exit_now = True
324 332 self._shutdown_message = self.session.msg(u'shutdown_reply', parent['content'], parent)
325 333 sys.exit(0)
326 334
327 335 #---------------------------------------------------------------------------
328 336 # Protected interface
329 337 #---------------------------------------------------------------------------
330 338
331 339 def _abort_queue(self):
332 340 while True:
333 341 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
334 342 if msg is None:
335 343 break
336 344 else:
337 345 assert ident is not None, \
338 346 "Unexpected missing message part."
339 io.raw_print("Aborting:\n", Message(msg))
347
348 logger.debug("Aborting:\n"+str(Message(msg)))
340 349 msg_type = msg['msg_type']
341 350 reply_type = msg_type.split('_')[0] + '_reply'
342 351 reply_msg = self.session.send(self.reply_socket, reply_type,
343 352 {'status' : 'aborted'}, msg, ident=ident)
344 io.raw_print(reply_msg)
353 logger.debug(reply_msg)
345 354 # We need to wait a bit for requests to come in. This can probably
346 355 # be set shorter for true asynchronous clients.
347 356 time.sleep(0.1)
348 357
349 358 def _raw_input(self, prompt, ident, parent):
350 359 # Flush output before making the request.
351 360 sys.stderr.flush()
352 361 sys.stdout.flush()
353 362
354 363 # Send the input request.
355 364 content = dict(prompt=prompt)
356 365 msg = self.session.send(self.req_socket, u'input_request', content, parent)
357 366
358 367 # Await a response.
359 368 ident, reply = self.session.recv(self.req_socket, 0)
360 369 try:
361 370 value = reply['content']['value']
362 371 except:
363 io.raw_print_err("Got bad raw_input reply: ")
364 io.raw_print_err(Message(parent))
372 logger.error("Got bad raw_input reply: ")
373 logger.error(str(Message(parent)))
365 374 value = ''
366 375 return value
367 376
368 377 def _complete(self, msg):
369 378 c = msg['content']
370 379 try:
371 380 cpos = int(c['cursor_pos'])
372 381 except:
373 382 # If we don't get something that we can convert to an integer, at
374 383 # least attempt the completion guessing the cursor is at the end of
375 384 # the text, if there's any, and otherwise of the line
376 385 cpos = len(c['text'])
377 386 if cpos==0:
378 387 cpos = len(c['line'])
379 388 return self.shell.complete(c['text'], c['line'], cpos)
380 389
381 390 def _object_info(self, context):
382 391 symbol, leftover = self._symbol_from_context(context)
383 392 if symbol is not None and not leftover:
384 393 doc = getattr(symbol, '__doc__', '')
385 394 else:
386 395 doc = ''
387 396 object_info = dict(docstring = doc)
388 397 return object_info
389 398
390 399 def _symbol_from_context(self, context):
391 400 if not context:
392 401 return None, context
393 402
394 403 base_symbol_string = context[0]
395 404 symbol = self.shell.user_ns.get(base_symbol_string, None)
396 405 if symbol is None:
397 406 symbol = __builtin__.__dict__.get(base_symbol_string, None)
398 407 if symbol is None:
399 408 return None, context
400 409
401 410 context = context[1:]
402 411 for i, name in enumerate(context):
403 412 new_symbol = getattr(symbol, name, None)
404 413 if new_symbol is None:
405 414 return symbol, context[i:]
406 415 else:
407 416 symbol = new_symbol
408 417
409 418 return symbol, []
410 419
411 420 def _at_shutdown(self):
412 421 """Actions taken at shutdown by the kernel, called by python's atexit.
413 422 """
414 423 # io.rprint("Kernel at_shutdown") # dbg
415 424 if self._shutdown_message is not None:
416 425 self.session.send(self.reply_socket, self._shutdown_message)
417 426 self.session.send(self.pub_socket, self._shutdown_message)
418 io.raw_print(self._shutdown_message)
427 logger.debug(str(self._shutdown_message))
419 428 # A very short sleep to give zmq time to flush its message buffers
420 429 # before Python truly shuts down.
421 430 time.sleep(0.01)
422 431
423 432
424 433 class QtKernel(Kernel):
425 434 """A Kernel subclass with Qt support."""
426 435
427 436 def start(self):
428 437 """Start a kernel with QtPy4 event loop integration."""
429 438
430 439 from PyQt4 import QtCore
431 440 from IPython.lib.guisupport import get_app_qt4, start_event_loop_qt4
432 441
433 442 self.app = get_app_qt4([" "])
434 443 self.app.setQuitOnLastWindowClosed(False)
435 444 self.timer = QtCore.QTimer()
436 445 self.timer.timeout.connect(self.do_one_iteration)
437 446 # Units for the timer are in milliseconds
438 447 self.timer.start(1000*self._poll_interval)
439 448 start_event_loop_qt4(self.app)
440 449
441 450
442 451 class WxKernel(Kernel):
443 452 """A Kernel subclass with Wx support."""
444 453
445 454 def start(self):
446 455 """Start a kernel with wx event loop support."""
447 456
448 457 import wx
449 458 from IPython.lib.guisupport import start_event_loop_wx
450 459
451 460 doi = self.do_one_iteration
452 461 # Wx uses milliseconds
453 462 poll_interval = int(1000*self._poll_interval)
454 463
455 464 # We have to put the wx.Timer in a wx.Frame for it to fire properly.
456 465 # We make the Frame hidden when we create it in the main app below.
457 466 class TimerFrame(wx.Frame):
458 467 def __init__(self, func):
459 468 wx.Frame.__init__(self, None, -1)
460 469 self.timer = wx.Timer(self)
461 470 # Units for the timer are in milliseconds
462 471 self.timer.Start(poll_interval)
463 472 self.Bind(wx.EVT_TIMER, self.on_timer)
464 473 self.func = func
465 474
466 475 def on_timer(self, event):
467 476 self.func()
468 477
469 478 # We need a custom wx.App to create our Frame subclass that has the
470 479 # wx.Timer to drive the ZMQ event loop.
471 480 class IPWxApp(wx.App):
472 481 def OnInit(self):
473 482 self.frame = TimerFrame(doi)
474 483 self.frame.Show(False)
475 484 return True
476 485
477 486 # The redirect=False here makes sure that wx doesn't replace
478 487 # sys.stdout/stderr with its own classes.
479 488 self.app = IPWxApp(redirect=False)
480 489 start_event_loop_wx(self.app)
481 490
482 491
483 492 class TkKernel(Kernel):
484 493 """A Kernel subclass with Tk support."""
485 494
486 495 def start(self):
487 496 """Start a Tk enabled event loop."""
488 497
489 498 import Tkinter
490 499 doi = self.do_one_iteration
491 500 # Tk uses milliseconds
492 501 poll_interval = int(1000*self._poll_interval)
493 502 # For Tkinter, we create a Tk object and call its withdraw method.
494 503 class Timer(object):
495 504 def __init__(self, func):
496 505 self.app = Tkinter.Tk()
497 506 self.app.withdraw()
498 507 self.func = func
499 508
500 509 def on_timer(self):
501 510 self.func()
502 511 self.app.after(poll_interval, self.on_timer)
503 512
504 513 def start(self):
505 514 self.on_timer() # Call it once to get things going.
506 515 self.app.mainloop()
507 516
508 517 self.timer = Timer(doi)
509 518 self.timer.start()
510 519
511 520
512 521 class GTKKernel(Kernel):
513 522 """A Kernel subclass with GTK support."""
514 523
515 524 def start(self):
516 525 """Start the kernel, coordinating with the GTK event loop"""
517 526 from .gui.gtkembed import GTKEmbed
518 527
519 528 gtk_kernel = GTKEmbed(self)
520 529 gtk_kernel.start()
521 530
522 531
523 532 #-----------------------------------------------------------------------------
524 533 # Kernel main and launch functions
525 534 #-----------------------------------------------------------------------------
526 535
527 536 def launch_kernel(ip=None, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
528 537 independent=False, pylab=False, colors=None):
529 538 """Launches a localhost kernel, binding to the specified ports.
530 539
531 540 Parameters
532 541 ----------
533 542 ip : str, optional
534 543 The ip address the kernel will bind to.
535 544
536 545 xrep_port : int, optional
537 546 The port to use for XREP channel.
538 547
539 548 pub_port : int, optional
540 549 The port to use for the SUB channel.
541 550
542 551 req_port : int, optional
543 552 The port to use for the REQ (raw input) channel.
544 553
545 554 hb_port : int, optional
546 555 The port to use for the hearbeat REP channel.
547 556
548 557 independent : bool, optional (default False)
549 558 If set, the kernel process is guaranteed to survive if this process
550 559 dies. If not set, an effort is made to ensure that the kernel is killed
551 560 when this process dies. Note that in this case it is still good practice
552 561 to kill kernels manually before exiting.
553 562
554 563 pylab : bool or string, optional (default False)
555 564 If not False, the kernel will be launched with pylab enabled. If a
556 565 string is passed, matplotlib will use the specified backend. Otherwise,
557 566 matplotlib's default backend will be used.
558 567
559 568 colors : None or string, optional (default None)
560 569 If not None, specify the color scheme. One of (NoColor, LightBG, Linux)
561 570
562 571 Returns
563 572 -------
564 573 A tuple of form:
565 574 (kernel_process, xrep_port, pub_port, req_port)
566 575 where kernel_process is a Popen object and the ports are integers.
567 576 """
568 577 extra_arguments = []
569 578 if pylab:
570 579 extra_arguments.append('--pylab')
571 580 if isinstance(pylab, basestring):
572 581 extra_arguments.append(pylab)
573 582 if ip is not None:
574 583 extra_arguments.append('--ip')
575 584 if isinstance(ip, basestring):
576 585 extra_arguments.append(ip)
577 586 if colors is not None:
578 587 extra_arguments.append('--colors')
579 588 extra_arguments.append(colors)
580 589 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
581 590 xrep_port, pub_port, req_port, hb_port,
582 591 independent, extra_arguments)
583 592
584 593
585 594 def main():
586 595 """ The IPython kernel main entry point.
587 596 """
588 597 parser = make_argument_parser()
589 598 parser.add_argument('--pylab', type=str, metavar='GUI', nargs='?',
590 599 const='auto', help = \
591 600 "Pre-load matplotlib and numpy for interactive use. If GUI is not \
592 601 given, the GUI backend is matplotlib's, otherwise use one of: \
593 602 ['tk', 'gtk', 'qt', 'wx', 'inline'].")
594 603 parser.add_argument('--colors',
595 604 type=str, dest='colors',
596 605 help="Set the color scheme (NoColor, Linux, and LightBG).",
597 606 metavar='ZMQInteractiveShell.colors')
598 607 namespace = parser.parse_args()
599 608
600 609 kernel_class = Kernel
601 610
602 611 kernel_classes = {
603 612 'qt' : QtKernel,
604 613 'qt4': QtKernel,
605 614 'inline': Kernel,
606 615 'wx' : WxKernel,
607 616 'tk' : TkKernel,
608 617 'gtk': GTKKernel,
609 618 }
610 619 if namespace.pylab:
611 620 if namespace.pylab == 'auto':
612 621 gui, backend = pylabtools.find_gui_and_backend()
613 622 else:
614 623 gui, backend = pylabtools.find_gui_and_backend(namespace.pylab)
615 624 kernel_class = kernel_classes.get(gui)
616 625 if kernel_class is None:
617 626 raise ValueError('GUI is not supported: %r' % gui)
618 627 pylabtools.activate_matplotlib(backend)
619 628 if namespace.colors:
620 629 ZMQInteractiveShell.colors=namespace.colors
621 630
622 631 kernel = make_kernel(namespace, kernel_class, OutStream)
623 632
624 633 if namespace.pylab:
625 634 pylabtools.import_pylab(kernel.shell.user_ns, backend,
626 635 shell=kernel.shell)
627 636
628 637 start_kernel(namespace, kernel)
629 638
630 639
631 640 if __name__ == '__main__':
632 641 main()
@@ -1,908 +1,909 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2010 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 from Queue import Queue, Empty
21 21 from subprocess import Popen
22 22 import signal
23 23 import sys
24 24 from threading import Thread
25 25 import time
26 import logging
26 27
27 28 # System library imports.
28 29 import zmq
29 30 from zmq import POLLIN, POLLOUT, POLLERR
30 31 from zmq.eventloop import ioloop
31 32
32 33 # Local imports.
33 34 from IPython.utils import io
34 35 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
35 36 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
36 37 from session import Session, Message
37 38
38 39 #-----------------------------------------------------------------------------
39 40 # Constants and exceptions
40 41 #-----------------------------------------------------------------------------
41 42
42 43 class InvalidPortNumber(Exception):
43 44 pass
44 45
45 46 #-----------------------------------------------------------------------------
46 47 # Utility functions
47 48 #-----------------------------------------------------------------------------
48 49
49 50 # some utilities to validate message structure, these might get moved elsewhere
50 51 # if they prove to have more generic utility
51 52
52 53 def validate_string_list(lst):
53 54 """Validate that the input is a list of strings.
54 55
55 56 Raises ValueError if not."""
56 57 if not isinstance(lst, list):
57 58 raise ValueError('input %r must be a list' % lst)
58 59 for x in lst:
59 60 if not isinstance(x, basestring):
60 61 raise ValueError('element %r in list must be a string' % x)
61 62
62 63
63 64 def validate_string_dict(dct):
64 65 """Validate that the input is a dict with string keys and values.
65 66
66 67 Raises ValueError if not."""
67 68 for k,v in dct.iteritems():
68 69 if not isinstance(k, basestring):
69 70 raise ValueError('key %r in dict must be a string' % k)
70 71 if not isinstance(v, basestring):
71 72 raise ValueError('value %r in dict must be a string' % v)
72 73
73 74
74 75 #-----------------------------------------------------------------------------
75 76 # ZMQ Socket Channel classes
76 77 #-----------------------------------------------------------------------------
77 78
78 79 class ZmqSocketChannel(Thread):
79 80 """The base class for the channels that use ZMQ sockets.
80 81 """
81 82 context = None
82 83 session = None
83 84 socket = None
84 85 ioloop = None
85 86 iostate = None
86 87 _address = None
87 88
88 89 def __init__(self, context, session, address):
89 90 """Create a channel
90 91
91 92 Parameters
92 93 ----------
93 94 context : :class:`zmq.Context`
94 95 The ZMQ context to use.
95 96 session : :class:`session.Session`
96 97 The session to use.
97 98 address : tuple
98 99 Standard (ip, port) tuple that the kernel is listening on.
99 100 """
100 101 super(ZmqSocketChannel, self).__init__()
101 102 self.daemon = True
102 103
103 104 self.context = context
104 105 self.session = session
105 106 if address[1] == 0:
106 107 message = 'The port number for a channel cannot be 0.'
107 108 raise InvalidPortNumber(message)
108 109 self._address = address
109 110
110 111 def stop(self):
111 112 """Stop the channel's activity.
112 113
113 114 This calls :method:`Thread.join` and returns when the thread
114 115 terminates. :class:`RuntimeError` will be raised if
115 116 :method:`self.start` is called again.
116 117 """
117 118 self.join()
118 119
119 120 @property
120 121 def address(self):
121 122 """Get the channel's address as an (ip, port) tuple.
122 123
123 124 By the default, the address is (localhost, 0), where 0 means a random
124 125 port.
125 126 """
126 127 return self._address
127 128
128 129 def add_io_state(self, state):
129 130 """Add IO state to the eventloop.
130 131
131 132 Parameters
132 133 ----------
133 134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 135 The IO state flag to set.
135 136
136 137 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 138 """
138 139 def add_io_state_callback():
139 140 if not self.iostate & state:
140 141 self.iostate = self.iostate | state
141 142 self.ioloop.update_handler(self.socket, self.iostate)
142 143 self.ioloop.add_callback(add_io_state_callback)
143 144
144 145 def drop_io_state(self, state):
145 146 """Drop IO state from the eventloop.
146 147
147 148 Parameters
148 149 ----------
149 150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 151 The IO state flag to set.
151 152
152 153 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 154 """
154 155 def drop_io_state_callback():
155 156 if self.iostate & state:
156 157 self.iostate = self.iostate & (~state)
157 158 self.ioloop.update_handler(self.socket, self.iostate)
158 159 self.ioloop.add_callback(drop_io_state_callback)
159 160
160 161
161 162 class XReqSocketChannel(ZmqSocketChannel):
162 163 """The XREQ channel for issues request/replies to the kernel.
163 164 """
164 165
165 166 command_queue = None
166 167
167 168 def __init__(self, context, session, address):
168 169 super(XReqSocketChannel, self).__init__(context, session, address)
169 170 self.command_queue = Queue()
170 171 self.ioloop = ioloop.IOLoop()
171 172
172 173 def run(self):
173 174 """The thread's main activity. Call start() instead."""
174 175 self.socket = self.context.socket(zmq.XREQ)
175 176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
176 177 self.socket.connect('tcp://%s:%i' % self.address)
177 178 self.iostate = POLLERR|POLLIN
178 179 self.ioloop.add_handler(self.socket, self._handle_events,
179 180 self.iostate)
180 181 self.ioloop.start()
181 182
182 183 def stop(self):
183 184 self.ioloop.stop()
184 185 super(XReqSocketChannel, self).stop()
185 186
186 187 def call_handlers(self, msg):
187 188 """This method is called in the ioloop thread when a message arrives.
188 189
189 190 Subclasses should override this method to handle incoming messages.
190 191 It is important to remember that this method is called in the thread
191 192 so that some logic must be done to ensure that the application leve
192 193 handlers are called in the application thread.
193 194 """
194 195 raise NotImplementedError('call_handlers must be defined in a subclass.')
195 196
196 197 def execute(self, code, silent=False,
197 198 user_variables=None, user_expressions=None):
198 199 """Execute code in the kernel.
199 200
200 201 Parameters
201 202 ----------
202 203 code : str
203 204 A string of Python code.
204 205
205 206 silent : bool, optional (default False)
206 207 If set, the kernel will execute the code as quietly possible.
207 208
208 209 user_variables : list, optional
209 210 A list of variable names to pull from the user's namespace. They
210 211 will come back as a dict with these names as keys and their
211 212 :func:`repr` as values.
212 213
213 214 user_expressions : dict, optional
214 215 A dict with string keys and to pull from the user's
215 216 namespace. They will come back as a dict with these names as keys
216 217 and their :func:`repr` as values.
217 218
218 219 Returns
219 220 -------
220 221 The msg_id of the message sent.
221 222 """
222 223 if user_variables is None:
223 224 user_variables = []
224 225 if user_expressions is None:
225 226 user_expressions = {}
226 227
227 228 # Don't waste network traffic if inputs are invalid
228 229 if not isinstance(code, basestring):
229 230 raise ValueError('code %r must be a string' % code)
230 231 validate_string_list(user_variables)
231 232 validate_string_dict(user_expressions)
232 233
233 234 # Create class for content/msg creation. Related to, but possibly
234 235 # not in Session.
235 236 content = dict(code=code, silent=silent,
236 237 user_variables=user_variables,
237 238 user_expressions=user_expressions)
238 239 msg = self.session.msg('execute_request', content)
239 240 self._queue_request(msg)
240 241 return msg['header']['msg_id']
241 242
242 243 def complete(self, text, line, cursor_pos, block=None):
243 244 """Tab complete text in the kernel's namespace.
244 245
245 246 Parameters
246 247 ----------
247 248 text : str
248 249 The text to complete.
249 250 line : str
250 251 The full line of text that is the surrounding context for the
251 252 text to complete.
252 253 cursor_pos : int
253 254 The position of the cursor in the line where the completion was
254 255 requested.
255 256 block : str, optional
256 257 The full block of code in which the completion is being requested.
257 258
258 259 Returns
259 260 -------
260 261 The msg_id of the message sent.
261 262 """
262 263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 264 msg = self.session.msg('complete_request', content)
264 265 self._queue_request(msg)
265 266 return msg['header']['msg_id']
266 267
267 268 def object_info(self, oname):
268 269 """Get metadata information about an object.
269 270
270 271 Parameters
271 272 ----------
272 273 oname : str
273 274 A string specifying the object name.
274 275
275 276 Returns
276 277 -------
277 278 The msg_id of the message sent.
278 279 """
279 280 content = dict(oname=oname)
280 281 msg = self.session.msg('object_info_request', content)
281 282 self._queue_request(msg)
282 283 return msg['header']['msg_id']
283 284
284 285 def history(self, index=None, raw=False, output=True):
285 286 """Get the history list.
286 287
287 288 Parameters
288 289 ----------
289 290 index : n or (n1, n2) or None
290 291 If n, then the last entries. If a tuple, then all in
291 292 range(n1, n2). If None, then all entries. Raises IndexError if
292 293 the format of index is incorrect.
293 294 raw : bool
294 295 If True, return the raw input.
295 296 output : bool
296 297 If True, then return the output as well.
297 298
298 299 Returns
299 300 -------
300 301 The msg_id of the message sent.
301 302 """
302 303 content = dict(index=index, raw=raw, output=output)
303 304 msg = self.session.msg('history_request', content)
304 305 self._queue_request(msg)
305 306 return msg['header']['msg_id']
306 307
307 308 def shutdown(self, restart=False):
308 309 """Request an immediate kernel shutdown.
309 310
310 311 Upon receipt of the (empty) reply, client code can safely assume that
311 312 the kernel has shut down and it's safe to forcefully terminate it if
312 313 it's still alive.
313 314
314 315 The kernel will send the reply via a function registered with Python's
315 316 atexit module, ensuring it's truly done as the kernel is done with all
316 317 normal operation.
317 318 """
318 319 # Send quit message to kernel. Once we implement kernel-side setattr,
319 320 # this should probably be done that way, but for now this will do.
320 321 msg = self.session.msg('shutdown_request', {'restart':restart})
321 322 self._queue_request(msg)
322 323 return msg['header']['msg_id']
323 324
324 325 def _handle_events(self, socket, events):
325 326 if events & POLLERR:
326 327 self._handle_err()
327 328 if events & POLLOUT:
328 329 self._handle_send()
329 330 if events & POLLIN:
330 331 self._handle_recv()
331 332
332 333 def _handle_recv(self):
333 334 ident,msg = self.session.recv(self.socket, 0)
334 335 self.call_handlers(msg)
335 336
336 337 def _handle_send(self):
337 338 try:
338 339 msg = self.command_queue.get(False)
339 340 except Empty:
340 341 pass
341 342 else:
342 343 self.session.send(self.socket,msg)
343 344 if self.command_queue.empty():
344 345 self.drop_io_state(POLLOUT)
345 346
346 347 def _handle_err(self):
347 348 # We don't want to let this go silently, so eventually we should log.
348 349 raise zmq.ZMQError()
349 350
350 351 def _queue_request(self, msg):
351 352 self.command_queue.put(msg)
352 353 self.add_io_state(POLLOUT)
353 354
354 355
355 356 class SubSocketChannel(ZmqSocketChannel):
356 357 """The SUB channel which listens for messages that the kernel publishes.
357 358 """
358 359
359 360 def __init__(self, context, session, address):
360 361 super(SubSocketChannel, self).__init__(context, session, address)
361 362 self.ioloop = ioloop.IOLoop()
362 363
363 364 def run(self):
364 365 """The thread's main activity. Call start() instead."""
365 366 self.socket = self.context.socket(zmq.SUB)
366 367 self.socket.setsockopt(zmq.SUBSCRIBE,'')
367 368 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
368 369 self.socket.connect('tcp://%s:%i' % self.address)
369 370 self.iostate = POLLIN|POLLERR
370 371 self.ioloop.add_handler(self.socket, self._handle_events,
371 372 self.iostate)
372 373 self.ioloop.start()
373 374
374 375 def stop(self):
375 376 self.ioloop.stop()
376 377 super(SubSocketChannel, self).stop()
377 378
378 379 def call_handlers(self, msg):
379 380 """This method is called in the ioloop thread when a message arrives.
380 381
381 382 Subclasses should override this method to handle incoming messages.
382 383 It is important to remember that this method is called in the thread
383 384 so that some logic must be done to ensure that the application leve
384 385 handlers are called in the application thread.
385 386 """
386 387 raise NotImplementedError('call_handlers must be defined in a subclass.')
387 388
388 389 def flush(self, timeout=1.0):
389 390 """Immediately processes all pending messages on the SUB channel.
390 391
391 392 Callers should use this method to ensure that :method:`call_handlers`
392 393 has been called for all messages that have been received on the
393 394 0MQ SUB socket of this channel.
394 395
395 396 This method is thread safe.
396 397
397 398 Parameters
398 399 ----------
399 400 timeout : float, optional
400 401 The maximum amount of time to spend flushing, in seconds. The
401 402 default is one second.
402 403 """
403 404 # We do the IOLoop callback process twice to ensure that the IOLoop
404 405 # gets to perform at least one full poll.
405 406 stop_time = time.time() + timeout
406 407 for i in xrange(2):
407 408 self._flushed = False
408 409 self.ioloop.add_callback(self._flush)
409 410 while not self._flushed and time.time() < stop_time:
410 411 time.sleep(0.01)
411 412
412 413 def _handle_events(self, socket, events):
413 414 # Turn on and off POLLOUT depending on if we have made a request
414 415 if events & POLLERR:
415 416 self._handle_err()
416 417 if events & POLLIN:
417 418 self._handle_recv()
418 419
419 420 def _handle_err(self):
420 421 # We don't want to let this go silently, so eventually we should log.
421 422 raise zmq.ZMQError()
422 423
423 424 def _handle_recv(self):
424 425 # Get all of the messages we can
425 426 while True:
426 427 try:
427 428 ident,msg = self.session.recv(self.socket)
428 429 except zmq.ZMQError:
429 430 # Check the errno?
430 431 # Will this trigger POLLERR?
431 432 break
432 433 else:
433 434 if msg is None:
434 435 break
435 436 self.call_handlers(msg)
436 437
437 438 def _flush(self):
438 439 """Callback for :method:`self.flush`."""
439 440 self._flushed = True
440 441
441 442
442 443 class RepSocketChannel(ZmqSocketChannel):
443 444 """A reply channel to handle raw_input requests that the kernel makes."""
444 445
445 446 msg_queue = None
446 447
447 448 def __init__(self, context, session, address):
448 449 super(RepSocketChannel, self).__init__(context, session, address)
449 450 self.ioloop = ioloop.IOLoop()
450 451 self.msg_queue = Queue()
451 452
452 453 def run(self):
453 454 """The thread's main activity. Call start() instead."""
454 455 self.socket = self.context.socket(zmq.XREQ)
455 456 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
456 457 self.socket.connect('tcp://%s:%i' % self.address)
457 458 self.iostate = POLLERR|POLLIN
458 459 self.ioloop.add_handler(self.socket, self._handle_events,
459 460 self.iostate)
460 461 self.ioloop.start()
461 462
462 463 def stop(self):
463 464 self.ioloop.stop()
464 465 super(RepSocketChannel, self).stop()
465 466
466 467 def call_handlers(self, msg):
467 468 """This method is called in the ioloop thread when a message arrives.
468 469
469 470 Subclasses should override this method to handle incoming messages.
470 471 It is important to remember that this method is called in the thread
471 472 so that some logic must be done to ensure that the application leve
472 473 handlers are called in the application thread.
473 474 """
474 475 raise NotImplementedError('call_handlers must be defined in a subclass.')
475 476
476 477 def input(self, string):
477 478 """Send a string of raw input to the kernel."""
478 479 content = dict(value=string)
479 480 msg = self.session.msg('input_reply', content)
480 481 self._queue_reply(msg)
481 482
482 483 def _handle_events(self, socket, events):
483 484 if events & POLLERR:
484 485 self._handle_err()
485 486 if events & POLLOUT:
486 487 self._handle_send()
487 488 if events & POLLIN:
488 489 self._handle_recv()
489 490
490 491 def _handle_recv(self):
491 492 ident,msg = self.session.recv(self.socket, 0)
492 493 self.call_handlers(msg)
493 494
494 495 def _handle_send(self):
495 496 try:
496 497 msg = self.msg_queue.get(False)
497 498 except Empty:
498 499 pass
499 500 else:
500 501 self.session.send(self.socket,msg)
501 502 if self.msg_queue.empty():
502 503 self.drop_io_state(POLLOUT)
503 504
504 505 def _handle_err(self):
505 506 # We don't want to let this go silently, so eventually we should log.
506 507 raise zmq.ZMQError()
507 508
508 509 def _queue_reply(self, msg):
509 510 self.msg_queue.put(msg)
510 511 self.add_io_state(POLLOUT)
511 512
512 513
513 514 class HBSocketChannel(ZmqSocketChannel):
514 515 """The heartbeat channel which monitors the kernel heartbeat.
515 516
516 517 Note that the heartbeat channel is paused by default. As long as you start
517 518 this channel, the kernel manager will ensure that it is paused and un-paused
518 519 as appropriate.
519 520 """
520 521
521 522 time_to_dead = 3.0
522 523 socket = None
523 524 poller = None
524 525 _running = None
525 526 _pause = None
526 527
527 528 def __init__(self, context, session, address):
528 529 super(HBSocketChannel, self).__init__(context, session, address)
529 530 self._running = False
530 531 self._pause = True
531 532
532 533 def _create_socket(self):
533 534 self.socket = self.context.socket(zmq.REQ)
534 535 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
535 536 self.socket.connect('tcp://%s:%i' % self.address)
536 537 self.poller = zmq.Poller()
537 538 self.poller.register(self.socket, zmq.POLLIN)
538 539
539 540 def run(self):
540 541 """The thread's main activity. Call start() instead."""
541 542 self._create_socket()
542 543 self._running = True
543 544 while self._running:
544 545 if self._pause:
545 546 time.sleep(self.time_to_dead)
546 547 else:
547 548 since_last_heartbeat = 0.0
548 549 request_time = time.time()
549 550 try:
550 551 #io.rprint('Ping from HB channel') # dbg
551 552 self.socket.send(b'ping')
552 553 except zmq.ZMQError, e:
553 554 #io.rprint('*** HB Error:', e) # dbg
554 555 if e.errno == zmq.EFSM:
555 556 #io.rprint('sleep...', self.time_to_dead) # dbg
556 557 time.sleep(self.time_to_dead)
557 558 self._create_socket()
558 559 else:
559 560 raise
560 561 else:
561 562 while True:
562 563 try:
563 564 self.socket.recv(zmq.NOBLOCK)
564 565 except zmq.ZMQError, e:
565 566 #io.rprint('*** HB Error 2:', e) # dbg
566 567 if e.errno == zmq.EAGAIN:
567 568 before_poll = time.time()
568 569 until_dead = self.time_to_dead - (before_poll -
569 570 request_time)
570 571
571 572 # When the return value of poll() is an empty
572 573 # list, that is when things have gone wrong
573 574 # (zeromq bug). As long as it is not an empty
574 575 # list, poll is working correctly even if it
575 576 # returns quickly. Note: poll timeout is in
576 577 # milliseconds.
577 578 self.poller.poll(1000*until_dead)
578 579
579 580 since_last_heartbeat = time.time()-request_time
580 581 if since_last_heartbeat > self.time_to_dead:
581 582 self.call_handlers(since_last_heartbeat)
582 583 break
583 584 else:
584 585 # FIXME: We should probably log this instead.
585 586 raise
586 587 else:
587 588 until_dead = self.time_to_dead - (time.time() -
588 589 request_time)
589 590 if until_dead > 0.0:
590 591 #io.rprint('sleep...', self.time_to_dead) # dbg
591 592 time.sleep(until_dead)
592 593 break
593 594
594 595 def pause(self):
595 596 """Pause the heartbeat."""
596 597 self._pause = True
597 598
598 599 def unpause(self):
599 600 """Unpause the heartbeat."""
600 601 self._pause = False
601 602
602 603 def is_beating(self):
603 604 """Is the heartbeat running and not paused."""
604 605 if self.is_alive() and not self._pause:
605 606 return True
606 607 else:
607 608 return False
608 609
609 610 def stop(self):
610 611 self._running = False
611 612 super(HBSocketChannel, self).stop()
612 613
613 614 def call_handlers(self, since_last_heartbeat):
614 615 """This method is called in the ioloop thread when a message arrives.
615 616
616 617 Subclasses should override this method to handle incoming messages.
617 618 It is important to remember that this method is called in the thread
618 619 so that some logic must be done to ensure that the application leve
619 620 handlers are called in the application thread.
620 621 """
621 622 raise NotImplementedError('call_handlers must be defined in a subclass.')
622 623
623 624
624 625 #-----------------------------------------------------------------------------
625 626 # Main kernel manager class
626 627 #-----------------------------------------------------------------------------
627 628
628 629 class KernelManager(HasTraits):
629 630 """ Manages a kernel for a frontend.
630 631
631 632 The SUB channel is for the frontend to receive messages published by the
632 633 kernel.
633 634
634 635 The REQ channel is for the frontend to make requests of the kernel.
635 636
636 637 The REP channel is for the kernel to request stdin (raw_input) from the
637 638 frontend.
638 639 """
639 640 # The PyZMQ Context to use for communication with the kernel.
640 641 context = Instance(zmq.Context,(),{})
641 642
642 643 # The Session to use for communication with the kernel.
643 644 session = Instance(Session,(),{})
644 645
645 646 # The kernel process with which the KernelManager is communicating.
646 647 kernel = Instance(Popen)
647 648
648 649 # The addresses for the communication channels.
649 650 xreq_address = TCPAddress((LOCALHOST, 0))
650 651 sub_address = TCPAddress((LOCALHOST, 0))
651 652 rep_address = TCPAddress((LOCALHOST, 0))
652 653 hb_address = TCPAddress((LOCALHOST, 0))
653 654
654 655 # The classes to use for the various channels.
655 656 xreq_channel_class = Type(XReqSocketChannel)
656 657 sub_channel_class = Type(SubSocketChannel)
657 658 rep_channel_class = Type(RepSocketChannel)
658 659 hb_channel_class = Type(HBSocketChannel)
659 660
660 661 # Protected traits.
661 662 _launch_args = Any
662 663 _xreq_channel = Any
663 664 _sub_channel = Any
664 665 _rep_channel = Any
665 666 _hb_channel = Any
666 667
667 668 def __init__(self, **kwargs):
668 669 super(KernelManager, self).__init__(**kwargs)
669 670 # Uncomment this to try closing the context.
670 671 # atexit.register(self.context.close)
671 672
672 673 #--------------------------------------------------------------------------
673 674 # Channel management methods:
674 675 #--------------------------------------------------------------------------
675 676
676 677 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
677 678 """Starts the channels for this kernel.
678 679
679 680 This will create the channels if they do not exist and then start
680 681 them. If port numbers of 0 are being used (random ports) then you
681 682 must first call :method:`start_kernel`. If the channels have been
682 683 stopped and you call this, :class:`RuntimeError` will be raised.
683 684 """
684 685 if xreq:
685 686 self.xreq_channel.start()
686 687 if sub:
687 688 self.sub_channel.start()
688 689 if rep:
689 690 self.rep_channel.start()
690 691 if hb:
691 692 self.hb_channel.start()
692 693
693 694 def stop_channels(self):
694 695 """Stops all the running channels for this kernel.
695 696 """
696 697 if self.xreq_channel.is_alive():
697 698 self.xreq_channel.stop()
698 699 if self.sub_channel.is_alive():
699 700 self.sub_channel.stop()
700 701 if self.rep_channel.is_alive():
701 702 self.rep_channel.stop()
702 703 if self.hb_channel.is_alive():
703 704 self.hb_channel.stop()
704 705
705 706 @property
706 707 def channels_running(self):
707 708 """Are any of the channels created and running?"""
708 709 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
709 710 self.rep_channel.is_alive() or self.hb_channel.is_alive())
710 711
711 712 #--------------------------------------------------------------------------
712 713 # Kernel process management methods:
713 714 #--------------------------------------------------------------------------
714 715
715 716 def start_kernel(self, **kw):
716 717 """Starts a kernel process and configures the manager to use it.
717 718
718 719 If random ports (port=0) are being used, this method must be called
719 720 before the channels are created.
720 721
721 722 Parameters:
722 723 -----------
723 724 ipython : bool, optional (default True)
724 725 Whether to use an IPython kernel instead of a plain Python kernel.
725 726 """
726 727 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
727 728 self.rep_address, self.hb_address
728 729 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
729 730 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
730 731 raise RuntimeError("Can only launch a kernel on a local interface. "
731 732 "Make sure that the '*_address' attributes are "
732 733 "configured properly. "
733 734 "Currently valid addresses are: %s"%LOCAL_IPS
734 735 )
735 736
736 737 self._launch_args = kw.copy()
737 738 if kw.pop('ipython', True):
738 739 from ipkernel import launch_kernel
739 740 else:
740 741 from pykernel import launch_kernel
741 742 self.kernel, xrep, pub, req, _hb = launch_kernel(
742 743 xrep_port=xreq[1], pub_port=sub[1],
743 744 req_port=rep[1], hb_port=hb[1], **kw)
744 745 self.xreq_address = (xreq[0], xrep)
745 746 self.sub_address = (sub[0], pub)
746 747 self.rep_address = (rep[0], req)
747 748 self.hb_address = (hb[0], _hb)
748 749
749 750 def shutdown_kernel(self, restart=False):
750 751 """ Attempts to the stop the kernel process cleanly. If the kernel
751 752 cannot be stopped, it is killed, if possible.
752 753 """
753 754 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
754 755 if sys.platform == 'win32':
755 756 self.kill_kernel()
756 757 return
757 758
758 759 # Pause the heart beat channel if it exists.
759 760 if self._hb_channel is not None:
760 761 self._hb_channel.pause()
761 762
762 763 # Don't send any additional kernel kill messages immediately, to give
763 764 # the kernel a chance to properly execute shutdown actions. Wait for at
764 765 # most 1s, checking every 0.1s.
765 766 self.xreq_channel.shutdown(restart=restart)
766 767 for i in range(10):
767 768 if self.is_alive:
768 769 time.sleep(0.1)
769 770 else:
770 771 break
771 772 else:
772 773 # OK, we've waited long enough.
773 774 if self.has_kernel:
774 775 self.kill_kernel()
775 776
776 777 def restart_kernel(self, now=False):
777 778 """Restarts a kernel with the same arguments that were used to launch
778 779 it. If the old kernel was launched with random ports, the same ports
779 780 will be used for the new kernel.
780 781
781 782 Parameters
782 783 ----------
783 784 now : bool, optional
784 785 If True, the kernel is forcefully restarted *immediately*, without
785 786 having a chance to do any cleanup action. Otherwise the kernel is
786 787 given 1s to clean up before a forceful restart is issued.
787 788
788 789 In all cases the kernel is restarted, the only difference is whether
789 790 it is given a chance to perform a clean shutdown or not.
790 791 """
791 792 if self._launch_args is None:
792 793 raise RuntimeError("Cannot restart the kernel. "
793 794 "No previous call to 'start_kernel'.")
794 795 else:
795 796 if self.has_kernel:
796 797 if now:
797 798 self.kill_kernel()
798 799 else:
799 800 self.shutdown_kernel(restart=True)
800 801 self.start_kernel(**self._launch_args)
801 802
802 803 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
803 804 # unless there is some delay here.
804 805 if sys.platform == 'win32':
805 806 time.sleep(0.2)
806 807
807 808 @property
808 809 def has_kernel(self):
809 810 """Returns whether a kernel process has been specified for the kernel
810 811 manager.
811 812 """
812 813 return self.kernel is not None
813 814
814 815 def kill_kernel(self):
815 816 """ Kill the running kernel. """
816 817 if self.has_kernel:
817 818 # Pause the heart beat channel if it exists.
818 819 if self._hb_channel is not None:
819 820 self._hb_channel.pause()
820 821
821 822 # Attempt to kill the kernel.
822 823 try:
823 824 self.kernel.kill()
824 825 except OSError, e:
825 826 # In Windows, we will get an Access Denied error if the process
826 827 # has already terminated. Ignore it.
827 828 if not (sys.platform == 'win32' and e.winerror == 5):
828 829 raise
829 830 self.kernel = None
830 831 else:
831 832 raise RuntimeError("Cannot kill kernel. No kernel is running!")
832 833
833 834 def interrupt_kernel(self):
834 835 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
835 836 well supported on all platforms.
836 837 """
837 838 if self.has_kernel:
838 839 if sys.platform == 'win32':
839 840 from parentpoller import ParentPollerWindows as Poller
840 841 Poller.send_interrupt(self.kernel.win32_interrupt_event)
841 842 else:
842 843 self.kernel.send_signal(signal.SIGINT)
843 844 else:
844 845 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
845 846
846 847 def signal_kernel(self, signum):
847 848 """ Sends a signal to the kernel. Note that since only SIGTERM is
848 849 supported on Windows, this function is only useful on Unix systems.
849 850 """
850 851 if self.has_kernel:
851 852 self.kernel.send_signal(signum)
852 853 else:
853 854 raise RuntimeError("Cannot signal kernel. No kernel is running!")
854 855
855 856 @property
856 857 def is_alive(self):
857 858 """Is the kernel process still running?"""
858 859 # FIXME: not using a heartbeat means this method is broken for any
859 860 # remote kernel, it's only capable of handling local kernels.
860 861 if self.has_kernel:
861 862 if self.kernel.poll() is None:
862 863 return True
863 864 else:
864 865 return False
865 866 else:
866 867 # We didn't start the kernel with this KernelManager so we don't
867 868 # know if it is running. We should use a heartbeat for this case.
868 869 return True
869 870
870 871 #--------------------------------------------------------------------------
871 872 # Channels used for communication with the kernel:
872 873 #--------------------------------------------------------------------------
873 874
874 875 @property
875 876 def xreq_channel(self):
876 877 """Get the REQ socket channel object to make requests of the kernel."""
877 878 if self._xreq_channel is None:
878 879 self._xreq_channel = self.xreq_channel_class(self.context,
879 880 self.session,
880 881 self.xreq_address)
881 882 return self._xreq_channel
882 883
883 884 @property
884 885 def sub_channel(self):
885 886 """Get the SUB socket channel object."""
886 887 if self._sub_channel is None:
887 888 self._sub_channel = self.sub_channel_class(self.context,
888 889 self.session,
889 890 self.sub_address)
890 891 return self._sub_channel
891 892
892 893 @property
893 894 def rep_channel(self):
894 895 """Get the REP socket channel object to handle stdin (raw_input)."""
895 896 if self._rep_channel is None:
896 897 self._rep_channel = self.rep_channel_class(self.context,
897 898 self.session,
898 899 self.rep_address)
899 900 return self._rep_channel
900 901
901 902 @property
902 903 def hb_channel(self):
903 904 """Get the REP socket channel object to handle stdin (raw_input)."""
904 905 if self._hb_channel is None:
905 906 self._hb_channel = self.hb_channel_class(self.context,
906 907 self.session,
907 908 self.hb_address)
908 909 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now