##// END OF EJS Templates
Also add a test to check that an error is raised on timeout by default
Jessica B. Hamrick -
Show More
@@ -1,137 +1,149
1 """
1 """
2 Module with tests for the execute preprocessor.
2 Module with tests for the execute preprocessor.
3 """
3 """
4
4
5 # Copyright (c) IPython Development Team.
5 # Copyright (c) IPython Development Team.
6 # Distributed under the terms of the Modified BSD License.
6 # Distributed under the terms of the Modified BSD License.
7
7
8 import copy
8 import copy
9 import glob
9 import glob
10 import io
10 import io
11 import os
11 import os
12 import re
12 import re
13
13
14 try:
15 from queue import Empty # Py 3
16 except ImportError:
17 from Queue import Empty # Py 2
18
14 from IPython import nbformat
19 from IPython import nbformat
15
20
16 from .base import PreprocessorTestsBase
21 from .base import PreprocessorTestsBase
17 from ..execute import ExecutePreprocessor
22 from ..execute import ExecutePreprocessor
18
23
19 from IPython.nbconvert.filters import strip_ansi
24 from IPython.nbconvert.filters import strip_ansi
25 from nose.tools import assert_raises
20
26
21 addr_pat = re.compile(r'0x[0-9a-f]{7,9}')
27 addr_pat = re.compile(r'0x[0-9a-f]{7,9}')
22
28
23 class TestExecute(PreprocessorTestsBase):
29 class TestExecute(PreprocessorTestsBase):
24 """Contains test functions for execute.py"""
30 """Contains test functions for execute.py"""
25
31
26 @staticmethod
32 @staticmethod
27 def normalize_output(output):
33 def normalize_output(output):
28 """
34 """
29 Normalizes outputs for comparison.
35 Normalizes outputs for comparison.
30 """
36 """
31 output = dict(output)
37 output = dict(output)
32 if 'metadata' in output:
38 if 'metadata' in output:
33 del output['metadata']
39 del output['metadata']
34 if 'text' in output:
40 if 'text' in output:
35 output['text'] = re.sub(addr_pat, '<HEXADDR>', output['text'])
41 output['text'] = re.sub(addr_pat, '<HEXADDR>', output['text'])
36 if 'text/plain' in output.get('data', {}):
42 if 'text/plain' in output.get('data', {}):
37 output['data']['text/plain'] = \
43 output['data']['text/plain'] = \
38 re.sub(addr_pat, '<HEXADDR>', output['data']['text/plain'])
44 re.sub(addr_pat, '<HEXADDR>', output['data']['text/plain'])
39 if 'traceback' in output:
45 if 'traceback' in output:
40 tb = []
46 tb = []
41 for line in output['traceback']:
47 for line in output['traceback']:
42 tb.append(strip_ansi(line))
48 tb.append(strip_ansi(line))
43 output['traceback'] = tb
49 output['traceback'] = tb
44
50
45 return output
51 return output
46
52
47
53
48 def assert_notebooks_equal(self, expected, actual):
54 def assert_notebooks_equal(self, expected, actual):
49 expected_cells = expected['cells']
55 expected_cells = expected['cells']
50 actual_cells = actual['cells']
56 actual_cells = actual['cells']
51 self.assertEqual(len(expected_cells), len(actual_cells))
57 self.assertEqual(len(expected_cells), len(actual_cells))
52
58
53 for expected_cell, actual_cell in zip(expected_cells, actual_cells):
59 for expected_cell, actual_cell in zip(expected_cells, actual_cells):
54 expected_outputs = expected_cell.get('outputs', [])
60 expected_outputs = expected_cell.get('outputs', [])
55 actual_outputs = actual_cell.get('outputs', [])
61 actual_outputs = actual_cell.get('outputs', [])
56 normalized_expected_outputs = list(map(self.normalize_output, expected_outputs))
62 normalized_expected_outputs = list(map(self.normalize_output, expected_outputs))
57 normalized_actual_outputs = list(map(self.normalize_output, actual_outputs))
63 normalized_actual_outputs = list(map(self.normalize_output, actual_outputs))
58 self.assertEqual(normalized_expected_outputs, normalized_actual_outputs)
64 self.assertEqual(normalized_expected_outputs, normalized_actual_outputs)
59
65
60 expected_execution_count = expected_cell.get('execution_count', None)
66 expected_execution_count = expected_cell.get('execution_count', None)
61 actual_execution_count = actual_cell.get('execution_count', None)
67 actual_execution_count = actual_cell.get('execution_count', None)
62 self.assertEqual(expected_execution_count, actual_execution_count)
68 self.assertEqual(expected_execution_count, actual_execution_count)
63
69
64
70
65 def build_preprocessor(self, opts):
71 def build_preprocessor(self, opts):
66 """Make an instance of a preprocessor"""
72 """Make an instance of a preprocessor"""
67 preprocessor = ExecutePreprocessor()
73 preprocessor = ExecutePreprocessor()
68 preprocessor.enabled = True
74 preprocessor.enabled = True
69 for opt in opts:
75 for opt in opts:
70 setattr(preprocessor, opt, opts[opt])
76 setattr(preprocessor, opt, opts[opt])
71 return preprocessor
77 return preprocessor
72
78
73
79
74 def test_constructor(self):
80 def test_constructor(self):
75 """Can a ExecutePreprocessor be constructed?"""
81 """Can a ExecutePreprocessor be constructed?"""
76 self.build_preprocessor({})
82 self.build_preprocessor({})
77
83
78
84
79 def run_notebook(self, filename, opts, resources):
85 def run_notebook(self, filename, opts, resources):
80 """Loads and runs a notebook, returning both the version prior to
86 """Loads and runs a notebook, returning both the version prior to
81 running it and the version after running it.
87 running it and the version after running it.
82
88
83 """
89 """
84 with io.open(filename) as f:
90 with io.open(filename) as f:
85 input_nb = nbformat.read(f, 4)
91 input_nb = nbformat.read(f, 4)
86 preprocessor = self.build_preprocessor(opts)
92 preprocessor = self.build_preprocessor(opts)
87 cleaned_input_nb = copy.deepcopy(input_nb)
93 cleaned_input_nb = copy.deepcopy(input_nb)
88 for cell in cleaned_input_nb.cells:
94 for cell in cleaned_input_nb.cells:
89 if 'execution_count' in cell:
95 if 'execution_count' in cell:
90 del cell['execution_count']
96 del cell['execution_count']
91 cell['outputs'] = []
97 cell['outputs'] = []
92 output_nb, _ = preprocessor(cleaned_input_nb, resources)
98 output_nb, _ = preprocessor(cleaned_input_nb, resources)
93 return input_nb, output_nb
99 return input_nb, output_nb
94
100
95 def test_run_notebooks(self):
101 def test_run_notebooks(self):
96 """Runs a series of test notebooks and compares them to their actual output"""
102 """Runs a series of test notebooks and compares them to their actual output"""
97 current_dir = os.path.dirname(__file__)
103 current_dir = os.path.dirname(__file__)
98 input_files = glob.glob(os.path.join(current_dir, 'files', '*.ipynb'))
104 input_files = glob.glob(os.path.join(current_dir, 'files', '*.ipynb'))
99 for filename in input_files:
105 for filename in input_files:
100 if os.path.basename(filename) == "Disable Stdin.ipynb":
106 if os.path.basename(filename) == "Disable Stdin.ipynb":
101 continue
107 continue
102 elif os.path.basename(filename) == "Interrupt.ipynb":
108 elif os.path.basename(filename) == "Interrupt.ipynb":
103 opts = dict(timeout=1, interrupt_on_timeout=True)
109 opts = dict(timeout=1, interrupt_on_timeout=True)
104 else:
110 else:
105 opts = {}
111 opts = {}
106 res = self.build_resources()
112 res = self.build_resources()
107 res['metadata']['path'] = os.path.dirname(filename)
113 res['metadata']['path'] = os.path.dirname(filename)
108 input_nb, output_nb = self.run_notebook(filename, opts, res)
114 input_nb, output_nb = self.run_notebook(filename, opts, res)
109 self.assert_notebooks_equal(input_nb, output_nb)
115 self.assert_notebooks_equal(input_nb, output_nb)
110
116
111 def test_empty_path(self):
117 def test_empty_path(self):
112 """Can the kernel be started when the path is empty?"""
118 """Can the kernel be started when the path is empty?"""
113 current_dir = os.path.dirname(__file__)
119 current_dir = os.path.dirname(__file__)
114 filename = os.path.join(current_dir, 'files', 'HelloWorld.ipynb')
120 filename = os.path.join(current_dir, 'files', 'HelloWorld.ipynb')
115 res = self.build_resources()
121 res = self.build_resources()
116 res['metadata']['path'] = ''
122 res['metadata']['path'] = ''
117 input_nb, output_nb = self.run_notebook(filename, {}, res)
123 input_nb, output_nb = self.run_notebook(filename, {}, res)
118 self.assert_notebooks_equal(input_nb, output_nb)
124 self.assert_notebooks_equal(input_nb, output_nb)
119
125
120 def test_disable_stdin(self):
126 def test_disable_stdin(self):
121 """Test disabling standard input"""
127 """Test disabling standard input"""
122 current_dir = os.path.dirname(__file__)
128 current_dir = os.path.dirname(__file__)
123 filename = os.path.join(current_dir, 'files', 'Disable Stdin.ipynb')
129 filename = os.path.join(current_dir, 'files', 'Disable Stdin.ipynb')
124 res = self.build_resources()
130 res = self.build_resources()
125 res['metadata']['path'] = os.path.dirname(filename)
131 res['metadata']['path'] = os.path.dirname(filename)
126 input_nb, output_nb = self.run_notebook(filename, {}, res)
132 input_nb, output_nb = self.run_notebook(filename, {}, res)
127
133
128 # We need to special-case this particular notebook, because the
134 # We need to special-case this particular notebook, because the
129 # traceback contains machine-specific stuff like where IPython
135 # traceback contains machine-specific stuff like where IPython
130 # is installed. It is sufficient here to just check that an error
136 # is installed. It is sufficient here to just check that an error
131 # was thrown, and that it was a StdinNotImplementedError
137 # was thrown, and that it was a StdinNotImplementedError
132 self.assertEqual(len(output_nb['cells']), 1)
138 self.assertEqual(len(output_nb['cells']), 1)
133 self.assertEqual(len(output_nb['cells'][0]['outputs']), 1)
139 self.assertEqual(len(output_nb['cells'][0]['outputs']), 1)
134 output = output_nb['cells'][0]['outputs'][0]
140 output = output_nb['cells'][0]['outputs'][0]
135 self.assertEqual(output['output_type'], 'error')
141 self.assertEqual(output['output_type'], 'error')
136 self.assertEqual(output['ename'], 'StdinNotImplementedError')
142 self.assertEqual(output['ename'], 'StdinNotImplementedError')
137 self.assertEqual(output['evalue'], 'raw_input was called, but this frontend does not support input requests.')
143 self.assertEqual(output['evalue'], 'raw_input was called, but this frontend does not support input requests.')
144
145 def test_timeout(self):
146 """Check that an error is raised when a computation times out"""
147 current_dir = os.path.dirname(__file__)
148 filename = os.path.join(current_dir, 'files', 'Interrupt.ipynb')
149 assert_raises(Empty, self.run_notebook, filename, dict(timeout=1))
General Comments 0
You need to be logged in to leave comments. Login now