##// END OF EJS Templates
Handle comm messages in execute preprocessor
Jessica B. Hamrick -
Show More
@@ -1,141 +1,143 b''
1 """Module containing a preprocessor that removes the outputs from code cells"""
1 """Module containing a preprocessor that removes the outputs from code cells"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import os
6 import os
7 from textwrap import dedent
7 from textwrap import dedent
8
8
9 try:
9 try:
10 from queue import Empty # Py 3
10 from queue import Empty # Py 3
11 except ImportError:
11 except ImportError:
12 from Queue import Empty # Py 2
12 from Queue import Empty # Py 2
13
13
14 from IPython.utils.traitlets import List, Unicode, Bool
14 from IPython.utils.traitlets import List, Unicode, Bool
15
15
16 from IPython.nbformat.v4 import output_from_msg
16 from IPython.nbformat.v4 import output_from_msg
17 from .base import Preprocessor
17 from .base import Preprocessor
18 from IPython.utils.traitlets import Integer
18 from IPython.utils.traitlets import Integer
19
19
20
20
21 class ExecutePreprocessor(Preprocessor):
21 class ExecutePreprocessor(Preprocessor):
22 """
22 """
23 Executes all the cells in a notebook
23 Executes all the cells in a notebook
24 """
24 """
25
25
26 timeout = Integer(30, config=True,
26 timeout = Integer(30, config=True,
27 help="The time to wait (in seconds) for output from executions."
27 help="The time to wait (in seconds) for output from executions."
28 )
28 )
29
29
30 interrupt_on_timeout = Bool(
30 interrupt_on_timeout = Bool(
31 False, config=True,
31 False, config=True,
32 help=dedent(
32 help=dedent(
33 """
33 """
34 If execution of a cell times out, interrupt the kernel and
34 If execution of a cell times out, interrupt the kernel and
35 continue executing other cells rather than throwing an error and
35 continue executing other cells rather than throwing an error and
36 stopping.
36 stopping.
37 """
37 """
38 )
38 )
39 )
39 )
40
40
41 extra_arguments = List(Unicode)
41 extra_arguments = List(Unicode)
42
42
43 def preprocess(self, nb, resources):
43 def preprocess(self, nb, resources):
44 path = resources.get('metadata', {}).get('path', '')
44 path = resources.get('metadata', {}).get('path', '')
45 if path == '':
45 if path == '':
46 path = None
46 path = None
47
47
48 from IPython.kernel.manager import start_new_kernel
48 from IPython.kernel.manager import start_new_kernel
49 kernel_name = nb.metadata.get('kernelspec', {}).get('name', 'python')
49 kernel_name = nb.metadata.get('kernelspec', {}).get('name', 'python')
50 self.log.info("Executing notebook with kernel: %s" % kernel_name)
50 self.log.info("Executing notebook with kernel: %s" % kernel_name)
51 self.km, self.kc = start_new_kernel(
51 self.km, self.kc = start_new_kernel(
52 kernel_name=kernel_name,
52 kernel_name=kernel_name,
53 extra_arguments=self.extra_arguments,
53 extra_arguments=self.extra_arguments,
54 stderr=open(os.devnull, 'w'),
54 stderr=open(os.devnull, 'w'),
55 cwd=path)
55 cwd=path)
56 self.kc.allow_stdin = False
56 self.kc.allow_stdin = False
57
57
58 try:
58 try:
59 nb, resources = super(ExecutePreprocessor, self).preprocess(nb, resources)
59 nb, resources = super(ExecutePreprocessor, self).preprocess(nb, resources)
60 finally:
60 finally:
61 self.kc.stop_channels()
61 self.kc.stop_channels()
62 self.km.shutdown_kernel(now=True)
62 self.km.shutdown_kernel(now=True)
63
63
64 return nb, resources
64 return nb, resources
65
65
66 def preprocess_cell(self, cell, resources, cell_index):
66 def preprocess_cell(self, cell, resources, cell_index):
67 """
67 """
68 Apply a transformation on each code cell. See base.py for details.
68 Apply a transformation on each code cell. See base.py for details.
69 """
69 """
70 if cell.cell_type != 'code':
70 if cell.cell_type != 'code':
71 return cell, resources
71 return cell, resources
72 try:
72 try:
73 outputs = self.run_cell(cell)
73 outputs = self.run_cell(cell)
74 except Exception as e:
74 except Exception as e:
75 self.log.error("failed to run cell: " + repr(e))
75 self.log.error("failed to run cell: " + repr(e))
76 self.log.error(str(cell.source))
76 self.log.error(str(cell.source))
77 raise
77 raise
78 cell.outputs = outputs
78 cell.outputs = outputs
79 return cell, resources
79 return cell, resources
80
80
81 def run_cell(self, cell):
81 def run_cell(self, cell):
82 msg_id = self.kc.execute(cell.source)
82 msg_id = self.kc.execute(cell.source)
83 self.log.debug("Executing cell:\n%s", cell.source)
83 self.log.debug("Executing cell:\n%s", cell.source)
84 # wait for finish, with timeout
84 # wait for finish, with timeout
85 while True:
85 while True:
86 try:
86 try:
87 msg = self.kc.shell_channel.get_msg(timeout=self.timeout)
87 msg = self.kc.shell_channel.get_msg(timeout=self.timeout)
88 except Empty:
88 except Empty:
89 self.log.error("Timeout waiting for execute reply")
89 self.log.error("Timeout waiting for execute reply")
90 if self.interrupt_on_timeout:
90 if self.interrupt_on_timeout:
91 self.log.error("Interrupting kernel")
91 self.log.error("Interrupting kernel")
92 self.km.interrupt_kernel()
92 self.km.interrupt_kernel()
93 break
93 break
94 else:
94 else:
95 raise
95 raise
96
96
97 if msg['parent_header'].get('msg_id') == msg_id:
97 if msg['parent_header'].get('msg_id') == msg_id:
98 break
98 break
99 else:
99 else:
100 # not our reply
100 # not our reply
101 continue
101 continue
102
102
103 outs = []
103 outs = []
104
104
105 while True:
105 while True:
106 try:
106 try:
107 msg = self.kc.iopub_channel.get_msg(timeout=self.timeout)
107 msg = self.kc.iopub_channel.get_msg(timeout=self.timeout)
108 except Empty:
108 except Empty:
109 self.log.warn("Timeout waiting for IOPub output")
109 self.log.warn("Timeout waiting for IOPub output")
110 break
110 break
111 if msg['parent_header'].get('msg_id') != msg_id:
111 if msg['parent_header'].get('msg_id') != msg_id:
112 # not an output from our execution
112 # not an output from our execution
113 continue
113 continue
114
114
115 msg_type = msg['msg_type']
115 msg_type = msg['msg_type']
116 self.log.debug("output: %s", msg_type)
116 self.log.debug("output: %s", msg_type)
117 content = msg['content']
117 content = msg['content']
118
118
119 # set the prompt number for the input and the output
119 # set the prompt number for the input and the output
120 if 'execution_count' in content:
120 if 'execution_count' in content:
121 cell['execution_count'] = content['execution_count']
121 cell['execution_count'] = content['execution_count']
122
122
123 if msg_type == 'status':
123 if msg_type == 'status':
124 if content['execution_state'] == 'idle':
124 if content['execution_state'] == 'idle':
125 break
125 break
126 else:
126 else:
127 continue
127 continue
128 elif msg_type == 'execute_input':
128 elif msg_type == 'execute_input':
129 continue
129 continue
130 elif msg_type == 'clear_output':
130 elif msg_type == 'clear_output':
131 outs = []
131 outs = []
132 continue
132 continue
133 elif msg_type.startswith('comm'):
134 continue
133
135
134 try:
136 try:
135 out = output_from_msg(msg)
137 out = output_from_msg(msg)
136 except ValueError:
138 except ValueError:
137 self.log.error("unhandled iopub msg: " + msg_type)
139 self.log.error("unhandled iopub msg: " + msg_type)
138 else:
140 else:
139 outs.append(out)
141 outs.append(out)
140
142
141 return outs
143 return outs
General Comments 0
You need to be logged in to leave comments. Login now