|
|
# coding: utf-8
|
|
|
"""Tests for the IPython tab-completion machinery.
|
|
|
"""
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Module imports
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
# stdlib
|
|
|
import io
|
|
|
import sqlite3
|
|
|
import sys
|
|
|
import tempfile
|
|
|
from datetime import datetime
|
|
|
from pathlib import Path
|
|
|
|
|
|
from tempfile import TemporaryDirectory
|
|
|
# our own packages
|
|
|
from traitlets.config.loader import Config
|
|
|
|
|
|
from IPython.core.history import HistoryAccessor, HistoryManager, extract_hist_ranges
|
|
|
|
|
|
|
|
|
def test_proper_default_encoding():
|
|
|
assert sys.getdefaultencoding() == "utf-8"
|
|
|
|
|
|
def test_history():
|
|
|
ip = get_ipython()
|
|
|
with TemporaryDirectory() as tmpdir:
|
|
|
tmp_path = Path(tmpdir)
|
|
|
hist_manager_ori = ip.history_manager
|
|
|
hist_file = tmp_path / "history.sqlite"
|
|
|
try:
|
|
|
ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file)
|
|
|
hist = ["a=1", "def f():\n test = 1\n return test", "b='€Æ¾÷ß'"]
|
|
|
for i, h in enumerate(hist, start=1):
|
|
|
ip.history_manager.store_inputs(i, h)
|
|
|
|
|
|
ip.history_manager.db_log_output = True
|
|
|
# Doesn't match the input, but we'll just check it's stored.
|
|
|
ip.history_manager.output_hist_reprs[3] = "spam"
|
|
|
ip.history_manager.store_output(3)
|
|
|
|
|
|
assert ip.history_manager.input_hist_raw == [""] + hist
|
|
|
|
|
|
# Detailed tests for _get_range_session
|
|
|
grs = ip.history_manager._get_range_session
|
|
|
assert list(grs(start=2, stop=-1)) == list(zip([0], [2], hist[1:-1]))
|
|
|
assert list(grs(start=-2)) == list(zip([0, 0], [2, 3], hist[-2:]))
|
|
|
assert list(grs(output=True)) == list(
|
|
|
zip([0, 0, 0], [1, 2, 3], zip(hist, [None, None, "spam"]))
|
|
|
)
|
|
|
|
|
|
# Check whether specifying a range beyond the end of the current
|
|
|
# session results in an error (gh-804)
|
|
|
ip.run_line_magic("hist", "2-500")
|
|
|
|
|
|
# Check that we can write non-ascii characters to a file
|
|
|
ip.run_line_magic("hist", "-f %s" % (tmp_path / "test1"))
|
|
|
ip.run_line_magic("hist", "-pf %s" % (tmp_path / "test2"))
|
|
|
ip.run_line_magic("hist", "-nf %s" % (tmp_path / "test3"))
|
|
|
ip.run_line_magic("save", "%s 1-10" % (tmp_path / "test4"))
|
|
|
|
|
|
# New session
|
|
|
ip.history_manager.reset()
|
|
|
newcmds = ["z=5", "class X(object):\n pass", "k='p'", "z=5"]
|
|
|
for i, cmd in enumerate(newcmds, start=1):
|
|
|
ip.history_manager.store_inputs(i, cmd)
|
|
|
gothist = ip.history_manager.get_range(start=1, stop=4)
|
|
|
assert list(gothist) == list(zip([0, 0, 0], [1, 2, 3], newcmds))
|
|
|
# Previous session:
|
|
|
gothist = ip.history_manager.get_range(-1, 1, 4)
|
|
|
assert list(gothist) == list(zip([1, 1, 1], [1, 2, 3], hist))
|
|
|
|
|
|
newhist = [(2, i, c) for (i, c) in enumerate(newcmds, 1)]
|
|
|
|
|
|
# Check get_hist_tail
|
|
|
gothist = ip.history_manager.get_tail(5, output=True,
|
|
|
include_latest=True)
|
|
|
expected = [(1, 3, (hist[-1], "spam"))] \
|
|
|
+ [(s, n, (c, None)) for (s, n, c) in newhist]
|
|
|
assert list(gothist) == expected
|
|
|
|
|
|
gothist = ip.history_manager.get_tail(2)
|
|
|
expected = newhist[-3:-1]
|
|
|
assert list(gothist) == expected
|
|
|
|
|
|
# Check get_hist_search
|
|
|
|
|
|
gothist = ip.history_manager.search("*test*")
|
|
|
assert list(gothist) == [(1, 2, hist[1])]
|
|
|
|
|
|
gothist = ip.history_manager.search("*=*")
|
|
|
assert list(gothist) == [
|
|
|
(1, 1, hist[0]),
|
|
|
(1, 2, hist[1]),
|
|
|
(1, 3, hist[2]),
|
|
|
newhist[0],
|
|
|
newhist[2],
|
|
|
newhist[3],
|
|
|
]
|
|
|
|
|
|
gothist = ip.history_manager.search("*=*", n=4)
|
|
|
assert list(gothist) == [
|
|
|
(1, 3, hist[2]),
|
|
|
newhist[0],
|
|
|
newhist[2],
|
|
|
newhist[3],
|
|
|
]
|
|
|
|
|
|
gothist = ip.history_manager.search("*=*", unique=True)
|
|
|
assert list(gothist) == [
|
|
|
(1, 1, hist[0]),
|
|
|
(1, 2, hist[1]),
|
|
|
(1, 3, hist[2]),
|
|
|
newhist[2],
|
|
|
newhist[3],
|
|
|
]
|
|
|
|
|
|
gothist = ip.history_manager.search("*=*", unique=True, n=3)
|
|
|
assert list(gothist) == [(1, 3, hist[2]), newhist[2], newhist[3]]
|
|
|
|
|
|
gothist = ip.history_manager.search("b*", output=True)
|
|
|
assert list(gothist) == [(1, 3, (hist[2], "spam"))]
|
|
|
|
|
|
# Cross testing: check that magic %save can get previous session.
|
|
|
testfilename = (tmp_path / "test.py").resolve()
|
|
|
ip.run_line_magic("save", str(testfilename) + " ~1/1-3")
|
|
|
with io.open(testfilename, encoding="utf-8") as testfile:
|
|
|
assert testfile.read() == "# coding: utf-8\n" + "\n".join(hist) + "\n"
|
|
|
|
|
|
# Duplicate line numbers - check that it doesn't crash, and
|
|
|
# gets a new session
|
|
|
ip.history_manager.store_inputs(1, "rogue")
|
|
|
ip.history_manager.writeout_cache()
|
|
|
assert ip.history_manager.session_number == 3
|
|
|
|
|
|
# Check that session and line values are not just max values
|
|
|
sessid, lineno, entry = newhist[-1]
|
|
|
assert lineno > 1
|
|
|
ip.history_manager.reset()
|
|
|
lineno = 1
|
|
|
ip.history_manager.store_inputs(lineno, entry)
|
|
|
gothist = ip.history_manager.search("*=*", unique=True)
|
|
|
hist = list(gothist)[-1]
|
|
|
assert sessid < hist[0]
|
|
|
assert hist[1:] == (lineno, entry)
|
|
|
finally:
|
|
|
# Ensure saving thread is shut down before we try to clean up the files
|
|
|
ip.history_manager.save_thread.stop()
|
|
|
# Forcibly close database rather than relying on garbage collection
|
|
|
ip.history_manager.db.close()
|
|
|
# Restore history manager
|
|
|
ip.history_manager = hist_manager_ori
|
|
|
|
|
|
|
|
|
def test_extract_hist_ranges():
|
|
|
instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5 ~10/"
|
|
|
expected = [(0, 1, 2), # 0 == current session
|
|
|
(2, 3, 4),
|
|
|
(-4, 5, 7),
|
|
|
(-4, 7, 10),
|
|
|
(-9, 2, None), # None == to end
|
|
|
(-8, 1, None),
|
|
|
(-7, 1, 6),
|
|
|
(-10, 1, None)]
|
|
|
actual = list(extract_hist_ranges(instr))
|
|
|
assert actual == expected
|
|
|
|
|
|
|
|
|
def test_extract_hist_ranges_empty_str():
|
|
|
instr = ""
|
|
|
expected = [(0, 1, None)] # 0 == current session, None == to end
|
|
|
actual = list(extract_hist_ranges(instr))
|
|
|
assert actual == expected
|
|
|
|
|
|
|
|
|
def test_magic_rerun():
|
|
|
"""Simple test for %rerun (no args -> rerun last line)"""
|
|
|
ip = get_ipython()
|
|
|
ip.run_cell("a = 10", store_history=True)
|
|
|
ip.run_cell("a += 1", store_history=True)
|
|
|
assert ip.user_ns["a"] == 11
|
|
|
ip.run_cell("%rerun", store_history=True)
|
|
|
assert ip.user_ns["a"] == 12
|
|
|
|
|
|
def test_timestamp_type():
|
|
|
ip = get_ipython()
|
|
|
info = ip.history_manager.get_session_info()
|
|
|
assert isinstance(info[1], datetime)
|
|
|
|
|
|
def test_hist_file_config():
|
|
|
cfg = Config()
|
|
|
tfile = tempfile.NamedTemporaryFile(delete=False)
|
|
|
cfg.HistoryManager.hist_file = Path(tfile.name)
|
|
|
try:
|
|
|
hm = HistoryManager(shell=get_ipython(), config=cfg)
|
|
|
assert hm.hist_file == cfg.HistoryManager.hist_file
|
|
|
finally:
|
|
|
try:
|
|
|
Path(tfile.name).unlink()
|
|
|
except OSError:
|
|
|
# same catch as in testing.tools.TempFileMixin
|
|
|
# On Windows, even though we close the file, we still can't
|
|
|
# delete it. I have no clue why
|
|
|
pass
|
|
|
|
|
|
def test_histmanager_disabled():
|
|
|
"""Ensure that disabling the history manager doesn't create a database."""
|
|
|
cfg = Config()
|
|
|
cfg.HistoryAccessor.enabled = False
|
|
|
|
|
|
ip = get_ipython()
|
|
|
with TemporaryDirectory() as tmpdir:
|
|
|
hist_manager_ori = ip.history_manager
|
|
|
hist_file = Path(tmpdir) / "history.sqlite"
|
|
|
cfg.HistoryManager.hist_file = hist_file
|
|
|
try:
|
|
|
ip.history_manager = HistoryManager(shell=ip, config=cfg)
|
|
|
hist = ["a=1", "def f():\n test = 1\n return test", "b='€Æ¾÷ß'"]
|
|
|
for i, h in enumerate(hist, start=1):
|
|
|
ip.history_manager.store_inputs(i, h)
|
|
|
assert ip.history_manager.input_hist_raw == [""] + hist
|
|
|
ip.history_manager.reset()
|
|
|
ip.history_manager.end_session()
|
|
|
finally:
|
|
|
ip.history_manager = hist_manager_ori
|
|
|
|
|
|
# hist_file should not be created
|
|
|
assert hist_file.exists() is False
|
|
|
|
|
|
|
|
|
def test_get_tail_session_awareness():
|
|
|
"""Test .get_tail() is:
|
|
|
- session specific in HistoryManager
|
|
|
- session agnostic in HistoryAccessor
|
|
|
same for .get_last_session_id()
|
|
|
"""
|
|
|
ip = get_ipython()
|
|
|
with TemporaryDirectory() as tmpdir:
|
|
|
tmp_path = Path(tmpdir)
|
|
|
hist_file = tmp_path / "history.sqlite"
|
|
|
get_source = lambda x: x[2]
|
|
|
hm1 = None
|
|
|
hm2 = None
|
|
|
ha = None
|
|
|
try:
|
|
|
# hm1 creates a new session and adds history entries,
|
|
|
# ha catches up
|
|
|
hm1 = HistoryManager(shell=ip, hist_file=hist_file)
|
|
|
hm1_last_sid = hm1.get_last_session_id
|
|
|
ha = HistoryAccessor(hist_file=hist_file)
|
|
|
ha_last_sid = ha.get_last_session_id
|
|
|
|
|
|
hist1 = ["a=1", "b=1", "c=1"]
|
|
|
for i, h in enumerate(hist1 + [""], start=1):
|
|
|
hm1.store_inputs(i, h)
|
|
|
assert list(map(get_source, hm1.get_tail())) == hist1
|
|
|
assert list(map(get_source, ha.get_tail())) == hist1
|
|
|
sid1 = hm1_last_sid()
|
|
|
assert sid1 is not None
|
|
|
assert ha_last_sid() == sid1
|
|
|
|
|
|
# hm2 creates a new session and adds entries,
|
|
|
# ha catches up
|
|
|
hm2 = HistoryManager(shell=ip, hist_file=hist_file)
|
|
|
hm2_last_sid = hm2.get_last_session_id
|
|
|
|
|
|
hist2 = ["a=2", "b=2", "c=2"]
|
|
|
for i, h in enumerate(hist2 + [""], start=1):
|
|
|
hm2.store_inputs(i, h)
|
|
|
tail = hm2.get_tail(n=3)
|
|
|
assert list(map(get_source, tail)) == hist2
|
|
|
tail = ha.get_tail(n=3)
|
|
|
assert list(map(get_source, tail)) == hist2
|
|
|
sid2 = hm2_last_sid()
|
|
|
assert sid2 is not None
|
|
|
assert ha_last_sid() == sid2
|
|
|
assert sid2 != sid1
|
|
|
|
|
|
# but hm1 still maintains its point of reference
|
|
|
# and adding more entries to it doesn't change others
|
|
|
# immediate perspective
|
|
|
assert hm1_last_sid() == sid1
|
|
|
tail = hm1.get_tail(n=3)
|
|
|
assert list(map(get_source, tail)) == hist1
|
|
|
|
|
|
hist3 = ["a=3", "b=3", "c=3"]
|
|
|
for i, h in enumerate(hist3 + [""], start=5):
|
|
|
hm1.store_inputs(i, h)
|
|
|
tail = hm1.get_tail(n=7)
|
|
|
assert list(map(get_source, tail)) == hist1 + [""] + hist3
|
|
|
tail = hm2.get_tail(n=3)
|
|
|
assert list(map(get_source, tail)) == hist2
|
|
|
tail = ha.get_tail(n=3)
|
|
|
assert list(map(get_source, tail)) == hist2
|
|
|
assert hm1_last_sid() == sid1
|
|
|
assert hm2_last_sid() == sid2
|
|
|
assert ha_last_sid() == sid2
|
|
|
finally:
|
|
|
if hm1:
|
|
|
hm1.save_thread.stop()
|
|
|
hm1.db.close()
|
|
|
if hm2:
|
|
|
hm2.save_thread.stop()
|
|
|
hm2.db.close()
|
|
|
if ha:
|
|
|
ha.db.close()
|
|
|
|