##// END OF EJS Templates
Merge pull request #2088 from bfroehle/_2083_py32_test_failures...
Thomas Kluyver -
r7811:d6009ff2 merge
parent child Browse files
Show More
@@ -1,743 +1,744 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tests for various magic functions.
3 3
4 4 Needs to be run by nose (to make ipython session available).
5 5 """
6 6 from __future__ import absolute_import
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Imports
10 10 #-----------------------------------------------------------------------------
11 11
12 12 import io
13 13 import os
14 14 import sys
15 15 from StringIO import StringIO
16 16 from unittest import TestCase
17 17
18 18 try:
19 19 from importlib import invalidate_caches # Required from Python 3.3
20 20 except ImportError:
21 21 def invalidate_caches():
22 22 pass
23 23
24 24 import nose.tools as nt
25 25
26 26 from IPython.core import magic
27 27 from IPython.core.magic import (Magics, magics_class, line_magic,
28 28 cell_magic, line_cell_magic,
29 29 register_line_magic, register_cell_magic,
30 30 register_line_cell_magic)
31 31 from IPython.core.magics import execution, script
32 32 from IPython.nbformat.v3.tests.nbexamples import nb0
33 33 from IPython.nbformat import current
34 34 from IPython.testing import decorators as dec
35 35 from IPython.testing import tools as tt
36 36 from IPython.utils import py3compat
37 37 from IPython.utils.tempdir import TemporaryDirectory
38 38 from IPython.utils.process import find_cmd
39 39
40 40 #-----------------------------------------------------------------------------
41 41 # Test functions begin
42 42 #-----------------------------------------------------------------------------
43 43
44 44 @magic.magics_class
45 45 class DummyMagics(magic.Magics): pass
46 46
47 47 def test_rehashx():
48 48 # clear up everything
49 49 _ip = get_ipython()
50 50 _ip.alias_manager.alias_table.clear()
51 51 del _ip.db['syscmdlist']
52 52
53 53 _ip.magic('rehashx')
54 54 # Practically ALL ipython development systems will have more than 10 aliases
55 55
56 56 yield (nt.assert_true, len(_ip.alias_manager.alias_table) > 10)
57 57 for key, val in _ip.alias_manager.alias_table.iteritems():
58 58 # we must strip dots from alias names
59 59 nt.assert_true('.' not in key)
60 60
61 61 # rehashx must fill up syscmdlist
62 62 scoms = _ip.db['syscmdlist']
63 63 yield (nt.assert_true, len(scoms) > 10)
64 64
65 65
66 66 def test_magic_parse_options():
67 67 """Test that we don't mangle paths when parsing magic options."""
68 68 ip = get_ipython()
69 69 path = 'c:\\x'
70 70 m = DummyMagics(ip)
71 71 opts = m.parse_options('-f %s' % path,'f:')[0]
72 72 # argv splitting is os-dependent
73 73 if os.name == 'posix':
74 74 expected = 'c:x'
75 75 else:
76 76 expected = path
77 77 nt.assert_equals(opts['f'], expected)
78 78
79 79 def test_magic_parse_long_options():
80 80 """Magic.parse_options can handle --foo=bar long options"""
81 81 ip = get_ipython()
82 82 m = DummyMagics(ip)
83 83 opts, _ = m.parse_options('--foo --bar=bubble', 'a', 'foo', 'bar=')
84 84 nt.assert_true('foo' in opts)
85 85 nt.assert_true('bar' in opts)
86 86 nt.assert_true(opts['bar'], "bubble")
87 87
88 88
89 89 @dec.skip_without('sqlite3')
90 90 def doctest_hist_f():
91 91 """Test %hist -f with temporary filename.
92 92
93 93 In [9]: import tempfile
94 94
95 95 In [10]: tfile = tempfile.mktemp('.py','tmp-ipython-')
96 96
97 97 In [11]: %hist -nl -f $tfile 3
98 98
99 99 In [13]: import os; os.unlink(tfile)
100 100 """
101 101
102 102
103 103 @dec.skip_without('sqlite3')
104 104 def doctest_hist_r():
105 105 """Test %hist -r
106 106
107 107 XXX - This test is not recording the output correctly. For some reason, in
108 108 testing mode the raw history isn't getting populated. No idea why.
109 109 Disabling the output checking for now, though at least we do run it.
110 110
111 111 In [1]: 'hist' in _ip.lsmagic()
112 112 Out[1]: True
113 113
114 114 In [2]: x=1
115 115
116 116 In [3]: %hist -rl 2
117 117 x=1 # random
118 118 %hist -r 2
119 119 """
120 120
121 121
122 122 @dec.skip_without('sqlite3')
123 123 def doctest_hist_op():
124 124 """Test %hist -op
125 125
126 126 In [1]: class b(float):
127 127 ...: pass
128 128 ...:
129 129
130 130 In [2]: class s(object):
131 131 ...: def __str__(self):
132 132 ...: return 's'
133 133 ...:
134 134
135 135 In [3]:
136 136
137 137 In [4]: class r(b):
138 138 ...: def __repr__(self):
139 139 ...: return 'r'
140 140 ...:
141 141
142 142 In [5]: class sr(s,r): pass
143 143 ...:
144 144
145 145 In [6]:
146 146
147 147 In [7]: bb=b()
148 148
149 149 In [8]: ss=s()
150 150
151 151 In [9]: rr=r()
152 152
153 153 In [10]: ssrr=sr()
154 154
155 155 In [11]: 4.5
156 156 Out[11]: 4.5
157 157
158 158 In [12]: str(ss)
159 159 Out[12]: 's'
160 160
161 161 In [13]:
162 162
163 163 In [14]: %hist -op
164 164 >>> class b:
165 165 ... pass
166 166 ...
167 167 >>> class s(b):
168 168 ... def __str__(self):
169 169 ... return 's'
170 170 ...
171 171 >>>
172 172 >>> class r(b):
173 173 ... def __repr__(self):
174 174 ... return 'r'
175 175 ...
176 176 >>> class sr(s,r): pass
177 177 >>>
178 178 >>> bb=b()
179 179 >>> ss=s()
180 180 >>> rr=r()
181 181 >>> ssrr=sr()
182 182 >>> 4.5
183 183 4.5
184 184 >>> str(ss)
185 185 's'
186 186 >>>
187 187 """
188 188
189 189
190 190 @dec.skip_without('sqlite3')
191 191 def test_macro():
192 192 ip = get_ipython()
193 193 ip.history_manager.reset() # Clear any existing history.
194 194 cmds = ["a=1", "def b():\n return a**2", "print(a,b())"]
195 195 for i, cmd in enumerate(cmds, start=1):
196 196 ip.history_manager.store_inputs(i, cmd)
197 197 ip.magic("macro test 1-3")
198 198 nt.assert_equal(ip.user_ns["test"].value, "\n".join(cmds)+"\n")
199 199
200 200 # List macros.
201 201 assert "test" in ip.magic("macro")
202 202
203 203
204 204 @dec.skip_without('sqlite3')
205 205 def test_macro_run():
206 206 """Test that we can run a multi-line macro successfully."""
207 207 ip = get_ipython()
208 208 ip.history_manager.reset()
209 209 cmds = ["a=10", "a+=1", py3compat.doctest_refactor_print("print a"),
210 210 "%macro test 2-3"]
211 211 for cmd in cmds:
212 212 ip.run_cell(cmd, store_history=True)
213 213 nt.assert_equal(ip.user_ns["test"].value,
214 214 py3compat.doctest_refactor_print("a+=1\nprint a\n"))
215 215 with tt.AssertPrints("12"):
216 216 ip.run_cell("test")
217 217 with tt.AssertPrints("13"):
218 218 ip.run_cell("test")
219 219
220 220
221 221 @dec.skipif_not_numpy
222 222 def test_numpy_reset_array_undec():
223 223 "Test '%reset array' functionality"
224 224 _ip.ex('import numpy as np')
225 225 _ip.ex('a = np.empty(2)')
226 226 yield (nt.assert_true, 'a' in _ip.user_ns)
227 227 _ip.magic('reset -f array')
228 228 yield (nt.assert_false, 'a' in _ip.user_ns)
229 229
230 230 def test_reset_out():
231 231 "Test '%reset out' magic"
232 232 _ip.run_cell("parrot = 'dead'", store_history=True)
233 233 # test '%reset -f out', make an Out prompt
234 234 _ip.run_cell("parrot", store_history=True)
235 235 nt.assert_true('dead' in [_ip.user_ns[x] for x in '_','__','___'])
236 236 _ip.magic('reset -f out')
237 237 nt.assert_false('dead' in [_ip.user_ns[x] for x in '_','__','___'])
238 238 nt.assert_true(len(_ip.user_ns['Out']) == 0)
239 239
240 240 def test_reset_in():
241 241 "Test '%reset in' magic"
242 242 # test '%reset -f in'
243 243 _ip.run_cell("parrot", store_history=True)
244 244 nt.assert_true('parrot' in [_ip.user_ns[x] for x in '_i','_ii','_iii'])
245 245 _ip.magic('%reset -f in')
246 246 nt.assert_false('parrot' in [_ip.user_ns[x] for x in '_i','_ii','_iii'])
247 247 nt.assert_true(len(set(_ip.user_ns['In'])) == 1)
248 248
249 249 def test_reset_dhist():
250 250 "Test '%reset dhist' magic"
251 251 _ip.run_cell("tmp = [d for d in _dh]") # copy before clearing
252 252 _ip.magic('cd ' + os.path.dirname(nt.__file__))
253 253 _ip.magic('cd -')
254 254 nt.assert_true(len(_ip.user_ns['_dh']) > 0)
255 255 _ip.magic('reset -f dhist')
256 256 nt.assert_true(len(_ip.user_ns['_dh']) == 0)
257 257 _ip.run_cell("_dh = [d for d in tmp]") #restore
258 258
259 259 def test_reset_in_length():
260 260 "Test that '%reset in' preserves In[] length"
261 261 _ip.run_cell("print 'foo'")
262 262 _ip.run_cell("reset -f in")
263 263 nt.assert_true(len(_ip.user_ns['In']) == _ip.displayhook.prompt_count+1)
264 264
265 265 def test_time():
266 266 _ip.magic('time None')
267 267
268 268 def test_tb_syntaxerror():
269 269 """test %tb after a SyntaxError"""
270 270 ip = get_ipython()
271 271 ip.run_cell("for")
272 272
273 273 # trap and validate stdout
274 274 save_stdout = sys.stdout
275 275 try:
276 276 sys.stdout = StringIO()
277 277 ip.run_cell("%tb")
278 278 out = sys.stdout.getvalue()
279 279 finally:
280 280 sys.stdout = save_stdout
281 281 # trim output, and only check the last line
282 282 last_line = out.rstrip().splitlines()[-1].strip()
283 283 nt.assert_equals(last_line, "SyntaxError: invalid syntax")
284 284
285 285
286 286 @py3compat.doctest_refactor_print
287 287 def doctest_time():
288 288 """
289 289 In [10]: %time None
290 290 CPU times: user 0.00 s, sys: 0.00 s, total: 0.00 s
291 291 Wall time: 0.00 s
292 292
293 293 In [11]: def f(kmjy):
294 294 ....: %time print 2*kmjy
295 295
296 296 In [12]: f(3)
297 297 6
298 298 CPU times: user 0.00 s, sys: 0.00 s, total: 0.00 s
299 299 Wall time: 0.00 s
300 300 """
301 301
302 302
303 303 def test_doctest_mode():
304 304 "Toggle doctest_mode twice, it should be a no-op and run without error"
305 305 _ip.magic('doctest_mode')
306 306 _ip.magic('doctest_mode')
307 307
308 308
309 309 def test_parse_options():
310 310 """Tests for basic options parsing in magics."""
311 311 # These are only the most minimal of tests, more should be added later. At
312 312 # the very least we check that basic text/unicode calls work OK.
313 313 m = DummyMagics(_ip)
314 314 nt.assert_equal(m.parse_options('foo', '')[1], 'foo')
315 315 nt.assert_equal(m.parse_options(u'foo', '')[1], u'foo')
316 316
317 317
318 318 def test_dirops():
319 319 """Test various directory handling operations."""
320 320 # curpath = lambda :os.path.splitdrive(os.getcwdu())[1].replace('\\','/')
321 321 curpath = os.getcwdu
322 322 startdir = os.getcwdu()
323 323 ipdir = os.path.realpath(_ip.ipython_dir)
324 324 try:
325 325 _ip.magic('cd "%s"' % ipdir)
326 326 nt.assert_equal(curpath(), ipdir)
327 327 _ip.magic('cd -')
328 328 nt.assert_equal(curpath(), startdir)
329 329 _ip.magic('pushd "%s"' % ipdir)
330 330 nt.assert_equal(curpath(), ipdir)
331 331 _ip.magic('popd')
332 332 nt.assert_equal(curpath(), startdir)
333 333 finally:
334 334 os.chdir(startdir)
335 335
336 336
337 337 def test_xmode():
338 338 # Calling xmode three times should be a no-op
339 339 xmode = _ip.InteractiveTB.mode
340 340 for i in range(3):
341 341 _ip.magic("xmode")
342 342 nt.assert_equal(_ip.InteractiveTB.mode, xmode)
343 343
344 344 def test_reset_hard():
345 345 monitor = []
346 346 class A(object):
347 347 def __del__(self):
348 348 monitor.append(1)
349 349 def __repr__(self):
350 350 return "<A instance>"
351 351
352 352 _ip.user_ns["a"] = A()
353 353 _ip.run_cell("a")
354 354
355 355 nt.assert_equal(monitor, [])
356 356 _ip.magic("reset -f")
357 357 nt.assert_equal(monitor, [1])
358 358
359 359 class TestXdel(tt.TempFileMixin):
360 360 def test_xdel(self):
361 361 """Test that references from %run are cleared by xdel."""
362 362 src = ("class A(object):\n"
363 363 " monitor = []\n"
364 364 " def __del__(self):\n"
365 365 " self.monitor.append(1)\n"
366 366 "a = A()\n")
367 367 self.mktmp(src)
368 368 # %run creates some hidden references...
369 369 _ip.magic("run %s" % self.fname)
370 370 # ... as does the displayhook.
371 371 _ip.run_cell("a")
372 372
373 373 monitor = _ip.user_ns["A"].monitor
374 374 nt.assert_equal(monitor, [])
375 375
376 376 _ip.magic("xdel a")
377 377
378 378 # Check that a's __del__ method has been called.
379 379 nt.assert_equal(monitor, [1])
380 380
381 381 def doctest_who():
382 382 """doctest for %who
383 383
384 384 In [1]: %reset -f
385 385
386 386 In [2]: alpha = 123
387 387
388 388 In [3]: beta = 'beta'
389 389
390 390 In [4]: %who int
391 391 alpha
392 392
393 393 In [5]: %who str
394 394 beta
395 395
396 396 In [6]: %whos
397 397 Variable Type Data/Info
398 398 ----------------------------
399 399 alpha int 123
400 400 beta str beta
401 401
402 402 In [7]: %who_ls
403 403 Out[7]: ['alpha', 'beta']
404 404 """
405 405
406 406 def test_whos():
407 407 """Check that whos is protected against objects where repr() fails."""
408 408 class A(object):
409 409 def __repr__(self):
410 410 raise Exception()
411 411 _ip.user_ns['a'] = A()
412 412 _ip.magic("whos")
413 413
414 414 @py3compat.u_format
415 415 def doctest_precision():
416 416 """doctest for %precision
417 417
418 418 In [1]: f = get_ipython().display_formatter.formatters['text/plain']
419 419
420 420 In [2]: %precision 5
421 421 Out[2]: {u}'%.5f'
422 422
423 423 In [3]: f.float_format
424 424 Out[3]: {u}'%.5f'
425 425
426 426 In [4]: %precision %e
427 427 Out[4]: {u}'%e'
428 428
429 429 In [5]: f(3.1415927)
430 430 Out[5]: {u}'3.141593e+00'
431 431 """
432 432
433 433 def test_psearch():
434 434 with tt.AssertPrints("dict.fromkeys"):
435 435 _ip.run_cell("dict.fr*?")
436 436
437 437 def test_timeit_shlex():
438 438 """test shlex issues with timeit (#1109)"""
439 439 _ip.ex("def f(*a,**kw): pass")
440 440 _ip.magic('timeit -n1 "this is a bug".count(" ")')
441 441 _ip.magic('timeit -r1 -n1 f(" ", 1)')
442 442 _ip.magic('timeit -r1 -n1 f(" ", 1, " ", 2, " ")')
443 443 _ip.magic('timeit -r1 -n1 ("a " + "b")')
444 444 _ip.magic('timeit -r1 -n1 f("a " + "b")')
445 445 _ip.magic('timeit -r1 -n1 f("a " + "b ")')
446 446
447 447
448 448 def test_timeit_arguments():
449 449 "Test valid timeit arguments, should not cause SyntaxError (GH #1269)"
450 450 _ip.magic("timeit ('#')")
451 451
452 452
453 453 def test_timeit_special_syntax():
454 454 "Test %%timeit with IPython special syntax"
455 455 from IPython.core.magic import register_line_magic
456 456
457 457 @register_line_magic
458 458 def lmagic(line):
459 459 ip = get_ipython()
460 460 ip.user_ns['lmagic_out'] = line
461 461
462 462 # line mode test
463 463 _ip.run_line_magic('timeit', '-n1 -r1 %lmagic my line')
464 464 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line')
465 465 # cell mode test
466 466 _ip.run_cell_magic('timeit', '-n1 -r1', '%lmagic my line2')
467 467 nt.assert_equal(_ip.user_ns['lmagic_out'], 'my line2')
468 468
469 469
470 470 @dec.skipif(execution.profile is None)
471 471 def test_prun_quotes():
472 472 "Test that prun does not clobber string escapes (GH #1302)"
473 473 _ip.magic(r"prun -q x = '\t'")
474 474 nt.assert_equal(_ip.user_ns['x'], '\t')
475 475
476 476 def test_extension():
477 477 tmpdir = TemporaryDirectory()
478 478 orig_ipython_dir = _ip.ipython_dir
479 479 try:
480 480 _ip.ipython_dir = tmpdir.name
481 481 nt.assert_raises(ImportError, _ip.magic, "load_ext daft_extension")
482 482 url = os.path.join(os.path.dirname(__file__), "daft_extension.py")
483 483 _ip.magic("install_ext %s" % url)
484 484 _ip.user_ns.pop('arq', None)
485 485 invalidate_caches() # Clear import caches
486 486 _ip.magic("load_ext daft_extension")
487 487 tt.assert_equal(_ip.user_ns['arq'], 185)
488 488 _ip.magic("unload_ext daft_extension")
489 489 assert 'arq' not in _ip.user_ns
490 490 finally:
491 491 _ip.ipython_dir = orig_ipython_dir
492 tmpdir.cleanup()
492 493
493 494 def test_notebook_export_json():
494 495 with TemporaryDirectory() as td:
495 496 outfile = os.path.join(td, "nb.ipynb")
496 497 _ip.ex(py3compat.u_format(u"u = {u}'hΓ©llo'"))
497 498 _ip.magic("notebook -e %s" % outfile)
498 499
499 500 def test_notebook_export_py():
500 501 with TemporaryDirectory() as td:
501 502 outfile = os.path.join(td, "nb.py")
502 503 _ip.ex(py3compat.u_format(u"u = {u}'hΓ©llo'"))
503 504 _ip.magic("notebook -e %s" % outfile)
504 505
505 506 def test_notebook_reformat_py():
506 507 with TemporaryDirectory() as td:
507 508 infile = os.path.join(td, "nb.ipynb")
508 509 with io.open(infile, 'w', encoding='utf-8') as f:
509 510 current.write(nb0, f, 'json')
510 511
511 512 _ip.ex(py3compat.u_format(u"u = {u}'hΓ©llo'"))
512 513 _ip.magic("notebook -f py %s" % infile)
513 514
514 515 def test_notebook_reformat_json():
515 516 with TemporaryDirectory() as td:
516 517 infile = os.path.join(td, "nb.py")
517 518 with io.open(infile, 'w', encoding='utf-8') as f:
518 519 current.write(nb0, f, 'py')
519 520
520 521 _ip.ex(py3compat.u_format(u"u = {u}'hΓ©llo'"))
521 522 _ip.magic("notebook -f ipynb %s" % infile)
522 523 _ip.magic("notebook -f json %s" % infile)
523 524
524 525 def test_env():
525 526 env = _ip.magic("env")
526 527 assert isinstance(env, dict), type(env)
527 528
528 529
529 530 class CellMagicTestCase(TestCase):
530 531
531 532 def check_ident(self, magic):
532 533 # Manually called, we get the result
533 534 out = _ip.run_cell_magic(magic, 'a', 'b')
534 535 nt.assert_equals(out, ('a','b'))
535 536 # Via run_cell, it goes into the user's namespace via displayhook
536 537 _ip.run_cell('%%' + magic +' c\nd')
537 538 nt.assert_equals(_ip.user_ns['_'], ('c','d'))
538 539
539 540 def test_cell_magic_func_deco(self):
540 541 "Cell magic using simple decorator"
541 542 @register_cell_magic
542 543 def cellm(line, cell):
543 544 return line, cell
544 545
545 546 self.check_ident('cellm')
546 547
547 548 def test_cell_magic_reg(self):
548 549 "Cell magic manually registered"
549 550 def cellm(line, cell):
550 551 return line, cell
551 552
552 553 _ip.register_magic_function(cellm, 'cell', 'cellm2')
553 554 self.check_ident('cellm2')
554 555
555 556 def test_cell_magic_class(self):
556 557 "Cell magics declared via a class"
557 558 @magics_class
558 559 class MyMagics(Magics):
559 560
560 561 @cell_magic
561 562 def cellm3(self, line, cell):
562 563 return line, cell
563 564
564 565 _ip.register_magics(MyMagics)
565 566 self.check_ident('cellm3')
566 567
567 568 def test_cell_magic_class2(self):
568 569 "Cell magics declared via a class, #2"
569 570 @magics_class
570 571 class MyMagics2(Magics):
571 572
572 573 @cell_magic('cellm4')
573 574 def cellm33(self, line, cell):
574 575 return line, cell
575 576
576 577 _ip.register_magics(MyMagics2)
577 578 self.check_ident('cellm4')
578 579 # Check that nothing is registered as 'cellm33'
579 580 c33 = _ip.find_cell_magic('cellm33')
580 581 nt.assert_equals(c33, None)
581 582
582 583 def test_file():
583 584 """Basic %%file"""
584 585 ip = get_ipython()
585 586 with TemporaryDirectory() as td:
586 587 fname = os.path.join(td, 'file1')
587 588 ip.run_cell_magic("file", fname, u'\n'.join([
588 589 'line1',
589 590 'line2',
590 591 ]))
591 592 with open(fname) as f:
592 593 s = f.read()
593 594 nt.assert_in('line1\n', s)
594 595 nt.assert_in('line2', s)
595 596
596 597 def test_file_unicode():
597 598 """%%file with unicode cell"""
598 599 ip = get_ipython()
599 600 with TemporaryDirectory() as td:
600 601 fname = os.path.join(td, 'file1')
601 602 ip.run_cell_magic("file", fname, u'\n'.join([
602 603 u'linΓ©1',
603 604 u'linΓ©2',
604 605 ]))
605 606 with io.open(fname, encoding='utf-8') as f:
606 607 s = f.read()
607 608 nt.assert_in(u'linΓ©1\n', s)
608 609 nt.assert_in(u'linΓ©2', s)
609 610
610 611 def test_file_amend():
611 612 """%%file -a amends files"""
612 613 ip = get_ipython()
613 614 with TemporaryDirectory() as td:
614 615 fname = os.path.join(td, 'file2')
615 616 ip.run_cell_magic("file", fname, u'\n'.join([
616 617 'line1',
617 618 'line2',
618 619 ]))
619 620 ip.run_cell_magic("file", "-a %s" % fname, u'\n'.join([
620 621 'line3',
621 622 'line4',
622 623 ]))
623 624 with open(fname) as f:
624 625 s = f.read()
625 626 nt.assert_in('line1\n', s)
626 627 nt.assert_in('line3\n', s)
627 628
628 629
629 630 def test_script_config():
630 631 ip = get_ipython()
631 632 ip.config.ScriptMagics.script_magics = ['whoda']
632 633 sm = script.ScriptMagics(shell=ip)
633 634 nt.assert_in('whoda', sm.magics['cell'])
634 635
635 636 @dec.skip_win32
636 637 def test_script_out():
637 638 ip = get_ipython()
638 639 ip.run_cell_magic("script", "--out output sh", "echo 'hi'")
639 640 nt.assert_equals(ip.user_ns['output'], 'hi\n')
640 641
641 642 @dec.skip_win32
642 643 def test_script_err():
643 644 ip = get_ipython()
644 645 ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2")
645 646 nt.assert_equals(ip.user_ns['error'], 'hello\n')
646 647
647 648 @dec.skip_win32
648 649 def test_script_out_err():
649 650 ip = get_ipython()
650 651 ip.run_cell_magic("script", "--out output --err error sh", "echo 'hi'\necho 'hello' >&2")
651 652 nt.assert_equals(ip.user_ns['output'], 'hi\n')
652 653 nt.assert_equals(ip.user_ns['error'], 'hello\n')
653 654
654 655 @dec.skip_win32
655 656 def test_script_bg_out():
656 657 ip = get_ipython()
657 658 ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'")
658 659 nt.assert_equals(ip.user_ns['output'].read(), b'hi\n')
659 660
660 661 @dec.skip_win32
661 662 def test_script_bg_err():
662 663 ip = get_ipython()
663 664 ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2")
664 665 nt.assert_equals(ip.user_ns['error'].read(), b'hello\n')
665 666
666 667 @dec.skip_win32
667 668 def test_script_bg_out_err():
668 669 ip = get_ipython()
669 670 ip.run_cell_magic("script", "--bg --out output --err error sh", "echo 'hi'\necho 'hello' >&2")
670 671 nt.assert_equals(ip.user_ns['output'].read(), b'hi\n')
671 672 nt.assert_equals(ip.user_ns['error'].read(), b'hello\n')
672 673
673 674 def test_script_defaults():
674 675 ip = get_ipython()
675 676 for cmd in ['sh', 'bash', 'perl', 'ruby']:
676 677 try:
677 678 find_cmd(cmd)
678 679 except Exception:
679 680 pass
680 681 else:
681 682 nt.assert_in(cmd, ip.magics_manager.magics['cell'])
682 683
683 684
684 685 @magics_class
685 686 class FooFoo(Magics):
686 687 """class with both %foo and %%foo magics"""
687 688 @line_magic('foo')
688 689 def line_foo(self, line):
689 690 "I am line foo"
690 691 pass
691 692
692 693 @cell_magic("foo")
693 694 def cell_foo(self, line, cell):
694 695 "I am cell foo, not line foo"
695 696 pass
696 697
697 698 def test_line_cell_info():
698 699 """%%foo and %foo magics are distinguishable to inspect"""
699 700 ip = get_ipython()
700 701 ip.magics_manager.register(FooFoo)
701 702 oinfo = ip.object_inspect('foo')
702 703 nt.assert_true(oinfo['found'])
703 704 nt.assert_true(oinfo['ismagic'])
704 705
705 706 oinfo = ip.object_inspect('%%foo')
706 707 nt.assert_true(oinfo['found'])
707 708 nt.assert_true(oinfo['ismagic'])
708 709 nt.assert_equals(oinfo['docstring'], FooFoo.cell_foo.__doc__)
709 710
710 711 oinfo = ip.object_inspect('%foo')
711 712 nt.assert_true(oinfo['found'])
712 713 nt.assert_true(oinfo['ismagic'])
713 714 nt.assert_equals(oinfo['docstring'], FooFoo.line_foo.__doc__)
714 715
715 716 def test_multiple_magics():
716 717 ip = get_ipython()
717 718 foo1 = FooFoo(ip)
718 719 foo2 = FooFoo(ip)
719 720 mm = ip.magics_manager
720 721 mm.register(foo1)
721 722 nt.assert_true(mm.magics['line']['foo'].im_self is foo1)
722 723 mm.register(foo2)
723 724 nt.assert_true(mm.magics['line']['foo'].im_self is foo2)
724 725
725 726 def test_alias_magic():
726 727 """Test %alias_magic."""
727 728 ip = get_ipython()
728 729 mm = ip.magics_manager
729 730
730 731 # Basic operation: both cell and line magics are created, if possible.
731 732 ip.run_line_magic('alias_magic', 'timeit_alias timeit')
732 733 nt.assert_true('timeit_alias' in mm.magics['line'])
733 734 nt.assert_true('timeit_alias' in mm.magics['cell'])
734 735
735 736 # --cell is specified, line magic not created.
736 737 ip.run_line_magic('alias_magic', '--cell timeit_cell_alias timeit')
737 738 nt.assert_false('timeit_cell_alias' in mm.magics['line'])
738 739 nt.assert_true('timeit_cell_alias' in mm.magics['cell'])
739 740
740 741 # Test that line alias is created successfully.
741 742 ip.run_line_magic('alias_magic', '--line env_alias env')
742 743 nt.assert_equal(ip.run_line_magic('env', ''),
743 744 ip.run_line_magic('env_alias', ''))
@@ -1,364 +1,366 b''
1 1 #!/usr/bin/env python
2 2
3 3 """ PickleShare - a small 'shelve' like datastore with concurrency support
4 4
5 5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6 6 shelve, many processes can access the database simultaneously. Changing a
7 7 value in database is immediately visible to other processes accessing the
8 8 same database.
9 9
10 10 Concurrency is possible because the values are stored in separate files. Hence
11 11 the "database" is a directory where *all* files are governed by PickleShare.
12 12
13 13 Example usage::
14 14
15 15 from pickleshare import *
16 16 db = PickleShareDB('~/testpickleshare')
17 17 db.clear()
18 18 print "Should be empty:",db.items()
19 19 db['hello'] = 15
20 20 db['aku ankka'] = [1,2,313]
21 21 db['paths/are/ok/key'] = [1,(5,46)]
22 22 print db.keys()
23 23 del db['aku ankka']
24 24
25 25 This module is certainly not ZODB, but can be used for low-load
26 26 (non-mission-critical) situations where tiny code size trumps the
27 27 advanced features of a "real" object database.
28 28
29 29 Installation guide: easy_install pickleshare
30 30
31 31 Author: Ville Vainio <vivainio@gmail.com>
32 32 License: MIT open source license.
33 33
34 34 """
35 35
36 36 from IPython.external.path import path as Path
37 37 import os,stat,time
38 38 import collections
39 39 import cPickle as pickle
40 40 import glob
41 41
42 42 def gethashfile(key):
43 43 return ("%02x" % abs(hash(key) % 256))[-2:]
44 44
45 45 _sentinel = object()
46 46
47 47 class PickleShareDB(collections.MutableMapping):
48 48 """ The main 'connection' object for PickleShare database """
49 49 def __init__(self,root):
50 50 """ Return a db object that will manage the specied directory"""
51 51 self.root = Path(root).expanduser().abspath()
52 52 if not self.root.isdir():
53 53 self.root.makedirs()
54 54 # cache has { 'key' : (obj, orig_mod_time) }
55 55 self.cache = {}
56 56
57 57
58 58 def __getitem__(self,key):
59 59 """ db['key'] reading """
60 60 fil = self.root / key
61 61 try:
62 62 mtime = (fil.stat()[stat.ST_MTIME])
63 63 except OSError:
64 64 raise KeyError(key)
65 65
66 66 if fil in self.cache and mtime == self.cache[fil][1]:
67 67 return self.cache[fil][0]
68 68 try:
69 69 # The cached item has expired, need to read
70 obj = pickle.loads(fil.open("rb").read())
70 with fil.open("rb") as f:
71 obj = pickle.loads(f.read())
71 72 except:
72 73 raise KeyError(key)
73 74
74 75 self.cache[fil] = (obj,mtime)
75 76 return obj
76 77
77 78 def __setitem__(self,key,value):
78 79 """ db['key'] = 5 """
79 80 fil = self.root / key
80 81 parent = fil.parent
81 82 if parent and not parent.isdir():
82 83 parent.makedirs()
83 84 # We specify protocol 2, so that we can mostly go between Python 2
84 85 # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
85 pickled = pickle.dump(value,fil.open('wb'), protocol=2)
86 with fil.open('wb') as f:
87 pickled = pickle.dump(value, f, protocol=2)
86 88 try:
87 89 self.cache[fil] = (value,fil.mtime)
88 90 except OSError as e:
89 91 if e.errno != 2:
90 92 raise
91 93
92 94 def hset(self, hashroot, key, value):
93 95 """ hashed set """
94 96 hroot = self.root / hashroot
95 97 if not hroot.isdir():
96 98 hroot.makedirs()
97 99 hfile = hroot / gethashfile(key)
98 100 d = self.get(hfile, {})
99 101 d.update( {key : value})
100 102 self[hfile] = d
101 103
102 104
103 105
104 106 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
105 107 """ hashed get """
106 108 hroot = self.root / hashroot
107 109 hfile = hroot / gethashfile(key)
108 110
109 111 d = self.get(hfile, _sentinel )
110 112 #print "got dict",d,"from",hfile
111 113 if d is _sentinel:
112 114 if fast_only:
113 115 if default is _sentinel:
114 116 raise KeyError(key)
115 117
116 118 return default
117 119
118 120 # slow mode ok, works even after hcompress()
119 121 d = self.hdict(hashroot)
120 122
121 123 return d.get(key, default)
122 124
123 125 def hdict(self, hashroot):
124 126 """ Get all data contained in hashed category 'hashroot' as dict """
125 127 hfiles = self.keys(hashroot + "/*")
126 128 hfiles.sort()
127 129 last = len(hfiles) and hfiles[-1] or ''
128 130 if last.endswith('xx'):
129 131 # print "using xx"
130 132 hfiles = [last] + hfiles[:-1]
131 133
132 134 all = {}
133 135
134 136 for f in hfiles:
135 137 # print "using",f
136 138 try:
137 139 all.update(self[f])
138 140 except KeyError:
139 141 print "Corrupt",f,"deleted - hset is not threadsafe!"
140 142 del self[f]
141 143
142 144 self.uncache(f)
143 145
144 146 return all
145 147
146 148 def hcompress(self, hashroot):
147 149 """ Compress category 'hashroot', so hset is fast again
148 150
149 151 hget will fail if fast_only is True for compressed items (that were
150 152 hset before hcompress).
151 153
152 154 """
153 155 hfiles = self.keys(hashroot + "/*")
154 156 all = {}
155 157 for f in hfiles:
156 158 # print "using",f
157 159 all.update(self[f])
158 160 self.uncache(f)
159 161
160 162 self[hashroot + '/xx'] = all
161 163 for f in hfiles:
162 164 p = self.root / f
163 165 if p.basename() == 'xx':
164 166 continue
165 167 p.remove()
166 168
167 169
168 170
169 171 def __delitem__(self,key):
170 172 """ del db["key"] """
171 173 fil = self.root / key
172 174 self.cache.pop(fil,None)
173 175 try:
174 176 fil.remove()
175 177 except OSError:
176 178 # notfound and permission denied are ok - we
177 179 # lost, the other process wins the conflict
178 180 pass
179 181
180 182 def _normalized(self, p):
181 183 """ Make a key suitable for user's eyes """
182 184 return str(self.root.relpathto(p)).replace('\\','/')
183 185
184 186 def keys(self, globpat = None):
185 187 """ All keys in DB, or all keys matching a glob"""
186 188
187 189 if globpat is None:
188 190 files = self.root.walkfiles()
189 191 else:
190 192 files = [Path(p) for p in glob.glob(self.root/globpat)]
191 193 return [self._normalized(p) for p in files if p.isfile()]
192 194
193 195 def __iter__(self):
194 196 return iter(self.keys())
195 197
196 198 def __len__(self):
197 199 return len(self.keys())
198 200
199 201 def uncache(self,*items):
200 202 """ Removes all, or specified items from cache
201 203
202 204 Use this after reading a large amount of large objects
203 205 to free up memory, when you won't be needing the objects
204 206 for a while.
205 207
206 208 """
207 209 if not items:
208 210 self.cache = {}
209 211 for it in items:
210 212 self.cache.pop(it,None)
211 213
212 214 def waitget(self,key, maxwaittime = 60 ):
213 215 """ Wait (poll) for a key to get a value
214 216
215 217 Will wait for `maxwaittime` seconds before raising a KeyError.
216 218 The call exits normally if the `key` field in db gets a value
217 219 within the timeout period.
218 220
219 221 Use this for synchronizing different processes or for ensuring
220 222 that an unfortunately timed "db['key'] = newvalue" operation
221 223 in another process (which causes all 'get' operation to cause a
222 224 KeyError for the duration of pickling) won't screw up your program
223 225 logic.
224 226 """
225 227
226 228 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
227 229 tries = 0
228 230 waited = 0
229 231 while 1:
230 232 try:
231 233 val = self[key]
232 234 return val
233 235 except KeyError:
234 236 pass
235 237
236 238 if waited > maxwaittime:
237 239 raise KeyError(key)
238 240
239 241 time.sleep(wtimes[tries])
240 242 waited+=wtimes[tries]
241 243 if tries < len(wtimes) -1:
242 244 tries+=1
243 245
244 246 def getlink(self,folder):
245 247 """ Get a convenient link for accessing items """
246 248 return PickleShareLink(self, folder)
247 249
248 250 def __repr__(self):
249 251 return "PickleShareDB('%s')" % self.root
250 252
251 253
252 254
253 255 class PickleShareLink:
254 256 """ A shortdand for accessing nested PickleShare data conveniently.
255 257
256 258 Created through PickleShareDB.getlink(), example::
257 259
258 260 lnk = db.getlink('myobjects/test')
259 261 lnk.foo = 2
260 262 lnk.bar = lnk.foo + 5
261 263
262 264 """
263 265 def __init__(self, db, keydir ):
264 266 self.__dict__.update(locals())
265 267
266 268 def __getattr__(self,key):
267 269 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
268 270 def __setattr__(self,key,val):
269 271 self.db[self.keydir+'/' + key] = val
270 272 def __repr__(self):
271 273 db = self.__dict__['db']
272 274 keys = db.keys( self.__dict__['keydir'] +"/*")
273 275 return "<PickleShareLink '%s': %s>" % (
274 276 self.__dict__['keydir'],
275 277 ";".join([Path(k).basename() for k in keys]))
276 278
277 279
278 280 def test():
279 281 db = PickleShareDB('~/testpickleshare')
280 282 db.clear()
281 283 print "Should be empty:",db.items()
282 284 db['hello'] = 15
283 285 db['aku ankka'] = [1,2,313]
284 286 db['paths/nest/ok/keyname'] = [1,(5,46)]
285 287 db.hset('hash', 'aku', 12)
286 288 db.hset('hash', 'ankka', 313)
287 289 print "12 =",db.hget('hash','aku')
288 290 print "313 =",db.hget('hash','ankka')
289 291 print "all hashed",db.hdict('hash')
290 292 print db.keys()
291 293 print db.keys('paths/nest/ok/k*')
292 294 print dict(db) # snapsot of whole db
293 295 db.uncache() # frees memory, causes re-reads later
294 296
295 297 # shorthand for accessing deeply nested files
296 298 lnk = db.getlink('myobjects/test')
297 299 lnk.foo = 2
298 300 lnk.bar = lnk.foo + 5
299 301 print lnk.bar # 7
300 302
301 303 def stress():
302 304 db = PickleShareDB('~/fsdbtest')
303 305 import time,sys
304 306 for i in range(1000):
305 307 for j in range(1000):
306 308 if i % 15 == 0 and i < 200:
307 309 if str(j) in db:
308 310 del db[str(j)]
309 311 continue
310 312
311 313 if j%33 == 0:
312 314 time.sleep(0.02)
313 315
314 316 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
315 317 db.hset('hash',j, db.hget('hash',j,15) + 1 )
316 318
317 319 print i,
318 320 sys.stdout.flush()
319 321 if i % 10 == 0:
320 322 db.uncache()
321 323
322 324 def main():
323 325 import textwrap
324 326 usage = textwrap.dedent("""\
325 327 pickleshare - manage PickleShare databases
326 328
327 329 Usage:
328 330
329 331 pickleshare dump /path/to/db > dump.txt
330 332 pickleshare load /path/to/db < dump.txt
331 333 pickleshare test /path/to/db
332 334 """)
333 335 DB = PickleShareDB
334 336 import sys
335 337 if len(sys.argv) < 2:
336 338 print usage
337 339 return
338 340
339 341 cmd = sys.argv[1]
340 342 args = sys.argv[2:]
341 343 if cmd == 'dump':
342 344 if not args: args= ['.']
343 345 db = DB(args[0])
344 346 import pprint
345 347 pprint.pprint(db.items())
346 348 elif cmd == 'load':
347 349 cont = sys.stdin.read()
348 350 db = DB(args[0])
349 351 data = eval(cont)
350 352 db.clear()
351 353 for k,v in db.items():
352 354 db[k] = v
353 355 elif cmd == 'testwait':
354 356 db = DB(args[0])
355 357 db.clear()
356 358 print db.waitget('250')
357 359 elif cmd == 'test':
358 360 test()
359 361 stress()
360 362
361 363 if __name__== "__main__":
362 364 main()
363 365
364 366
@@ -1,178 +1,179 b''
1 1 # coding: utf-8
2 2 """Compatibility tricks for Python 3. Mainly to do with unicode."""
3 3 import __builtin__
4 4 import functools
5 5 import sys
6 6 import re
7 7 import types
8 8
9 9 from .encoding import DEFAULT_ENCODING
10 10
11 11 orig_open = open
12 12
13 13 def no_code(x, encoding=None):
14 14 return x
15 15
16 16 def decode(s, encoding=None):
17 17 encoding = encoding or DEFAULT_ENCODING
18 18 return s.decode(encoding, "replace")
19 19
20 20 def encode(u, encoding=None):
21 21 encoding = encoding or DEFAULT_ENCODING
22 22 return u.encode(encoding, "replace")
23 23
24 24
25 25 def cast_unicode(s, encoding=None):
26 26 if isinstance(s, bytes):
27 27 return decode(s, encoding)
28 28 return s
29 29
30 30 def cast_bytes(s, encoding=None):
31 31 if not isinstance(s, bytes):
32 32 return encode(s, encoding)
33 33 return s
34 34
35 35 def _modify_str_or_docstring(str_change_func):
36 36 @functools.wraps(str_change_func)
37 37 def wrapper(func_or_str):
38 38 if isinstance(func_or_str, basestring):
39 39 func = None
40 40 doc = func_or_str
41 41 else:
42 42 func = func_or_str
43 43 doc = func.__doc__
44 44
45 45 doc = str_change_func(doc)
46 46
47 47 if func:
48 48 func.__doc__ = doc
49 49 return func
50 50 return doc
51 51 return wrapper
52 52
53 53 if sys.version_info[0] >= 3:
54 54 PY3 = True
55 55
56 56 input = input
57 57 builtin_mod_name = "builtins"
58 58
59 59 str_to_unicode = no_code
60 60 unicode_to_str = no_code
61 61 str_to_bytes = encode
62 62 bytes_to_str = decode
63 63 cast_bytes_py2 = no_code
64 64
65 65 def isidentifier(s, dotted=False):
66 66 if dotted:
67 67 return all(isidentifier(a) for a in s.split("."))
68 68 return s.isidentifier()
69 69
70 70 open = orig_open
71 71
72 72 MethodType = types.MethodType
73 73
74 74 def execfile(fname, glob, loc=None):
75 75 loc = loc if (loc is not None) else glob
76 exec compile(open(fname, 'rb').read(), fname, 'exec') in glob, loc
76 with open(fname, 'rb') as f:
77 exec compile(f.read(), fname, 'exec') in glob, loc
77 78
78 79 # Refactor print statements in doctests.
79 80 _print_statement_re = re.compile(r"\bprint (?P<expr>.*)$", re.MULTILINE)
80 81 def _print_statement_sub(match):
81 82 expr = match.groups('expr')
82 83 return "print(%s)" % expr
83 84
84 85 @_modify_str_or_docstring
85 86 def doctest_refactor_print(doc):
86 87 """Refactor 'print x' statements in a doctest to print(x) style. 2to3
87 88 unfortunately doesn't pick up on our doctests.
88 89
89 90 Can accept a string or a function, so it can be used as a decorator."""
90 91 return _print_statement_re.sub(_print_statement_sub, doc)
91 92
92 93 # Abstract u'abc' syntax:
93 94 @_modify_str_or_docstring
94 95 def u_format(s):
95 96 """"{u}'abc'" --> "'abc'" (Python 3)
96 97
97 98 Accepts a string or a function, so it can be used as a decorator."""
98 99 return s.format(u='')
99 100
100 101 else:
101 102 PY3 = False
102 103
103 104 input = raw_input
104 105 builtin_mod_name = "__builtin__"
105 106
106 107 str_to_unicode = decode
107 108 unicode_to_str = encode
108 109 str_to_bytes = no_code
109 110 bytes_to_str = no_code
110 111 cast_bytes_py2 = cast_bytes
111 112
112 113 import re
113 114 _name_re = re.compile(r"[a-zA-Z_][a-zA-Z0-9_]*$")
114 115 def isidentifier(s, dotted=False):
115 116 if dotted:
116 117 return all(isidentifier(a) for a in s.split("."))
117 118 return bool(_name_re.match(s))
118 119
119 120 class open(object):
120 121 """Wrapper providing key part of Python 3 open() interface."""
121 122 def __init__(self, fname, mode="r", encoding="utf-8"):
122 123 self.f = orig_open(fname, mode)
123 124 self.enc = encoding
124 125
125 126 def write(self, s):
126 127 return self.f.write(s.encode(self.enc))
127 128
128 129 def read(self, size=-1):
129 130 return self.f.read(size).decode(self.enc)
130 131
131 132 def close(self):
132 133 return self.f.close()
133 134
134 135 def __enter__(self):
135 136 return self
136 137
137 138 def __exit__(self, etype, value, traceback):
138 139 self.f.close()
139 140
140 141 def MethodType(func, instance):
141 142 return types.MethodType(func, instance, type(instance))
142 143
143 144 # don't override system execfile on 2.x:
144 145 execfile = execfile
145 146
146 147 def doctest_refactor_print(func_or_str):
147 148 return func_or_str
148 149
149 150
150 151 # Abstract u'abc' syntax:
151 152 @_modify_str_or_docstring
152 153 def u_format(s):
153 154 """"{u}'abc'" --> "u'abc'" (Python 2)
154 155
155 156 Accepts a string or a function, so it can be used as a decorator."""
156 157 return s.format(u='u')
157 158
158 159 if sys.platform == 'win32':
159 160 def execfile(fname, glob=None, loc=None):
160 161 loc = loc if (loc is not None) else glob
161 162 # The rstrip() is necessary b/c trailing whitespace in files will
162 163 # cause an IndentationError in Python 2.6 (this was fixed in 2.7,
163 164 # but we still support 2.6). See issue 1027.
164 165 scripttext = __builtin__.open(fname).read().rstrip() + '\n'
165 166 # compile converts unicode filename to str assuming
166 167 # ascii. Let's do the conversion before calling compile
167 168 if isinstance(fname, unicode):
168 169 filename = unicode_to_str(fname)
169 170 else:
170 171 filename = fname
171 172 exec compile(scripttext, filename, 'exec') in glob, loc
172 173 else:
173 174 def execfile(fname, *where):
174 175 if isinstance(fname, unicode):
175 176 filename = fname.encode(sys.getfilesystemencoding())
176 177 else:
177 178 filename = fname
178 179 __builtin__.execfile(filename, *where)
General Comments 0
You need to be logged in to leave comments. Login now