##// END OF EJS Templates
Merge branch 'history-threaded' of https://github.com/takluyver/ipython into takluyver-history-threaded
Fernando Perez -
r3730:3df44067 merge
parent child Browse files
Show More
@@ -13,11 +13,13 b''
13 from __future__ import print_function
13 from __future__ import print_function
14
14
15 # Stdlib imports
15 # Stdlib imports
16 import atexit
16 import datetime
17 import datetime
17 import json
18 import json
18 import os
19 import os
19 import re
20 import re
20 import sqlite3
21 import sqlite3
22 import threading
21
23
22 from collections import defaultdict
24 from collections import defaultdict
23
25
@@ -76,6 +78,11 b' class HistoryManager(Configurable):'
76 db_input_cache = List()
78 db_input_cache = List()
77 db_output_cache = List()
79 db_output_cache = List()
78
80
81 # History saving in separate thread
82 save_thread = Instance('IPython.core.history.HistorySavingThread')
83 # N.B. Event is a function returning an instance of _Event.
84 save_flag = Instance(threading._Event)
85
79 # Private interface
86 # Private interface
80 # Variables used to store the three last inputs from the user. On each new
87 # Variables used to store the three last inputs from the user. On each new
81 # history update, we populate the user's namespace with these, shifted as
88 # history update, we populate the user's namespace with these, shifted as
@@ -119,6 +126,12 b' class HistoryManager(Configurable):'
119 else:
126 else:
120 # The hist_file is probably :memory: or something else.
127 # The hist_file is probably :memory: or something else.
121 raise
128 raise
129
130 self.save_flag = threading.Event()
131 self.db_input_cache_lock = threading.Lock()
132 self.db_output_cache_lock = threading.Lock()
133 self.save_thread = HistorySavingThread(self)
134 self.save_thread.start()
122
135
123 self.new_session()
136 self.new_session()
124
137
@@ -139,10 +152,13 b' class HistoryManager(Configurable):'
139 PRIMARY KEY (session, line))""")
152 PRIMARY KEY (session, line))""")
140 self.db.commit()
153 self.db.commit()
141
154
142 def new_session(self):
155 def new_session(self, conn=None):
143 """Get a new session number."""
156 """Get a new session number."""
144 with self.db:
157 if conn is None:
145 cur = self.db.execute("""INSERT INTO sessions VALUES (NULL, ?, NULL,
158 conn = self.db
159
160 with conn:
161 cur = conn.execute("""INSERT INTO sessions VALUES (NULL, ?, NULL,
146 NULL, "") """, (datetime.datetime.now(),))
162 NULL, "") """, (datetime.datetime.now(),))
147 self.session_number = cur.lastrowid
163 self.session_number = cur.lastrowid
148
164
@@ -373,10 +389,11 b' class HistoryManager(Configurable):'
373 self.input_hist_parsed.append(source)
389 self.input_hist_parsed.append(source)
374 self.input_hist_raw.append(source_raw)
390 self.input_hist_raw.append(source_raw)
375
391
376 self.db_input_cache.append((line_num, source, source_raw))
392 with self.db_input_cache_lock:
377 # Trigger to flush cache and write to DB.
393 self.db_input_cache.append((line_num, source, source_raw))
378 if len(self.db_input_cache) >= self.db_cache_size:
394 # Trigger to flush cache and write to DB.
379 self.writeout_cache()
395 if len(self.db_input_cache) >= self.db_cache_size:
396 self.save_flag.set()
380
397
381 # update the auto _i variables
398 # update the auto _i variables
382 self._iii = self._ii
399 self._iii = self._ii
@@ -406,45 +423,90 b' class HistoryManager(Configurable):'
406 return
423 return
407 output = json.dumps(self.output_hist_reprs[line_num])
424 output = json.dumps(self.output_hist_reprs[line_num])
408
425
409 self.db_output_cache.append((line_num, output))
426 with self.db_output_cache_lock:
427 self.db_output_cache.append((line_num, output))
410 if self.db_cache_size <= 1:
428 if self.db_cache_size <= 1:
411 self.writeout_cache()
429 self.save_flag.set()
412
430
413 def _writeout_input_cache(self):
431 def _writeout_input_cache(self, conn):
414 with self.db:
432 with conn:
415 for line in self.db_input_cache:
433 for line in self.db_input_cache:
416 self.db.execute("INSERT INTO history VALUES (?, ?, ?, ?)",
434 conn.execute("INSERT INTO history VALUES (?, ?, ?, ?)",
417 (self.session_number,)+line)
435 (self.session_number,)+line)
418
436
419 def _writeout_output_cache(self):
437 def _writeout_output_cache(self, conn):
420 with self.db:
438 with conn:
421 for line in self.db_output_cache:
439 for line in self.db_output_cache:
422 self.db.execute("INSERT INTO output_history VALUES (?, ?, ?)",
440 conn.execute("INSERT INTO output_history VALUES (?, ?, ?)",
423 (self.session_number,)+line)
441 (self.session_number,)+line)
424
442
425 def writeout_cache(self):
443 def writeout_cache(self, conn=None):
426 """Write any entries in the cache to the database."""
444 """Write any entries in the cache to the database."""
427 try:
445 if conn is None:
428 self._writeout_input_cache()
446 conn = self.db
429 except sqlite3.IntegrityError:
430 self.new_session()
431 print("ERROR! Session/line number was not unique in",
432 "database. History logging moved to new session",
433 self.session_number)
434 try: # Try writing to the new session. If this fails, don't recurse
435 self._writeout_input_cache()
436 except sqlite3.IntegrityError:
437 pass
438 finally:
439 self.db_input_cache = []
440
447
448 with self.db_input_cache_lock:
449 try:
450 self._writeout_input_cache(conn)
451 except sqlite3.IntegrityError:
452 self.new_session(conn)
453 print("ERROR! Session/line number was not unique in",
454 "database. History logging moved to new session",
455 self.session_number)
456 try: # Try writing to the new session. If this fails, don't recurse
457 self._writeout_input_cache(conn)
458 except sqlite3.IntegrityError:
459 pass
460 finally:
461 self.db_input_cache = []
462
463 with self.db_output_cache_lock:
464 try:
465 self._writeout_output_cache(conn)
466 except sqlite3.IntegrityError:
467 print("!! Session/line number for output was not unique",
468 "in database. Output will not be stored.")
469 finally:
470 self.db_output_cache = []
471
472
473 class HistorySavingThread(threading.Thread):
474 """This thread takes care of writing history to the database, so that
475 the UI isn't held up while that happens.
476
477 It waits for the HistoryManager's save_flag to be set, then writes out
478 the history cache. The main thread is responsible for setting the flag when
479 the cache size reaches a defined threshold."""
480 daemon = True
481 stop_now = False
482 def __init__(self, history_manager):
483 super(HistorySavingThread, self).__init__()
484 self.history_manager = history_manager
485 atexit.register(self.stop)
486
487 def run(self):
488 # We need a separate db connection per thread:
441 try:
489 try:
442 self._writeout_output_cache()
490 self.db = sqlite3.connect(self.history_manager.hist_file)
443 except sqlite3.IntegrityError:
491 while True:
444 print("!! Session/line number for output was not unique",
492 self.history_manager.save_flag.wait()
445 "in database. Output will not be stored.")
493 if self.stop_now:
446 finally:
494 return
447 self.db_output_cache = []
495 self.history_manager.save_flag.clear()
496 self.history_manager.writeout_cache(self.db)
497 except Exception as e:
498 print(("The history saving thread hit an unexpected error (%s)."
499 "History will not be written to the database.") % repr(e))
500
501 def stop(self):
502 """This can be called from the main thread to safely stop this thread.
503
504 Note that it does not attempt to write out remaining history before
505 exiting. That should be done by calling the HistoryManager's
506 end_session method."""
507 self.stop_now = True
508 self.history_manager.save_flag.set()
509 self.join()
448
510
449
511
450 # To match, e.g. ~5/8-~2/3
512 # To match, e.g. ~5/8-~2/3
@@ -23,10 +23,10 b' def setUp():'
23 def test_history():
23 def test_history():
24 ip = get_ipython()
24 ip = get_ipython()
25 with TemporaryDirectory() as tmpdir:
25 with TemporaryDirectory() as tmpdir:
26 # Make a new :memory: DB.
27 hist_manager_ori = ip.history_manager
26 hist_manager_ori = ip.history_manager
27 hist_file = os.path.join(tmpdir, 'history.sqlite')
28 try:
28 try:
29 ip.history_manager = HistoryManager(shell=ip, hist_file=':memory:')
29 ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file)
30 hist = ['a=1', 'def f():\n test = 1\n return test', u"b='€Æ¾÷ß'"]
30 hist = ['a=1', 'def f():\n test = 1\n return test', u"b='€Æ¾÷ß'"]
31 for i, h in enumerate(hist, start=1):
31 for i, h in enumerate(hist, start=1):
32 ip.history_manager.store_inputs(i, h)
32 ip.history_manager.store_inputs(i, h)
@@ -38,10 +38,7 b' def test_history():'
38
38
39 nt.assert_equal(ip.history_manager.input_hist_raw, [''] + hist)
39 nt.assert_equal(ip.history_manager.input_hist_raw, [''] + hist)
40
40
41 # Check lines were written to DB
41
42 c = ip.history_manager.db.execute("SELECT source_raw FROM history")
43 nt.assert_equal([x for x, in c], hist)
44
45 # New session
42 # New session
46 ip.history_manager.reset()
43 ip.history_manager.reset()
47 newcmds = ["z=5","class X(object):\n pass", "k='p'"]
44 newcmds = ["z=5","class X(object):\n pass", "k='p'"]
@@ -83,6 +80,7 b' def test_history():'
83 # Duplicate line numbers - check that it doesn't crash, and
80 # Duplicate line numbers - check that it doesn't crash, and
84 # gets a new session
81 # gets a new session
85 ip.history_manager.store_inputs(1, "rogue")
82 ip.history_manager.store_inputs(1, "rogue")
83 ip.history_manager.writeout_cache()
86 nt.assert_equal(ip.history_manager.session_number, 3)
84 nt.assert_equal(ip.history_manager.session_number, 3)
87 finally:
85 finally:
88 # Restore history manager
86 # Restore history manager
@@ -167,7 +167,8 b' def default_config():'
167 config.TerminalInteractiveShell.colors = 'NoColor'
167 config.TerminalInteractiveShell.colors = 'NoColor'
168 config.TerminalTerminalInteractiveShell.term_title = False,
168 config.TerminalTerminalInteractiveShell.term_title = False,
169 config.TerminalInteractiveShell.autocall = 0
169 config.TerminalInteractiveShell.autocall = 0
170 config.HistoryManager.hist_file = u':memory:'
170 config.HistoryManager.hist_file = u'test_hist.sqlite'
171 config.HistoryManager.db_cache_size = 10000
171 return config
172 return config
172
173
173
174
General Comments 0
You need to be logged in to leave comments. Login now