##// END OF EJS Templates
Add test code to make sure that history database errors caused by duplicate session/line numbers are handled correctly (don't crash, get new session).
Thomas Kluyver -
Show More
@@ -1,109 +1,114 b''
1 """Tests for the IPython tab-completion machinery.
1 """Tests for the IPython tab-completion machinery.
2 """
2 """
3 #-----------------------------------------------------------------------------
3 #-----------------------------------------------------------------------------
4 # Module imports
4 # Module imports
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6
6
7 # stdlib
7 # stdlib
8 import os
8 import os
9 import sys
9 import sys
10 import unittest
10 import unittest
11
11
12 # third party
12 # third party
13 import nose.tools as nt
13 import nose.tools as nt
14
14
15 # our own packages
15 # our own packages
16 from IPython.utils.tempdir import TemporaryDirectory
16 from IPython.utils.tempdir import TemporaryDirectory
17 from IPython.core.history import HistoryManager, extract_hist_ranges
17 from IPython.core.history import HistoryManager, extract_hist_ranges
18
18
19 def test_history():
19 def test_history():
20
20
21 ip = get_ipython()
21 ip = get_ipython()
22 with TemporaryDirectory() as tmpdir:
22 with TemporaryDirectory() as tmpdir:
23 #tmpdir = '/software/temp'
23 #tmpdir = '/software/temp'
24 histfile = os.path.realpath(os.path.join(tmpdir, 'history.sqlite'))
24 histfile = os.path.realpath(os.path.join(tmpdir, 'history.sqlite'))
25 # Ensure that we restore the history management that we mess with in
25 # Ensure that we restore the history management that we mess with in
26 # this test doesn't affect the IPython instance used by the test suite
26 # this test doesn't affect the IPython instance used by the test suite
27 # beyond this test.
27 # beyond this test.
28 hist_manager_ori = ip.history_manager
28 hist_manager_ori = ip.history_manager
29 try:
29 try:
30 ip.history_manager = HistoryManager(shell=ip)
30 ip.history_manager = HistoryManager(shell=ip)
31 ip.history_manager.hist_file = histfile
31 ip.history_manager.hist_file = histfile
32 ip.history_manager.init_db() # Has to be called after changing file
32 ip.history_manager.init_db() # Has to be called after changing file
33 ip.history_manager.reset()
33 ip.history_manager.reset()
34 print 'test',histfile
34 print 'test',histfile
35 hist = ['a=1', 'def f():\n test = 1\n return test', 'b=2']
35 hist = ['a=1', 'def f():\n test = 1\n return test', 'b=2']
36 for i, h in enumerate(hist, start=1):
36 for i, h in enumerate(hist, start=1):
37 ip.history_manager.store_inputs(i, h)
37 ip.history_manager.store_inputs(i, h)
38
38
39 ip.history_manager.db_log_output = True
39 ip.history_manager.db_log_output = True
40 # Doesn't match the input, but we'll just check it's stored.
40 # Doesn't match the input, but we'll just check it's stored.
41 ip.history_manager.output_hist_reprs[3].append("spam")
41 ip.history_manager.output_hist_reprs[3].append("spam")
42 ip.history_manager.store_output(3)
42 ip.history_manager.store_output(3)
43
43
44 nt.assert_equal(ip.history_manager.input_hist_raw, [''] + hist)
44 nt.assert_equal(ip.history_manager.input_hist_raw, [''] + hist)
45
45
46 # Check lines were written to DB
46 # Check lines were written to DB
47 c = ip.history_manager.db.execute("SELECT source_raw FROM history")
47 c = ip.history_manager.db.execute("SELECT source_raw FROM history")
48 nt.assert_equal([x for x, in c], hist)
48 nt.assert_equal([x for x, in c], hist)
49
49
50 # New session
50 # New session
51 ip.history_manager.reset()
51 ip.history_manager.reset()
52 newcmds = ["z=5","class X(object):\n pass", "k='p'"]
52 newcmds = ["z=5","class X(object):\n pass", "k='p'"]
53 for i, cmd in enumerate(newcmds, start=1):
53 for i, cmd in enumerate(newcmds, start=1):
54 ip.history_manager.store_inputs(i, cmd)
54 ip.history_manager.store_inputs(i, cmd)
55 gothist = ip.history_manager.get_range(start=1, stop=4)
55 gothist = ip.history_manager.get_range(start=1, stop=4)
56 nt.assert_equal(list(gothist), zip([0,0,0],[1,2,3], newcmds))
56 nt.assert_equal(list(gothist), zip([0,0,0],[1,2,3], newcmds))
57 # Previous session:
57 # Previous session:
58 gothist = ip.history_manager.get_range(-1, 1, 4)
58 gothist = ip.history_manager.get_range(-1, 1, 4)
59 nt.assert_equal(list(gothist), zip([1,1,1],[1,2,3], hist))
59 nt.assert_equal(list(gothist), zip([1,1,1],[1,2,3], hist))
60
60
61 # Check get_hist_tail
61 # Check get_hist_tail
62 gothist = ip.history_manager.get_tail(4, output=True,
62 gothist = ip.history_manager.get_tail(4, output=True,
63 include_latest=True)
63 include_latest=True)
64 expected = [(1, 3, (hist[-1], ["spam"])),
64 expected = [(1, 3, (hist[-1], ["spam"])),
65 (2, 1, (newcmds[0], None)),
65 (2, 1, (newcmds[0], None)),
66 (2, 2, (newcmds[1], None)),
66 (2, 2, (newcmds[1], None)),
67 (2, 3, (newcmds[2], None)),]
67 (2, 3, (newcmds[2], None)),]
68 nt.assert_equal(list(gothist), expected)
68 nt.assert_equal(list(gothist), expected)
69
69
70 gothist = ip.history_manager.get_tail(2)
70 gothist = ip.history_manager.get_tail(2)
71 expected = [(2, 1, newcmds[0]),
71 expected = [(2, 1, newcmds[0]),
72 (2, 2, newcmds[1])]
72 (2, 2, newcmds[1])]
73 nt.assert_equal(list(gothist), expected)
73 nt.assert_equal(list(gothist), expected)
74
74
75 # Check get_hist_search
75 # Check get_hist_search
76 gothist = ip.history_manager.search("*test*")
76 gothist = ip.history_manager.search("*test*")
77 nt.assert_equal(list(gothist), [(1,2,hist[1])] )
77 nt.assert_equal(list(gothist), [(1,2,hist[1])] )
78 gothist = ip.history_manager.search("b*", output=True)
78 gothist = ip.history_manager.search("b*", output=True)
79 nt.assert_equal(list(gothist), [(1,3,(hist[2],["spam"]))] )
79 nt.assert_equal(list(gothist), [(1,3,(hist[2],["spam"]))] )
80
80
81 # Cross testing: check that magic %save can get previous session.
81 # Cross testing: check that magic %save can get previous session.
82 testfilename = os.path.realpath(os.path.join(tmpdir, "test.py"))
82 testfilename = os.path.realpath(os.path.join(tmpdir, "test.py"))
83 ip.magic_save(testfilename + " ~1/1-3")
83 ip.magic_save(testfilename + " ~1/1-3")
84 testfile = open(testfilename, "r")
84 testfile = open(testfilename, "r")
85 nt.assert_equal(testfile.read(), "\n".join(hist))
85 nt.assert_equal(testfile.read(), "\n".join(hist))
86
87 # Duplicate line numbers - check that it doesn't crash, and
88 # gets a new session
89 ip.history_manager.store_inputs(1, "rogue")
90 nt.assert_equal(ip.history_manager.session_number, 3)
86 finally:
91 finally:
87 # Restore history manager
92 # Restore history manager
88 ip.history_manager = hist_manager_ori
93 ip.history_manager = hist_manager_ori
89
94
90 def test_extract_hist_ranges():
95 def test_extract_hist_ranges():
91 instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5"
96 instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5"
92 expected = [(0, 1, 2), # 0 == current session
97 expected = [(0, 1, 2), # 0 == current session
93 (2, 3, 4),
98 (2, 3, 4),
94 (-4, 5, 7),
99 (-4, 5, 7),
95 (-4, 7, 10),
100 (-4, 7, 10),
96 (-9, 2, None), # None == to end
101 (-9, 2, None), # None == to end
97 (-8, 1, None),
102 (-8, 1, None),
98 (-7, 1, 6)]
103 (-7, 1, 6)]
99 actual = list(extract_hist_ranges(instr))
104 actual = list(extract_hist_ranges(instr))
100 nt.assert_equal(actual, expected)
105 nt.assert_equal(actual, expected)
101
106
102 def test_magic_rerun():
107 def test_magic_rerun():
103 """Simple test for %rerun (no args -> rerun last line)"""
108 """Simple test for %rerun (no args -> rerun last line)"""
104 ip = get_ipython()
109 ip = get_ipython()
105 ip.run_cell("a = 10")
110 ip.run_cell("a = 10")
106 ip.run_cell("a += 1")
111 ip.run_cell("a += 1")
107 nt.assert_equal(ip.user_ns["a"], 11)
112 nt.assert_equal(ip.user_ns["a"], 11)
108 ip.run_cell("%rerun")
113 ip.run_cell("%rerun")
109 nt.assert_equal(ip.user_ns["a"], 12)
114 nt.assert_equal(ip.user_ns["a"], 12)
General Comments 0
You need to be logged in to leave comments. Login now