historyapp.py
159 lines
| 5.9 KiB
| text/x-python
|
PythonLexer
Thomas Kluyver
|
r9723 | # encoding: utf-8 | ||
""" | ||||
An application for managing IPython history. | ||||
To be invoked as the `ipython history` subcommand. | ||||
""" | ||||
from __future__ import print_function | ||||
import os | ||||
import sqlite3 | ||||
from IPython.config.application import Application | ||||
from IPython.core.application import BaseIPythonApplication | ||||
from IPython.utils.traitlets import Bool, Int, Dict | ||||
Paul Ivanov
|
r13612 | from IPython.utils.io import ask_yes_no | ||
Thomas Kluyver
|
r9723 | |||
trim_hist_help = """Trim the IPython history database to the last 1000 entries. | ||||
This actually copies the last 1000 entries to a new database, and then replaces | ||||
Paul Ivanov
|
r13607 | the old file with the new. Use the `--keep=` argument to specify a number | ||
other than 1000. | ||||
Thomas Kluyver
|
r9723 | """ | ||
Paul Ivanov
|
r13612 | clear_hist_help = """Clear the IPython history database, deleting all entries. | ||
Because this is a destructive operation, IPython will prompt the user if they | ||||
really want to do this. Passing a `-f` flag will force clearing without a | ||||
prompt. | ||||
This is an handy alias to `ipython history trim --keep=0` | ||||
""" | ||||
Thomas Kluyver
|
r9723 | class HistoryTrim(BaseIPythonApplication): | ||
description = trim_hist_help | ||||
backup = Bool(False, config=True, | ||||
help="Keep the old history file as history.sqlite.<N>") | ||||
keep = Int(1000, config=True, | ||||
help="Number of recent lines to keep in the database.") | ||||
flags = Dict(dict( | ||||
backup = ({'HistoryTrim' : {'backup' : True}}, | ||||
Paul Ivanov
|
r13601 | backup.get_metadata('help') | ||
Thomas Kluyver
|
r9723 | ) | ||
)) | ||||
Paul Ivanov
|
r13601 | |||
aliases=Dict(dict( | ||||
keep = 'HistoryTrim.keep' | ||||
)) | ||||
Thomas Kluyver
|
r9723 | |||
def start(self): | ||||
profile_dir = self.profile_dir.location | ||||
hist_file = os.path.join(profile_dir, 'history.sqlite') | ||||
con = sqlite3.connect(hist_file) | ||||
# Grab the recent history from the current database. | ||||
inputs = list(con.execute('SELECT session, line, source, source_raw FROM ' | ||||
'history ORDER BY session DESC, line DESC LIMIT ?', (self.keep+1,))) | ||||
if len(inputs) <= self.keep: | ||||
print("There are already at most %d entries in the history database." % self.keep) | ||||
Paul Ivanov
|
r13601 | print("Not doing anything. Use --keep= argument to keep fewer entries") | ||
Thomas Kluyver
|
r9723 | return | ||
print("Trimming history to the most recent %d entries." % self.keep) | ||||
inputs.pop() # Remove the extra element we got to check the length. | ||||
inputs.reverse() | ||||
Paul Ivanov
|
r13611 | if inputs: | ||
first_session = inputs[0][0] | ||||
outputs = list(con.execute('SELECT session, line, output FROM ' | ||||
'output_history WHERE session >= ?', (first_session,))) | ||||
sessions = list(con.execute('SELECT session, start, end, num_cmds, remark FROM ' | ||||
'sessions WHERE session >= ?', (first_session,))) | ||||
Thomas Kluyver
|
r9723 | con.close() | ||
# Create the new history database. | ||||
new_hist_file = os.path.join(profile_dir, 'history.sqlite.new') | ||||
i = 0 | ||||
while os.path.exists(new_hist_file): | ||||
# Make sure we don't interfere with an existing file. | ||||
i += 1 | ||||
new_hist_file = os.path.join(profile_dir, 'history.sqlite.new'+str(i)) | ||||
new_db = sqlite3.connect(new_hist_file) | ||||
new_db.execute("""CREATE TABLE IF NOT EXISTS sessions (session integer | ||||
primary key autoincrement, start timestamp, | ||||
end timestamp, num_cmds integer, remark text)""") | ||||
new_db.execute("""CREATE TABLE IF NOT EXISTS history | ||||
(session integer, line integer, source text, source_raw text, | ||||
PRIMARY KEY (session, line))""") | ||||
new_db.execute("""CREATE TABLE IF NOT EXISTS output_history | ||||
(session integer, line integer, output text, | ||||
PRIMARY KEY (session, line))""") | ||||
new_db.commit() | ||||
Paul Ivanov
|
r13611 | if inputs: | ||
with new_db: | ||||
# Add the recent history into the new database. | ||||
new_db.executemany('insert into sessions values (?,?,?,?,?)', sessions) | ||||
new_db.executemany('insert into history values (?,?,?,?)', inputs) | ||||
new_db.executemany('insert into output_history values (?,?,?)', outputs) | ||||
Thomas Kluyver
|
r9723 | new_db.close() | ||
if self.backup: | ||||
i = 1 | ||||
backup_hist_file = os.path.join(profile_dir, 'history.sqlite.old.%d' % i) | ||||
while os.path.exists(backup_hist_file): | ||||
i += 1 | ||||
backup_hist_file = os.path.join(profile_dir, 'history.sqlite.old.%d' % i) | ||||
os.rename(hist_file, backup_hist_file) | ||||
print("Backed up longer history file to", backup_hist_file) | ||||
else: | ||||
os.remove(hist_file) | ||||
os.rename(new_hist_file, hist_file) | ||||
Paul Ivanov
|
r13612 | class HistoryClear(HistoryTrim): | ||
description = clear_hist_help | ||||
keep = Int(0, config=False, | ||||
help="Number of recent lines to keep in the database.") | ||||
force = Bool(False, config=True, | ||||
help="Don't prompt user for confirmation") | ||||
flags = Dict(dict( | ||||
force = ({'HistoryClear' : {'force' : True}}, | ||||
force.get_metadata('help')), | ||||
f = ({'HistoryTrim' : {'force' : True}}, | ||||
force.get_metadata('help') | ||||
) | ||||
)) | ||||
aliases = Dict() | ||||
def start(self): | ||||
if self.force or ask_yes_no("Really delete all ipython history? ", | ||||
default="no", interrupt="no"): | ||||
HistoryTrim.start(self) | ||||
Thomas Kluyver
|
r9723 | |||
class HistoryApp(Application): | ||||
name = u'ipython-history' | ||||
description = "Manage the IPython history database." | ||||
subcommands = Dict(dict( | ||||
trim = (HistoryTrim, HistoryTrim.description.splitlines()[0]), | ||||
Paul Ivanov
|
r13612 | clear = (HistoryClear, HistoryClear.description.splitlines()[0]), | ||
Thomas Kluyver
|
r9723 | )) | ||
def start(self): | ||||
if self.subapp is None: | ||||
print("No subcommand specified. Must specify one of: %s" % \ | ||||
(self.subcommands.keys())) | ||||
print() | ||||
self.print_description() | ||||
self.print_subcommands() | ||||
self.exit(1) | ||||
else: | ||||
return self.subapp.start() | ||||