##// END OF EJS Templates
Use pathlib in historyapp.py
Gal B -
Show More
@@ -1,161 +1,161 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 An application for managing IPython history.
3 An application for managing IPython history.
4
4
5 To be invoked as the `ipython history` subcommand.
5 To be invoked as the `ipython history` subcommand.
6 """
6 """
7
7
8 import os
9 import sqlite3
8 import sqlite3
9 from pathlib import Path
10
10
11 from traitlets.config.application import Application
11 from traitlets.config.application import Application
12 from .application import BaseIPythonApplication
12 from .application import BaseIPythonApplication
13 from traitlets import Bool, Int, Dict
13 from traitlets import Bool, Int, Dict
14 from ..utils.io import ask_yes_no
14 from ..utils.io import ask_yes_no
15
15
16 trim_hist_help = """Trim the IPython history database to the last 1000 entries.
16 trim_hist_help = """Trim the IPython history database to the last 1000 entries.
17
17
18 This actually copies the last 1000 entries to a new database, and then replaces
18 This actually copies the last 1000 entries to a new database, and then replaces
19 the old file with the new. Use the `--keep=` argument to specify a number
19 the old file with the new. Use the `--keep=` argument to specify a number
20 other than 1000.
20 other than 1000.
21 """
21 """
22
22
23 clear_hist_help = """Clear the IPython history database, deleting all entries.
23 clear_hist_help = """Clear the IPython history database, deleting all entries.
24
24
25 Because this is a destructive operation, IPython will prompt the user if they
25 Because this is a destructive operation, IPython will prompt the user if they
26 really want to do this. Passing a `-f` flag will force clearing without a
26 really want to do this. Passing a `-f` flag will force clearing without a
27 prompt.
27 prompt.
28
28
29 This is an handy alias to `ipython history trim --keep=0`
29 This is an handy alias to `ipython history trim --keep=0`
30 """
30 """
31
31
32
32
33 class HistoryTrim(BaseIPythonApplication):
33 class HistoryTrim(BaseIPythonApplication):
34 description = trim_hist_help
34 description = trim_hist_help
35
35
36 backup = Bool(False,
36 backup = Bool(False,
37 help="Keep the old history file as history.sqlite.<N>"
37 help="Keep the old history file as history.sqlite.<N>"
38 ).tag(config=True)
38 ).tag(config=True)
39
39
40 keep = Int(1000,
40 keep = Int(1000,
41 help="Number of recent lines to keep in the database."
41 help="Number of recent lines to keep in the database."
42 ).tag(config=True)
42 ).tag(config=True)
43
43
44 flags = Dict(dict(
44 flags = Dict(dict(
45 backup = ({'HistoryTrim' : {'backup' : True}},
45 backup = ({'HistoryTrim' : {'backup' : True}},
46 backup.help
46 backup.help
47 )
47 )
48 ))
48 ))
49
49
50 aliases=Dict(dict(
50 aliases=Dict(dict(
51 keep = 'HistoryTrim.keep'
51 keep = 'HistoryTrim.keep'
52 ))
52 ))
53
53
54 def start(self):
54 def start(self):
55 profile_dir = self.profile_dir.location
55 profile_dir = Path(self.profile_dir.location)
56 hist_file = os.path.join(profile_dir, 'history.sqlite')
56 hist_file = profile_dir / "history.sqlite"
57 con = sqlite3.connect(hist_file)
57 con = sqlite3.connect(hist_file)
58
58
59 # Grab the recent history from the current database.
59 # Grab the recent history from the current database.
60 inputs = list(con.execute('SELECT session, line, source, source_raw FROM '
60 inputs = list(con.execute('SELECT session, line, source, source_raw FROM '
61 'history ORDER BY session DESC, line DESC LIMIT ?', (self.keep+1,)))
61 'history ORDER BY session DESC, line DESC LIMIT ?', (self.keep+1,)))
62 if len(inputs) <= self.keep:
62 if len(inputs) <= self.keep:
63 print("There are already at most %d entries in the history database." % self.keep)
63 print("There are already at most %d entries in the history database." % self.keep)
64 print("Not doing anything. Use --keep= argument to keep fewer entries")
64 print("Not doing anything. Use --keep= argument to keep fewer entries")
65 return
65 return
66
66
67 print("Trimming history to the most recent %d entries." % self.keep)
67 print("Trimming history to the most recent %d entries." % self.keep)
68
68
69 inputs.pop() # Remove the extra element we got to check the length.
69 inputs.pop() # Remove the extra element we got to check the length.
70 inputs.reverse()
70 inputs.reverse()
71 if inputs:
71 if inputs:
72 first_session = inputs[0][0]
72 first_session = inputs[0][0]
73 outputs = list(con.execute('SELECT session, line, output FROM '
73 outputs = list(con.execute('SELECT session, line, output FROM '
74 'output_history WHERE session >= ?', (first_session,)))
74 'output_history WHERE session >= ?', (first_session,)))
75 sessions = list(con.execute('SELECT session, start, end, num_cmds, remark FROM '
75 sessions = list(con.execute('SELECT session, start, end, num_cmds, remark FROM '
76 'sessions WHERE session >= ?', (first_session,)))
76 'sessions WHERE session >= ?', (first_session,)))
77 con.close()
77 con.close()
78
78
79 # Create the new history database.
79 # Create the new history database.
80 new_hist_file = os.path.join(profile_dir, 'history.sqlite.new')
80 new_hist_file = profile_dir / "history.sqlite.new"
81 i = 0
81 i = 0
82 while os.path.exists(new_hist_file):
82 while new_hist_file.exists():
83 # Make sure we don't interfere with an existing file.
83 # Make sure we don't interfere with an existing file.
84 i += 1
84 i += 1
85 new_hist_file = os.path.join(profile_dir, 'history.sqlite.new'+str(i))
85 new_hist_file = profile_dir / ("history.sqlite.new" + str(i))
86 new_db = sqlite3.connect(new_hist_file)
86 new_db = sqlite3.connect(new_hist_file)
87 new_db.execute("""CREATE TABLE IF NOT EXISTS sessions (session integer
87 new_db.execute("""CREATE TABLE IF NOT EXISTS sessions (session integer
88 primary key autoincrement, start timestamp,
88 primary key autoincrement, start timestamp,
89 end timestamp, num_cmds integer, remark text)""")
89 end timestamp, num_cmds integer, remark text)""")
90 new_db.execute("""CREATE TABLE IF NOT EXISTS history
90 new_db.execute("""CREATE TABLE IF NOT EXISTS history
91 (session integer, line integer, source text, source_raw text,
91 (session integer, line integer, source text, source_raw text,
92 PRIMARY KEY (session, line))""")
92 PRIMARY KEY (session, line))""")
93 new_db.execute("""CREATE TABLE IF NOT EXISTS output_history
93 new_db.execute("""CREATE TABLE IF NOT EXISTS output_history
94 (session integer, line integer, output text,
94 (session integer, line integer, output text,
95 PRIMARY KEY (session, line))""")
95 PRIMARY KEY (session, line))""")
96 new_db.commit()
96 new_db.commit()
97
97
98
98
99 if inputs:
99 if inputs:
100 with new_db:
100 with new_db:
101 # Add the recent history into the new database.
101 # Add the recent history into the new database.
102 new_db.executemany('insert into sessions values (?,?,?,?,?)', sessions)
102 new_db.executemany('insert into sessions values (?,?,?,?,?)', sessions)
103 new_db.executemany('insert into history values (?,?,?,?)', inputs)
103 new_db.executemany('insert into history values (?,?,?,?)', inputs)
104 new_db.executemany('insert into output_history values (?,?,?)', outputs)
104 new_db.executemany('insert into output_history values (?,?,?)', outputs)
105 new_db.close()
105 new_db.close()
106
106
107 if self.backup:
107 if self.backup:
108 i = 1
108 i = 1
109 backup_hist_file = os.path.join(profile_dir, 'history.sqlite.old.%d' % i)
109 backup_hist_file = profile_dir / ("history.sqlite.old.%d" % i)
110 while os.path.exists(backup_hist_file):
110 while backup_hist_file.exists():
111 i += 1
111 i += 1
112 backup_hist_file = os.path.join(profile_dir, 'history.sqlite.old.%d' % i)
112 backup_hist_file = profile_dir / ("history.sqlite.old.%d" % i)
113 os.rename(hist_file, backup_hist_file)
113 hist_file.rename(backup_hist_file)
114 print("Backed up longer history file to", backup_hist_file)
114 print("Backed up longer history file to", backup_hist_file)
115 else:
115 else:
116 os.remove(hist_file)
116 hist_file.unlink()
117
117
118 os.rename(new_hist_file, hist_file)
118 new_hist_file.rename(hist_file)
119
119
120 class HistoryClear(HistoryTrim):
120 class HistoryClear(HistoryTrim):
121 description = clear_hist_help
121 description = clear_hist_help
122 keep = Int(0,
122 keep = Int(0,
123 help="Number of recent lines to keep in the database.")
123 help="Number of recent lines to keep in the database.")
124
124
125 force = Bool(False,
125 force = Bool(False,
126 help="Don't prompt user for confirmation"
126 help="Don't prompt user for confirmation"
127 ).tag(config=True)
127 ).tag(config=True)
128
128
129 flags = Dict(dict(
129 flags = Dict(dict(
130 force = ({'HistoryClear' : {'force' : True}},
130 force = ({'HistoryClear' : {'force' : True}},
131 force.help),
131 force.help),
132 f = ({'HistoryTrim' : {'force' : True}},
132 f = ({'HistoryTrim' : {'force' : True}},
133 force.help
133 force.help
134 )
134 )
135 ))
135 ))
136 aliases = Dict()
136 aliases = Dict()
137
137
138 def start(self):
138 def start(self):
139 if self.force or ask_yes_no("Really delete all ipython history? ",
139 if self.force or ask_yes_no("Really delete all ipython history? ",
140 default="no", interrupt="no"):
140 default="no", interrupt="no"):
141 HistoryTrim.start(self)
141 HistoryTrim.start(self)
142
142
143 class HistoryApp(Application):
143 class HistoryApp(Application):
144 name = u'ipython-history'
144 name = u'ipython-history'
145 description = "Manage the IPython history database."
145 description = "Manage the IPython history database."
146
146
147 subcommands = Dict(dict(
147 subcommands = Dict(dict(
148 trim = (HistoryTrim, HistoryTrim.description.splitlines()[0]),
148 trim = (HistoryTrim, HistoryTrim.description.splitlines()[0]),
149 clear = (HistoryClear, HistoryClear.description.splitlines()[0]),
149 clear = (HistoryClear, HistoryClear.description.splitlines()[0]),
150 ))
150 ))
151
151
152 def start(self):
152 def start(self):
153 if self.subapp is None:
153 if self.subapp is None:
154 print("No subcommand specified. Must specify one of: %s" % \
154 print("No subcommand specified. Must specify one of: %s" % \
155 (self.subcommands.keys()))
155 (self.subcommands.keys()))
156 print()
156 print()
157 self.print_description()
157 self.print_description()
158 self.print_subcommands()
158 self.print_subcommands()
159 self.exit(1)
159 self.exit(1)
160 else:
160 else:
161 return self.subapp.start()
161 return self.subapp.start()
General Comments 0
You need to be logged in to leave comments. Login now