##// END OF EJS Templates
Update history.py to use pathlib...
Jakub Klus -
Show More
@@ -6,7 +6,7 b''
6 6
7 7 import atexit
8 8 import datetime
9 import os
9 from pathlib import Path
10 10 import re
11 11 import sqlite3
12 12 import threading
@@ -16,8 +16,8 b' from decorator import decorator'
16 16 from IPython.utils.decorators import undoc
17 17 from IPython.paths import locate_profile
18 18 from traitlets import (
19 Any, Bool, Dict, Instance, Integer, List, Unicode, TraitError,
20 default, observe,
19 Any, Bool, Dict, Instance, Integer, List, Unicode, Union, TraitError,
20 default, observe
21 21 )
22 22
23 23 #-----------------------------------------------------------------------------
@@ -73,17 +73,18 b' def catch_corrupt_db(f, self, *a, **kw):'
73 73 if self._corrupt_db_counter > self._corrupt_db_limit:
74 74 self.hist_file = ':memory:'
75 75 self.log.error("Failed to load history too many times, history will not be saved.")
76 elif os.path.isfile(self.hist_file):
76 elif self.hist_file.is_file():
77 77 # move the file out of the way
78 base, ext = os.path.splitext(self.hist_file)
79 size = os.stat(self.hist_file).st_size
78 base = str(self.hist_file.parent / self.hist_file.stem)
79 ext = self.hist_file.suffix
80 size = self.hist_file.stat().st_size
80 81 if size >= _SAVE_DB_SIZE:
81 82 # if there's significant content, avoid clobbering
82 83 now = datetime.datetime.now().isoformat().replace(':', '.')
83 84 newpath = base + '-corrupt-' + now + ext
84 85 # don't clobber previous corrupt backups
85 86 for i in range(100):
86 if not os.path.isfile(newpath):
87 if not Path(newpath).exists():
87 88 break
88 89 else:
89 90 newpath = base + '-corrupt-' + now + (u'-%i' % i) + ext
@@ -91,7 +92,7 b' def catch_corrupt_db(f, self, *a, **kw):'
91 92 # not much content, possibly empty; don't worry about clobbering
92 93 # maybe we should just delete it?
93 94 newpath = base + '-corrupt' + ext
94 os.rename(self.hist_file, newpath)
95 self.hist_file.rename(newpath)
95 96 self.log.error("History file was moved to %s and a new file created.", newpath)
96 97 self.init_db()
97 98 return []
@@ -128,7 +129,7 b' class HistoryAccessor(HistoryAccessorBase):'
128 129 _corrupt_db_limit = 2
129 130
130 131 # String holding the path to the history file
131 hist_file = Unicode(
132 hist_file = Union([Instance(Path), Unicode()],
132 133 help="""Path to file to use for SQLite history database.
133 134
134 135 By default, IPython will put the history database in the IPython
@@ -144,7 +145,8 b' class HistoryAccessor(HistoryAccessorBase):'
144 145 you can also use the specific value `:memory:` (including the colon
145 146 at both end but not the back ticks), to avoid creating an history file.
146 147
147 """).tag(config=True)
148 """
149 ).tag(config=True)
148 150
149 151 enabled = Bool(True,
150 152 help="""enable the SQLite history
@@ -176,7 +178,7 b' class HistoryAccessor(HistoryAccessorBase):'
176 178 (self.__class__.__name__, new)
177 179 raise TraitError(msg)
178 180
179 def __init__(self, profile='default', hist_file=u'', **traits):
181 def __init__(self, profile='default', hist_file="", **traits):
180 182 """Create a new history accessor.
181 183
182 184 Parameters
@@ -197,7 +199,9 b' class HistoryAccessor(HistoryAccessorBase):'
197 199 if hist_file:
198 200 self.hist_file = hist_file
199 201
200 if self.hist_file == u'':
202 try:
203 self.hist_file
204 except TraitError:
201 205 # No one has set the hist_file, yet.
202 206 self.hist_file = self._get_hist_file_name(profile)
203 207
@@ -214,7 +218,7 b' class HistoryAccessor(HistoryAccessorBase):'
214 218 profile : str
215 219 The name of a profile which has a history file.
216 220 """
217 return os.path.join(locate_profile(profile), 'history.sqlite')
221 return Path(locate_profile(profile)) / 'history.sqlite'
218 222
219 223 @catch_corrupt_db
220 224 def init_db(self):
@@ -461,7 +465,7 b' class HistoryManager(HistoryAccessor):'
461 465 @default('dir_hist')
462 466 def _dir_hist_default(self):
463 467 try:
464 return [os.getcwd()]
468 return [Path.cwd()]
465 469 except OSError:
466 470 return []
467 471
@@ -531,7 +535,7 b' class HistoryManager(HistoryAccessor):'
531 535 The profile parameter is ignored, but must exist for compatibility with
532 536 the parent class."""
533 537 profile_dir = self.shell.profile_dir.location
534 return os.path.join(profile_dir, 'history.sqlite')
538 return Path(profile_dir)/'history.sqlite'
535 539
536 540 @only_when_enabled
537 541 def new_session(self, conn=None):
@@ -564,7 +568,7 b' class HistoryManager(HistoryAccessor):'
564 568 optionally open a new session."""
565 569 self.output_hist.clear()
566 570 # The directory history can't be completely empty
567 self.dir_hist[:] = [os.getcwd()]
571 self.dir_hist[:] = [Path.cwd()]
568 572
569 573 if new_session:
570 574 if self.session_number:
@@ -7,7 +7,7 b''
7 7
8 8 # stdlib
9 9 import io
10 import os
10 from pathlib import Path
11 11 import sys
12 12 import tempfile
13 13 from datetime import datetime
@@ -29,8 +29,9 b' def test_proper_default_encoding():'
29 29 def test_history():
30 30 ip = get_ipython()
31 31 with TemporaryDirectory() as tmpdir:
32 tmp_path = Path(tmpdir)
32 33 hist_manager_ori = ip.history_manager
33 hist_file = os.path.join(tmpdir, 'history.sqlite')
34 hist_file = tmp_path / 'history.sqlite'
34 35 try:
35 36 ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file)
36 37 hist = [u'a=1', u'def f():\n test = 1\n return test', u"b='€Æ¾÷ß'"]
@@ -55,10 +56,10 b' def test_history():'
55 56 ip.magic('%hist 2-500')
56 57
57 58 # Check that we can write non-ascii characters to a file
58 ip.magic("%%hist -f %s" % os.path.join(tmpdir, "test1"))
59 ip.magic("%%hist -pf %s" % os.path.join(tmpdir, "test2"))
60 ip.magic("%%hist -nf %s" % os.path.join(tmpdir, "test3"))
61 ip.magic("%%save %s 1-10" % os.path.join(tmpdir, "test4"))
59 ip.magic("%%hist -f %s" % (tmp_path / "test1"))
60 ip.magic("%%hist -pf %s" % (tmp_path / "test2"))
61 ip.magic("%%hist -nf %s" % (tmp_path / "test3"))
62 ip.magic("%%save %s 1-10" % (tmp_path / "test4"))
62 63
63 64 # New session
64 65 ip.history_manager.reset()
@@ -126,8 +127,8 b' def test_history():'
126 127 nt.assert_equal(list(gothist), [(1,3,(hist[2],"spam"))] )
127 128
128 129 # Cross testing: check that magic %save can get previous session.
129 testfilename = os.path.realpath(os.path.join(tmpdir, "test.py"))
130 ip.magic("save " + testfilename + " ~1/1-3")
130 testfilename = (tmp_path /"test.py").resolve()
131 ip.magic("save " + str(testfilename) + " ~1/1-3")
131 132 with io.open(testfilename, encoding='utf-8') as testfile:
132 133 nt.assert_equal(testfile.read(),
133 134 u"# coding: utf-8\n" + u"\n".join(hist)+u"\n")
@@ -176,13 +177,13 b' def test_timestamp_type():'
176 177 def test_hist_file_config():
177 178 cfg = Config()
178 179 tfile = tempfile.NamedTemporaryFile(delete=False)
179 cfg.HistoryManager.hist_file = tfile.name
180 cfg.HistoryManager.hist_file = Path(tfile.name)
180 181 try:
181 182 hm = HistoryManager(shell=get_ipython(), config=cfg)
182 183 nt.assert_equal(hm.hist_file, cfg.HistoryManager.hist_file)
183 184 finally:
184 185 try:
185 os.remove(tfile.name)
186 Path(tfile.name).unlink()
186 187 except OSError:
187 188 # same catch as in testing.tools.TempFileMixin
188 189 # On Windows, even though we close the file, we still can't
@@ -197,7 +198,7 b' def test_histmanager_disabled():'
197 198 ip = get_ipython()
198 199 with TemporaryDirectory() as tmpdir:
199 200 hist_manager_ori = ip.history_manager
200 hist_file = os.path.join(tmpdir, 'history.sqlite')
201 hist_file = Path(tmpdir) / 'history.sqlite'
201 202 cfg.HistoryManager.hist_file = hist_file
202 203 try:
203 204 ip.history_manager = HistoryManager(shell=ip, config=cfg)
@@ -211,4 +212,4 b' def test_histmanager_disabled():'
211 212 ip.history_manager = hist_manager_ori
212 213
213 214 # hist_file should not be created
214 nt.assert_false(os.path.exists(hist_file))
215 nt.assert_false(hist_file.exists())
@@ -10,6 +10,7 b' Authors'
10 10 # Distributed under the terms of the Modified BSD License.
11 11
12 12 import os
13 from pathlib import Path
13 14 import re
14 15 import sys
15 16 import tempfile
@@ -142,7 +143,7 b' def default_config():'
142 143 config.TerminalTerminalInteractiveShell.term_title = False,
143 144 config.TerminalInteractiveShell.autocall = 0
144 145 f = tempfile.NamedTemporaryFile(suffix=u'test_hist.sqlite', delete=False)
145 config.HistoryManager.hist_file = f.name
146 config.HistoryManager.hist_file = Path(f.name)
146 147 f.close()
147 148 config.HistoryManager.db_cache_size = 10000
148 149 return config
General Comments 0
You need to be logged in to leave comments. Login now