##// END OF EJS Templates
Use pathlib in historyapp.py
Gal B -
Show More
@@ -1,161 +1,161 b''
1 1 # encoding: utf-8
2 2 """
3 3 An application for managing IPython history.
4 4
5 5 To be invoked as the `ipython history` subcommand.
6 6 """
7 7
8 import os
9 8 import sqlite3
9 from pathlib import Path
10 10
11 11 from traitlets.config.application import Application
12 12 from .application import BaseIPythonApplication
13 13 from traitlets import Bool, Int, Dict
14 14 from ..utils.io import ask_yes_no
15 15
16 16 trim_hist_help = """Trim the IPython history database to the last 1000 entries.
17 17
18 18 This actually copies the last 1000 entries to a new database, and then replaces
19 19 the old file with the new. Use the `--keep=` argument to specify a number
20 20 other than 1000.
21 21 """
22 22
23 23 clear_hist_help = """Clear the IPython history database, deleting all entries.
24 24
25 25 Because this is a destructive operation, IPython will prompt the user if they
26 26 really want to do this. Passing a `-f` flag will force clearing without a
27 27 prompt.
28 28
29 29 This is an handy alias to `ipython history trim --keep=0`
30 30 """
31 31
32 32
33 33 class HistoryTrim(BaseIPythonApplication):
34 34 description = trim_hist_help
35 35
36 36 backup = Bool(False,
37 37 help="Keep the old history file as history.sqlite.<N>"
38 38 ).tag(config=True)
39 39
40 40 keep = Int(1000,
41 41 help="Number of recent lines to keep in the database."
42 42 ).tag(config=True)
43 43
44 44 flags = Dict(dict(
45 45 backup = ({'HistoryTrim' : {'backup' : True}},
46 46 backup.help
47 47 )
48 48 ))
49 49
50 50 aliases=Dict(dict(
51 51 keep = 'HistoryTrim.keep'
52 52 ))
53 53
54 54 def start(self):
55 profile_dir = self.profile_dir.location
56 hist_file = os.path.join(profile_dir, 'history.sqlite')
55 profile_dir = Path(self.profile_dir.location)
56 hist_file = profile_dir / "history.sqlite"
57 57 con = sqlite3.connect(hist_file)
58 58
59 59 # Grab the recent history from the current database.
60 60 inputs = list(con.execute('SELECT session, line, source, source_raw FROM '
61 61 'history ORDER BY session DESC, line DESC LIMIT ?', (self.keep+1,)))
62 62 if len(inputs) <= self.keep:
63 63 print("There are already at most %d entries in the history database." % self.keep)
64 64 print("Not doing anything. Use --keep= argument to keep fewer entries")
65 65 return
66 66
67 67 print("Trimming history to the most recent %d entries." % self.keep)
68 68
69 69 inputs.pop() # Remove the extra element we got to check the length.
70 70 inputs.reverse()
71 71 if inputs:
72 72 first_session = inputs[0][0]
73 73 outputs = list(con.execute('SELECT session, line, output FROM '
74 74 'output_history WHERE session >= ?', (first_session,)))
75 75 sessions = list(con.execute('SELECT session, start, end, num_cmds, remark FROM '
76 76 'sessions WHERE session >= ?', (first_session,)))
77 77 con.close()
78 78
79 79 # Create the new history database.
80 new_hist_file = os.path.join(profile_dir, 'history.sqlite.new')
80 new_hist_file = profile_dir / "history.sqlite.new"
81 81 i = 0
82 while os.path.exists(new_hist_file):
82 while new_hist_file.exists():
83 83 # Make sure we don't interfere with an existing file.
84 84 i += 1
85 new_hist_file = os.path.join(profile_dir, 'history.sqlite.new'+str(i))
85 new_hist_file = profile_dir / ("history.sqlite.new" + str(i))
86 86 new_db = sqlite3.connect(new_hist_file)
87 87 new_db.execute("""CREATE TABLE IF NOT EXISTS sessions (session integer
88 88 primary key autoincrement, start timestamp,
89 89 end timestamp, num_cmds integer, remark text)""")
90 90 new_db.execute("""CREATE TABLE IF NOT EXISTS history
91 91 (session integer, line integer, source text, source_raw text,
92 92 PRIMARY KEY (session, line))""")
93 93 new_db.execute("""CREATE TABLE IF NOT EXISTS output_history
94 94 (session integer, line integer, output text,
95 95 PRIMARY KEY (session, line))""")
96 96 new_db.commit()
97 97
98 98
99 99 if inputs:
100 100 with new_db:
101 101 # Add the recent history into the new database.
102 102 new_db.executemany('insert into sessions values (?,?,?,?,?)', sessions)
103 103 new_db.executemany('insert into history values (?,?,?,?)', inputs)
104 104 new_db.executemany('insert into output_history values (?,?,?)', outputs)
105 105 new_db.close()
106 106
107 107 if self.backup:
108 108 i = 1
109 backup_hist_file = os.path.join(profile_dir, 'history.sqlite.old.%d' % i)
110 while os.path.exists(backup_hist_file):
109 backup_hist_file = profile_dir / ("history.sqlite.old.%d" % i)
110 while backup_hist_file.exists():
111 111 i += 1
112 backup_hist_file = os.path.join(profile_dir, 'history.sqlite.old.%d' % i)
113 os.rename(hist_file, backup_hist_file)
112 backup_hist_file = profile_dir / ("history.sqlite.old.%d" % i)
113 hist_file.rename(backup_hist_file)
114 114 print("Backed up longer history file to", backup_hist_file)
115 115 else:
116 os.remove(hist_file)
116 hist_file.unlink()
117 117
118 os.rename(new_hist_file, hist_file)
118 new_hist_file.rename(hist_file)
119 119
120 120 class HistoryClear(HistoryTrim):
121 121 description = clear_hist_help
122 122 keep = Int(0,
123 123 help="Number of recent lines to keep in the database.")
124 124
125 125 force = Bool(False,
126 126 help="Don't prompt user for confirmation"
127 127 ).tag(config=True)
128 128
129 129 flags = Dict(dict(
130 130 force = ({'HistoryClear' : {'force' : True}},
131 131 force.help),
132 132 f = ({'HistoryTrim' : {'force' : True}},
133 133 force.help
134 134 )
135 135 ))
136 136 aliases = Dict()
137 137
138 138 def start(self):
139 139 if self.force or ask_yes_no("Really delete all ipython history? ",
140 140 default="no", interrupt="no"):
141 141 HistoryTrim.start(self)
142 142
143 143 class HistoryApp(Application):
144 144 name = u'ipython-history'
145 145 description = "Manage the IPython history database."
146 146
147 147 subcommands = Dict(dict(
148 148 trim = (HistoryTrim, HistoryTrim.description.splitlines()[0]),
149 149 clear = (HistoryClear, HistoryClear.description.splitlines()[0]),
150 150 ))
151 151
152 152 def start(self):
153 153 if self.subapp is None:
154 154 print("No subcommand specified. Must specify one of: %s" % \
155 155 (self.subcommands.keys()))
156 156 print()
157 157 self.print_description()
158 158 self.print_subcommands()
159 159 self.exit(1)
160 160 else:
161 161 return self.subapp.start()
General Comments 0
You need to be logged in to leave comments. Login now