##// END OF EJS Templates
Specify encoding in remainining instances of io.open
Thomas Kluyver -
Show More
@@ -1,944 +1,944 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for various magic functions.
3 3
4 4 Needs to be run by nose (to make ipython session available).
5 5 """
6 6 from __future__ import absolute_import
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Imports
10 10 #-----------------------------------------------------------------------------
11 11
12 12 import io
13 13 import os
14 14 import sys
15 15 from unittest import TestCase
16 16
17 17 try:
18 18 from importlib import invalidate_caches # Required from Python 3.3
19 19 except ImportError:
20 20 def invalidate_caches():
21 21 pass
22 22
23 23 import nose.tools as nt
24 24
25 25 from IPython.core import magic
26 26 from IPython.core.magic import (Magics, magics_class, line_magic,
27 27 cell_magic, line_cell_magic,
28 28 register_line_magic, register_cell_magic,
29 29 register_line_cell_magic)
30 30 from IPython.core.magics import execution, script, code
31 31 from IPython.nbformat.v3.tests.nbexamples import nb0
32 32 from IPython.nbformat import current
33 33 from IPython.testing import decorators as dec
34 34 from IPython.testing import tools as tt
35 35 from IPython.utils import py3compat
36 36 from IPython.utils.io import capture_output
37 37 from IPython.utils.tempdir import TemporaryDirectory
38 38 from IPython.utils.process import find_cmd
39 39
40 40 if py3compat.PY3:
41 41 from io import StringIO
42 42 else:
43 43 from StringIO import StringIO
44 44
45 45 #-----------------------------------------------------------------------------
46 46 # Test functions begin
47 47 #-----------------------------------------------------------------------------
48 48
49 49 @magic.magics_class
50 50 class DummyMagics(magic.Magics): pass
51 51
52 52 def test_extract_code_ranges():
53 53 instr = "1 3 5-6 7-9 10:15 17: :10 10- -13 :"
54 54 expected = [(0, 1),
55 55 (2, 3),
56 56 (4, 6),
57 57 (6, 9),
58 58 (9, 14),
59 59 (16, None),
60 60 (None, 9),
61 61 (9, None),
62 62 (None, 13),
63 63 (None, None)]
64 64 actual = list(code.extract_code_ranges(instr))
65 65 nt.assert_equal(actual, expected)
66 66
67 67 def test_extract_symbols():
68 68 source = """import foo\na = 10\ndef b():\n return 42\n\n\nclass A: pass\n\n\n"""
69 69 symbols_args = ["a", "b", "A", "A,b", "A,a", "z"]
70 70 expected = [([], ['a']),
71 71 (["def b():\n return 42\n"], []),
72 72 (["class A: pass\n"], []),
73 73 (["class A: pass\n", "def b():\n return 42\n"], []),
74 74 (["class A: pass\n"], ['a']),
75 75 ([], ['z'])]
76 76 for symbols, exp in zip(symbols_args, expected):
77 77 nt.assert_equal(code.extract_symbols(source, symbols), exp)
78 78
79 79
80 80 def test_extract_symbols_raises_exception_with_non_python_code():
81 81 source = ("=begin A Ruby program :)=end\n"
82 82 "def hello\n"
83 83 "puts 'Hello world'\n"
84 84 "end")
85 85 with nt.assert_raises(SyntaxError):
86 86 code.extract_symbols(source, "hello")
87 87
88 88 def test_config():
89 89 """ test that config magic does not raise
90 90 can happen if Configurable init is moved too early into
91 91 Magics.__init__ as then a Config object will be registerd as a
92 92 magic.
93 93 """
94 94 ## should not raise.
95 95 _ip.magic('config')
96 96
97 97 def test_rehashx():
98 98 # clear up everything
99 99 _ip = get_ipython()
100 100 _ip.alias_manager.clear_aliases()
101 101 del _ip.db['syscmdlist']
102 102
103 103 _ip.magic('rehashx')
104 104 # Practically ALL ipython development systems will have more than 10 aliases
105 105
106 106 nt.assert_true(len(_ip.alias_manager.aliases) > 10)
107 107 for name, cmd in _ip.alias_manager.aliases:
108 108 # we must strip dots from alias names
109 109 nt.assert_not_in('.', name)
110 110
111 111 # rehashx must fill up syscmdlist
112 112 scoms = _ip.db['syscmdlist']
113 113 nt.assert_true(len(scoms) > 10)
114 114
115 115
116 116 def test_magic_parse_options():
117 117 """Test that we don't mangle paths when parsing magic options."""
118 118 ip = get_ipython()
119 119 path = 'c:\\x'
120 120 m = DummyMagics(ip)
121 121 opts = m.parse_options('-f %s' % path,'f:')[0]
122 122 # argv splitting is os-dependent
123 123 if os.name == 'posix':
124 124 expected = 'c:x'
125 125 else:
126 126 expected = path
127 127 nt.assert_equal(opts['f'], expected)
128 128
129 129 def test_magic_parse_long_options():
130 130 """Magic.parse_options can handle --foo=bar long options"""
131 131 ip = get_ipython()
132 132 m = DummyMagics(ip)
133 133 opts, _ = m.parse_options('--foo --bar=bubble', 'a', 'foo', 'bar=')
134 134 nt.assert_in('foo', opts)
135 135 nt.assert_in('bar', opts)
136 136 nt.assert_equal(opts['bar'], "bubble")
137 137
138 138
139 139 @dec.skip_without('sqlite3')
140 140 def doctest_hist_f():
141 141 """Test %hist -f with temporary filename.
142 142
143 143 In [9]: import tempfile
144 144
145 145 In [10]: tfile = tempfile.mktemp('.py','tmp-ipython-')
146 146
147 147 In [11]: %hist -nl -f $tfile 3
148 148
149 149 In [13]: import os; os.unlink(tfile)
150 150 """
151 151
152 152
153 153 @dec.skip_without('sqlite3')
154 154 def doctest_hist_r():
155 155 """Test %hist -r
156 156
157 157 XXX - This test is not recording the output correctly. For some reason, in
158 158 testing mode the raw history isn't getting populated. No idea why.
159 159 Disabling the output checking for now, though at least we do run it.
160 160
161 161 In [1]: 'hist' in _ip.lsmagic()
162 162 Out[1]: True
163 163
164 164 In [2]: x=1
165 165
166 166 In [3]: %hist -rl 2
167 167 x=1 # random
168 168 %hist -r 2
169 169 """
170 170
171 171
172 172 @dec.skip_without('sqlite3')
173 173 def doctest_hist_op():
174 174 """Test %hist -op
175 175
176 176 In [1]: class b(float):
177 177 ...: pass
178 178 ...:
179 179
180 180 In [2]: class s(object):
181 181 ...: def __str__(self):
182 182 ...: return 's'
183 183 ...:
184 184
185 185 In [3]:
186 186
187 187 In [4]: class r(b):
188 188 ...: def __repr__(self):
189 189 ...: return 'r'
190 190 ...:
191 191
192 192 In [5]: class sr(s,r): pass
193 193 ...:
194 194
195 195 In [6]:
196 196
197 197 In [7]: bb=b()
198 198
199 199 In [8]: ss=s()
200 200
201 201 In [9]: rr=r()
202 202
203 203 In [10]: ssrr=sr()
204 204
205 205 In [11]: 4.5
206 206 Out[11]: 4.5
207 207
208 208 In [12]: str(ss)
209 209 Out[12]: 's'
210 210
211 211 In [13]:
212 212
213 213 In [14]: %hist -op
214 214 >>> class b:
215 215 ... pass
216 216 ...
217 217 >>> class s(b):
218 218 ... def __str__(self):
219 219 ... return 's'
220 220 ...
221 221 >>>
222 222 >>> class r(b):
223 223 ... def __repr__(self):
224 224 ... return 'r'
225 225 ...
226 226 >>> class sr(s,r): pass
227 227 >>>
228 228 >>> bb=b()
229 229 >>> ss=s()
230 230 >>> rr=r()
231 231 >>> ssrr=sr()
232 232 >>> 4.5
233 233 4.5
234 234 >>> str(ss)
235 235 's'
236 236 >>>
237 237 """
238 238
239 239
240 240 @dec.skip_without('sqlite3')
241 241 def test_macro():
242 242 ip = get_ipython()
243 243 ip.history_manager.reset() # Clear any existing history.
244 244 cmds = ["a=1", "def b():\n return a**2", "print(a,b())"]
245 245 for i, cmd in enumerate(cmds, start=1):
246 246 ip.history_manager.store_inputs(i, cmd)
247 247 ip.magic("macro test 1-3")
248 248 nt.assert_equal(ip.user_ns["test"].value, "\n".join(cmds)+"\n")
249 249
250 250 # List macros
251 251 nt.assert_in("test", ip.magic("macro"))
252 252
253 253
254 254 @dec.skip_without('sqlite3')
255 255 def test_macro_run():
256 256 """Test that we can run a multi-line macro successfully."""
257 257 ip = get_ipython()
258 258 ip.history_manager.reset()
259 259 cmds = ["a=10", "a+=1", py3compat.doctest_refactor_print("print a"),
260 260 "%macro test 2-3"]
261 261 for cmd in cmds:
262 262 ip.run_cell(cmd, store_history=True)
263 263 nt.assert_equal(ip.user_ns["test"].value,
264 264 py3compat.doctest_refactor_print("a+=1\nprint a\n"))
265 265 with tt.AssertPrints("12"):
266 266 ip.run_cell("test")
267 267 with tt.AssertPrints("13"):
268 268 ip.run_cell("test")
269 269
270 270
271 271 def test_magic_magic():
272 272 """Test %magic"""
273 273 ip = get_ipython()
274 274 with capture_output() as captured:
275 275 ip.magic("magic")
276 276
277 277 stdout = captured.stdout
278 278 nt.assert_in('%magic', stdout)
279 279 nt.assert_in('IPython', stdout)
280 280 nt.assert_in('Available', stdout)
281 281
282 282
283 283 @dec.skipif_not_numpy
284 284 def test_numpy_reset_array_undec():
285 285 "Test '%reset array' functionality"
286 286 _ip.ex('import numpy as np')
287 287 _ip.ex('a = np.empty(2)')
288 288 nt.assert_in('a', _ip.user_ns)
289 289 _ip.magic('reset -f array')
290 290 nt.assert_not_in('a', _ip.user_ns)
291 291
292 292 def test_reset_out():
293 293 "Test '%reset out' magic"
294 294 _ip.run_cell("parrot = 'dead'", store_history=True)
295 295 # test '%reset -f out', make an Out prompt
296 296 _ip.run_cell("parrot", store_history=True)
297 297 nt.assert_true('dead' in [_ip.user_ns[x] for x in ('_','__','___')])
298 298 _ip.magic('reset -f out')
299 299 nt.assert_false('dead' in [_ip.user_ns[x] for x in ('_','__','___')])
300 300 nt.assert_equal(len(_ip.user_ns['Out']), 0)
301 301
302 302 def test_reset_in():
303 303 "Test '%reset in' magic"
304 304 # test '%reset -f in'
305 305 _ip.run_cell("parrot", store_history=True)
306 306 nt.assert_true('parrot' in [_ip.user_ns[x] for x in ('_i','_ii','_iii')])
307 307 _ip.magic('%reset -f in')
308 308 nt.assert_false('parrot' in [_ip.user_ns[x] for x in ('_i','_ii','_iii')])
309 309 nt.assert_equal(len(set(_ip.user_ns['In'])), 1)
310 310
311 311 def test_reset_dhist():
312 312 "Test '%reset dhist' magic"
313 313 _ip.run_cell("tmp = [d for d in _dh]") # copy before clearing
314 314 _ip.magic('cd ' + os.path.dirname(nt.__file__))
315 315 _ip.magic('cd -')
316 316 nt.assert_true(len(_ip.user_ns['_dh']) > 0)
317 317 _ip.magic('reset -f dhist')
318 318 nt.assert_equal(len(_ip.user_ns['_dh']), 0)
319 319 _ip.run_cell("_dh = [d for d in tmp]") #restore
320 320
321 321 def test_reset_in_length():
322 322 "Test that '%reset in' preserves In[] length"
323 323 _ip.run_cell("print 'foo'")
324 324 _ip.run_cell("reset -f in")
325 325 nt.assert_equal(len(_ip.user_ns['In']), _ip.displayhook.prompt_count+1)
326 326
327 327 def test_tb_syntaxerror():
328 328 """test %tb after a SyntaxError"""
329 329 ip = get_ipython()
330 330 ip.run_cell("for")
331 331
332 332 # trap and validate stdout
333 333 save_stdout = sys.stdout
334 334 try:
335 335 sys.stdout = StringIO()
336 336 ip.run_cell("%tb")
337 337 out = sys.stdout.getvalue()
338 338 finally:
339 339 sys.stdout = save_stdout
340 340 # trim output, and only check the last line
341 341 last_line = out.rstrip().splitlines()[-1].strip()
342 342 nt.assert_equal(last_line, "SyntaxError: invalid syntax")
343 343
344 344
345 345 def test_time():
346 346 ip = get_ipython()
347 347
348 348 with tt.AssertPrints("Wall time: "):
349 349 ip.run_cell("%time None")
350 350
351 351 ip.run_cell("def f(kmjy):\n"
352 352 " %time print (2*kmjy)")
353 353
354 354 with tt.AssertPrints("Wall time: "):
355 355 with tt.AssertPrints("hihi", suppress=False):
356 356 ip.run_cell("f('hi')")
357 357
358 358
359 359 @dec.skip_win32
360 360 def test_time2():
361 361 ip = get_ipython()
362 362
363 363 with tt.AssertPrints("CPU times: user "):
364 364 ip.run_cell("%time None")
365 365
366 366 def test_time3():
367 367 """Erroneous magic function calls, issue gh-3334"""
368 368 ip = get_ipython()
369 369 ip.user_ns.pop('run', None)
370 370
371 371 with tt.AssertNotPrints("not found", channel='stderr'):
372 372 ip.run_cell("%%time\n"
373 373 "run = 0\n"
374 374 "run += 1")
375 375
376 376 def test_doctest_mode():
377 377 "Toggle doctest_mode twice, it should be a no-op and run without error"
378 378 _ip.magic('doctest_mode')
379 379 _ip.magic('doctest_mode')
380 380
381 381
382 382 def test_parse_options():
383 383 """Tests for basic options parsing in magics."""
384 384 # These are only the most minimal of tests, more should be added later. At
385 385 # the very least we check that basic text/unicode calls work OK.
386 386 m = DummyMagics(_ip)
387 387 nt.assert_equal(m.parse_options('foo', '')[1], 'foo')
388 388 nt.assert_equal(m.parse_options(u'foo', '')[1], u'foo')
389 389
390 390
391 391 def test_dirops():
392 392 """Test various directory handling operations."""
393 393 # curpath = lambda :os.path.splitdrive(py3compat.getcwd())[1].replace('\\','/')
394 394 curpath = py3compat.getcwd
395 395 startdir = py3compat.getcwd()
396 396 ipdir = os.path.realpath(_ip.ipython_dir)
397 397 try:
398 398 _ip.magic('cd "%s"' % ipdir)
399 399 nt.assert_equal(curpath(), ipdir)
400 400 _ip.magic('cd -')
401 401 nt.assert_equal(curpath(), startdir)
402 402 _ip.magic('pushd "%s"' % ipdir)
403 403 nt.assert_equal(curpath(), ipdir)
404 404 _ip.magic('popd')
405 405 nt.assert_equal(curpath(), startdir)
406 406 finally:
407 407 os.chdir(startdir)
408 408
409 409
410 410 def test_xmode():
411 411 # Calling xmode three times should be a no-op
412 412 xmode = _ip.InteractiveTB.mode
413 413 for i in range(3):
414 414 _ip.magic("xmode")
415 415 nt.assert_equal(_ip.InteractiveTB.mode, xmode)
416 416
417 417 def test_reset_hard():
418 418 monitor = []
419 419 class A(object):
420 420 def __del__(self):
421 421 monitor.append(1)
422 422 def __repr__(self):
423 423 return "<A instance>"
424 424
425 425 _ip.user_ns["a"] = A()
426 426 _ip.run_cell("a")
427 427
428 428 nt.assert_equal(monitor, [])
429 429 _ip.magic("reset -f")
430 430 nt.assert_equal(monitor, [1])
431 431
432 432 class TestXdel(tt.TempFileMixin):
433 433 def test_xdel(self):
434 434 """Test that references from %run are cleared by xdel."""
435 435 src = ("class A(object):\n"
436 436 " monitor = []\n"
437 437 " def __del__(self):\n"
438 438 " self.monitor.append(1)\n"
439 439 "a = A()\n")
440 440 self.mktmp(src)
441 441 # %run creates some hidden references...
442 442 _ip.magic("run %s" % self.fname)
443 443 # ... as does the displayhook.
444 444 _ip.run_cell("a")
445 445
446 446 monitor = _ip.user_ns["A"].monitor
447 447 nt.assert_equal(monitor, [])
448 448
449 449 _ip.magic("xdel a")
450 450
451 451 # Check that a's __del__ method has been called.
452 452 nt.assert_equal(monitor, [1])
453 453
454 454 def doctest_who():
455 455 """doctest for %who
456 456
457 457 In [1]: %reset -f
458 458
459 459 In [2]: alpha = 123
460 460
461 461 In [3]: beta = 'beta'
462 462
463 463 In [4]: %who int
464 464 alpha
465 465
466 466 In [5]: %who str
467 467 beta
468 468
469 469 In [6]: %whos
470 470 Variable Type Data/Info
471 471 ----------------------------
472 472 alpha int 123
473 473 beta str beta
474 474
475 475 In [7]: %who_ls
476 476 Out[7]: ['alpha', 'beta']
477 477 """
478 478
479 479 def test_whos():
480 480 """Check that whos is protected against objects where repr() fails."""
481 481 class A(object):
482 482 def __repr__(self):
483 483 raise Exception()
484 484 _ip.user_ns['a'] = A()
485 485 _ip.magic("whos")
486 486
487 487 @py3compat.u_format
488 488 def doctest_precision():
489 489 """doctest for %precision
490 490
491 491 In [1]: f = get_ipython().display_formatter.formatters['text/plain']
492 492
493 493 In [2]: %precision 5
494 494 Out[2]: {u}'%.5f'
495 495
496 496 In [3]: f.float_format
497 497 Out[3]: {u}'%.5f'
498 498
499 499 In [4]: %precision %e
500 500 Out[4]: {u}'%e'
501 501
502 502 In [5]: f(3.1415927)
503 503 Out[5]: {u}'3.141593e+00'
504 504 """
505 505
506 506 def test_psearch():
507 507 with tt.AssertPrints("dict.fromkeys"):
508 508 _ip.run_cell("dict.fr*?")
509 509
510 510 def test_timeit_shlex():
511 511 """test shlex issues with timeit (#1109)"""
512 512 _ip.ex("def f(*a,**kw): pass")
513 513 _ip.magic('timeit -n1 "this is a bug".count(" ")')
514 514 _ip.magic('timeit -r1 -n1 f(" ", 1)')
515 515 _ip.magic('timeit -r1 -n1 f(" ", 1, " ", 2, " ")')
516 516 _ip.magic('timeit -r1 -n1 ("a " + "b")')
517 517 _ip.magic('timeit -r1 -n1 f("a " + "b")')
518 518 _ip.magic('timeit -r1 -n1 f("a " + "b ")')
519 519
520 520
521 521 def test_timeit_arguments():
522 522 "Test valid timeit arguments, should not cause SyntaxError (GH #1269)"
523 523 _ip.magic("timeit ('#')")
524 524
525 525
526 526 def test_timeit_special_syntax():
527 527 "Test %%timeit with IPython special syntax"
528 528 @register_line_magic
529 529 def lmagic(line):
530 530 ip = get_ipython()
531 531 ip.user_ns['lmagic_out'] = line
532 532
533 533 # line mode test
534 534 _ip.run_line_magic('timeit', '-n1 -r1 %lmagic my line')
535 535 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line')
536 536 # cell mode test
537 537 _ip.run_cell_magic('timeit', '-n1 -r1', '%lmagic my line2')
538 538 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line2')
539 539
540 540 def test_timeit_return():
541 541 """
542 542 test wether timeit -o return object
543 543 """
544 544
545 545 res = _ip.run_line_magic('timeit','-n10 -r10 -o 1')
546 546 assert(res is not None)
547 547
548 548 def test_timeit_quiet():
549 549 """
550 550 test quiet option of timeit magic
551 551 """
552 552 with tt.AssertNotPrints("loops"):
553 553 _ip.run_cell("%timeit -n1 -r1 -q 1")
554 554
555 555 @dec.skipif(execution.profile is None)
556 556 def test_prun_special_syntax():
557 557 "Test %%prun with IPython special syntax"
558 558 @register_line_magic
559 559 def lmagic(line):
560 560 ip = get_ipython()
561 561 ip.user_ns['lmagic_out'] = line
562 562
563 563 # line mode test
564 564 _ip.run_line_magic('prun', '-q %lmagic my line')
565 565 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line')
566 566 # cell mode test
567 567 _ip.run_cell_magic('prun', '-q', '%lmagic my line2')
568 568 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line2')
569 569
570 570 @dec.skipif(execution.profile is None)
571 571 def test_prun_quotes():
572 572 "Test that prun does not clobber string escapes (GH #1302)"
573 573 _ip.magic(r"prun -q x = '\t'")
574 574 nt.assert_equal(_ip.user_ns['x'], '\t')
575 575
576 576 def test_extension():
577 577 tmpdir = TemporaryDirectory()
578 578 orig_ipython_dir = _ip.ipython_dir
579 579 try:
580 580 _ip.ipython_dir = tmpdir.name
581 581 nt.assert_raises(ImportError, _ip.magic, "load_ext daft_extension")
582 582 url = os.path.join(os.path.dirname(__file__), "daft_extension.py")
583 583 _ip.magic("install_ext %s" % url)
584 584 _ip.user_ns.pop('arq', None)
585 585 invalidate_caches() # Clear import caches
586 586 _ip.magic("load_ext daft_extension")
587 587 nt.assert_equal(_ip.user_ns['arq'], 185)
588 588 _ip.magic("unload_ext daft_extension")
589 589 assert 'arq' not in _ip.user_ns
590 590 finally:
591 591 _ip.ipython_dir = orig_ipython_dir
592 592 tmpdir.cleanup()
593 593
594 594 def test_notebook_export_json():
595 595 with TemporaryDirectory() as td:
596 596 outfile = os.path.join(td, "nb.ipynb")
597 597 _ip.ex(py3compat.u_format(u"u = {u}'hΓ©llo'"))
598 598 _ip.magic("notebook -e %s" % outfile)
599 599
600 600 def test_notebook_export_py():
601 601 with TemporaryDirectory() as td:
602 602 outfile = os.path.join(td, "nb.py")
603 603 _ip.ex(py3compat.u_format(u"u = {u}'hΓ©llo'"))
604 604 _ip.magic("notebook -e %s" % outfile)
605 605
606 606 def test_notebook_reformat_py():
607 607 with TemporaryDirectory() as td:
608 608 infile = os.path.join(td, "nb.ipynb")
609 609 with io.open(infile, 'w', encoding='utf-8') as f:
610 610 current.write(nb0, f, 'json')
611 611
612 612 _ip.ex(py3compat.u_format(u"u = {u}'hΓ©llo'"))
613 613 _ip.magic("notebook -f py %s" % infile)
614 614
615 615 def test_notebook_reformat_json():
616 616 with TemporaryDirectory() as td:
617 617 infile = os.path.join(td, "nb.py")
618 618 with io.open(infile, 'w', encoding='utf-8') as f:
619 619 current.write(nb0, f, 'py')
620 620
621 621 _ip.ex(py3compat.u_format(u"u = {u}'hΓ©llo'"))
622 622 _ip.magic("notebook -f ipynb %s" % infile)
623 623 _ip.magic("notebook -f json %s" % infile)
624 624
625 625 def test_env():
626 626 env = _ip.magic("env")
627 627 assert isinstance(env, dict), type(env)
628 628
629 629
630 630 class CellMagicTestCase(TestCase):
631 631
632 632 def check_ident(self, magic):
633 633 # Manually called, we get the result
634 634 out = _ip.run_cell_magic(magic, 'a', 'b')
635 635 nt.assert_equal(out, ('a','b'))
636 636 # Via run_cell, it goes into the user's namespace via displayhook
637 637 _ip.run_cell('%%' + magic +' c\nd')
638 638 nt.assert_equal(_ip.user_ns['_'], ('c','d'))
639 639
640 640 def test_cell_magic_func_deco(self):
641 641 "Cell magic using simple decorator"
642 642 @register_cell_magic
643 643 def cellm(line, cell):
644 644 return line, cell
645 645
646 646 self.check_ident('cellm')
647 647
648 648 def test_cell_magic_reg(self):
649 649 "Cell magic manually registered"
650 650 def cellm(line, cell):
651 651 return line, cell
652 652
653 653 _ip.register_magic_function(cellm, 'cell', 'cellm2')
654 654 self.check_ident('cellm2')
655 655
656 656 def test_cell_magic_class(self):
657 657 "Cell magics declared via a class"
658 658 @magics_class
659 659 class MyMagics(Magics):
660 660
661 661 @cell_magic
662 662 def cellm3(self, line, cell):
663 663 return line, cell
664 664
665 665 _ip.register_magics(MyMagics)
666 666 self.check_ident('cellm3')
667 667
668 668 def test_cell_magic_class2(self):
669 669 "Cell magics declared via a class, #2"
670 670 @magics_class
671 671 class MyMagics2(Magics):
672 672
673 673 @cell_magic('cellm4')
674 674 def cellm33(self, line, cell):
675 675 return line, cell
676 676
677 677 _ip.register_magics(MyMagics2)
678 678 self.check_ident('cellm4')
679 679 # Check that nothing is registered as 'cellm33'
680 680 c33 = _ip.find_cell_magic('cellm33')
681 681 nt.assert_equal(c33, None)
682 682
683 683 def test_file():
684 684 """Basic %%file"""
685 685 ip = get_ipython()
686 686 with TemporaryDirectory() as td:
687 687 fname = os.path.join(td, 'file1')
688 688 ip.run_cell_magic("file", fname, u'\n'.join([
689 689 'line1',
690 690 'line2',
691 691 ]))
692 692 with open(fname) as f:
693 693 s = f.read()
694 694 nt.assert_in('line1\n', s)
695 695 nt.assert_in('line2', s)
696 696
697 697 def test_file_var_expand():
698 698 """%%file $filename"""
699 699 ip = get_ipython()
700 700 with TemporaryDirectory() as td:
701 701 fname = os.path.join(td, 'file1')
702 702 ip.user_ns['filename'] = fname
703 703 ip.run_cell_magic("file", '$filename', u'\n'.join([
704 704 'line1',
705 705 'line2',
706 706 ]))
707 707 with open(fname) as f:
708 708 s = f.read()
709 709 nt.assert_in('line1\n', s)
710 710 nt.assert_in('line2', s)
711 711
712 712 def test_file_unicode():
713 713 """%%file with unicode cell"""
714 714 ip = get_ipython()
715 715 with TemporaryDirectory() as td:
716 716 fname = os.path.join(td, 'file1')
717 717 ip.run_cell_magic("file", fname, u'\n'.join([
718 718 u'linΓ©1',
719 719 u'linΓ©2',
720 720 ]))
721 721 with io.open(fname, encoding='utf-8') as f:
722 722 s = f.read()
723 723 nt.assert_in(u'linΓ©1\n', s)
724 724 nt.assert_in(u'linΓ©2', s)
725 725
726 726 def test_file_amend():
727 727 """%%file -a amends files"""
728 728 ip = get_ipython()
729 729 with TemporaryDirectory() as td:
730 730 fname = os.path.join(td, 'file2')
731 731 ip.run_cell_magic("file", fname, u'\n'.join([
732 732 'line1',
733 733 'line2',
734 734 ]))
735 735 ip.run_cell_magic("file", "-a %s" % fname, u'\n'.join([
736 736 'line3',
737 737 'line4',
738 738 ]))
739 739 with open(fname) as f:
740 740 s = f.read()
741 741 nt.assert_in('line1\n', s)
742 742 nt.assert_in('line3\n', s)
743 743
744 744
745 745 def test_script_config():
746 746 ip = get_ipython()
747 747 ip.config.ScriptMagics.script_magics = ['whoda']
748 748 sm = script.ScriptMagics(shell=ip)
749 749 nt.assert_in('whoda', sm.magics['cell'])
750 750
751 751 @dec.skip_win32
752 752 def test_script_out():
753 753 ip = get_ipython()
754 754 ip.run_cell_magic("script", "--out output sh", "echo 'hi'")
755 755 nt.assert_equal(ip.user_ns['output'], 'hi\n')
756 756
757 757 @dec.skip_win32
758 758 def test_script_err():
759 759 ip = get_ipython()
760 760 ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2")
761 761 nt.assert_equal(ip.user_ns['error'], 'hello\n')
762 762
763 763 @dec.skip_win32
764 764 def test_script_out_err():
765 765 ip = get_ipython()
766 766 ip.run_cell_magic("script", "--out output --err error sh", "echo 'hi'\necho 'hello' >&2")
767 767 nt.assert_equal(ip.user_ns['output'], 'hi\n')
768 768 nt.assert_equal(ip.user_ns['error'], 'hello\n')
769 769
770 770 @dec.skip_win32
771 771 def test_script_bg_out():
772 772 ip = get_ipython()
773 773 ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'")
774 774 nt.assert_equal(ip.user_ns['output'].read(), b'hi\n')
775 775
776 776 @dec.skip_win32
777 777 def test_script_bg_err():
778 778 ip = get_ipython()
779 779 ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2")
780 780 nt.assert_equal(ip.user_ns['error'].read(), b'hello\n')
781 781
782 782 @dec.skip_win32
783 783 def test_script_bg_out_err():
784 784 ip = get_ipython()
785 785 ip.run_cell_magic("script", "--bg --out output --err error sh", "echo 'hi'\necho 'hello' >&2")
786 786 nt.assert_equal(ip.user_ns['output'].read(), b'hi\n')
787 787 nt.assert_equal(ip.user_ns['error'].read(), b'hello\n')
788 788
789 789 def test_script_defaults():
790 790 ip = get_ipython()
791 791 for cmd in ['sh', 'bash', 'perl', 'ruby']:
792 792 try:
793 793 find_cmd(cmd)
794 794 except Exception:
795 795 pass
796 796 else:
797 797 nt.assert_in(cmd, ip.magics_manager.magics['cell'])
798 798
799 799
800 800 @magics_class
801 801 class FooFoo(Magics):
802 802 """class with both %foo and %%foo magics"""
803 803 @line_magic('foo')
804 804 def line_foo(self, line):
805 805 "I am line foo"
806 806 pass
807 807
808 808 @cell_magic("foo")
809 809 def cell_foo(self, line, cell):
810 810 "I am cell foo, not line foo"
811 811 pass
812 812
813 813 def test_line_cell_info():
814 814 """%%foo and %foo magics are distinguishable to inspect"""
815 815 ip = get_ipython()
816 816 ip.magics_manager.register(FooFoo)
817 817 oinfo = ip.object_inspect('foo')
818 818 nt.assert_true(oinfo['found'])
819 819 nt.assert_true(oinfo['ismagic'])
820 820
821 821 oinfo = ip.object_inspect('%%foo')
822 822 nt.assert_true(oinfo['found'])
823 823 nt.assert_true(oinfo['ismagic'])
824 824 nt.assert_equal(oinfo['docstring'], FooFoo.cell_foo.__doc__)
825 825
826 826 oinfo = ip.object_inspect('%foo')
827 827 nt.assert_true(oinfo['found'])
828 828 nt.assert_true(oinfo['ismagic'])
829 829 nt.assert_equal(oinfo['docstring'], FooFoo.line_foo.__doc__)
830 830
831 831 def test_multiple_magics():
832 832 ip = get_ipython()
833 833 foo1 = FooFoo(ip)
834 834 foo2 = FooFoo(ip)
835 835 mm = ip.magics_manager
836 836 mm.register(foo1)
837 837 nt.assert_true(mm.magics['line']['foo'].__self__ is foo1)
838 838 mm.register(foo2)
839 839 nt.assert_true(mm.magics['line']['foo'].__self__ is foo2)
840 840
841 841 def test_alias_magic():
842 842 """Test %alias_magic."""
843 843 ip = get_ipython()
844 844 mm = ip.magics_manager
845 845
846 846 # Basic operation: both cell and line magics are created, if possible.
847 847 ip.run_line_magic('alias_magic', 'timeit_alias timeit')
848 848 nt.assert_in('timeit_alias', mm.magics['line'])
849 849 nt.assert_in('timeit_alias', mm.magics['cell'])
850 850
851 851 # --cell is specified, line magic not created.
852 852 ip.run_line_magic('alias_magic', '--cell timeit_cell_alias timeit')
853 853 nt.assert_not_in('timeit_cell_alias', mm.magics['line'])
854 854 nt.assert_in('timeit_cell_alias', mm.magics['cell'])
855 855
856 856 # Test that line alias is created successfully.
857 857 ip.run_line_magic('alias_magic', '--line env_alias env')
858 858 nt.assert_equal(ip.run_line_magic('env', ''),
859 859 ip.run_line_magic('env_alias', ''))
860 860
861 861 def test_save():
862 862 """Test %save."""
863 863 ip = get_ipython()
864 864 ip.history_manager.reset() # Clear any existing history.
865 865 cmds = [u"a=1", u"def b():\n return a**2", u"print(a, b())"]
866 866 for i, cmd in enumerate(cmds, start=1):
867 867 ip.history_manager.store_inputs(i, cmd)
868 868 with TemporaryDirectory() as tmpdir:
869 869 file = os.path.join(tmpdir, "testsave.py")
870 870 ip.run_line_magic("save", "%s 1-10" % file)
871 871 with open(file) as f:
872 872 content = f.read()
873 873 nt.assert_equal(content.count(cmds[0]), 1)
874 874 nt.assert_in('coding: utf-8', content)
875 875 ip.run_line_magic("save", "-a %s 1-10" % file)
876 876 with open(file) as f:
877 877 content = f.read()
878 878 nt.assert_equal(content.count(cmds[0]), 2)
879 879 nt.assert_in('coding: utf-8', content)
880 880
881 881
882 882 def test_store():
883 883 """Test %store."""
884 884 ip = get_ipython()
885 885 ip.run_line_magic('load_ext', 'storemagic')
886 886
887 887 # make sure the storage is empty
888 888 ip.run_line_magic('store', '-z')
889 889 ip.user_ns['var'] = 42
890 890 ip.run_line_magic('store', 'var')
891 891 ip.user_ns['var'] = 39
892 892 ip.run_line_magic('store', '-r')
893 893 nt.assert_equal(ip.user_ns['var'], 42)
894 894
895 895 ip.run_line_magic('store', '-d var')
896 896 ip.user_ns['var'] = 39
897 897 ip.run_line_magic('store' , '-r')
898 898 nt.assert_equal(ip.user_ns['var'], 39)
899 899
900 900
901 901 def _run_edit_test(arg_s, exp_filename=None,
902 902 exp_lineno=-1,
903 903 exp_contents=None,
904 904 exp_is_temp=None):
905 905 ip = get_ipython()
906 906 M = code.CodeMagics(ip)
907 907 last_call = ['','']
908 908 opts,args = M.parse_options(arg_s,'prxn:')
909 909 filename, lineno, is_temp = M._find_edit_target(ip, args, opts, last_call)
910 910
911 911 if exp_filename is not None:
912 912 nt.assert_equal(exp_filename, filename)
913 913 if exp_contents is not None:
914 with io.open(filename, 'r') as f:
914 with io.open(filename, 'r', encoding='utf-8') as f:
915 915 contents = f.read()
916 916 nt.assert_equal(exp_contents, contents)
917 917 if exp_lineno != -1:
918 918 nt.assert_equal(exp_lineno, lineno)
919 919 if exp_is_temp is not None:
920 920 nt.assert_equal(exp_is_temp, is_temp)
921 921
922 922
923 923 def test_edit_interactive():
924 924 """%edit on interactively defined objects"""
925 925 ip = get_ipython()
926 926 n = ip.execution_count
927 927 ip.run_cell(u"def foo(): return 1", store_history=True)
928 928
929 929 try:
930 930 _run_edit_test("foo")
931 931 except code.InteractivelyDefined as e:
932 932 nt.assert_equal(e.index, n)
933 933 else:
934 934 raise AssertionError("Should have raised InteractivelyDefined")
935 935
936 936
937 937 def test_edit_cell():
938 938 """%edit [cell id]"""
939 939 ip = get_ipython()
940 940
941 941 ip.run_cell(u"def foo(): return 1", store_history=True)
942 942
943 943 # test
944 944 _run_edit_test("1", exp_contents=ip.user_ns['In'][1], exp_is_temp=True)
@@ -1,318 +1,319 b''
1 1 # coding: utf-8
2 2 """Test the notebooks webservice API."""
3 3
4 4 import io
5 5 import json
6 6 import os
7 7 import shutil
8 8 from unicodedata import normalize
9 9
10 10 pjoin = os.path.join
11 11
12 12 import requests
13 13
14 14 from IPython.html.utils import url_path_join, url_escape
15 15 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
16 16 from IPython.nbformat import current
17 17 from IPython.nbformat.current import (new_notebook, write, read, new_worksheet,
18 18 new_heading_cell, to_notebook_json)
19 19 from IPython.nbformat import v2
20 20 from IPython.utils import py3compat
21 21 from IPython.utils.data import uniq_stable
22 22
23 23
24 24 class NBAPI(object):
25 25 """Wrapper for notebook API calls."""
26 26 def __init__(self, base_url):
27 27 self.base_url = base_url
28 28
29 29 def _req(self, verb, path, body=None):
30 30 response = requests.request(verb,
31 31 url_path_join(self.base_url, 'api/notebooks', path),
32 32 data=body,
33 33 )
34 34 response.raise_for_status()
35 35 return response
36 36
37 37 def list(self, path='/'):
38 38 return self._req('GET', path)
39 39
40 40 def read(self, name, path='/'):
41 41 return self._req('GET', url_path_join(path, name))
42 42
43 43 def create_untitled(self, path='/'):
44 44 return self._req('POST', path)
45 45
46 46 def upload_untitled(self, body, path='/'):
47 47 return self._req('POST', path, body)
48 48
49 49 def copy_untitled(self, copy_from, path='/'):
50 50 body = json.dumps({'copy_from':copy_from})
51 51 return self._req('POST', path, body)
52 52
53 53 def create(self, name, path='/'):
54 54 return self._req('PUT', url_path_join(path, name))
55 55
56 56 def upload(self, name, body, path='/'):
57 57 return self._req('PUT', url_path_join(path, name), body)
58 58
59 59 def copy(self, copy_from, copy_to, path='/'):
60 60 body = json.dumps({'copy_from':copy_from})
61 61 return self._req('PUT', url_path_join(path, copy_to), body)
62 62
63 63 def save(self, name, body, path='/'):
64 64 return self._req('PUT', url_path_join(path, name), body)
65 65
66 66 def delete(self, name, path='/'):
67 67 return self._req('DELETE', url_path_join(path, name))
68 68
69 69 def rename(self, name, path, new_name):
70 70 body = json.dumps({'name': new_name})
71 71 return self._req('PATCH', url_path_join(path, name), body)
72 72
73 73 def get_checkpoints(self, name, path):
74 74 return self._req('GET', url_path_join(path, name, 'checkpoints'))
75 75
76 76 def new_checkpoint(self, name, path):
77 77 return self._req('POST', url_path_join(path, name, 'checkpoints'))
78 78
79 79 def restore_checkpoint(self, name, path, checkpoint_id):
80 80 return self._req('POST', url_path_join(path, name, 'checkpoints', checkpoint_id))
81 81
82 82 def delete_checkpoint(self, name, path, checkpoint_id):
83 83 return self._req('DELETE', url_path_join(path, name, 'checkpoints', checkpoint_id))
84 84
85 85 class APITest(NotebookTestBase):
86 86 """Test the kernels web service API"""
87 87 dirs_nbs = [('', 'inroot'),
88 88 ('Directory with spaces in', 'inspace'),
89 89 (u'unicodΓ©', 'innonascii'),
90 90 ('foo', 'a'),
91 91 ('foo', 'b'),
92 92 ('foo', 'name with spaces'),
93 93 ('foo', u'unicodΓ©'),
94 94 ('foo/bar', 'baz'),
95 95 (u'Γ₯ b', u'Γ§ d')
96 96 ]
97 97
98 98 dirs = uniq_stable([d for (d,n) in dirs_nbs])
99 99 del dirs[0] # remove ''
100 100
101 101 def setUp(self):
102 102 nbdir = self.notebook_dir.name
103 103
104 104 for d in self.dirs:
105 105 d.replace('/', os.sep)
106 106 if not os.path.isdir(pjoin(nbdir, d)):
107 107 os.mkdir(pjoin(nbdir, d))
108 108
109 109 for d, name in self.dirs_nbs:
110 110 d = d.replace('/', os.sep)
111 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w') as f:
111 with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
112 encoding='utf-8') as f:
112 113 nb = new_notebook(name=name)
113 114 write(nb, f, format='ipynb')
114 115
115 116 self.nb_api = NBAPI(self.base_url())
116 117
117 118 def tearDown(self):
118 119 nbdir = self.notebook_dir.name
119 120
120 121 for dname in ['foo', 'Directory with spaces in', u'unicodΓ©', u'Γ₯ b']:
121 122 shutil.rmtree(pjoin(nbdir, dname), ignore_errors=True)
122 123
123 124 if os.path.isfile(pjoin(nbdir, 'inroot.ipynb')):
124 125 os.unlink(pjoin(nbdir, 'inroot.ipynb'))
125 126
126 127 def test_list_notebooks(self):
127 128 nbs = self.nb_api.list().json()
128 129 self.assertEqual(len(nbs), 1)
129 130 self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
130 131
131 132 nbs = self.nb_api.list('/Directory with spaces in/').json()
132 133 self.assertEqual(len(nbs), 1)
133 134 self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
134 135
135 136 nbs = self.nb_api.list(u'/unicodΓ©/').json()
136 137 self.assertEqual(len(nbs), 1)
137 138 self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
138 139 self.assertEqual(nbs[0]['path'], u'unicodΓ©')
139 140
140 141 nbs = self.nb_api.list('/foo/bar/').json()
141 142 self.assertEqual(len(nbs), 1)
142 143 self.assertEqual(nbs[0]['name'], 'baz.ipynb')
143 144 self.assertEqual(nbs[0]['path'], 'foo/bar')
144 145
145 146 nbs = self.nb_api.list('foo').json()
146 147 self.assertEqual(len(nbs), 4)
147 148 nbnames = { normalize('NFC', n['name']) for n in nbs }
148 149 expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodΓ©.ipynb']
149 150 expected = { normalize('NFC', name) for name in expected }
150 151 self.assertEqual(nbnames, expected)
151 152
152 153 def test_list_nonexistant_dir(self):
153 154 with assert_http_error(404):
154 155 self.nb_api.list('nonexistant')
155 156
156 157 def test_get_contents(self):
157 158 for d, name in self.dirs_nbs:
158 159 nb = self.nb_api.read('%s.ipynb' % name, d+'/').json()
159 160 self.assertEqual(nb['name'], u'%s.ipynb' % name)
160 161 self.assertIn('content', nb)
161 162 self.assertIn('metadata', nb['content'])
162 163 self.assertIsInstance(nb['content']['metadata'], dict)
163 164
164 165 # Name that doesn't exist - should be a 404
165 166 with assert_http_error(404):
166 167 self.nb_api.read('q.ipynb', 'foo')
167 168
168 169 def _check_nb_created(self, resp, name, path):
169 170 self.assertEqual(resp.status_code, 201)
170 171 location_header = py3compat.str_to_unicode(resp.headers['Location'])
171 172 self.assertEqual(location_header, url_escape(url_path_join(u'/api/notebooks', path, name)))
172 173 self.assertEqual(resp.json()['name'], name)
173 174 assert os.path.isfile(pjoin(
174 175 self.notebook_dir.name,
175 176 path.replace('/', os.sep),
176 177 name,
177 178 ))
178 179
179 180 def test_create_untitled(self):
180 181 resp = self.nb_api.create_untitled(path=u'Γ₯ b')
181 182 self._check_nb_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
182 183
183 184 # Second time
184 185 resp = self.nb_api.create_untitled(path=u'Γ₯ b')
185 186 self._check_nb_created(resp, 'Untitled1.ipynb', u'Γ₯ b')
186 187
187 188 # And two directories down
188 189 resp = self.nb_api.create_untitled(path='foo/bar')
189 190 self._check_nb_created(resp, 'Untitled0.ipynb', 'foo/bar')
190 191
191 192 def test_upload_untitled(self):
192 193 nb = new_notebook(name='Upload test')
193 194 nbmodel = {'content': nb}
194 195 resp = self.nb_api.upload_untitled(path=u'Γ₯ b',
195 196 body=json.dumps(nbmodel))
196 197 self._check_nb_created(resp, 'Untitled0.ipynb', u'Γ₯ b')
197 198
198 199 def test_upload(self):
199 200 nb = new_notebook(name=u'ignored')
200 201 nbmodel = {'content': nb}
201 202 resp = self.nb_api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
202 203 body=json.dumps(nbmodel))
203 204 self._check_nb_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
204 205
205 206 def test_upload_v2(self):
206 207 nb = v2.new_notebook()
207 208 ws = v2.new_worksheet()
208 209 nb.worksheets.append(ws)
209 210 ws.cells.append(v2.new_code_cell(input='print("hi")'))
210 211 nbmodel = {'content': nb}
211 212 resp = self.nb_api.upload(u'Upload tΓ©st.ipynb', path=u'Γ₯ b',
212 213 body=json.dumps(nbmodel))
213 214 self._check_nb_created(resp, u'Upload tΓ©st.ipynb', u'Γ₯ b')
214 215 resp = self.nb_api.read(u'Upload tΓ©st.ipynb', u'Γ₯ b')
215 216 data = resp.json()
216 217 self.assertEqual(data['content']['nbformat'], current.nbformat)
217 218 self.assertEqual(data['content']['orig_nbformat'], 2)
218 219
219 220 def test_copy_untitled(self):
220 221 resp = self.nb_api.copy_untitled(u'Γ§ d.ipynb', path=u'Γ₯ b')
221 222 self._check_nb_created(resp, u'Γ§ d-Copy0.ipynb', u'Γ₯ b')
222 223
223 224 def test_copy(self):
224 225 resp = self.nb_api.copy(u'Γ§ d.ipynb', u'cΓΈpy.ipynb', path=u'Γ₯ b')
225 226 self._check_nb_created(resp, u'cΓΈpy.ipynb', u'Γ₯ b')
226 227
227 228 def test_delete(self):
228 229 for d, name in self.dirs_nbs:
229 230 resp = self.nb_api.delete('%s.ipynb' % name, d)
230 231 self.assertEqual(resp.status_code, 204)
231 232
232 233 for d in self.dirs + ['/']:
233 234 nbs = self.nb_api.list(d).json()
234 235 self.assertEqual(len(nbs), 0)
235 236
236 237 def test_rename(self):
237 238 resp = self.nb_api.rename('a.ipynb', 'foo', 'z.ipynb')
238 239 self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
239 240 self.assertEqual(resp.json()['name'], 'z.ipynb')
240 241 assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
241 242
242 243 nbs = self.nb_api.list('foo').json()
243 244 nbnames = set(n['name'] for n in nbs)
244 245 self.assertIn('z.ipynb', nbnames)
245 246 self.assertNotIn('a.ipynb', nbnames)
246 247
247 248 def test_save(self):
248 249 resp = self.nb_api.read('a.ipynb', 'foo')
249 250 nbcontent = json.loads(resp.text)['content']
250 251 nb = to_notebook_json(nbcontent)
251 252 ws = new_worksheet()
252 253 nb.worksheets = [ws]
253 254 ws.cells.append(new_heading_cell(u'Created by test Β³'))
254 255
255 256 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb}
256 257 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
257 258
258 259 nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
259 260 with io.open(nbfile, 'r', encoding='utf-8') as f:
260 261 newnb = read(f, format='ipynb')
261 262 self.assertEqual(newnb.worksheets[0].cells[0].source,
262 263 u'Created by test Β³')
263 264 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
264 265 newnb = to_notebook_json(nbcontent)
265 266 self.assertEqual(newnb.worksheets[0].cells[0].source,
266 267 u'Created by test Β³')
267 268
268 269 # Save and rename
269 270 nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb}
270 271 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
271 272 saved = resp.json()
272 273 self.assertEqual(saved['name'], 'a2.ipynb')
273 274 self.assertEqual(saved['path'], 'foo/bar')
274 275 assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
275 276 assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
276 277 with assert_http_error(404):
277 278 self.nb_api.read('a.ipynb', 'foo')
278 279
279 280 def test_checkpoints(self):
280 281 resp = self.nb_api.read('a.ipynb', 'foo')
281 282 r = self.nb_api.new_checkpoint('a.ipynb', 'foo')
282 283 self.assertEqual(r.status_code, 201)
283 284 cp1 = r.json()
284 285 self.assertEqual(set(cp1), {'id', 'last_modified'})
285 286 self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
286 287
287 288 # Modify it
288 289 nbcontent = json.loads(resp.text)['content']
289 290 nb = to_notebook_json(nbcontent)
290 291 ws = new_worksheet()
291 292 nb.worksheets = [ws]
292 293 hcell = new_heading_cell('Created by test')
293 294 ws.cells.append(hcell)
294 295 # Save
295 296 nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb}
296 297 resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
297 298
298 299 # List checkpoints
299 300 cps = self.nb_api.get_checkpoints('a.ipynb', 'foo').json()
300 301 self.assertEqual(cps, [cp1])
301 302
302 303 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
303 304 nb = to_notebook_json(nbcontent)
304 305 self.assertEqual(nb.worksheets[0].cells[0].source, 'Created by test')
305 306
306 307 # Restore cp1
307 308 r = self.nb_api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
308 309 self.assertEqual(r.status_code, 204)
309 310 nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
310 311 nb = to_notebook_json(nbcontent)
311 312 self.assertEqual(nb.worksheets, [])
312 313
313 314 # Delete cp1
314 315 r = self.nb_api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
315 316 self.assertEqual(r.status_code, 204)
316 317 cps = self.nb_api.get_checkpoints('a.ipynb', 'foo').json()
317 318 self.assertEqual(cps, [])
318 319
@@ -1,114 +1,115 b''
1 1 """Test the sessions web service API."""
2 2
3 3 import errno
4 4 import io
5 5 import os
6 6 import json
7 7 import requests
8 8 import shutil
9 9
10 10 pjoin = os.path.join
11 11
12 12 from IPython.html.utils import url_path_join
13 13 from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
14 14 from IPython.nbformat.current import new_notebook, write
15 15
16 16 class SessionAPI(object):
17 17 """Wrapper for notebook API calls."""
18 18 def __init__(self, base_url):
19 19 self.base_url = base_url
20 20
21 21 def _req(self, verb, path, body=None):
22 22 response = requests.request(verb,
23 23 url_path_join(self.base_url, 'api/sessions', path), data=body)
24 24
25 25 if 400 <= response.status_code < 600:
26 26 try:
27 27 response.reason = response.json()['message']
28 28 except:
29 29 pass
30 30 response.raise_for_status()
31 31
32 32 return response
33 33
34 34 def list(self):
35 35 return self._req('GET', '')
36 36
37 37 def get(self, id):
38 38 return self._req('GET', id)
39 39
40 40 def create(self, name, path):
41 41 body = json.dumps({'notebook': {'name':name, 'path':path}})
42 42 return self._req('POST', '', body)
43 43
44 44 def modify(self, id, name, path):
45 45 body = json.dumps({'notebook': {'name':name, 'path':path}})
46 46 return self._req('PATCH', id, body)
47 47
48 48 def delete(self, id):
49 49 return self._req('DELETE', id)
50 50
51 51 class SessionAPITest(NotebookTestBase):
52 52 """Test the sessions web service API"""
53 53 def setUp(self):
54 54 nbdir = self.notebook_dir.name
55 55 try:
56 56 os.mkdir(pjoin(nbdir, 'foo'))
57 57 except OSError as e:
58 58 # Deleting the folder in an earlier test may have failed
59 59 if e.errno != errno.EEXIST:
60 60 raise
61 61
62 with io.open(pjoin(nbdir, 'foo', 'nb1.ipynb'), 'w') as f:
62 with io.open(pjoin(nbdir, 'foo', 'nb1.ipynb'), 'w',
63 encoding='utf-8') as f:
63 64 nb = new_notebook(name='nb1')
64 65 write(nb, f, format='ipynb')
65 66
66 67 self.sess_api = SessionAPI(self.base_url())
67 68
68 69 def tearDown(self):
69 70 for session in self.sess_api.list().json():
70 71 self.sess_api.delete(session['id'])
71 72 shutil.rmtree(pjoin(self.notebook_dir.name, 'foo'),
72 73 ignore_errors=True)
73 74
74 75 def test_create(self):
75 76 sessions = self.sess_api.list().json()
76 77 self.assertEqual(len(sessions), 0)
77 78
78 79 resp = self.sess_api.create('nb1.ipynb', 'foo')
79 80 self.assertEqual(resp.status_code, 201)
80 81 newsession = resp.json()
81 82 self.assertIn('id', newsession)
82 83 self.assertEqual(newsession['notebook']['name'], 'nb1.ipynb')
83 84 self.assertEqual(newsession['notebook']['path'], 'foo')
84 85 self.assertEqual(resp.headers['Location'], '/api/sessions/{0}'.format(newsession['id']))
85 86
86 87 sessions = self.sess_api.list().json()
87 88 self.assertEqual(sessions, [newsession])
88 89
89 90 # Retrieve it
90 91 sid = newsession['id']
91 92 got = self.sess_api.get(sid).json()
92 93 self.assertEqual(got, newsession)
93 94
94 95 def test_delete(self):
95 96 newsession = self.sess_api.create('nb1.ipynb', 'foo').json()
96 97 sid = newsession['id']
97 98
98 99 resp = self.sess_api.delete(sid)
99 100 self.assertEqual(resp.status_code, 204)
100 101
101 102 sessions = self.sess_api.list().json()
102 103 self.assertEqual(sessions, [])
103 104
104 105 with assert_http_error(404):
105 106 self.sess_api.get(sid)
106 107
107 108 def test_modify(self):
108 109 newsession = self.sess_api.create('nb1.ipynb', 'foo').json()
109 110 sid = newsession['id']
110 111
111 112 changed = self.sess_api.modify(sid, 'nb2.ipynb', '').json()
112 113 self.assertEqual(changed['id'], sid)
113 114 self.assertEqual(changed['notebook']['name'], 'nb2.ipynb')
114 115 self.assertEqual(changed['notebook']['path'], '')
General Comments 0
You need to be logged in to leave comments. Login now