##// END OF EJS Templates
Remove EventManager reset methods, because they violate encapsulation....
Nathaniel J. Smith -
Show More
@@ -1,139 +1,131 b''
1 1 """Infrastructure for registering and firing callbacks on application events.
2 2
3 3 Unlike :mod:`IPython.core.hooks`, which lets end users set single functions to
4 4 be called at specific times, or a collection of alternative methods to try,
5 5 callbacks are designed to be used by extension authors. A number of callbacks
6 6 can be registered for the same event without needing to be aware of one another.
7 7
8 8 The functions defined in this module are no-ops indicating the names of available
9 9 events and the arguments which will be passed to them.
10 10
11 11 .. note::
12 12
13 13 This API is experimental in IPython 2.0, and may be revised in future versions.
14 14 """
15 15 from __future__ import print_function
16 16
17 17 class EventManager(object):
18 18 """Manage a collection of events and a sequence of callbacks for each.
19 19
20 20 This is attached to :class:`~IPython.core.interactiveshell.InteractiveShell`
21 21 instances as an ``events`` attribute.
22 22
23 23 .. note::
24 24
25 25 This API is experimental in IPython 2.0, and may be revised in future versions.
26 26 """
27 27 def __init__(self, shell, available_events):
28 28 """Initialise the :class:`CallbackManager`.
29 29
30 30 Parameters
31 31 ----------
32 32 shell
33 33 The :class:`~IPython.core.interactiveshell.InteractiveShell` instance
34 34 available_callbacks
35 35 An iterable of names for callback events.
36 36 """
37 37 self.shell = shell
38 38 self.callbacks = {n:[] for n in available_events}
39 39
40 40 def register(self, event, function):
41 41 """Register a new event callback
42 42
43 43 Parameters
44 44 ----------
45 45 event : str
46 46 The event for which to register this callback.
47 47 function : callable
48 48 A function to be called on the given event. It should take the same
49 49 parameters as the appropriate callback prototype.
50 50
51 51 Raises
52 52 ------
53 53 TypeError
54 54 If ``function`` is not callable.
55 55 KeyError
56 56 If ``event`` is not one of the known events.
57 57 """
58 58 if not callable(function):
59 59 raise TypeError('Need a callable, got %r' % function)
60 60 self.callbacks[event].append(function)
61 61
62 62 def unregister(self, event, function):
63 63 """Remove a callback from the given event."""
64 64 self.callbacks[event].remove(function)
65 65
66 def reset(self, event):
67 """Clear all callbacks for the given event."""
68 self.callbacks[event] = []
69
70 def reset_all(self):
71 """Clear all callbacks for all events."""
72 self.callbacks = {n:[] for n in self.callbacks}
73
74 66 def trigger(self, event, *args, **kwargs):
75 67 """Call callbacks for ``event``.
76 68
77 69 Any additional arguments are passed to all callbacks registered for this
78 70 event. Exceptions raised by callbacks are caught, and a message printed.
79 71 """
80 72 for func in self.callbacks[event]:
81 73 try:
82 74 func(*args, **kwargs)
83 75 except Exception:
84 76 print("Error in callback {} (for {}):".format(func, event))
85 77 self.shell.showtraceback()
86 78
87 79 # event_name -> prototype mapping
88 80 available_events = {}
89 81
90 82 def _define_event(callback_proto):
91 83 available_events[callback_proto.__name__] = callback_proto
92 84 return callback_proto
93 85
94 86 # ------------------------------------------------------------------------------
95 87 # Callback prototypes
96 88 #
97 89 # No-op functions which describe the names of available events and the
98 90 # signatures of callbacks for those events.
99 91 # ------------------------------------------------------------------------------
100 92
101 93 @_define_event
102 94 def pre_execute():
103 95 """Fires before code is executed in response to user/frontend action.
104 96
105 97 This includes comm and widget messages and silent execution, as well as user
106 98 code cells."""
107 99 pass
108 100
109 101 @_define_event
110 102 def pre_run_cell():
111 103 """Fires before user-entered code runs."""
112 104 pass
113 105
114 106 @_define_event
115 107 def post_execute():
116 108 """Fires after code is executed in response to user/frontend action.
117 109
118 110 This includes comm and widget messages and silent execution, as well as user
119 111 code cells."""
120 112 pass
121 113
122 114 @_define_event
123 115 def post_run_cell():
124 116 """Fires after user-entered code runs."""
125 117 pass
126 118
127 119 @_define_event
128 120 def shell_initialized(ip):
129 121 """Fires after initialisation of :class:`~IPython.core.interactiveshell.InteractiveShell`.
130 122
131 123 This is before extensions and startup scripts are loaded, so it can only be
132 124 set by subclassing.
133 125
134 126 Parameters
135 127 ----------
136 128 ip : :class:`~IPython.core.interactiveshell.InteractiveShell`
137 129 The newly initialised shell.
138 130 """
139 131 pass
@@ -1,46 +1,32 b''
1 1 import unittest
2 2 try: # Python 3.3 +
3 3 from unittest.mock import Mock
4 4 except ImportError:
5 5 from mock import Mock
6 6
7 7 from IPython.core import events
8 8 import IPython.testing.tools as tt
9 9
10 10 def ping_received():
11 11 pass
12 12
13 13 class CallbackTests(unittest.TestCase):
14 14 def setUp(self):
15 15 self.em = events.EventManager(get_ipython(), {'ping_received': ping_received})
16 16
17 17 def test_register_unregister(self):
18 18 cb = Mock()
19 19
20 20 self.em.register('ping_received', cb)
21 21 self.em.trigger('ping_received')
22 22 self.assertEqual(cb.call_count, 1)
23 23
24 24 self.em.unregister('ping_received', cb)
25 25 self.em.trigger('ping_received')
26 26 self.assertEqual(cb.call_count, 1)
27 27
28 def test_reset(self):
29 cb = Mock()
30 self.em.register('ping_received', cb)
31 self.em.reset('ping_received')
32 self.em.trigger('ping_received')
33 assert not cb.called
34
35 def test_reset_all(self):
36 cb = Mock()
37 self.em.register('ping_received', cb)
38 self.em.reset_all()
39 self.em.trigger('ping_received')
40 assert not cb.called
41
42 28 def test_cb_error(self):
43 29 cb = Mock(side_effect=ValueError)
44 30 self.em.register('ping_received', cb)
45 31 with tt.AssertPrints("Error in callback"):
46 self.em.trigger('ping_received') No newline at end of file
32 self.em.trigger('ping_received')
@@ -1,842 +1,845 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for the key interactiveshell module.
3 3
4 4 Historically the main classes in interactiveshell have been under-tested. This
5 5 module should grow as many single-method tests as possible to trap many of the
6 6 recurring bugs we seem to encounter with high-level interaction.
7 7 """
8 8
9 9 # Copyright (c) IPython Development Team.
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 12 import ast
13 13 import os
14 14 import signal
15 15 import shutil
16 16 import sys
17 17 import tempfile
18 18 import unittest
19 19 try:
20 20 from unittest import mock
21 21 except ImportError:
22 22 import mock
23 23 from os.path import join
24 24
25 25 import nose.tools as nt
26 26
27 27 from IPython.core.error import InputRejected
28 28 from IPython.core.inputtransformer import InputTransformer
29 29 from IPython.testing.decorators import (
30 30 skipif, skip_win32, onlyif_unicode_paths, onlyif_cmds_exist,
31 31 )
32 32 from IPython.testing import tools as tt
33 33 from IPython.utils import io
34 34 from IPython.utils.process import find_cmd
35 35 from IPython.utils import py3compat
36 36 from IPython.utils.py3compat import unicode_type, PY3
37 37
38 38 if PY3:
39 39 from io import StringIO
40 40 else:
41 41 from StringIO import StringIO
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Globals
45 45 #-----------------------------------------------------------------------------
46 46 # This is used by every single test, no point repeating it ad nauseam
47 47 ip = get_ipython()
48 48
49 49 #-----------------------------------------------------------------------------
50 50 # Tests
51 51 #-----------------------------------------------------------------------------
52 52
53 53 class InteractiveShellTestCase(unittest.TestCase):
54 54 def test_naked_string_cells(self):
55 55 """Test that cells with only naked strings are fully executed"""
56 56 # First, single-line inputs
57 57 ip.run_cell('"a"\n')
58 58 self.assertEqual(ip.user_ns['_'], 'a')
59 59 # And also multi-line cells
60 60 ip.run_cell('"""a\nb"""\n')
61 61 self.assertEqual(ip.user_ns['_'], 'a\nb')
62 62
63 63 def test_run_empty_cell(self):
64 64 """Just make sure we don't get a horrible error with a blank
65 65 cell of input. Yes, I did overlook that."""
66 66 old_xc = ip.execution_count
67 67 ip.run_cell('')
68 68 self.assertEqual(ip.execution_count, old_xc)
69 69
70 70 def test_run_cell_multiline(self):
71 71 """Multi-block, multi-line cells must execute correctly.
72 72 """
73 73 src = '\n'.join(["x=1",
74 74 "y=2",
75 75 "if 1:",
76 76 " x += 1",
77 77 " y += 1",])
78 78 ip.run_cell(src)
79 79 self.assertEqual(ip.user_ns['x'], 2)
80 80 self.assertEqual(ip.user_ns['y'], 3)
81 81
82 82 def test_multiline_string_cells(self):
83 83 "Code sprinkled with multiline strings should execute (GH-306)"
84 84 ip.run_cell('tmp=0')
85 85 self.assertEqual(ip.user_ns['tmp'], 0)
86 86 ip.run_cell('tmp=1;"""a\nb"""\n')
87 87 self.assertEqual(ip.user_ns['tmp'], 1)
88 88
89 89 def test_dont_cache_with_semicolon(self):
90 90 "Ending a line with semicolon should not cache the returned object (GH-307)"
91 91 oldlen = len(ip.user_ns['Out'])
92 92 for cell in ['1;', '1;1;']:
93 93 ip.run_cell(cell, store_history=True)
94 94 newlen = len(ip.user_ns['Out'])
95 95 self.assertEqual(oldlen, newlen)
96 96 i = 0
97 97 #also test the default caching behavior
98 98 for cell in ['1', '1;1']:
99 99 ip.run_cell(cell, store_history=True)
100 100 newlen = len(ip.user_ns['Out'])
101 101 i += 1
102 102 self.assertEqual(oldlen+i, newlen)
103 103
104 104 def test_In_variable(self):
105 105 "Verify that In variable grows with user input (GH-284)"
106 106 oldlen = len(ip.user_ns['In'])
107 107 ip.run_cell('1;', store_history=True)
108 108 newlen = len(ip.user_ns['In'])
109 109 self.assertEqual(oldlen+1, newlen)
110 110 self.assertEqual(ip.user_ns['In'][-1],'1;')
111 111
112 112 def test_magic_names_in_string(self):
113 113 ip.run_cell('a = """\n%exit\n"""')
114 114 self.assertEqual(ip.user_ns['a'], '\n%exit\n')
115 115
116 116 def test_trailing_newline(self):
117 117 """test that running !(command) does not raise a SyntaxError"""
118 118 ip.run_cell('!(true)\n', False)
119 119 ip.run_cell('!(true)\n\n\n', False)
120 120
121 121 def test_gh_597(self):
122 122 """Pretty-printing lists of objects with non-ascii reprs may cause
123 123 problems."""
124 124 class Spam(object):
125 125 def __repr__(self):
126 126 return "\xe9"*50
127 127 import IPython.core.formatters
128 128 f = IPython.core.formatters.PlainTextFormatter()
129 129 f([Spam(),Spam()])
130 130
131 131
132 132 def test_future_flags(self):
133 133 """Check that future flags are used for parsing code (gh-777)"""
134 134 ip.run_cell('from __future__ import print_function')
135 135 try:
136 136 ip.run_cell('prfunc_return_val = print(1,2, sep=" ")')
137 137 assert 'prfunc_return_val' in ip.user_ns
138 138 finally:
139 139 # Reset compiler flags so we don't mess up other tests.
140 140 ip.compile.reset_compiler_flags()
141 141
142 142 def test_future_unicode(self):
143 143 """Check that unicode_literals is imported from __future__ (gh #786)"""
144 144 try:
145 145 ip.run_cell(u'byte_str = "a"')
146 146 assert isinstance(ip.user_ns['byte_str'], str) # string literals are byte strings by default
147 147 ip.run_cell('from __future__ import unicode_literals')
148 148 ip.run_cell(u'unicode_str = "a"')
149 149 assert isinstance(ip.user_ns['unicode_str'], unicode_type) # strings literals are now unicode
150 150 finally:
151 151 # Reset compiler flags so we don't mess up other tests.
152 152 ip.compile.reset_compiler_flags()
153 153
154 154 def test_can_pickle(self):
155 155 "Can we pickle objects defined interactively (GH-29)"
156 156 ip = get_ipython()
157 157 ip.reset()
158 158 ip.run_cell(("class Mylist(list):\n"
159 159 " def __init__(self,x=[]):\n"
160 160 " list.__init__(self,x)"))
161 161 ip.run_cell("w=Mylist([1,2,3])")
162 162
163 163 from pickle import dumps
164 164
165 165 # We need to swap in our main module - this is only necessary
166 166 # inside the test framework, because IPython puts the interactive module
167 167 # in place (but the test framework undoes this).
168 168 _main = sys.modules['__main__']
169 169 sys.modules['__main__'] = ip.user_module
170 170 try:
171 171 res = dumps(ip.user_ns["w"])
172 172 finally:
173 173 sys.modules['__main__'] = _main
174 174 self.assertTrue(isinstance(res, bytes))
175 175
176 176 def test_global_ns(self):
177 177 "Code in functions must be able to access variables outside them."
178 178 ip = get_ipython()
179 179 ip.run_cell("a = 10")
180 180 ip.run_cell(("def f(x):\n"
181 181 " return x + a"))
182 182 ip.run_cell("b = f(12)")
183 183 self.assertEqual(ip.user_ns["b"], 22)
184 184
185 185 def test_bad_custom_tb(self):
186 186 """Check that InteractiveShell is protected from bad custom exception handlers"""
187 187 from IPython.utils import io
188 188 save_stderr = io.stderr
189 189 try:
190 190 # capture stderr
191 191 io.stderr = StringIO()
192 192 ip.set_custom_exc((IOError,), lambda etype,value,tb: 1/0)
193 193 self.assertEqual(ip.custom_exceptions, (IOError,))
194 194 ip.run_cell(u'raise IOError("foo")')
195 195 self.assertEqual(ip.custom_exceptions, ())
196 196 self.assertTrue("Custom TB Handler failed" in io.stderr.getvalue())
197 197 finally:
198 198 io.stderr = save_stderr
199 199
200 200 def test_bad_custom_tb_return(self):
201 201 """Check that InteractiveShell is protected from bad return types in custom exception handlers"""
202 202 from IPython.utils import io
203 203 save_stderr = io.stderr
204 204 try:
205 205 # capture stderr
206 206 io.stderr = StringIO()
207 207 ip.set_custom_exc((NameError,),lambda etype,value,tb, tb_offset=None: 1)
208 208 self.assertEqual(ip.custom_exceptions, (NameError,))
209 209 ip.run_cell(u'a=abracadabra')
210 210 self.assertEqual(ip.custom_exceptions, ())
211 211 self.assertTrue("Custom TB Handler failed" in io.stderr.getvalue())
212 212 finally:
213 213 io.stderr = save_stderr
214 214
215 215 def test_drop_by_id(self):
216 216 myvars = {"a":object(), "b":object(), "c": object()}
217 217 ip.push(myvars, interactive=False)
218 218 for name in myvars:
219 219 assert name in ip.user_ns, name
220 220 assert name in ip.user_ns_hidden, name
221 221 ip.user_ns['b'] = 12
222 222 ip.drop_by_id(myvars)
223 223 for name in ["a", "c"]:
224 224 assert name not in ip.user_ns, name
225 225 assert name not in ip.user_ns_hidden, name
226 226 assert ip.user_ns['b'] == 12
227 227 ip.reset()
228 228
229 229 def test_var_expand(self):
230 230 ip.user_ns['f'] = u'Ca\xf1o'
231 231 self.assertEqual(ip.var_expand(u'echo $f'), u'echo Ca\xf1o')
232 232 self.assertEqual(ip.var_expand(u'echo {f}'), u'echo Ca\xf1o')
233 233 self.assertEqual(ip.var_expand(u'echo {f[:-1]}'), u'echo Ca\xf1')
234 234 self.assertEqual(ip.var_expand(u'echo {1*2}'), u'echo 2')
235 235
236 236 ip.user_ns['f'] = b'Ca\xc3\xb1o'
237 237 # This should not raise any exception:
238 238 ip.var_expand(u'echo $f')
239 239
240 240 def test_var_expand_local(self):
241 241 """Test local variable expansion in !system and %magic calls"""
242 242 # !system
243 243 ip.run_cell('def test():\n'
244 244 ' lvar = "ttt"\n'
245 245 ' ret = !echo {lvar}\n'
246 246 ' return ret[0]\n')
247 247 res = ip.user_ns['test']()
248 248 nt.assert_in('ttt', res)
249 249
250 250 # %magic
251 251 ip.run_cell('def makemacro():\n'
252 252 ' macroname = "macro_var_expand_locals"\n'
253 253 ' %macro {macroname} codestr\n')
254 254 ip.user_ns['codestr'] = "str(12)"
255 255 ip.run_cell('makemacro()')
256 256 nt.assert_in('macro_var_expand_locals', ip.user_ns)
257 257
258 258 def test_var_expand_self(self):
259 259 """Test variable expansion with the name 'self', which was failing.
260 260
261 261 See https://github.com/ipython/ipython/issues/1878#issuecomment-7698218
262 262 """
263 263 ip.run_cell('class cTest:\n'
264 264 ' classvar="see me"\n'
265 265 ' def test(self):\n'
266 266 ' res = !echo Variable: {self.classvar}\n'
267 267 ' return res[0]\n')
268 268 nt.assert_in('see me', ip.user_ns['cTest']().test())
269 269
270 270 def test_bad_var_expand(self):
271 271 """var_expand on invalid formats shouldn't raise"""
272 272 # SyntaxError
273 273 self.assertEqual(ip.var_expand(u"{'a':5}"), u"{'a':5}")
274 274 # NameError
275 275 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
276 276 # ZeroDivisionError
277 277 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
278 278
279 279 def test_silent_postexec(self):
280 280 """run_cell(silent=True) doesn't invoke pre/post_run_cell callbacks"""
281 281 pre_explicit = mock.Mock()
282 282 pre_always = mock.Mock()
283 283 post_explicit = mock.Mock()
284 284 post_always = mock.Mock()
285 285
286 286 ip.events.register('pre_run_cell', pre_explicit)
287 287 ip.events.register('pre_execute', pre_always)
288 288 ip.events.register('post_run_cell', post_explicit)
289 289 ip.events.register('post_execute', post_always)
290 290
291 291 try:
292 292 ip.run_cell("1", silent=True)
293 293 assert pre_always.called
294 294 assert not pre_explicit.called
295 295 assert post_always.called
296 296 assert not post_explicit.called
297 297 # double-check that non-silent exec did what we expected
298 298 # silent to avoid
299 299 ip.run_cell("1")
300 300 assert pre_explicit.called
301 301 assert post_explicit.called
302 302 finally:
303 303 # remove post-exec
304 ip.events.reset_all()
304 ip.events.unregister('pre_run_cell', pre_explicit)
305 ip.events.unregister('pre_execute', pre_always)
306 ip.events.unregister('post_run_cell', post_explicit)
307 ip.events.unregister('post_execute', post_always)
305 308
306 309 def test_silent_noadvance(self):
307 310 """run_cell(silent=True) doesn't advance execution_count"""
308 311 ec = ip.execution_count
309 312 # silent should force store_history=False
310 313 ip.run_cell("1", store_history=True, silent=True)
311 314
312 315 self.assertEqual(ec, ip.execution_count)
313 316 # double-check that non-silent exec did what we expected
314 317 # silent to avoid
315 318 ip.run_cell("1", store_history=True)
316 319 self.assertEqual(ec+1, ip.execution_count)
317 320
318 321 def test_silent_nodisplayhook(self):
319 322 """run_cell(silent=True) doesn't trigger displayhook"""
320 323 d = dict(called=False)
321 324
322 325 trap = ip.display_trap
323 326 save_hook = trap.hook
324 327
325 328 def failing_hook(*args, **kwargs):
326 329 d['called'] = True
327 330
328 331 try:
329 332 trap.hook = failing_hook
330 333 ip.run_cell("1", silent=True)
331 334 self.assertFalse(d['called'])
332 335 # double-check that non-silent exec did what we expected
333 336 # silent to avoid
334 337 ip.run_cell("1")
335 338 self.assertTrue(d['called'])
336 339 finally:
337 340 trap.hook = save_hook
338 341
339 342 @skipif(sys.version_info[0] >= 3, "softspace removed in py3")
340 343 def test_print_softspace(self):
341 344 """Verify that softspace is handled correctly when executing multiple
342 345 statements.
343 346
344 347 In [1]: print 1; print 2
345 348 1
346 349 2
347 350
348 351 In [2]: print 1,; print 2
349 352 1 2
350 353 """
351 354
352 355 def test_ofind_line_magic(self):
353 356 from IPython.core.magic import register_line_magic
354 357
355 358 @register_line_magic
356 359 def lmagic(line):
357 360 "A line magic"
358 361
359 362 # Get info on line magic
360 363 lfind = ip._ofind('lmagic')
361 364 info = dict(found=True, isalias=False, ismagic=True,
362 365 namespace = 'IPython internal', obj= lmagic.__wrapped__,
363 366 parent = None)
364 367 nt.assert_equal(lfind, info)
365 368
366 369 def test_ofind_cell_magic(self):
367 370 from IPython.core.magic import register_cell_magic
368 371
369 372 @register_cell_magic
370 373 def cmagic(line, cell):
371 374 "A cell magic"
372 375
373 376 # Get info on cell magic
374 377 find = ip._ofind('cmagic')
375 378 info = dict(found=True, isalias=False, ismagic=True,
376 379 namespace = 'IPython internal', obj= cmagic.__wrapped__,
377 380 parent = None)
378 381 nt.assert_equal(find, info)
379 382
380 383 def test_ofind_property_with_error(self):
381 384 class A(object):
382 385 @property
383 386 def foo(self):
384 387 raise NotImplementedError()
385 388 a = A()
386 389
387 390 found = ip._ofind('a.foo', [('locals', locals())])
388 391 info = dict(found=True, isalias=False, ismagic=False,
389 392 namespace='locals', obj=A.foo, parent=a)
390 393 nt.assert_equal(found, info)
391 394
392 395 def test_ofind_multiple_attribute_lookups(self):
393 396 class A(object):
394 397 @property
395 398 def foo(self):
396 399 raise NotImplementedError()
397 400
398 401 a = A()
399 402 a.a = A()
400 403 a.a.a = A()
401 404
402 405 found = ip._ofind('a.a.a.foo', [('locals', locals())])
403 406 info = dict(found=True, isalias=False, ismagic=False,
404 407 namespace='locals', obj=A.foo, parent=a.a.a)
405 408 nt.assert_equal(found, info)
406 409
407 410 def test_ofind_slotted_attributes(self):
408 411 class A(object):
409 412 __slots__ = ['foo']
410 413 def __init__(self):
411 414 self.foo = 'bar'
412 415
413 416 a = A()
414 417 found = ip._ofind('a.foo', [('locals', locals())])
415 418 info = dict(found=True, isalias=False, ismagic=False,
416 419 namespace='locals', obj=a.foo, parent=a)
417 420 nt.assert_equal(found, info)
418 421
419 422 found = ip._ofind('a.bar', [('locals', locals())])
420 423 info = dict(found=False, isalias=False, ismagic=False,
421 424 namespace=None, obj=None, parent=a)
422 425 nt.assert_equal(found, info)
423 426
424 427 def test_ofind_prefers_property_to_instance_level_attribute(self):
425 428 class A(object):
426 429 @property
427 430 def foo(self):
428 431 return 'bar'
429 432 a = A()
430 433 a.__dict__['foo'] = 'baz'
431 434 nt.assert_equal(a.foo, 'bar')
432 435 found = ip._ofind('a.foo', [('locals', locals())])
433 436 nt.assert_is(found['obj'], A.foo)
434 437
435 438 def test_custom_exception(self):
436 439 called = []
437 440 def my_handler(shell, etype, value, tb, tb_offset=None):
438 441 called.append(etype)
439 442 shell.showtraceback((etype, value, tb), tb_offset=tb_offset)
440 443
441 444 ip.set_custom_exc((ValueError,), my_handler)
442 445 try:
443 446 ip.run_cell("raise ValueError('test')")
444 447 # Check that this was called, and only once.
445 448 self.assertEqual(called, [ValueError])
446 449 finally:
447 450 # Reset the custom exception hook
448 451 ip.set_custom_exc((), None)
449 452
450 453 @skipif(sys.version_info[0] >= 3, "no differences with __future__ in py3")
451 454 def test_future_environment(self):
452 455 "Can we run code with & without the shell's __future__ imports?"
453 456 ip.run_cell("from __future__ import division")
454 457 ip.run_cell("a = 1/2", shell_futures=True)
455 458 self.assertEqual(ip.user_ns['a'], 0.5)
456 459 ip.run_cell("b = 1/2", shell_futures=False)
457 460 self.assertEqual(ip.user_ns['b'], 0)
458 461
459 462 ip.compile.reset_compiler_flags()
460 463 # This shouldn't leak to the shell's compiler
461 464 ip.run_cell("from __future__ import division \nc=1/2", shell_futures=False)
462 465 self.assertEqual(ip.user_ns['c'], 0.5)
463 466 ip.run_cell("d = 1/2", shell_futures=True)
464 467 self.assertEqual(ip.user_ns['d'], 0)
465 468
466 469 def test_mktempfile(self):
467 470 filename = ip.mktempfile()
468 471 # Check that we can open the file again on Windows
469 472 with open(filename, 'w') as f:
470 473 f.write('abc')
471 474
472 475 filename = ip.mktempfile(data='blah')
473 476 with open(filename, 'r') as f:
474 477 self.assertEqual(f.read(), 'blah')
475 478
476 479 def test_new_main_mod(self):
477 480 # Smoketest to check that this accepts a unicode module name
478 481 name = u'jiefmw'
479 482 mod = ip.new_main_mod(u'%s.py' % name, name)
480 483 self.assertEqual(mod.__name__, name)
481 484
482 485 class TestSafeExecfileNonAsciiPath(unittest.TestCase):
483 486
484 487 @onlyif_unicode_paths
485 488 def setUp(self):
486 489 self.BASETESTDIR = tempfile.mkdtemp()
487 490 self.TESTDIR = join(self.BASETESTDIR, u"Γ₯Àâ")
488 491 os.mkdir(self.TESTDIR)
489 492 with open(join(self.TESTDIR, u"Γ₯Àâtestscript.py"), "w") as sfile:
490 493 sfile.write("pass\n")
491 494 self.oldpath = py3compat.getcwd()
492 495 os.chdir(self.TESTDIR)
493 496 self.fname = u"Γ₯Àâtestscript.py"
494 497
495 498 def tearDown(self):
496 499 os.chdir(self.oldpath)
497 500 shutil.rmtree(self.BASETESTDIR)
498 501
499 502 @onlyif_unicode_paths
500 503 def test_1(self):
501 504 """Test safe_execfile with non-ascii path
502 505 """
503 506 ip.safe_execfile(self.fname, {}, raise_exceptions=True)
504 507
505 508 class ExitCodeChecks(tt.TempFileMixin):
506 509 def test_exit_code_ok(self):
507 510 self.system('exit 0')
508 511 self.assertEqual(ip.user_ns['_exit_code'], 0)
509 512
510 513 def test_exit_code_error(self):
511 514 self.system('exit 1')
512 515 self.assertEqual(ip.user_ns['_exit_code'], 1)
513 516
514 517 @skipif(not hasattr(signal, 'SIGALRM'))
515 518 def test_exit_code_signal(self):
516 519 self.mktmp("import signal, time\n"
517 520 "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
518 521 "time.sleep(1)\n")
519 522 self.system("%s %s" % (sys.executable, self.fname))
520 523 self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
521 524
522 525 @onlyif_cmds_exist("csh")
523 526 def test_exit_code_signal_csh(self):
524 527 SHELL = os.environ.get('SHELL', None)
525 528 os.environ['SHELL'] = find_cmd("csh")
526 529 try:
527 530 self.test_exit_code_signal()
528 531 finally:
529 532 if SHELL is not None:
530 533 os.environ['SHELL'] = SHELL
531 534 else:
532 535 del os.environ['SHELL']
533 536
534 537 class TestSystemRaw(unittest.TestCase, ExitCodeChecks):
535 538 system = ip.system_raw
536 539
537 540 @onlyif_unicode_paths
538 541 def test_1(self):
539 542 """Test system_raw with non-ascii cmd
540 543 """
541 544 cmd = u'''python -c "'Γ₯Àâ'" '''
542 545 ip.system_raw(cmd)
543 546
544 547 # TODO: Exit codes are currently ignored on Windows.
545 548 class TestSystemPipedExitCode(unittest.TestCase, ExitCodeChecks):
546 549 system = ip.system_piped
547 550
548 551 @skip_win32
549 552 def test_exit_code_ok(self):
550 553 ExitCodeChecks.test_exit_code_ok(self)
551 554
552 555 @skip_win32
553 556 def test_exit_code_error(self):
554 557 ExitCodeChecks.test_exit_code_error(self)
555 558
556 559 @skip_win32
557 560 def test_exit_code_signal(self):
558 561 ExitCodeChecks.test_exit_code_signal(self)
559 562
560 563 class TestModules(unittest.TestCase, tt.TempFileMixin):
561 564 def test_extraneous_loads(self):
562 565 """Test we're not loading modules on startup that we shouldn't.
563 566 """
564 567 self.mktmp("import sys\n"
565 568 "print('numpy' in sys.modules)\n"
566 569 "print('IPython.parallel' in sys.modules)\n"
567 570 "print('IPython.kernel.zmq' in sys.modules)\n"
568 571 )
569 572 out = "False\nFalse\nFalse\n"
570 573 tt.ipexec_validate(self.fname, out)
571 574
572 575 class Negator(ast.NodeTransformer):
573 576 """Negates all number literals in an AST."""
574 577 def visit_Num(self, node):
575 578 node.n = -node.n
576 579 return node
577 580
578 581 class TestAstTransform(unittest.TestCase):
579 582 def setUp(self):
580 583 self.negator = Negator()
581 584 ip.ast_transformers.append(self.negator)
582 585
583 586 def tearDown(self):
584 587 ip.ast_transformers.remove(self.negator)
585 588
586 589 def test_run_cell(self):
587 590 with tt.AssertPrints('-34'):
588 591 ip.run_cell('print (12 + 22)')
589 592
590 593 # A named reference to a number shouldn't be transformed.
591 594 ip.user_ns['n'] = 55
592 595 with tt.AssertNotPrints('-55'):
593 596 ip.run_cell('print (n)')
594 597
595 598 def test_timeit(self):
596 599 called = set()
597 600 def f(x):
598 601 called.add(x)
599 602 ip.push({'f':f})
600 603
601 604 with tt.AssertPrints("best of "):
602 605 ip.run_line_magic("timeit", "-n1 f(1)")
603 606 self.assertEqual(called, set([-1]))
604 607 called.clear()
605 608
606 609 with tt.AssertPrints("best of "):
607 610 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
608 611 self.assertEqual(called, set([-2, -3]))
609 612
610 613 def test_time(self):
611 614 called = []
612 615 def f(x):
613 616 called.append(x)
614 617 ip.push({'f':f})
615 618
616 619 # Test with an expression
617 620 with tt.AssertPrints("Wall time: "):
618 621 ip.run_line_magic("time", "f(5+9)")
619 622 self.assertEqual(called, [-14])
620 623 called[:] = []
621 624
622 625 # Test with a statement (different code path)
623 626 with tt.AssertPrints("Wall time: "):
624 627 ip.run_line_magic("time", "a = f(-3 + -2)")
625 628 self.assertEqual(called, [5])
626 629
627 630 def test_macro(self):
628 631 ip.push({'a':10})
629 632 # The AST transformation makes this do a+=-1
630 633 ip.define_macro("amacro", "a+=1\nprint(a)")
631 634
632 635 with tt.AssertPrints("9"):
633 636 ip.run_cell("amacro")
634 637 with tt.AssertPrints("8"):
635 638 ip.run_cell("amacro")
636 639
637 640 class IntegerWrapper(ast.NodeTransformer):
638 641 """Wraps all integers in a call to Integer()"""
639 642 def visit_Num(self, node):
640 643 if isinstance(node.n, int):
641 644 return ast.Call(func=ast.Name(id='Integer', ctx=ast.Load()),
642 645 args=[node], keywords=[])
643 646 return node
644 647
645 648 class TestAstTransform2(unittest.TestCase):
646 649 def setUp(self):
647 650 self.intwrapper = IntegerWrapper()
648 651 ip.ast_transformers.append(self.intwrapper)
649 652
650 653 self.calls = []
651 654 def Integer(*args):
652 655 self.calls.append(args)
653 656 return args
654 657 ip.push({"Integer": Integer})
655 658
656 659 def tearDown(self):
657 660 ip.ast_transformers.remove(self.intwrapper)
658 661 del ip.user_ns['Integer']
659 662
660 663 def test_run_cell(self):
661 664 ip.run_cell("n = 2")
662 665 self.assertEqual(self.calls, [(2,)])
663 666
664 667 # This shouldn't throw an error
665 668 ip.run_cell("o = 2.0")
666 669 self.assertEqual(ip.user_ns['o'], 2.0)
667 670
668 671 def test_timeit(self):
669 672 called = set()
670 673 def f(x):
671 674 called.add(x)
672 675 ip.push({'f':f})
673 676
674 677 with tt.AssertPrints("best of "):
675 678 ip.run_line_magic("timeit", "-n1 f(1)")
676 679 self.assertEqual(called, set([(1,)]))
677 680 called.clear()
678 681
679 682 with tt.AssertPrints("best of "):
680 683 ip.run_cell_magic("timeit", "-n1 f(2)", "f(3)")
681 684 self.assertEqual(called, set([(2,), (3,)]))
682 685
683 686 class ErrorTransformer(ast.NodeTransformer):
684 687 """Throws an error when it sees a number."""
685 688 def visit_Num(self, node):
686 689 raise ValueError("test")
687 690
688 691 class TestAstTransformError(unittest.TestCase):
689 692 def test_unregistering(self):
690 693 err_transformer = ErrorTransformer()
691 694 ip.ast_transformers.append(err_transformer)
692 695
693 696 with tt.AssertPrints("unregister", channel='stderr'):
694 697 ip.run_cell("1 + 2")
695 698
696 699 # This should have been removed.
697 700 nt.assert_not_in(err_transformer, ip.ast_transformers)
698 701
699 702
700 703 class StringRejector(ast.NodeTransformer):
701 704 """Throws an InputRejected when it sees a string literal.
702 705
703 706 Used to verify that NodeTransformers can signal that a piece of code should
704 707 not be executed by throwing an InputRejected.
705 708 """
706 709
707 710 def visit_Str(self, node):
708 711 raise InputRejected("test")
709 712
710 713
711 714 class TestAstTransformInputRejection(unittest.TestCase):
712 715
713 716 def setUp(self):
714 717 self.transformer = StringRejector()
715 718 ip.ast_transformers.append(self.transformer)
716 719
717 720 def tearDown(self):
718 721 ip.ast_transformers.remove(self.transformer)
719 722
720 723 def test_input_rejection(self):
721 724 """Check that NodeTransformers can reject input."""
722 725
723 726 expect_exception_tb = tt.AssertPrints("InputRejected: test")
724 727 expect_no_cell_output = tt.AssertNotPrints("'unsafe'", suppress=False)
725 728
726 729 # Run the same check twice to verify that the transformer is not
727 730 # disabled after raising.
728 731 with expect_exception_tb, expect_no_cell_output:
729 732 ip.run_cell("'unsafe'")
730 733
731 734 with expect_exception_tb, expect_no_cell_output:
732 735 ip.run_cell("'unsafe'")
733 736
734 737 def test__IPYTHON__():
735 738 # This shouldn't raise a NameError, that's all
736 739 __IPYTHON__
737 740
738 741
739 742 class DummyRepr(object):
740 743 def __repr__(self):
741 744 return "DummyRepr"
742 745
743 746 def _repr_html_(self):
744 747 return "<b>dummy</b>"
745 748
746 749 def _repr_javascript_(self):
747 750 return "console.log('hi');", {'key': 'value'}
748 751
749 752
750 753 def test_user_variables():
751 754 # enable all formatters
752 755 ip.display_formatter.active_types = ip.display_formatter.format_types
753 756
754 757 ip.user_ns['dummy'] = d = DummyRepr()
755 758 keys = set(['dummy', 'doesnotexist'])
756 759 r = ip.user_expressions({ key:key for key in keys})
757 760
758 761 nt.assert_equal(keys, set(r.keys()))
759 762 dummy = r['dummy']
760 763 nt.assert_equal(set(['status', 'data', 'metadata']), set(dummy.keys()))
761 764 nt.assert_equal(dummy['status'], 'ok')
762 765 data = dummy['data']
763 766 metadata = dummy['metadata']
764 767 nt.assert_equal(data.get('text/html'), d._repr_html_())
765 768 js, jsmd = d._repr_javascript_()
766 769 nt.assert_equal(data.get('application/javascript'), js)
767 770 nt.assert_equal(metadata.get('application/javascript'), jsmd)
768 771
769 772 dne = r['doesnotexist']
770 773 nt.assert_equal(dne['status'], 'error')
771 774 nt.assert_equal(dne['ename'], 'NameError')
772 775
773 776 # back to text only
774 777 ip.display_formatter.active_types = ['text/plain']
775 778
776 779 def test_user_expression():
777 780 # enable all formatters
778 781 ip.display_formatter.active_types = ip.display_formatter.format_types
779 782 query = {
780 783 'a' : '1 + 2',
781 784 'b' : '1/0',
782 785 }
783 786 r = ip.user_expressions(query)
784 787 import pprint
785 788 pprint.pprint(r)
786 789 nt.assert_equal(set(r.keys()), set(query.keys()))
787 790 a = r['a']
788 791 nt.assert_equal(set(['status', 'data', 'metadata']), set(a.keys()))
789 792 nt.assert_equal(a['status'], 'ok')
790 793 data = a['data']
791 794 metadata = a['metadata']
792 795 nt.assert_equal(data.get('text/plain'), '3')
793 796
794 797 b = r['b']
795 798 nt.assert_equal(b['status'], 'error')
796 799 nt.assert_equal(b['ename'], 'ZeroDivisionError')
797 800
798 801 # back to text only
799 802 ip.display_formatter.active_types = ['text/plain']
800 803
801 804
802 805
803 806
804 807
805 808 class TestSyntaxErrorTransformer(unittest.TestCase):
806 809 """Check that SyntaxError raised by an input transformer is handled by run_cell()"""
807 810
808 811 class SyntaxErrorTransformer(InputTransformer):
809 812
810 813 def push(self, line):
811 814 pos = line.find('syntaxerror')
812 815 if pos >= 0:
813 816 e = SyntaxError('input contains "syntaxerror"')
814 817 e.text = line
815 818 e.offset = pos + 1
816 819 raise e
817 820 return line
818 821
819 822 def reset(self):
820 823 pass
821 824
822 825 def setUp(self):
823 826 self.transformer = TestSyntaxErrorTransformer.SyntaxErrorTransformer()
824 827 ip.input_splitter.python_line_transforms.append(self.transformer)
825 828 ip.input_transformer_manager.python_line_transforms.append(self.transformer)
826 829
827 830 def tearDown(self):
828 831 ip.input_splitter.python_line_transforms.remove(self.transformer)
829 832 ip.input_transformer_manager.python_line_transforms.remove(self.transformer)
830 833
831 834 def test_syntaxerror_input_transformer(self):
832 835 with tt.AssertPrints('1234'):
833 836 ip.run_cell('1234')
834 837 with tt.AssertPrints('SyntaxError: invalid syntax'):
835 838 ip.run_cell('1 2 3') # plain python syntax error
836 839 with tt.AssertPrints('SyntaxError: input contains "syntaxerror"'):
837 840 ip.run_cell('2345 # syntaxerror') # input transformer syntax error
838 841 with tt.AssertPrints('3456'):
839 842 ip.run_cell('3456')
840 843
841 844
842 845
General Comments 0
You need to be logged in to leave comments. Login now