##// END OF EJS Templates
Made is_alive a method of KernelManager and MultiKernelManager....
Brian E. Granger -
Show More
@@ -1,466 +1,466 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Frontend of ipython working with python-zmq
3 3
4 4 Ipython's frontend, is a ipython interface that send request to kernel and proccess the kernel's outputs.
5 5
6 6 For more details, see the ipython-zmq design
7 7 """
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18 from __future__ import print_function
19 19
20 20 import bdb
21 21 import signal
22 22 import os
23 23 import sys
24 24 import time
25 25 import subprocess
26 26 from io import BytesIO
27 27 import base64
28 28
29 29 from Queue import Empty
30 30
31 31 try:
32 32 from contextlib import nested
33 33 except:
34 34 from IPython.utils.nested_context import nested
35 35
36 36 from IPython.core.alias import AliasManager, AliasError
37 37 from IPython.core import page
38 38 from IPython.utils.warn import warn, error, fatal
39 39 from IPython.utils import io
40 40 from IPython.utils.traitlets import List, Enum, Any
41 41 from IPython.utils.tempdir import NamedFileInTemporaryDirectory
42 42
43 43 from IPython.frontend.terminal.interactiveshell import TerminalInteractiveShell
44 44 from IPython.frontend.terminal.console.completer import ZMQCompleter
45 45
46 46
47 47 class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
48 48 """A subclass of TerminalInteractiveShell that uses the 0MQ kernel"""
49 49 _executing = False
50 50
51 51 image_handler = Enum(('PIL', 'stream', 'tempfile', 'callable'),
52 52 config=True, help=
53 53 """
54 54 Handler for image type output. This is useful, for example,
55 55 when connecting to the kernel in which pylab inline backend is
56 56 activated. There are four handlers defined. 'PIL': Use
57 57 Python Imaging Library to popup image; 'stream': Use an
58 58 external program to show the image. Image will be fed into
59 59 the STDIN of the program. You will need to configure
60 60 `stream_image_handler`; 'tempfile': Use an external program to
61 61 show the image. Image will be saved in a temporally file and
62 62 the program is called with the temporally file. You will need
63 63 to configure `tempfile_image_handler`; 'callable': You can set
64 64 any Python callable which is called with the image data. You
65 65 will need to configure `callable_image_handler`.
66 66 """
67 67 )
68 68
69 69 stream_image_handler = List(config=True, help=
70 70 """
71 71 Command to invoke an image viewer program when you are using
72 72 'stream' image handler. This option is a list of string where
73 73 the first element is the command itself and reminders are the
74 74 options for the command. Raw image data is given as STDIN to
75 75 the program.
76 76 """
77 77 )
78 78
79 79 tempfile_image_handler = List(config=True, help=
80 80 """
81 81 Command to invoke an image viewer program when you are using
82 82 'tempfile' image handler. This option is a list of string
83 83 where the first element is the command itself and reminders
84 84 are the options for the command. You can use {file} and
85 85 {format} in the string to represent the location of the
86 86 generated image file and image format.
87 87 """
88 88 )
89 89
90 90 callable_image_handler = Any(config=True, help=
91 91 """
92 92 Callable object called via 'callable' image handler with one
93 93 argument, `data`, which is `msg["content"]["data"]` where
94 94 `msg` is the message from iopub channel. For exmaple, you can
95 95 find base64 encoded PNG data as `data['image/png']`.
96 96 """
97 97 )
98 98
99 99 mime_preference = List(
100 100 default_value=['image/png', 'image/jpeg', 'image/svg+xml'],
101 101 config=True, allow_none=False, help=
102 102 """
103 103 Preferred object representation MIME type in order. First
104 104 matched MIME type will be used.
105 105 """
106 106 )
107 107
108 108 def __init__(self, *args, **kwargs):
109 109 self.km = kwargs.pop('kernel_manager')
110 110 self.session_id = self.km.session.session
111 111 super(ZMQTerminalInteractiveShell, self).__init__(*args, **kwargs)
112 112
113 113 def init_completer(self):
114 114 """Initialize the completion machinery.
115 115
116 116 This creates completion machinery that can be used by client code,
117 117 either interactively in-process (typically triggered by the readline
118 118 library), programatically (such as in test suites) or out-of-prcess
119 119 (typically over the network by remote frontends).
120 120 """
121 121 from IPython.core.completerlib import (module_completer,
122 122 magic_run_completer, cd_completer)
123 123
124 124 self.Completer = ZMQCompleter(self, self.km)
125 125
126 126
127 127 self.set_hook('complete_command', module_completer, str_key = 'import')
128 128 self.set_hook('complete_command', module_completer, str_key = 'from')
129 129 self.set_hook('complete_command', magic_run_completer, str_key = '%run')
130 130 self.set_hook('complete_command', cd_completer, str_key = '%cd')
131 131
132 132 # Only configure readline if we truly are using readline. IPython can
133 133 # do tab-completion over the network, in GUIs, etc, where readline
134 134 # itself may be absent
135 135 if self.has_readline:
136 136 self.set_readline_completer()
137 137
138 138 def run_cell(self, cell, store_history=True):
139 139 """Run a complete IPython cell.
140 140
141 141 Parameters
142 142 ----------
143 143 cell : str
144 144 The code (including IPython code such as %magic functions) to run.
145 145 store_history : bool
146 146 If True, the raw and translated cell will be stored in IPython's
147 147 history. For user code calling back into IPython's machinery, this
148 148 should be set to False.
149 149 """
150 150 if (not cell) or cell.isspace():
151 151 return
152 152
153 153 if cell.strip() == 'exit':
154 154 # explicitly handle 'exit' command
155 155 return self.ask_exit()
156 156
157 157 self._executing = True
158 158 # flush stale replies, which could have been ignored, due to missed heartbeats
159 159 while self.km.shell_channel.msg_ready():
160 160 self.km.shell_channel.get_msg()
161 161 # shell_channel.execute takes 'hidden', which is the inverse of store_hist
162 162 msg_id = self.km.shell_channel.execute(cell, not store_history)
163 while not self.km.shell_channel.msg_ready() and self.km.is_alive:
163 while not self.km.shell_channel.msg_ready() and self.km.is_alive():
164 164 try:
165 165 self.handle_stdin_request(timeout=0.05)
166 166 except Empty:
167 167 # display intermediate print statements, etc.
168 168 self.handle_iopub()
169 169 pass
170 170 if self.km.shell_channel.msg_ready():
171 171 self.handle_execute_reply(msg_id)
172 172 self._executing = False
173 173
174 174 #-----------------
175 175 # message handlers
176 176 #-----------------
177 177
178 178 def handle_execute_reply(self, msg_id):
179 179 msg = self.km.shell_channel.get_msg()
180 180 if msg["parent_header"].get("msg_id", None) == msg_id:
181 181
182 182 self.handle_iopub()
183 183
184 184 content = msg["content"]
185 185 status = content['status']
186 186
187 187 if status == 'aborted':
188 188 self.write('Aborted\n')
189 189 return
190 190 elif status == 'ok':
191 191 # print execution payloads as well:
192 192 for item in content["payload"]:
193 193 text = item.get('text', None)
194 194 if text:
195 195 page.page(text)
196 196
197 197 elif status == 'error':
198 198 for frame in content["traceback"]:
199 199 print(frame, file=io.stderr)
200 200
201 201 self.execution_count = int(content["execution_count"] + 1)
202 202
203 203
204 204 def handle_iopub(self):
205 205 """ Method to procces subscribe channel's messages
206 206
207 207 This method reads a message and processes the content in different
208 208 outputs like stdout, stderr, pyout and status
209 209
210 210 Arguments:
211 211 sub_msg: message receive from kernel in the sub socket channel
212 212 capture by kernel manager.
213 213 """
214 214 while self.km.iopub_channel.msg_ready():
215 215 sub_msg = self.km.iopub_channel.get_msg()
216 216 msg_type = sub_msg['header']['msg_type']
217 217 parent = sub_msg["parent_header"]
218 218 if (not parent) or self.session_id == parent['session']:
219 219 if msg_type == 'status' :
220 220 if sub_msg["content"]["execution_state"] == "busy" :
221 221 pass
222 222
223 223 elif msg_type == 'stream' :
224 224 if sub_msg["content"]["name"] == "stdout":
225 225 print(sub_msg["content"]["data"], file=io.stdout, end="")
226 226 io.stdout.flush()
227 227 elif sub_msg["content"]["name"] == "stderr" :
228 228 print(sub_msg["content"]["data"], file=io.stderr, end="")
229 229 io.stderr.flush()
230 230
231 231 elif msg_type == 'pyout':
232 232 self.execution_count = int(sub_msg["content"]["execution_count"])
233 233 format_dict = sub_msg["content"]["data"]
234 234 self.handle_rich_data(format_dict)
235 235 # taken from DisplayHook.__call__:
236 236 hook = self.displayhook
237 237 hook.start_displayhook()
238 238 hook.write_output_prompt()
239 239 hook.write_format_data(format_dict)
240 240 hook.log_output(format_dict)
241 241 hook.finish_displayhook()
242 242
243 243 elif msg_type == 'display_data':
244 244 self.handle_rich_data(sub_msg["content"]["data"])
245 245
246 246 _imagemime = {
247 247 'image/png': 'png',
248 248 'image/jpeg': 'jpeg',
249 249 'image/svg+xml': 'svg',
250 250 }
251 251
252 252 def handle_rich_data(self, data):
253 253 for mime in self.mime_preference:
254 254 if mime in data and mime in self._imagemime:
255 255 self.handle_image(data, mime)
256 256 return
257 257
258 258 def handle_image(self, data, mime):
259 259 handler = getattr(
260 260 self, 'handle_image_{0}'.format(self.image_handler), None)
261 261 if handler:
262 262 handler(data, mime)
263 263
264 264 def handle_image_PIL(self, data, mime):
265 265 if mime not in ('image/png', 'image/jpeg'):
266 266 return
267 267 import PIL.Image
268 268 raw = base64.decodestring(data[mime].encode('ascii'))
269 269 img = PIL.Image.open(BytesIO(raw))
270 270 img.show()
271 271
272 272 def handle_image_stream(self, data, mime):
273 273 raw = base64.decodestring(data[mime].encode('ascii'))
274 274 imageformat = self._imagemime[mime]
275 275 fmt = dict(format=imageformat)
276 276 args = [s.format(**fmt) for s in self.stream_image_handler]
277 277 with open(os.devnull, 'w') as devnull:
278 278 proc = subprocess.Popen(
279 279 args, stdin=subprocess.PIPE,
280 280 stdout=devnull, stderr=devnull)
281 281 proc.communicate(raw)
282 282
283 283 def handle_image_tempfile(self, data, mime):
284 284 raw = base64.decodestring(data[mime].encode('ascii'))
285 285 imageformat = self._imagemime[mime]
286 286 filename = 'tmp.{0}'.format(imageformat)
287 287 with nested(NamedFileInTemporaryDirectory(filename),
288 288 open(os.devnull, 'w')) as (f, devnull):
289 289 f.write(raw)
290 290 f.flush()
291 291 fmt = dict(file=f.name, format=imageformat)
292 292 args = [s.format(**fmt) for s in self.tempfile_image_handler]
293 293 subprocess.call(args, stdout=devnull, stderr=devnull)
294 294
295 295 def handle_image_callable(self, data, mime):
296 296 self.callable_image_handler(data)
297 297
298 298 def handle_stdin_request(self, timeout=0.1):
299 299 """ Method to capture raw_input
300 300 """
301 301 msg_rep = self.km.stdin_channel.get_msg(timeout=timeout)
302 302 # in case any iopub came while we were waiting:
303 303 self.handle_iopub()
304 304 if self.session_id == msg_rep["parent_header"].get("session"):
305 305 # wrap SIGINT handler
306 306 real_handler = signal.getsignal(signal.SIGINT)
307 307 def double_int(sig,frame):
308 308 # call real handler (forwards sigint to kernel),
309 309 # then raise local interrupt, stopping local raw_input
310 310 real_handler(sig,frame)
311 311 raise KeyboardInterrupt
312 312 signal.signal(signal.SIGINT, double_int)
313 313
314 314 try:
315 315 raw_data = raw_input(msg_rep["content"]["prompt"])
316 316 except EOFError:
317 317 # turn EOFError into EOF character
318 318 raw_data = '\x04'
319 319 except KeyboardInterrupt:
320 320 sys.stdout.write('\n')
321 321 return
322 322 finally:
323 323 # restore SIGINT handler
324 324 signal.signal(signal.SIGINT, real_handler)
325 325
326 326 # only send stdin reply if there *was not* another request
327 327 # or execution finished while we were reading.
328 328 if not (self.km.stdin_channel.msg_ready() or self.km.shell_channel.msg_ready()):
329 329 self.km.stdin_channel.input(raw_data)
330 330
331 331 def mainloop(self, display_banner=False):
332 332 while True:
333 333 try:
334 334 self.interact(display_banner=display_banner)
335 335 #self.interact_with_readline()
336 336 # XXX for testing of a readline-decoupled repl loop, call
337 337 # interact_with_readline above
338 338 break
339 339 except KeyboardInterrupt:
340 340 # this should not be necessary, but KeyboardInterrupt
341 341 # handling seems rather unpredictable...
342 342 self.write("\nKeyboardInterrupt in interact()\n")
343 343
344 344 def wait_for_kernel(self, timeout=None):
345 345 """method to wait for a kernel to be ready"""
346 346 tic = time.time()
347 347 self.km.hb_channel.unpause()
348 348 while True:
349 349 self.run_cell('1', False)
350 350 if self.km.hb_channel.is_beating():
351 351 # heart failure was not the reason this returned
352 352 break
353 353 else:
354 354 # heart failed
355 355 if timeout is not None and (time.time() - tic) > timeout:
356 356 return False
357 357 return True
358 358
359 359 def interact(self, display_banner=None):
360 360 """Closely emulate the interactive Python console."""
361 361
362 362 # batch run -> do not interact
363 363 if self.exit_now:
364 364 return
365 365
366 366 if display_banner is None:
367 367 display_banner = self.display_banner
368 368
369 369 if isinstance(display_banner, basestring):
370 370 self.show_banner(display_banner)
371 371 elif display_banner:
372 372 self.show_banner()
373 373
374 374 more = False
375 375
376 376 # run a non-empty no-op, so that we don't get a prompt until
377 377 # we know the kernel is ready. This keeps the connection
378 378 # message above the first prompt.
379 379 if not self.wait_for_kernel(3):
380 380 error("Kernel did not respond\n")
381 381 return
382 382
383 383 if self.has_readline:
384 384 self.readline_startup_hook(self.pre_readline)
385 385 hlen_b4_cell = self.readline.get_current_history_length()
386 386 else:
387 387 hlen_b4_cell = 0
388 388 # exit_now is set by a call to %Exit or %Quit, through the
389 389 # ask_exit callback.
390 390
391 391 while not self.exit_now:
392 if not self.km.is_alive:
392 if not self.km.is_alive():
393 393 # kernel died, prompt for action or exit
394 394 action = "restart" if self.km.has_kernel else "wait for restart"
395 395 ans = self.ask_yes_no("kernel died, %s ([y]/n)?" % action, default='y')
396 396 if ans:
397 397 if self.km.has_kernel:
398 398 self.km.restart_kernel(True)
399 399 self.wait_for_kernel(3)
400 400 else:
401 401 self.exit_now = True
402 402 continue
403 403 try:
404 404 # protect prompt block from KeyboardInterrupt
405 405 # when sitting on ctrl-C
406 406 self.hooks.pre_prompt_hook()
407 407 if more:
408 408 try:
409 409 prompt = self.prompt_manager.render('in2')
410 410 except Exception:
411 411 self.showtraceback()
412 412 if self.autoindent:
413 413 self.rl_do_indent = True
414 414
415 415 else:
416 416 try:
417 417 prompt = self.separate_in + self.prompt_manager.render('in')
418 418 except Exception:
419 419 self.showtraceback()
420 420
421 421 line = self.raw_input(prompt)
422 422 if self.exit_now:
423 423 # quick exit on sys.std[in|out] close
424 424 break
425 425 if self.autoindent:
426 426 self.rl_do_indent = False
427 427
428 428 except KeyboardInterrupt:
429 429 #double-guard against keyboardinterrupts during kbdint handling
430 430 try:
431 431 self.write('\nKeyboardInterrupt\n')
432 432 source_raw = self.input_splitter.source_raw_reset()[1]
433 433 hlen_b4_cell = self._replace_rlhist_multiline(source_raw, hlen_b4_cell)
434 434 more = False
435 435 except KeyboardInterrupt:
436 436 pass
437 437 except EOFError:
438 438 if self.autoindent:
439 439 self.rl_do_indent = False
440 440 if self.has_readline:
441 441 self.readline_startup_hook(None)
442 442 self.write('\n')
443 443 self.exit()
444 444 except bdb.BdbQuit:
445 445 warn('The Python debugger has exited with a BdbQuit exception.\n'
446 446 'Because of how pdb handles the stack, it is impossible\n'
447 447 'for IPython to properly format this particular exception.\n'
448 448 'IPython will resume normal operation.')
449 449 except:
450 450 # exceptions here are VERY RARE, but they can be triggered
451 451 # asynchronously by signal handlers, for example.
452 452 self.showtraceback()
453 453 else:
454 454 self.input_splitter.push(line)
455 455 more = self.input_splitter.push_accepts_more()
456 456 if (self.SyntaxTB.last_syntax_error and
457 457 self.autoedit_syntax):
458 458 self.edit_syntax_error()
459 459 if not more:
460 460 source_raw = self.input_splitter.source_raw_reset()[1]
461 461 hlen_b4_cell = self._replace_rlhist_multiline(source_raw, hlen_b4_cell)
462 462 self.run_cell(source_raw)
463 463
464 464
465 465 # Turn off the exit flag, so the mainloop can be restarted if desired
466 466 self.exit_now = False
@@ -1,314 +1,313 b''
1 1 """ A kernel manager for in-process kernels. """
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Copyright (C) 2012 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-----------------------------------------------------------------------------
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Imports
12 12 #-----------------------------------------------------------------------------
13 13
14 14 # Local imports.
15 15 from IPython.config.configurable import Configurable
16 16 from IPython.utils.traitlets import Any, Instance, Type
17 17 from IPython.kernel.kernelmanagerabc import (
18 18 ShellChannelABC, IOPubChannelABC,
19 19 HBChannelABC, StdInChannelABC,
20 20 KernelManagerABC
21 21 )
22 22
23 23 from .socket import DummySocket
24 24
25 25 #-----------------------------------------------------------------------------
26 26 # Channel classes
27 27 #-----------------------------------------------------------------------------
28 28
29 29 class InProcessChannel(object):
30 30 """Base class for in-process channels."""
31 31
32 32 def __init__(self, manager):
33 33 super(InProcessChannel, self).__init__()
34 34 self.manager = manager
35 35 self._is_alive = False
36 36
37 37 #--------------------------------------------------------------------------
38 38 # Channel interface
39 39 #--------------------------------------------------------------------------
40 40
41 41 def is_alive(self):
42 42 return self._is_alive
43 43
44 44 def start(self):
45 45 self._is_alive = True
46 46
47 47 def stop(self):
48 48 self._is_alive = False
49 49
50 50 def call_handlers(self, msg):
51 51 """ This method is called in the main thread when a message arrives.
52 52
53 53 Subclasses should override this method to handle incoming messages.
54 54 """
55 55 raise NotImplementedError('call_handlers must be defined in a subclass.')
56 56
57 57 #--------------------------------------------------------------------------
58 58 # InProcessChannel interface
59 59 #--------------------------------------------------------------------------
60 60
61 61 def call_handlers_later(self, *args, **kwds):
62 62 """ Call the message handlers later.
63 63
64 64 The default implementation just calls the handlers immediately, but this
65 65 method exists so that GUI toolkits can defer calling the handlers until
66 66 after the event loop has run, as expected by GUI frontends.
67 67 """
68 68 self.call_handlers(*args, **kwds)
69 69
70 70 def process_events(self):
71 71 """ Process any pending GUI events.
72 72
73 73 This method will be never be called from a frontend without an event
74 74 loop (e.g., a terminal frontend).
75 75 """
76 76 raise NotImplementedError
77 77
78 78
79 79 class InProcessShellChannel(InProcessChannel):
80 80 """See `IPython.kernel.kernelmanager.ShellChannel` for docstrings."""
81 81
82 82 # flag for whether execute requests should be allowed to call raw_input
83 83 allow_stdin = True
84 84
85 85 #--------------------------------------------------------------------------
86 86 # ShellChannel interface
87 87 #--------------------------------------------------------------------------
88 88
89 89 def execute(self, code, silent=False, store_history=True,
90 90 user_variables=[], user_expressions={}, allow_stdin=None):
91 91 if allow_stdin is None:
92 92 allow_stdin = self.allow_stdin
93 93 content = dict(code=code, silent=silent, store_history=store_history,
94 94 user_variables=user_variables,
95 95 user_expressions=user_expressions,
96 96 allow_stdin=allow_stdin)
97 97 msg = self.manager.session.msg('execute_request', content)
98 98 self._dispatch_to_kernel(msg)
99 99 return msg['header']['msg_id']
100 100
101 101 def complete(self, text, line, cursor_pos, block=None):
102 102 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
103 103 msg = self.manager.session.msg('complete_request', content)
104 104 self._dispatch_to_kernel(msg)
105 105 return msg['header']['msg_id']
106 106
107 107 def object_info(self, oname, detail_level=0):
108 108 content = dict(oname=oname, detail_level=detail_level)
109 109 msg = self.manager.session.msg('object_info_request', content)
110 110 self._dispatch_to_kernel(msg)
111 111 return msg['header']['msg_id']
112 112
113 113 def history(self, raw=True, output=False, hist_access_type='range', **kwds):
114 114 content = dict(raw=raw, output=output,
115 115 hist_access_type=hist_access_type, **kwds)
116 116 msg = self.manager.session.msg('history_request', content)
117 117 self._dispatch_to_kernel(msg)
118 118 return msg['header']['msg_id']
119 119
120 120 def shutdown(self, restart=False):
121 121 # FIXME: What to do here?
122 122 raise NotImplementedError('Cannot shutdown in-process kernel')
123 123
124 124 #--------------------------------------------------------------------------
125 125 # Protected interface
126 126 #--------------------------------------------------------------------------
127 127
128 128 def _dispatch_to_kernel(self, msg):
129 129 """ Send a message to the kernel and handle a reply.
130 130 """
131 131 kernel = self.manager.kernel
132 132 if kernel is None:
133 133 raise RuntimeError('Cannot send request. No kernel exists.')
134 134
135 135 stream = DummySocket()
136 136 self.manager.session.send(stream, msg)
137 137 msg_parts = stream.recv_multipart()
138 138 kernel.dispatch_shell(stream, msg_parts)
139 139
140 140 idents, reply_msg = self.manager.session.recv(stream, copy=False)
141 141 self.call_handlers_later(reply_msg)
142 142
143 143
144 144 class InProcessIOPubChannel(InProcessChannel):
145 145 """See `IPython.kernel.kernelmanager.IOPubChannel` for docstrings."""
146 146
147 147 def flush(self, timeout=1.0):
148 148 pass
149 149
150 150
151 151 class InProcessStdInChannel(InProcessChannel):
152 152 """See `IPython.kernel.kernelmanager.StdInChannel` for docstrings."""
153 153
154 154 def input(self, string):
155 155 kernel = self.manager.kernel
156 156 if kernel is None:
157 157 raise RuntimeError('Cannot send input reply. No kernel exists.')
158 158 kernel.raw_input_str = string
159 159
160 160
161 161 class InProcessHBChannel(InProcessChannel):
162 162 """See `IPython.kernel.kernelmanager.HBChannel` for docstrings."""
163 163
164 164 time_to_dead = 3.0
165 165
166 166 def __init__(self, *args, **kwds):
167 167 super(InProcessHBChannel, self).__init__(*args, **kwds)
168 168 self._pause = True
169 169
170 170 def pause(self):
171 171 self._pause = True
172 172
173 173 def unpause(self):
174 174 self._pause = False
175 175
176 176 def is_beating(self):
177 177 return not self._pause
178 178
179 179
180 180 #-----------------------------------------------------------------------------
181 181 # Main kernel manager class
182 182 #-----------------------------------------------------------------------------
183 183
184 184 class InProcessKernelManager(Configurable):
185 185 """A manager for an in-process kernel.
186 186
187 187 This class implements the interface of
188 188 `IPython.kernel.kernelmanagerabc.KernelManagerABC` and allows
189 189 (asynchronous) frontends to be used seamlessly with an in-process kernel.
190 190
191 191 See `IPython.kernel.kernelmanager.KernelManager` for docstrings.
192 192 """
193 193
194 194 # The Session to use for building messages.
195 195 session = Instance('IPython.kernel.zmq.session.Session')
196 196 def _session_default(self):
197 197 from IPython.kernel.zmq.session import Session
198 198 return Session(config=self.config)
199 199
200 200 # The kernel process with which the KernelManager is communicating.
201 201 kernel = Instance('IPython.kernel.inprocess.ipkernel.InProcessKernel')
202 202
203 203 # The classes to use for the various channels.
204 204 shell_channel_class = Type(InProcessShellChannel)
205 205 iopub_channel_class = Type(InProcessIOPubChannel)
206 206 stdin_channel_class = Type(InProcessStdInChannel)
207 207 hb_channel_class = Type(InProcessHBChannel)
208 208
209 209 # Protected traits.
210 210 _shell_channel = Any
211 211 _iopub_channel = Any
212 212 _stdin_channel = Any
213 213 _hb_channel = Any
214 214
215 215 #--------------------------------------------------------------------------
216 216 # Channel management methods.
217 217 #--------------------------------------------------------------------------
218 218
219 219 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
220 220 if shell:
221 221 self.shell_channel.start()
222 222 if iopub:
223 223 self.iopub_channel.start()
224 224 if stdin:
225 225 self.stdin_channel.start()
226 226 self.shell_channel.allow_stdin = True
227 227 else:
228 228 self.shell_channel.allow_stdin = False
229 229 if hb:
230 230 self.hb_channel.start()
231 231
232 232 def stop_channels(self):
233 233 if self.shell_channel.is_alive():
234 234 self.shell_channel.stop()
235 235 if self.iopub_channel.is_alive():
236 236 self.iopub_channel.stop()
237 237 if self.stdin_channel.is_alive():
238 238 self.stdin_channel.stop()
239 239 if self.hb_channel.is_alive():
240 240 self.hb_channel.stop()
241 241
242 242 @property
243 243 def channels_running(self):
244 244 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
245 245 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
246 246
247 247 @property
248 248 def shell_channel(self):
249 249 if self._shell_channel is None:
250 250 self._shell_channel = self.shell_channel_class(self)
251 251 return self._shell_channel
252 252
253 253 @property
254 254 def iopub_channel(self):
255 255 if self._iopub_channel is None:
256 256 self._iopub_channel = self.iopub_channel_class(self)
257 257 return self._iopub_channel
258 258
259 259 @property
260 260 def stdin_channel(self):
261 261 if self._stdin_channel is None:
262 262 self._stdin_channel = self.stdin_channel_class(self)
263 263 return self._stdin_channel
264 264
265 265 @property
266 266 def hb_channel(self):
267 267 if self._hb_channel is None:
268 268 self._hb_channel = self.hb_channel_class(self)
269 269 return self._hb_channel
270 270
271 271 #--------------------------------------------------------------------------
272 272 # Kernel management methods:
273 273 #--------------------------------------------------------------------------
274 274
275 275 def start_kernel(self, **kwds):
276 276 from IPython.kernel.inprocess.ipkernel import InProcessKernel
277 277 self.kernel = InProcessKernel()
278 278 self.kernel.frontends.append(self)
279 279
280 280 def shutdown_kernel(self):
281 281 self._kill_kernel()
282 282
283 283 def restart_kernel(self, now=False, **kwds):
284 284 self.shutdown_kernel()
285 285 self.start_kernel(**kwds)
286 286
287 287 @property
288 288 def has_kernel(self):
289 289 return self.kernel is not None
290 290
291 291 def _kill_kernel(self):
292 292 self.kernel.frontends.remove(self)
293 293 self.kernel = None
294 294
295 295 def interrupt_kernel(self):
296 296 raise NotImplementedError("Cannot interrupt in-process kernel.")
297 297
298 298 def signal_kernel(self, signum):
299 299 raise NotImplementedError("Cannot signal in-process kernel.")
300 300
301 @property
302 301 def is_alive(self):
303 302 return True
304 303
305 304
306 305 #-----------------------------------------------------------------------------
307 306 # ABC Registration
308 307 #-----------------------------------------------------------------------------
309 308
310 309 ShellChannelABC.register(InProcessShellChannel)
311 310 IOPubChannelABC.register(InProcessIOPubChannel)
312 311 HBChannelABC.register(InProcessHBChannel)
313 312 StdInChannelABC.register(InProcessStdInChannel)
314 313 KernelManagerABC.register(InProcessKernelManager)
@@ -1,1130 +1,1129 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import absolute_import
19 19
20 20 # Standard library imports
21 21 import atexit
22 22 import errno
23 23 import json
24 24 from subprocess import Popen
25 25 import os
26 26 import signal
27 27 import sys
28 28 from threading import Thread
29 29 import time
30 30
31 31 # System library imports
32 32 import zmq
33 33 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
34 34 # during garbage collection of threads at exit:
35 35 from zmq import ZMQError
36 36 from zmq.eventloop import ioloop, zmqstream
37 37
38 38 # Local imports
39 39 from IPython.config.configurable import Configurable
40 40 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
41 41 from IPython.utils.traitlets import (
42 42 Any, Instance, Type, Unicode, List, Integer, Bool, CaselessStrEnum
43 43 )
44 44 from IPython.utils.py3compat import str_to_bytes
45 45 from IPython.kernel import (
46 46 write_connection_file,
47 47 make_ipkernel_cmd,
48 48 launch_kernel,
49 49 )
50 50 from .zmq.session import Session
51 51 from .kernelmanagerabc import (
52 52 ShellChannelABC, IOPubChannelABC,
53 53 HBChannelABC, StdInChannelABC,
54 54 KernelManagerABC
55 55 )
56 56
57 57
58 58 #-----------------------------------------------------------------------------
59 59 # Constants and exceptions
60 60 #-----------------------------------------------------------------------------
61 61
62 62 class InvalidPortNumber(Exception):
63 63 pass
64 64
65 65 #-----------------------------------------------------------------------------
66 66 # Utility functions
67 67 #-----------------------------------------------------------------------------
68 68
69 69 # some utilities to validate message structure, these might get moved elsewhere
70 70 # if they prove to have more generic utility
71 71
72 72 def validate_string_list(lst):
73 73 """Validate that the input is a list of strings.
74 74
75 75 Raises ValueError if not."""
76 76 if not isinstance(lst, list):
77 77 raise ValueError('input %r must be a list' % lst)
78 78 for x in lst:
79 79 if not isinstance(x, basestring):
80 80 raise ValueError('element %r in list must be a string' % x)
81 81
82 82
83 83 def validate_string_dict(dct):
84 84 """Validate that the input is a dict with string keys and values.
85 85
86 86 Raises ValueError if not."""
87 87 for k,v in dct.iteritems():
88 88 if not isinstance(k, basestring):
89 89 raise ValueError('key %r in dict must be a string' % k)
90 90 if not isinstance(v, basestring):
91 91 raise ValueError('value %r in dict must be a string' % v)
92 92
93 93
94 94 #-----------------------------------------------------------------------------
95 95 # ZMQ Socket Channel classes
96 96 #-----------------------------------------------------------------------------
97 97
98 98 class ZMQSocketChannel(Thread):
99 99 """The base class for the channels that use ZMQ sockets."""
100 100 context = None
101 101 session = None
102 102 socket = None
103 103 ioloop = None
104 104 stream = None
105 105 _address = None
106 106 _exiting = False
107 107
108 108 def __init__(self, context, session, address):
109 109 """Create a channel.
110 110
111 111 Parameters
112 112 ----------
113 113 context : :class:`zmq.Context`
114 114 The ZMQ context to use.
115 115 session : :class:`session.Session`
116 116 The session to use.
117 117 address : zmq url
118 118 Standard (ip, port) tuple that the kernel is listening on.
119 119 """
120 120 super(ZMQSocketChannel, self).__init__()
121 121 self.daemon = True
122 122
123 123 self.context = context
124 124 self.session = session
125 125 if isinstance(address, tuple):
126 126 if address[1] == 0:
127 127 message = 'The port number for a channel cannot be 0.'
128 128 raise InvalidPortNumber(message)
129 129 address = "tcp://%s:%i" % address
130 130 self._address = address
131 131 atexit.register(self._notice_exit)
132 132
133 133 def _notice_exit(self):
134 134 self._exiting = True
135 135
136 136 def _run_loop(self):
137 137 """Run my loop, ignoring EINTR events in the poller"""
138 138 while True:
139 139 try:
140 140 self.ioloop.start()
141 141 except ZMQError as e:
142 142 if e.errno == errno.EINTR:
143 143 continue
144 144 else:
145 145 raise
146 146 except Exception:
147 147 if self._exiting:
148 148 break
149 149 else:
150 150 raise
151 151 else:
152 152 break
153 153
154 154 def stop(self):
155 155 """Stop the channel's event loop and join its thread.
156 156
157 157 This calls :method:`Thread.join` and returns when the thread
158 158 terminates. :class:`RuntimeError` will be raised if
159 159 :method:`self.start` is called again.
160 160 """
161 161 self.join()
162 162
163 163 @property
164 164 def address(self):
165 165 """Get the channel's address as a zmq url string.
166 166
167 167 These URLS have the form: 'tcp://127.0.0.1:5555'.
168 168 """
169 169 return self._address
170 170
171 171 def _queue_send(self, msg):
172 172 """Queue a message to be sent from the IOLoop's thread.
173 173
174 174 Parameters
175 175 ----------
176 176 msg : message to send
177 177
178 178 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
179 179 thread control of the action.
180 180 """
181 181 def thread_send():
182 182 self.session.send(self.stream, msg)
183 183 self.ioloop.add_callback(thread_send)
184 184
185 185 def _handle_recv(self, msg):
186 186 """Callback for stream.on_recv.
187 187
188 188 Unpacks message, and calls handlers with it.
189 189 """
190 190 ident,smsg = self.session.feed_identities(msg)
191 191 self.call_handlers(self.session.unserialize(smsg))
192 192
193 193
194 194
195 195 class ShellChannel(ZMQSocketChannel):
196 196 """The shell channel for issuing request/replies to the kernel."""
197 197
198 198 command_queue = None
199 199 # flag for whether execute requests should be allowed to call raw_input:
200 200 allow_stdin = True
201 201
202 202 def __init__(self, context, session, address):
203 203 super(ShellChannel, self).__init__(context, session, address)
204 204 self.ioloop = ioloop.IOLoop()
205 205
206 206 def run(self):
207 207 """The thread's main activity. Call start() instead."""
208 208 self.socket = self.context.socket(zmq.DEALER)
209 209 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
210 210 self.socket.connect(self.address)
211 211 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
212 212 self.stream.on_recv(self._handle_recv)
213 213 self._run_loop()
214 214 try:
215 215 self.socket.close()
216 216 except:
217 217 pass
218 218
219 219 def stop(self):
220 220 """Stop the channel's event loop and join its thread."""
221 221 self.ioloop.stop()
222 222 super(ShellChannel, self).stop()
223 223
224 224 def call_handlers(self, msg):
225 225 """This method is called in the ioloop thread when a message arrives.
226 226
227 227 Subclasses should override this method to handle incoming messages.
228 228 It is important to remember that this method is called in the thread
229 229 so that some logic must be done to ensure that the application leve
230 230 handlers are called in the application thread.
231 231 """
232 232 raise NotImplementedError('call_handlers must be defined in a subclass.')
233 233
234 234 def execute(self, code, silent=False, store_history=True,
235 235 user_variables=None, user_expressions=None, allow_stdin=None):
236 236 """Execute code in the kernel.
237 237
238 238 Parameters
239 239 ----------
240 240 code : str
241 241 A string of Python code.
242 242
243 243 silent : bool, optional (default False)
244 244 If set, the kernel will execute the code as quietly possible, and
245 245 will force store_history to be False.
246 246
247 247 store_history : bool, optional (default True)
248 248 If set, the kernel will store command history. This is forced
249 249 to be False if silent is True.
250 250
251 251 user_variables : list, optional
252 252 A list of variable names to pull from the user's namespace. They
253 253 will come back as a dict with these names as keys and their
254 254 :func:`repr` as values.
255 255
256 256 user_expressions : dict, optional
257 257 A dict mapping names to expressions to be evaluated in the user's
258 258 dict. The expression values are returned as strings formatted using
259 259 :func:`repr`.
260 260
261 261 allow_stdin : bool, optional (default self.allow_stdin)
262 262 Flag for whether the kernel can send stdin requests to frontends.
263 263
264 264 Some frontends (e.g. the Notebook) do not support stdin requests.
265 265 If raw_input is called from code executed from such a frontend, a
266 266 StdinNotImplementedError will be raised.
267 267
268 268 Returns
269 269 -------
270 270 The msg_id of the message sent.
271 271 """
272 272 if user_variables is None:
273 273 user_variables = []
274 274 if user_expressions is None:
275 275 user_expressions = {}
276 276 if allow_stdin is None:
277 277 allow_stdin = self.allow_stdin
278 278
279 279
280 280 # Don't waste network traffic if inputs are invalid
281 281 if not isinstance(code, basestring):
282 282 raise ValueError('code %r must be a string' % code)
283 283 validate_string_list(user_variables)
284 284 validate_string_dict(user_expressions)
285 285
286 286 # Create class for content/msg creation. Related to, but possibly
287 287 # not in Session.
288 288 content = dict(code=code, silent=silent, store_history=store_history,
289 289 user_variables=user_variables,
290 290 user_expressions=user_expressions,
291 291 allow_stdin=allow_stdin,
292 292 )
293 293 msg = self.session.msg('execute_request', content)
294 294 self._queue_send(msg)
295 295 return msg['header']['msg_id']
296 296
297 297 def complete(self, text, line, cursor_pos, block=None):
298 298 """Tab complete text in the kernel's namespace.
299 299
300 300 Parameters
301 301 ----------
302 302 text : str
303 303 The text to complete.
304 304 line : str
305 305 The full line of text that is the surrounding context for the
306 306 text to complete.
307 307 cursor_pos : int
308 308 The position of the cursor in the line where the completion was
309 309 requested.
310 310 block : str, optional
311 311 The full block of code in which the completion is being requested.
312 312
313 313 Returns
314 314 -------
315 315 The msg_id of the message sent.
316 316 """
317 317 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
318 318 msg = self.session.msg('complete_request', content)
319 319 self._queue_send(msg)
320 320 return msg['header']['msg_id']
321 321
322 322 def object_info(self, oname, detail_level=0):
323 323 """Get metadata information about an object in the kernel's namespace.
324 324
325 325 Parameters
326 326 ----------
327 327 oname : str
328 328 A string specifying the object name.
329 329 detail_level : int, optional
330 330 The level of detail for the introspection (0-2)
331 331
332 332 Returns
333 333 -------
334 334 The msg_id of the message sent.
335 335 """
336 336 content = dict(oname=oname, detail_level=detail_level)
337 337 msg = self.session.msg('object_info_request', content)
338 338 self._queue_send(msg)
339 339 return msg['header']['msg_id']
340 340
341 341 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
342 342 """Get entries from the kernel's history list.
343 343
344 344 Parameters
345 345 ----------
346 346 raw : bool
347 347 If True, return the raw input.
348 348 output : bool
349 349 If True, then return the output as well.
350 350 hist_access_type : str
351 351 'range' (fill in session, start and stop params), 'tail' (fill in n)
352 352 or 'search' (fill in pattern param).
353 353
354 354 session : int
355 355 For a range request, the session from which to get lines. Session
356 356 numbers are positive integers; negative ones count back from the
357 357 current session.
358 358 start : int
359 359 The first line number of a history range.
360 360 stop : int
361 361 The final (excluded) line number of a history range.
362 362
363 363 n : int
364 364 The number of lines of history to get for a tail request.
365 365
366 366 pattern : str
367 367 The glob-syntax pattern for a search request.
368 368
369 369 Returns
370 370 -------
371 371 The msg_id of the message sent.
372 372 """
373 373 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
374 374 **kwargs)
375 375 msg = self.session.msg('history_request', content)
376 376 self._queue_send(msg)
377 377 return msg['header']['msg_id']
378 378
379 379 def kernel_info(self):
380 380 """Request kernel info."""
381 381 msg = self.session.msg('kernel_info_request')
382 382 self._queue_send(msg)
383 383 return msg['header']['msg_id']
384 384
385 385 def shutdown(self, restart=False):
386 386 """Request an immediate kernel shutdown.
387 387
388 388 Upon receipt of the (empty) reply, client code can safely assume that
389 389 the kernel has shut down and it's safe to forcefully terminate it if
390 390 it's still alive.
391 391
392 392 The kernel will send the reply via a function registered with Python's
393 393 atexit module, ensuring it's truly done as the kernel is done with all
394 394 normal operation.
395 395 """
396 396 # Send quit message to kernel. Once we implement kernel-side setattr,
397 397 # this should probably be done that way, but for now this will do.
398 398 msg = self.session.msg('shutdown_request', {'restart':restart})
399 399 self._queue_send(msg)
400 400 return msg['header']['msg_id']
401 401
402 402
403 403
404 404 class IOPubChannel(ZMQSocketChannel):
405 405 """The iopub channel which listens for messages that the kernel publishes.
406 406
407 407 This channel is where all output is published to frontends.
408 408 """
409 409
410 410 def __init__(self, context, session, address):
411 411 super(IOPubChannel, self).__init__(context, session, address)
412 412 self.ioloop = ioloop.IOLoop()
413 413
414 414 def run(self):
415 415 """The thread's main activity. Call start() instead."""
416 416 self.socket = self.context.socket(zmq.SUB)
417 417 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
418 418 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
419 419 self.socket.connect(self.address)
420 420 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
421 421 self.stream.on_recv(self._handle_recv)
422 422 self._run_loop()
423 423 try:
424 424 self.socket.close()
425 425 except:
426 426 pass
427 427
428 428 def stop(self):
429 429 """Stop the channel's event loop and join its thread."""
430 430 self.ioloop.stop()
431 431 super(IOPubChannel, self).stop()
432 432
433 433 def call_handlers(self, msg):
434 434 """This method is called in the ioloop thread when a message arrives.
435 435
436 436 Subclasses should override this method to handle incoming messages.
437 437 It is important to remember that this method is called in the thread
438 438 so that some logic must be done to ensure that the application leve
439 439 handlers are called in the application thread.
440 440 """
441 441 raise NotImplementedError('call_handlers must be defined in a subclass.')
442 442
443 443 def flush(self, timeout=1.0):
444 444 """Immediately processes all pending messages on the iopub channel.
445 445
446 446 Callers should use this method to ensure that :method:`call_handlers`
447 447 has been called for all messages that have been received on the
448 448 0MQ SUB socket of this channel.
449 449
450 450 This method is thread safe.
451 451
452 452 Parameters
453 453 ----------
454 454 timeout : float, optional
455 455 The maximum amount of time to spend flushing, in seconds. The
456 456 default is one second.
457 457 """
458 458 # We do the IOLoop callback process twice to ensure that the IOLoop
459 459 # gets to perform at least one full poll.
460 460 stop_time = time.time() + timeout
461 461 for i in xrange(2):
462 462 self._flushed = False
463 463 self.ioloop.add_callback(self._flush)
464 464 while not self._flushed and time.time() < stop_time:
465 465 time.sleep(0.01)
466 466
467 467 def _flush(self):
468 468 """Callback for :method:`self.flush`."""
469 469 self.stream.flush()
470 470 self._flushed = True
471 471
472 472
473 473 class StdInChannel(ZMQSocketChannel):
474 474 """The stdin channel to handle raw_input requests that the kernel makes."""
475 475
476 476 msg_queue = None
477 477
478 478 def __init__(self, context, session, address):
479 479 super(StdInChannel, self).__init__(context, session, address)
480 480 self.ioloop = ioloop.IOLoop()
481 481
482 482 def run(self):
483 483 """The thread's main activity. Call start() instead."""
484 484 self.socket = self.context.socket(zmq.DEALER)
485 485 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
486 486 self.socket.connect(self.address)
487 487 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
488 488 self.stream.on_recv(self._handle_recv)
489 489 self._run_loop()
490 490 try:
491 491 self.socket.close()
492 492 except:
493 493 pass
494 494
495 495 def stop(self):
496 496 """Stop the channel's event loop and join its thread."""
497 497 self.ioloop.stop()
498 498 super(StdInChannel, self).stop()
499 499
500 500 def call_handlers(self, msg):
501 501 """This method is called in the ioloop thread when a message arrives.
502 502
503 503 Subclasses should override this method to handle incoming messages.
504 504 It is important to remember that this method is called in the thread
505 505 so that some logic must be done to ensure that the application leve
506 506 handlers are called in the application thread.
507 507 """
508 508 raise NotImplementedError('call_handlers must be defined in a subclass.')
509 509
510 510 def input(self, string):
511 511 """Send a string of raw input to the kernel."""
512 512 content = dict(value=string)
513 513 msg = self.session.msg('input_reply', content)
514 514 self._queue_send(msg)
515 515
516 516
517 517 class HBChannel(ZMQSocketChannel):
518 518 """The heartbeat channel which monitors the kernel heartbeat.
519 519
520 520 Note that the heartbeat channel is paused by default. As long as you start
521 521 this channel, the kernel manager will ensure that it is paused and un-paused
522 522 as appropriate.
523 523 """
524 524
525 525 time_to_dead = 3.0
526 526 socket = None
527 527 poller = None
528 528 _running = None
529 529 _pause = None
530 530 _beating = None
531 531
532 532 def __init__(self, context, session, address):
533 533 super(HBChannel, self).__init__(context, session, address)
534 534 self._running = False
535 535 self._pause =True
536 536 self.poller = zmq.Poller()
537 537
538 538 def _create_socket(self):
539 539 if self.socket is not None:
540 540 # close previous socket, before opening a new one
541 541 self.poller.unregister(self.socket)
542 542 self.socket.close()
543 543 self.socket = self.context.socket(zmq.REQ)
544 544 self.socket.setsockopt(zmq.LINGER, 0)
545 545 self.socket.connect(self.address)
546 546
547 547 self.poller.register(self.socket, zmq.POLLIN)
548 548
549 549 def _poll(self, start_time):
550 550 """poll for heartbeat replies until we reach self.time_to_dead.
551 551
552 552 Ignores interrupts, and returns the result of poll(), which
553 553 will be an empty list if no messages arrived before the timeout,
554 554 or the event tuple if there is a message to receive.
555 555 """
556 556
557 557 until_dead = self.time_to_dead - (time.time() - start_time)
558 558 # ensure poll at least once
559 559 until_dead = max(until_dead, 1e-3)
560 560 events = []
561 561 while True:
562 562 try:
563 563 events = self.poller.poll(1000 * until_dead)
564 564 except ZMQError as e:
565 565 if e.errno == errno.EINTR:
566 566 # ignore interrupts during heartbeat
567 567 # this may never actually happen
568 568 until_dead = self.time_to_dead - (time.time() - start_time)
569 569 until_dead = max(until_dead, 1e-3)
570 570 pass
571 571 else:
572 572 raise
573 573 except Exception:
574 574 if self._exiting:
575 575 break
576 576 else:
577 577 raise
578 578 else:
579 579 break
580 580 return events
581 581
582 582 def run(self):
583 583 """The thread's main activity. Call start() instead."""
584 584 self._create_socket()
585 585 self._running = True
586 586 self._beating = True
587 587
588 588 while self._running:
589 589 if self._pause:
590 590 # just sleep, and skip the rest of the loop
591 591 time.sleep(self.time_to_dead)
592 592 continue
593 593
594 594 since_last_heartbeat = 0.0
595 595 # io.rprint('Ping from HB channel') # dbg
596 596 # no need to catch EFSM here, because the previous event was
597 597 # either a recv or connect, which cannot be followed by EFSM
598 598 self.socket.send(b'ping')
599 599 request_time = time.time()
600 600 ready = self._poll(request_time)
601 601 if ready:
602 602 self._beating = True
603 603 # the poll above guarantees we have something to recv
604 604 self.socket.recv()
605 605 # sleep the remainder of the cycle
606 606 remainder = self.time_to_dead - (time.time() - request_time)
607 607 if remainder > 0:
608 608 time.sleep(remainder)
609 609 continue
610 610 else:
611 611 # nothing was received within the time limit, signal heart failure
612 612 self._beating = False
613 613 since_last_heartbeat = time.time() - request_time
614 614 self.call_handlers(since_last_heartbeat)
615 615 # and close/reopen the socket, because the REQ/REP cycle has been broken
616 616 self._create_socket()
617 617 continue
618 618 try:
619 619 self.socket.close()
620 620 except:
621 621 pass
622 622
623 623 def pause(self):
624 624 """Pause the heartbeat."""
625 625 self._pause = True
626 626
627 627 def unpause(self):
628 628 """Unpause the heartbeat."""
629 629 self._pause = False
630 630
631 631 def is_beating(self):
632 632 """Is the heartbeat running and responsive (and not paused)."""
633 633 if self.is_alive() and not self._pause and self._beating:
634 634 return True
635 635 else:
636 636 return False
637 637
638 638 def stop(self):
639 639 """Stop the channel's event loop and join its thread."""
640 640 self._running = False
641 641 super(HBChannel, self).stop()
642 642
643 643 def call_handlers(self, since_last_heartbeat):
644 644 """This method is called in the ioloop thread when a message arrives.
645 645
646 646 Subclasses should override this method to handle incoming messages.
647 647 It is important to remember that this method is called in the thread
648 648 so that some logic must be done to ensure that the application level
649 649 handlers are called in the application thread.
650 650 """
651 651 raise NotImplementedError('call_handlers must be defined in a subclass.')
652 652
653 653
654 654 #-----------------------------------------------------------------------------
655 655 # Main kernel manager class
656 656 #-----------------------------------------------------------------------------
657 657
658 658 class KernelManager(Configurable):
659 659 """Manages a single kernel on this host along with its channels.
660 660
661 661 There are four channels associated with each kernel:
662 662
663 663 * shell: for request/reply calls to the kernel.
664 664 * iopub: for the kernel to publish results to frontends.
665 665 * hb: for monitoring the kernel's heartbeat.
666 666 * stdin: for frontends to reply to raw_input calls in the kernel.
667 667
668 668 The usage of the channels that this class manages is optional. It is
669 669 entirely possible to connect to the kernels directly using ZeroMQ
670 670 sockets. These channels are useful primarily for talking to a kernel
671 671 whose :class:`KernelManager` is in the same process.
672 672
673 673 This version manages kernels started using Popen.
674 674 """
675 675 # The PyZMQ Context to use for communication with the kernel.
676 676 context = Instance(zmq.Context)
677 677 def _context_default(self):
678 678 return zmq.Context.instance()
679 679
680 680 # The Session to use for communication with the kernel.
681 681 session = Instance(Session)
682 682 def _session_default(self):
683 683 return Session(config=self.config)
684 684
685 685 # The kernel process with which the KernelManager is communicating.
686 686 # generally a Popen instance
687 687 kernel = Any()
688 688
689 689 kernel_cmd = List(Unicode, config=True,
690 690 help="""The Popen Command to launch the kernel.
691 691 Override this if you have a custom
692 692 """
693 693 )
694 694 def _kernel_cmd_changed(self, name, old, new):
695 695 self.ipython_kernel = False
696 696
697 697 ipython_kernel = Bool(True)
698 698
699 699
700 700 # The addresses for the communication channels.
701 701 connection_file = Unicode('')
702 702
703 703 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
704 704
705 705 ip = Unicode(LOCALHOST, config=True,
706 706 help="""Set the kernel\'s IP address [default localhost].
707 707 If the IP address is something other than localhost, then
708 708 Consoles on other machines will be able to connect
709 709 to the Kernel, so be careful!"""
710 710 )
711 711 def _ip_default(self):
712 712 if self.transport == 'ipc':
713 713 if self.connection_file:
714 714 return os.path.splitext(self.connection_file)[0] + '-ipc'
715 715 else:
716 716 return 'kernel-ipc'
717 717 else:
718 718 return LOCALHOST
719 719 def _ip_changed(self, name, old, new):
720 720 if new == '*':
721 721 self.ip = '0.0.0.0'
722 722 shell_port = Integer(0)
723 723 iopub_port = Integer(0)
724 724 stdin_port = Integer(0)
725 725 hb_port = Integer(0)
726 726
727 727 # The classes to use for the various channels.
728 728 shell_channel_class = Type(ShellChannel)
729 729 iopub_channel_class = Type(IOPubChannel)
730 730 stdin_channel_class = Type(StdInChannel)
731 731 hb_channel_class = Type(HBChannel)
732 732
733 733 # Protected traits.
734 734 _launch_args = Any
735 735 _shell_channel = Any
736 736 _iopub_channel = Any
737 737 _stdin_channel = Any
738 738 _hb_channel = Any
739 739 _connection_file_written=Bool(False)
740 740
741 741 def __del__(self):
742 742 self.cleanup_connection_file()
743 743
744 744 #--------------------------------------------------------------------------
745 745 # Channel management methods:
746 746 #--------------------------------------------------------------------------
747 747
748 748 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
749 749 """Starts the channels for this kernel.
750 750
751 751 This will create the channels if they do not exist and then start
752 752 them (their activity runs in a thread). If port numbers of 0 are
753 753 being used (random ports) then you must first call
754 754 :method:`start_kernel`. If the channels have been stopped and you
755 755 call this, :class:`RuntimeError` will be raised.
756 756 """
757 757 if shell:
758 758 self.shell_channel.start()
759 759 if iopub:
760 760 self.iopub_channel.start()
761 761 if stdin:
762 762 self.stdin_channel.start()
763 763 self.shell_channel.allow_stdin = True
764 764 else:
765 765 self.shell_channel.allow_stdin = False
766 766 if hb:
767 767 self.hb_channel.start()
768 768
769 769 def stop_channels(self):
770 770 """Stops all the running channels for this kernel.
771 771
772 772 This stops their event loops and joins their threads.
773 773 """
774 774 if self.shell_channel.is_alive():
775 775 self.shell_channel.stop()
776 776 if self.iopub_channel.is_alive():
777 777 self.iopub_channel.stop()
778 778 if self.stdin_channel.is_alive():
779 779 self.stdin_channel.stop()
780 780 if self.hb_channel.is_alive():
781 781 self.hb_channel.stop()
782 782
783 783 @property
784 784 def channels_running(self):
785 785 """Are any of the channels created and running?"""
786 786 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
787 787 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
788 788
789 789 def _make_url(self, port):
790 790 """Make a zmq url with a port.
791 791
792 792 There are two cases that this handles:
793 793
794 794 * tcp: tcp://ip:port
795 795 * ipc: ipc://ip-port
796 796 """
797 797 if self.transport == 'tcp':
798 798 return "tcp://%s:%i" % (self.ip, port)
799 799 else:
800 800 return "%s://%s-%s" % (self.transport, self.ip, port)
801 801
802 802 @property
803 803 def shell_channel(self):
804 804 """Get the shell channel object for this kernel."""
805 805 if self._shell_channel is None:
806 806 self._shell_channel = self.shell_channel_class(
807 807 self.context, self.session, self._make_url(self.shell_port)
808 808 )
809 809 return self._shell_channel
810 810
811 811 @property
812 812 def iopub_channel(self):
813 813 """Get the iopub channel object for this kernel."""
814 814 if self._iopub_channel is None:
815 815 self._iopub_channel = self.iopub_channel_class(
816 816 self.context, self.session, self._make_url(self.iopub_port)
817 817 )
818 818 return self._iopub_channel
819 819
820 820 @property
821 821 def stdin_channel(self):
822 822 """Get the stdin channel object for this kernel."""
823 823 if self._stdin_channel is None:
824 824 self._stdin_channel = self.stdin_channel_class(
825 825 self.context, self.session, self._make_url(self.stdin_port)
826 826 )
827 827 return self._stdin_channel
828 828
829 829 @property
830 830 def hb_channel(self):
831 831 """Get the hb channel object for this kernel."""
832 832 if self._hb_channel is None:
833 833 self._hb_channel = self.hb_channel_class(
834 834 self.context, self.session, self._make_url(self.hb_port)
835 835 )
836 836 return self._hb_channel
837 837
838 838 #--------------------------------------------------------------------------
839 839 # Connection and ipc file management
840 840 #--------------------------------------------------------------------------
841 841
842 842 def cleanup_connection_file(self):
843 843 """Cleanup connection file *if we wrote it*
844 844
845 845 Will not raise if the connection file was already removed somehow.
846 846 """
847 847 if self._connection_file_written:
848 848 # cleanup connection files on full shutdown of kernel we started
849 849 self._connection_file_written = False
850 850 try:
851 851 os.remove(self.connection_file)
852 852 except (IOError, OSError, AttributeError):
853 853 pass
854 854
855 855 def cleanup_ipc_files(self):
856 856 """Cleanup ipc files if we wrote them."""
857 857 if self.transport != 'ipc':
858 858 return
859 859 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
860 860 ipcfile = "%s-%i" % (self.ip, port)
861 861 try:
862 862 os.remove(ipcfile)
863 863 except (IOError, OSError):
864 864 pass
865 865
866 866 def load_connection_file(self):
867 867 """Load connection info from JSON dict in self.connection_file."""
868 868 with open(self.connection_file) as f:
869 869 cfg = json.loads(f.read())
870 870
871 871 from pprint import pprint
872 872 pprint(cfg)
873 873 self.transport = cfg.get('transport', 'tcp')
874 874 self.ip = cfg['ip']
875 875 self.shell_port = cfg['shell_port']
876 876 self.stdin_port = cfg['stdin_port']
877 877 self.iopub_port = cfg['iopub_port']
878 878 self.hb_port = cfg['hb_port']
879 879 self.session.key = str_to_bytes(cfg['key'])
880 880
881 881 def write_connection_file(self):
882 882 """Write connection info to JSON dict in self.connection_file."""
883 883 if self._connection_file_written:
884 884 return
885 885 self.connection_file,cfg = write_connection_file(self.connection_file,
886 886 transport=self.transport, ip=self.ip, key=self.session.key,
887 887 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
888 888 shell_port=self.shell_port, hb_port=self.hb_port)
889 889 # write_connection_file also sets default ports:
890 890 self.shell_port = cfg['shell_port']
891 891 self.stdin_port = cfg['stdin_port']
892 892 self.iopub_port = cfg['iopub_port']
893 893 self.hb_port = cfg['hb_port']
894 894
895 895 self._connection_file_written = True
896 896
897 897 #--------------------------------------------------------------------------
898 898 # Kernel management
899 899 #--------------------------------------------------------------------------
900 900
901 901 def format_kernel_cmd(self, **kw):
902 902 """format templated args (e.g. {connection_file})"""
903 903 if self.kernel_cmd:
904 904 cmd = self.kernel_cmd
905 905 else:
906 906 cmd = make_ipkernel_cmd(
907 907 'from IPython.kernel.zmq.kernelapp import main; main()',
908 908 **kw
909 909 )
910 910 ns = dict(connection_file=self.connection_file)
911 911 ns.update(self._launch_args)
912 912 return [ c.format(**ns) for c in cmd ]
913 913
914 914 def _launch_kernel(self, kernel_cmd, **kw):
915 915 """actually launch the kernel
916 916
917 917 override in a subclass to launch kernel subprocesses differently
918 918 """
919 919 return launch_kernel(kernel_cmd, **kw)
920 920
921 921 def start_kernel(self, **kw):
922 922 """Starts a kernel on this host in a separate process.
923 923
924 924 If random ports (port=0) are being used, this method must be called
925 925 before the channels are created.
926 926
927 927 Parameters:
928 928 -----------
929 929 **kw : optional
930 930 keyword arguments that are passed down to build the kernel_cmd
931 931 and launching the kernel (e.g. Popen kwargs).
932 932 """
933 933 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
934 934 raise RuntimeError("Can only launch a kernel on a local interface. "
935 935 "Make sure that the '*_address' attributes are "
936 936 "configured properly. "
937 937 "Currently valid addresses are: %s"%LOCAL_IPS
938 938 )
939 939
940 940 # write connection file / get default ports
941 941 self.write_connection_file()
942 942
943 943 # save kwargs for use in restart
944 944 self._launch_args = kw.copy()
945 945 # build the Popen cmd
946 946 kernel_cmd = self.format_kernel_cmd(**kw)
947 947 # launch the kernel subprocess
948 948 self.kernel = self._launch_kernel(kernel_cmd,
949 949 ipython_kernel=self.ipython_kernel,
950 950 **kw)
951 951
952 952 def shutdown_kernel(self, now=False, restart=False):
953 953 """Attempts to the stop the kernel process cleanly.
954 954
955 955 This attempts to shutdown the kernels cleanly by:
956 956
957 957 1. Sending it a shutdown message over the shell channel.
958 958 2. If that fails, the kernel is shutdown forcibly by sending it
959 959 a signal.
960 960
961 961 Parameters:
962 962 -----------
963 963 now : bool
964 964 Should the kernel be forcible killed *now*. This skips the
965 965 first, nice shutdown attempt.
966 966 restart: bool
967 967 Will this kernel be restarted after it is shutdown. When this
968 968 is True, connection files will not be cleaned up.
969 969 """
970 970 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
971 971 if sys.platform == 'win32':
972 972 self._kill_kernel()
973 973 return
974 974
975 975 # Pause the heart beat channel if it exists.
976 976 if self._hb_channel is not None:
977 977 self._hb_channel.pause()
978 978
979 979 if now:
980 980 if self.has_kernel:
981 981 self._kill_kernel()
982 982 else:
983 983 # Don't send any additional kernel kill messages immediately, to give
984 984 # the kernel a chance to properly execute shutdown actions. Wait for at
985 985 # most 1s, checking every 0.1s.
986 986 self.shell_channel.shutdown(restart=restart)
987 987 for i in range(10):
988 if self.is_alive:
988 if self.is_alive():
989 989 time.sleep(0.1)
990 990 else:
991 991 break
992 992 else:
993 993 # OK, we've waited long enough.
994 994 if self.has_kernel:
995 995 self._kill_kernel()
996 996
997 997 if not restart:
998 998 self.cleanup_connection_file()
999 999 self.cleanup_ipc_files()
1000 1000 else:
1001 1001 self.cleanup_ipc_files()
1002 1002
1003 1003 def restart_kernel(self, now=False, **kw):
1004 1004 """Restarts a kernel with the arguments that were used to launch it.
1005 1005
1006 1006 If the old kernel was launched with random ports, the same ports will be
1007 1007 used for the new kernel. The same connection file is used again.
1008 1008
1009 1009 Parameters
1010 1010 ----------
1011 1011 now : bool, optional
1012 1012 If True, the kernel is forcefully restarted *immediately*, without
1013 1013 having a chance to do any cleanup action. Otherwise the kernel is
1014 1014 given 1s to clean up before a forceful restart is issued.
1015 1015
1016 1016 In all cases the kernel is restarted, the only difference is whether
1017 1017 it is given a chance to perform a clean shutdown or not.
1018 1018
1019 1019 **kw : optional
1020 1020 Any options specified here will overwrite those used to launch the
1021 1021 kernel.
1022 1022 """
1023 1023 if self._launch_args is None:
1024 1024 raise RuntimeError("Cannot restart the kernel. "
1025 1025 "No previous call to 'start_kernel'.")
1026 1026 else:
1027 1027 # Stop currently running kernel.
1028 1028 self.shutdown_kernel(now=now, restart=True)
1029 1029
1030 1030 # Start new kernel.
1031 1031 self._launch_args.update(kw)
1032 1032 self.start_kernel(**self._launch_args)
1033 1033
1034 1034 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
1035 1035 # unless there is some delay here.
1036 1036 if sys.platform == 'win32':
1037 1037 time.sleep(0.2)
1038 1038
1039 1039 @property
1040 1040 def has_kernel(self):
1041 1041 """Has a kernel been started that we are managing."""
1042 1042 return self.kernel is not None
1043 1043
1044 1044 def _kill_kernel(self):
1045 1045 """Kill the running kernel.
1046 1046
1047 1047 This is a private method, callers should use shutdown_kernel(now=True).
1048 1048 """
1049 1049 if self.has_kernel:
1050 1050 # Pause the heart beat channel if it exists.
1051 1051 if self._hb_channel is not None:
1052 1052 self._hb_channel.pause()
1053 1053
1054 1054 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1055 1055 # TerminateProcess() on Win32).
1056 1056 try:
1057 1057 self.kernel.kill()
1058 1058 except OSError as e:
1059 1059 # In Windows, we will get an Access Denied error if the process
1060 1060 # has already terminated. Ignore it.
1061 1061 if sys.platform == 'win32':
1062 1062 if e.winerror != 5:
1063 1063 raise
1064 1064 # On Unix, we may get an ESRCH error if the process has already
1065 1065 # terminated. Ignore it.
1066 1066 else:
1067 1067 from errno import ESRCH
1068 1068 if e.errno != ESRCH:
1069 1069 raise
1070 1070
1071 1071 # Block until the kernel terminates.
1072 1072 self.kernel.wait()
1073 1073 self.kernel = None
1074 1074 else:
1075 1075 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1076 1076
1077 1077 def interrupt_kernel(self):
1078 1078 """Interrupts the kernel by sending it a signal.
1079 1079
1080 1080 Unlike ``signal_kernel``, this operation is well supported on all
1081 1081 platforms.
1082 1082 """
1083 1083 if self.has_kernel:
1084 1084 if sys.platform == 'win32':
1085 1085 from .zmq.parentpoller import ParentPollerWindows as Poller
1086 1086 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1087 1087 else:
1088 1088 self.kernel.send_signal(signal.SIGINT)
1089 1089 else:
1090 1090 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1091 1091
1092 1092 def signal_kernel(self, signum):
1093 1093 """Sends a signal to the kernel.
1094 1094
1095 1095 Note that since only SIGTERM is supported on Windows, this function is
1096 1096 only useful on Unix systems.
1097 1097 """
1098 1098 if self.has_kernel:
1099 1099 self.kernel.send_signal(signum)
1100 1100 else:
1101 1101 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1102 1102
1103 @property
1104 1103 def is_alive(self):
1105 1104 """Is the kernel process still running?"""
1106 1105 if self.has_kernel:
1107 1106 if self.kernel.poll() is None:
1108 1107 return True
1109 1108 else:
1110 1109 return False
1111 1110 elif self._hb_channel is not None:
1112 1111 # We didn't start the kernel with this KernelManager so we
1113 1112 # use the heartbeat.
1114 1113 return self._hb_channel.is_beating()
1115 1114 else:
1116 1115 # no heartbeat and not local, we can't tell if it's running,
1117 1116 # so naively return True
1118 1117 return True
1119 1118
1120 1119
1121 1120 #-----------------------------------------------------------------------------
1122 1121 # ABC Registration
1123 1122 #-----------------------------------------------------------------------------
1124 1123
1125 1124 ShellChannelABC.register(ShellChannel)
1126 1125 IOPubChannelABC.register(IOPubChannel)
1127 1126 HBChannelABC.register(HBChannel)
1128 1127 StdInChannelABC.register(StdInChannel)
1129 1128 KernelManagerABC.register(KernelManager)
1130 1129
@@ -1,226 +1,226 b''
1 1 """Abstract base classes for kernel manager and channels."""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Copyright (C) 2013 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-----------------------------------------------------------------------------
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Imports
12 12 #-----------------------------------------------------------------------------
13 13
14 14 # Standard library imports.
15 15 import abc
16 16
17 17 #-----------------------------------------------------------------------------
18 18 # Channels
19 19 #-----------------------------------------------------------------------------
20 20
21 21
22 22 class ChannelABC(object):
23 23 """A base class for all channel ABCs."""
24 24
25 25 __metaclass__ = abc.ABCMeta
26 26
27 27 @abc.abstractmethod
28 28 def start(self):
29 29 pass
30 30
31 31 @abc.abstractmethod
32 32 def stop(self):
33 33 pass
34 34
35 35 @abc.abstractmethod
36 36 def is_alive(self):
37 37 pass
38 38
39 39
40 40 class ShellChannelABC(ChannelABC):
41 41 """ShellChannel ABC.
42 42
43 43 The docstrings for this class can be found in the base implementation:
44 44
45 45 `IPython.kernel.kernelmanager.ShellChannel`
46 46 """
47 47
48 48 @abc.abstractproperty
49 49 def allow_stdin(self):
50 50 pass
51 51
52 52 @abc.abstractmethod
53 53 def execute(self, code, silent=False, store_history=True,
54 54 user_variables=None, user_expressions=None, allow_stdin=None):
55 55 pass
56 56
57 57 @abc.abstractmethod
58 58 def complete(self, text, line, cursor_pos, block=None):
59 59 pass
60 60
61 61 @abc.abstractmethod
62 62 def object_info(self, oname, detail_level=0):
63 63 pass
64 64
65 65 @abc.abstractmethod
66 66 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
67 67 pass
68 68
69 69 @abc.abstractmethod
70 70 def kernel_info(self):
71 71 pass
72 72
73 73 @abc.abstractmethod
74 74 def shutdown(self, restart=False):
75 75 pass
76 76
77 77
78 78 class IOPubChannelABC(ChannelABC):
79 79 """IOPubChannel ABC.
80 80
81 81 The docstrings for this class can be found in the base implementation:
82 82
83 83 `IPython.kernel.kernelmanager.IOPubChannel`
84 84 """
85 85
86 86 @abc.abstractmethod
87 87 def flush(self, timeout=1.0):
88 88 pass
89 89
90 90
91 91 class StdInChannelABC(ChannelABC):
92 92 """StdInChannel ABC.
93 93
94 94 The docstrings for this class can be found in the base implementation:
95 95
96 96 `IPython.kernel.kernelmanager.StdInChannel`
97 97 """
98 98
99 99 @abc.abstractmethod
100 100 def input(self, string):
101 101 pass
102 102
103 103
104 104 class HBChannelABC(ChannelABC):
105 105 """HBChannel ABC.
106 106
107 107 The docstrings for this class can be found in the base implementation:
108 108
109 109 `IPython.kernel.kernelmanager.HBChannel`
110 110 """
111 111
112 112 @abc.abstractproperty
113 113 def time_to_dead(self):
114 114 pass
115 115
116 116 @abc.abstractmethod
117 117 def pause(self):
118 118 pass
119 119
120 120 @abc.abstractmethod
121 121 def unpause(self):
122 122 pass
123 123
124 124 @abc.abstractmethod
125 125 def is_beating(self):
126 126 pass
127 127
128 128
129 129 #-----------------------------------------------------------------------------
130 130 # Main kernel manager class
131 131 #-----------------------------------------------------------------------------
132 132
133 133 class KernelManagerABC(object):
134 134 """KernelManager ABC.
135 135
136 136 The docstrings for this class can be found in the base implementation:
137 137
138 138 `IPython.kernel.kernelmanager.KernelManager`
139 139 """
140 140
141 141 __metaclass__ = abc.ABCMeta
142 142
143 143 @abc.abstractproperty
144 144 def kernel(self):
145 145 pass
146 146
147 147 @abc.abstractproperty
148 148 def shell_channel_class(self):
149 149 pass
150 150
151 151 @abc.abstractproperty
152 152 def iopub_channel_class(self):
153 153 pass
154 154
155 155 @abc.abstractproperty
156 156 def hb_channel_class(self):
157 157 pass
158 158
159 159 @abc.abstractproperty
160 160 def stdin_channel_class(self):
161 161 pass
162 162
163 163 #--------------------------------------------------------------------------
164 164 # Channel management methods
165 165 #--------------------------------------------------------------------------
166 166
167 167 @abc.abstractmethod
168 168 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
169 169 pass
170 170
171 171 @abc.abstractmethod
172 172 def stop_channels(self):
173 173 pass
174 174
175 175 @abc.abstractproperty
176 176 def channels_running(self):
177 177 pass
178 178
179 179 @abc.abstractproperty
180 180 def shell_channel(self):
181 181 pass
182 182
183 183 @abc.abstractproperty
184 184 def iopub_channel(self):
185 185 pass
186 186
187 187 @abc.abstractproperty
188 188 def stdin_channel(self):
189 189 pass
190 190
191 191 @abc.abstractproperty
192 192 def hb_channel(self):
193 193 pass
194 194
195 195 #--------------------------------------------------------------------------
196 196 # Kernel management
197 197 #--------------------------------------------------------------------------
198 198
199 199 @abc.abstractmethod
200 200 def start_kernel(self, **kw):
201 201 pass
202 202
203 203 @abc.abstractmethod
204 204 def shutdown_kernel(self, now=False, restart=False):
205 205 pass
206 206
207 207 @abc.abstractmethod
208 208 def restart_kernel(self, now=False, **kw):
209 209 pass
210 210
211 211 @abc.abstractproperty
212 212 def has_kernel(self):
213 213 pass
214 214
215 215 @abc.abstractmethod
216 216 def interrupt_kernel(self):
217 217 pass
218 218
219 219 @abc.abstractmethod
220 220 def signal_kernel(self, signum):
221 221 pass
222 222
223 @abc.abstractproperty
223 @abc.abstractmethod
224 224 def is_alive(self):
225 225 pass
226 226
@@ -1,259 +1,272 b''
1 1 """A kernel manager for multiple kernels
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2013 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 from __future__ import absolute_import
20 20
21 21 import os
22 22 import uuid
23 23
24 24 import zmq
25 25 from zmq.eventloop.zmqstream import ZMQStream
26 26
27 27 from IPython.config.configurable import LoggingConfigurable
28 28 from IPython.utils.importstring import import_item
29 29 from IPython.utils.traitlets import (
30 30 Instance, Dict, Unicode, Any, DottedObjectName,
31 31 )
32 32 #-----------------------------------------------------------------------------
33 33 # Classes
34 34 #-----------------------------------------------------------------------------
35 35
36 36 class DuplicateKernelError(Exception):
37 37 pass
38 38
39 39
40 40 class MultiKernelManager(LoggingConfigurable):
41 41 """A class for managing multiple kernels."""
42 42
43 43 kernel_manager_class = DottedObjectName(
44 44 "IPython.kernel.blockingkernelmanager.BlockingKernelManager", config=True,
45 45 help="""The kernel manager class. This is configurable to allow
46 46 subclassing of the KernelManager for customized behavior.
47 47 """
48 48 )
49 49 def _kernel_manager_class_changed(self, name, old, new):
50 50 self.kernel_manager_factory = import_item(new)
51 51
52 52 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
53 53 def _kernel_manager_factory_default(self):
54 54 return import_item(self.kernel_manager_class)
55 55
56 56 context = Instance('zmq.Context')
57 57 def _context_default(self):
58 58 return zmq.Context.instance()
59 59
60 60 connection_dir = Unicode('')
61 61
62 62 _kernels = Dict()
63 63
64 64 def list_kernel_ids(self):
65 65 """Return a list of the kernel ids of the active kernels."""
66 66 # Create a copy so we can iterate over kernels in operations
67 67 # that delete keys.
68 68 return list(self._kernels.keys())
69 69
70 70 def __len__(self):
71 71 """Return the number of running kernels."""
72 72 return len(self.list_kernel_ids())
73 73
74 74 def __contains__(self, kernel_id):
75 75 return kernel_id in self._kernels
76 76
77 77 def start_kernel(self, **kwargs):
78 78 """Start a new kernel.
79 79
80 80 The caller can pick a kernel_id by passing one in as a keyword arg,
81 81 otherwise one will be picked using a uuid.
82 82
83 83 To silence the kernel's stdout/stderr, call this using::
84 84
85 85 km.start_kernel(stdout=PIPE, stderr=PIPE)
86 86
87 87 """
88 88 kernel_id = kwargs.pop('kernel_id', unicode(uuid.uuid4()))
89 89 if kernel_id in self:
90 90 raise DuplicateKernelError('Kernel already exists: %s' % kernel_id)
91 91 # kernel_manager_factory is the constructor for the KernelManager
92 92 # subclass we are using. It can be configured as any Configurable,
93 93 # including things like its transport and ip.
94 94 km = self.kernel_manager_factory(connection_file=os.path.join(
95 95 self.connection_dir, "kernel-%s.json" % kernel_id),
96 96 config=self.config,
97 97 )
98 98 km.start_kernel(**kwargs)
99 99 # start just the shell channel, needed for graceful restart
100 100 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
101 101 self._kernels[kernel_id] = km
102 102 return kernel_id
103 103
104 104 def shutdown_kernel(self, kernel_id, now=False):
105 105 """Shutdown a kernel by its kernel uuid.
106 106
107 107 Parameters
108 108 ==========
109 109 kernel_id : uuid
110 110 The id of the kernel to shutdown.
111 111 now : bool
112 112 Should the kernel be shutdown forcibly using a signal.
113 113 """
114 114 k = self.get_kernel(kernel_id)
115 115 k.shutdown_kernel(now=now)
116 116 k.shell_channel.stop()
117 117 del self._kernels[kernel_id]
118 118
119 119 def shutdown_all(self, now=False):
120 120 """Shutdown all kernels."""
121 121 for kid in self.list_kernel_ids():
122 122 self.shutdown_kernel(kid, now=now)
123 123
124 124 def interrupt_kernel(self, kernel_id):
125 125 """Interrupt (SIGINT) the kernel by its uuid.
126 126
127 127 Parameters
128 128 ==========
129 129 kernel_id : uuid
130 130 The id of the kernel to interrupt.
131 131 """
132 132 return self.get_kernel(kernel_id).interrupt_kernel()
133 133
134 134 def signal_kernel(self, kernel_id, signum):
135 135 """Sends a signal to the kernel by its uuid.
136 136
137 137 Note that since only SIGTERM is supported on Windows, this function
138 138 is only useful on Unix systems.
139 139
140 140 Parameters
141 141 ==========
142 142 kernel_id : uuid
143 143 The id of the kernel to signal.
144 144 """
145 145 return self.get_kernel(kernel_id).signal_kernel(signum)
146 146
147 147 def restart_kernel(self, kernel_id):
148 148 """Restart a kernel by its uuid, keeping the same ports.
149 149
150 150 Parameters
151 151 ==========
152 152 kernel_id : uuid
153 153 The id of the kernel to interrupt.
154 154 """
155 155 return self.get_kernel(kernel_id).restart_kernel()
156 156
157 def is_alive(self, kernel_id):
158 """Is the kernel alive.
159
160 This calls KernelManager.is_alive() which calls Popen.poll on the
161 actual kernel subprocess.
162
163 Parameters
164 ==========
165 kernel_id : uuid
166 The id of the kernel.
167 """
168 return self.get_kernel(kernel_id).is_alive()
169
157 170 def get_kernel(self, kernel_id):
158 171 """Get the single KernelManager object for a kernel by its uuid.
159 172
160 173 Parameters
161 174 ==========
162 175 kernel_id : uuid
163 176 The id of the kernel.
164 177 """
165 178 km = self._kernels.get(kernel_id)
166 179 if km is not None:
167 180 return km
168 181 else:
169 182 raise KeyError("Kernel with id not found: %s" % kernel_id)
170 183
171 184 def get_connection_info(self, kernel_id):
172 185 """Return a dictionary of connection data for a kernel.
173 186
174 187 Parameters
175 188 ==========
176 189 kernel_id : uuid
177 190 The id of the kernel.
178 191
179 192 Returns
180 193 =======
181 194 connection_dict : dict
182 195 A dict of the information needed to connect to a kernel.
183 196 This includes the ip address and the integer port
184 197 numbers of the different channels (stdin_port, iopub_port,
185 198 shell_port, hb_port).
186 199 """
187 200 km = self.get_kernel(kernel_id)
188 201 return dict(transport=km.transport,
189 202 ip=km.ip,
190 203 shell_port=km.shell_port,
191 204 iopub_port=km.iopub_port,
192 205 stdin_port=km.stdin_port,
193 206 hb_port=km.hb_port,
194 207 )
195 208
196 209 def _make_url(self, transport, ip, port):
197 210 """Make a ZeroMQ URL for a given transport, ip and port."""
198 211 if transport == 'tcp':
199 212 return "tcp://%s:%i" % (ip, port)
200 213 else:
201 214 return "%s://%s-%s" % (transport, ip, port)
202 215
203 216 def _create_connected_stream(self, kernel_id, socket_type, channel):
204 217 """Create a connected ZMQStream for a kernel."""
205 218 cinfo = self.get_connection_info(kernel_id)
206 219 url = self._make_url(cinfo['transport'], cinfo['ip'],
207 220 cinfo['%s_port' % channel]
208 221 )
209 222 sock = self.context.socket(socket_type)
210 223 self.log.info("Connecting to: %s" % url)
211 224 sock.connect(url)
212 225 return ZMQStream(sock)
213 226
214 227 def create_iopub_stream(self, kernel_id):
215 228 """Return a ZMQStream object connected to the iopub channel.
216 229
217 230 Parameters
218 231 ==========
219 232 kernel_id : uuid
220 233 The id of the kernel.
221 234
222 235 Returns
223 236 =======
224 237 stream : ZMQStream
225 238 """
226 239 iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB, 'iopub')
227 240 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
228 241 return iopub_stream
229 242
230 243 def create_shell_stream(self, kernel_id):
231 244 """Return a ZMQStream object connected to the shell channel.
232 245
233 246 Parameters
234 247 ==========
235 248 kernel_id : uuid
236 249 The id of the kernel.
237 250
238 251 Returns
239 252 =======
240 253 stream : ZMQStream
241 254 """
242 255 shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER, 'shell')
243 256 return shell_stream
244 257
245 258 def create_hb_stream(self, kernel_id):
246 259 """Return a ZMQStream object connected to the hb channel.
247 260
248 261 Parameters
249 262 ==========
250 263 kernel_id : uuid
251 264 The id of the kernel.
252 265
253 266 Returns
254 267 =======
255 268 stream : ZMQStream
256 269 """
257 270 hb_stream = self._create_connected_stream(kernel_id, zmq.REQ, 'hb')
258 271 return hb_stream
259 272
@@ -1,48 +1,50 b''
1 1 """Tests for the notebook kernel and session manager"""
2 2
3 3 from subprocess import PIPE
4 4 import time
5 5 from unittest import TestCase
6 6
7 7 from IPython.testing import decorators as dec
8 8
9 9 from IPython.config.loader import Config
10 10 from IPython.kernel.kernelmanager import KernelManager
11 11
12 12 class TestKernelManager(TestCase):
13 13
14 14 def _get_tcp_km(self):
15 15 return KernelManager()
16 16
17 17 def _get_ipc_km(self):
18 18 c = Config()
19 19 c.KernelManager.transport = 'ipc'
20 20 c.KernelManager.ip = 'test'
21 21 km = KernelManager(config=c)
22 22 return km
23 23
24 24 def _run_lifecycle(self, km):
25 25 km.start_kernel(stdout=PIPE, stderr=PIPE)
26 self.assertTrue(km.is_alive())
26 27 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
27 28 km.restart_kernel()
29 self.assertTrue(km.is_alive())
28 30 # We need a delay here to give the restarting kernel a chance to
29 31 # restart. Otherwise, the interrupt will kill it, causing the test
30 32 # suite to hang. The reason it *hangs* is that the shutdown
31 33 # message for the restart sometimes hasn't been sent to the kernel.
32 34 # Because linger is oo on the shell channel, the context can't
33 35 # close until the message is sent to the kernel, which is not dead.
34 36 time.sleep(1.0)
35 37 km.interrupt_kernel()
36 38 self.assertTrue(isinstance(km, KernelManager))
37 39 km.shutdown_kernel()
38 40 km.shell_channel.stop()
39 41
40 42 def test_tcp_lifecycle(self):
41 43 km = self._get_tcp_km()
42 44 self._run_lifecycle(km)
43 45
44 46 @dec.skip_win32
45 47 def test_ipc_lifecycle(self):
46 48 km = self._get_ipc_km()
47 49 self._run_lifecycle(km)
48 50
@@ -1,81 +1,83 b''
1 1 """Tests for the notebook kernel and session manager."""
2 2
3 3 from subprocess import PIPE
4 4 import time
5 5 from unittest import TestCase
6 6
7 7 from IPython.testing import decorators as dec
8 8
9 9 from IPython.config.loader import Config
10 10 from IPython.utils.localinterfaces import LOCALHOST
11 11 from IPython.kernel.kernelmanager import KernelManager
12 12 from IPython.kernel.multikernelmanager import MultiKernelManager
13 13
14 14 class TestKernelManager(TestCase):
15 15
16 16 def _get_tcp_km(self):
17 17 return MultiKernelManager()
18 18
19 19 def _get_ipc_km(self):
20 20 c = Config()
21 21 c.KernelManager.transport = 'ipc'
22 22 c.KernelManager.ip = 'test'
23 23 km = MultiKernelManager(config=c)
24 24 return km
25 25
26 26 def _run_lifecycle(self, km):
27 27 kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
28 self.assertTrue(km.is_alive(kid))
28 29 self.assertTrue(kid in km)
29 30 self.assertTrue(kid in km.list_kernel_ids())
30 31 self.assertEqual(len(km),1)
31 32 km.restart_kernel(kid)
33 self.assertTrue(km.is_alive(kid))
32 34 self.assertTrue(kid in km.list_kernel_ids())
33 35 # We need a delay here to give the restarting kernel a chance to
34 36 # restart. Otherwise, the interrupt will kill it, causing the test
35 37 # suite to hang. The reason it *hangs* is that the shutdown
36 38 # message for the restart sometimes hasn't been sent to the kernel.
37 39 # Because linger is oo on the shell channel, the context can't
38 40 # close until the message is sent to the kernel, which is not dead.
39 41 time.sleep(1.0)
40 42 km.interrupt_kernel(kid)
41 43 k = km.get_kernel(kid)
42 44 self.assertTrue(isinstance(k, KernelManager))
43 45 km.shutdown_kernel(kid)
44 46 self.assertTrue(not kid in km)
45 47
46 48 def _run_cinfo(self, km, transport, ip):
47 49 kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
48 50 k = km.get_kernel(kid)
49 51 cinfo = km.get_connection_info(kid)
50 52 self.assertEqual(transport, cinfo['transport'])
51 53 self.assertEqual(ip, cinfo['ip'])
52 54 self.assertTrue('stdin_port' in cinfo)
53 55 self.assertTrue('iopub_port' in cinfo)
54 56 stream = km.create_iopub_stream(kid)
55 57 stream.close()
56 58 self.assertTrue('shell_port' in cinfo)
57 59 stream = km.create_shell_stream(kid)
58 60 stream.close()
59 61 self.assertTrue('hb_port' in cinfo)
60 62 stream = km.create_hb_stream(kid)
61 63 stream.close()
62 64 km.shutdown_kernel(kid)
63 65
64 66 def test_tcp_lifecycle(self):
65 67 km = self._get_tcp_km()
66 68 self._run_lifecycle(km)
67 69
68 70 def test_tcp_cinfo(self):
69 71 km = self._get_tcp_km()
70 72 self._run_cinfo(km, 'tcp', LOCALHOST)
71 73
72 74 @dec.skip_win32
73 75 def test_ipc_lifecycle(self):
74 76 km = self._get_ipc_km()
75 77 self._run_lifecycle(km)
76 78
77 79 @dec.skip_win32
78 80 def test_ipc_cinfo(self):
79 81 km = self._get_ipc_km()
80 82 self._run_cinfo(km, 'ipc', 'test')
81 83
General Comments 0
You need to be logged in to leave comments. Login now