##// END OF EJS Templates
Merge pull request #6448 from takluyver/kernel-context-mgr...
Min RK -
r17909:c306a915 merge
parent child Browse files
Show More
@@ -6,6 +6,6 b' from . import zmq'
6 from .connect import *
6 from .connect import *
7 from .launcher import *
7 from .launcher import *
8 from .client import KernelClient
8 from .client import KernelClient
9 from .manager import KernelManager
9 from .manager import KernelManager, run_kernel
10 from .blocking import BlockingKernelClient
10 from .blocking import BlockingKernelClient
11 from .multikernelmanager import MultiKernelManager
11 from .multikernelmanager import MultiKernelManager
@@ -5,12 +5,17 b''
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7
7
8 from contextlib import contextmanager
8 import os
9 import os
9 import re
10 import re
10 import signal
11 import signal
11 import sys
12 import sys
12 import time
13 import time
13 import warnings
14 import warnings
15 try:
16 from queue import Empty # Py 3
17 except ImportError:
18 from Queue import Empty # Py 2
14
19
15 import zmq
20 import zmq
16
21
@@ -410,3 +415,39 b' class KernelManager(ConnectionFileMixin):'
410
415
411 KernelManagerABC.register(KernelManager)
416 KernelManagerABC.register(KernelManager)
412
417
418
419 def start_new_kernel(startup_timeout=60, kernel_name='python', **kwargs):
420 """Start a new kernel, and return its Manager and Client"""
421 km = KernelManager(kernel_name=kernel_name)
422 km.start_kernel(**kwargs)
423 kc = km.client()
424 kc.start_channels()
425
426 kc.kernel_info()
427 kc.get_shell_msg(block=True, timeout=startup_timeout)
428
429 # Flush channels
430 for channel in (kc.shell_channel, kc.iopub_channel):
431 while True:
432 try:
433 channel.get_msg(block=True, timeout=0.1)
434 except Empty:
435 break
436 return km, kc
437
438 @contextmanager
439 def run_kernel(**kwargs):
440 """Context manager to create a kernel in a subprocess.
441
442 The kernel is shut down when the context exits.
443
444 Returns
445 -------
446 kernel_client: connected KernelClient instance
447 """
448 km, kc = start_new_kernel(**kwargs)
449 try:
450 yield kc
451 finally:
452 kc.stop_channels()
453 km.shutdown_kernel(now=True)
@@ -15,7 +15,7 b' except ImportError:'
15 import nose
15 import nose
16 import nose.tools as nt
16 import nose.tools as nt
17
17
18 from IPython.kernel import KernelManager
18 from IPython.kernel import manager
19
19
20 #-------------------------------------------------------------------------------
20 #-------------------------------------------------------------------------------
21 # Globals
21 # Globals
@@ -30,22 +30,13 b' KC = None'
30 #-------------------------------------------------------------------------------
30 #-------------------------------------------------------------------------------
31 # code
31 # code
32 #-------------------------------------------------------------------------------
32 #-------------------------------------------------------------------------------
33
33 def start_new_kernel(**kwargs):
34
34 """start a new kernel, and return its Manager and Client
35 def start_new_kernel(argv=None):
36 """start a new kernel, and return its Manager and Client"""
37 km = KernelManager()
38 kwargs = dict(stdout=nose.iptest_stdstreams_fileno(), stderr=STDOUT)
39 if argv:
40 kwargs['extra_arguments'] = argv
41 km.start_kernel(**kwargs)
42 kc = km.client()
43 kc.start_channels()
44
35
45 msg_id = kc.kernel_info()
36 Integrates with our output capturing for tests.
46 kc.get_shell_msg(block=True, timeout=STARTUP_TIMEOUT)
37 """
47 flush_channels(kc)
38 kwargs.update(dict(stdout=nose.iptest_stdstreams_fileno(), stderr=STDOUT))
48 return km, kc
39 return manager.start_new_kernel(startup_timeout=STARTUP_TIMEOUT, **kwargs)
49
40
50 def flush_channels(kc=None):
41 def flush_channels(kc=None):
51 """flush any messages waiting on the queue"""
42 """flush any messages waiting on the queue"""
@@ -121,7 +112,6 b' def stop_global_kernel():'
121 KM.shutdown_kernel(now=True)
112 KM.shutdown_kernel(now=True)
122 KM = None
113 KM = None
123
114
124 @contextmanager
125 def new_kernel(argv=None):
115 def new_kernel(argv=None):
126 """Context manager for a new kernel in a subprocess
116 """Context manager for a new kernel in a subprocess
127
117
@@ -131,13 +121,11 b' def new_kernel(argv=None):'
131 -------
121 -------
132 kernel_client: connected KernelClient instance
122 kernel_client: connected KernelClient instance
133 """
123 """
134 km, kc = start_new_kernel(argv)
124 kwargs = dict(stdout=nose.iptest_stdstreams_fileno(), stderr=STDOUT,
135 try:
125 startup_timeout=STARTUP_TIMEOUT)
136 yield kc
126 if argv is not None:
137 finally:
127 kwargs['extra_arguments'] = argv
138 kc.stop_channels()
128 return manager.run_kernel(**kwargs)
139 km.shutdown_kernel(now=True)
140
141
129
142 def assemble_output(iopub):
130 def assemble_output(iopub):
143 """assemble stdout/err from an execution"""
131 """assemble stdout/err from an execution"""
@@ -46,38 +46,15 b' class ExecutePreprocessor(Preprocessor):'
46 }
46 }
47
47
48 extra_arguments = List(Unicode)
48 extra_arguments = List(Unicode)
49
50 def _create_client(self):
51 from IPython.kernel import KernelManager
52 self.km = KernelManager()
53 self.km.write_connection_file()
54 self.kc = self.km.client()
55 self.kc.start_channels()
56 self.km.start_kernel(extra_arguments=self.extra_arguments, stderr=open(os.devnull, 'w'))
57 self.iopub = self.kc.iopub_channel
58 self.shell = self.kc.shell_channel
59 self.shell.kernel_info()
60 try:
61 self.shell.get_msg(timeout=self.timeout)
62 except Empty:
63 self.log.error("Timeout waiting for kernel_info reply")
64 raise
65 # flush IOPub
66 while True:
67 try:
68 self.iopub.get_msg(block=True, timeout=0.25)
69 except Empty:
70 break
71
72 def _shutdown_client(self):
73 self.kc.stop_channels()
74 self.km.shutdown_kernel()
75 del self.km
76
49
77 def preprocess(self, nb, resources):
50 def preprocess(self, nb, resources):
78 self._create_client()
51 from IPython.kernel import run_kernel
79 nb, resources = super(ExecutePreprocessor, self).preprocess(nb, resources)
52 kernel_name = nb.metadata.get('kernelspec', {}).get('name', 'python')
80 self._shutdown_client()
53 with run_kernel(kernel_name=kernel_name,
54 extra_arguments=self.extra_arguments,
55 stderr=open(os.devnull, 'w')) as kc:
56 self.kc = kc
57 nb, resources = super(ExecutePreprocessor, self).preprocess(nb, resources)
81 return nb, resources
58 return nb, resources
82
59
83 def preprocess_cell(self, cell, resources, cell_index):
60 def preprocess_cell(self, cell, resources, cell_index):
@@ -87,7 +64,7 b' class ExecutePreprocessor(Preprocessor):'
87 if cell.cell_type != 'code':
64 if cell.cell_type != 'code':
88 return cell, resources
65 return cell, resources
89 try:
66 try:
90 outputs = self.run_cell(self.shell, self.iopub, cell)
67 outputs = self.run_cell(self.kc.shell_channel, self.kc.iopub_channel, cell)
91 except Exception as e:
68 except Exception as e:
92 self.log.error("failed to run cell: " + repr(e))
69 self.log.error("failed to run cell: " + repr(e))
93 self.log.error(str(cell.input))
70 self.log.error(str(cell.input))
General Comments 0
You need to be logged in to leave comments. Login now