Show More
@@ -13,11 +13,13 b'' | |||||
13 | from __future__ import print_function |
|
13 | from __future__ import print_function | |
14 |
|
14 | |||
15 | # Stdlib imports |
|
15 | # Stdlib imports | |
|
16 | import atexit | |||
16 | import datetime |
|
17 | import datetime | |
17 | import json |
|
18 | import json | |
18 | import os |
|
19 | import os | |
19 | import re |
|
20 | import re | |
20 | import sqlite3 |
|
21 | import sqlite3 | |
|
22 | import threading | |||
21 |
|
23 | |||
22 | from collections import defaultdict |
|
24 | from collections import defaultdict | |
23 |
|
25 | |||
@@ -76,6 +78,11 b' class HistoryManager(Configurable):' | |||||
76 | db_input_cache = List() |
|
78 | db_input_cache = List() | |
77 | db_output_cache = List() |
|
79 | db_output_cache = List() | |
78 |
|
80 | |||
|
81 | # History saving in separate thread | |||
|
82 | save_thread = Instance('IPython.core.history.HistorySavingThread') | |||
|
83 | # N.B. Event is a function returning an instance of _Event. | |||
|
84 | save_flag = Instance(threading._Event) | |||
|
85 | ||||
79 | # Private interface |
|
86 | # Private interface | |
80 | # Variables used to store the three last inputs from the user. On each new |
|
87 | # Variables used to store the three last inputs from the user. On each new | |
81 | # history update, we populate the user's namespace with these, shifted as |
|
88 | # history update, we populate the user's namespace with these, shifted as | |
@@ -119,6 +126,12 b' class HistoryManager(Configurable):' | |||||
119 | else: |
|
126 | else: | |
120 | # The hist_file is probably :memory: or something else. |
|
127 | # The hist_file is probably :memory: or something else. | |
121 | raise |
|
128 | raise | |
|
129 | ||||
|
130 | self.save_flag = threading.Event() | |||
|
131 | self.db_input_cache_lock = threading.Lock() | |||
|
132 | self.db_output_cache_lock = threading.Lock() | |||
|
133 | self.save_thread = HistorySavingThread(self) | |||
|
134 | self.save_thread.start() | |||
122 |
|
135 | |||
123 | self.new_session() |
|
136 | self.new_session() | |
124 |
|
137 | |||
@@ -139,10 +152,13 b' class HistoryManager(Configurable):' | |||||
139 | PRIMARY KEY (session, line))""") |
|
152 | PRIMARY KEY (session, line))""") | |
140 | self.db.commit() |
|
153 | self.db.commit() | |
141 |
|
154 | |||
142 | def new_session(self): |
|
155 | def new_session(self, conn=None): | |
143 | """Get a new session number.""" |
|
156 | """Get a new session number.""" | |
144 | with self.db: |
|
157 | if conn is None: | |
145 | cur = self.db.execute("""INSERT INTO sessions VALUES (NULL, ?, NULL, |
|
158 | conn = self.db | |
|
159 | ||||
|
160 | with conn: | |||
|
161 | cur = conn.execute("""INSERT INTO sessions VALUES (NULL, ?, NULL, | |||
146 | NULL, "") """, (datetime.datetime.now(),)) |
|
162 | NULL, "") """, (datetime.datetime.now(),)) | |
147 | self.session_number = cur.lastrowid |
|
163 | self.session_number = cur.lastrowid | |
148 |
|
164 | |||
@@ -373,10 +389,11 b' class HistoryManager(Configurable):' | |||||
373 | self.input_hist_parsed.append(source) |
|
389 | self.input_hist_parsed.append(source) | |
374 | self.input_hist_raw.append(source_raw) |
|
390 | self.input_hist_raw.append(source_raw) | |
375 |
|
391 | |||
376 | self.db_input_cache.append((line_num, source, source_raw)) |
|
392 | with self.db_input_cache_lock: | |
377 | # Trigger to flush cache and write to DB. |
|
393 | self.db_input_cache.append((line_num, source, source_raw)) | |
378 | if len(self.db_input_cache) >= self.db_cache_size: |
|
394 | # Trigger to flush cache and write to DB. | |
379 | self.writeout_cache() |
|
395 | if len(self.db_input_cache) >= self.db_cache_size: | |
|
396 | self.save_flag.set() | |||
380 |
|
397 | |||
381 | # update the auto _i variables |
|
398 | # update the auto _i variables | |
382 | self._iii = self._ii |
|
399 | self._iii = self._ii | |
@@ -406,45 +423,90 b' class HistoryManager(Configurable):' | |||||
406 | return |
|
423 | return | |
407 | output = json.dumps(self.output_hist_reprs[line_num]) |
|
424 | output = json.dumps(self.output_hist_reprs[line_num]) | |
408 |
|
425 | |||
409 |
self.db_output_cache |
|
426 | with self.db_output_cache_lock: | |
|
427 | self.db_output_cache.append((line_num, output)) | |||
410 | if self.db_cache_size <= 1: |
|
428 | if self.db_cache_size <= 1: | |
411 |
self. |
|
429 | self.save_flag.set() | |
412 |
|
|
430 | ||
413 | def _writeout_input_cache(self): |
|
431 | def _writeout_input_cache(self, conn): | |
414 |
with |
|
432 | with conn: | |
415 | for line in self.db_input_cache: |
|
433 | for line in self.db_input_cache: | |
416 |
|
|
434 | conn.execute("INSERT INTO history VALUES (?, ?, ?, ?)", | |
417 | (self.session_number,)+line) |
|
435 | (self.session_number,)+line) | |
418 |
|
436 | |||
419 | def _writeout_output_cache(self): |
|
437 | def _writeout_output_cache(self, conn): | |
420 |
with |
|
438 | with conn: | |
421 | for line in self.db_output_cache: |
|
439 | for line in self.db_output_cache: | |
422 |
|
|
440 | conn.execute("INSERT INTO output_history VALUES (?, ?, ?)", | |
423 | (self.session_number,)+line) |
|
441 | (self.session_number,)+line) | |
424 |
|
442 | |||
425 | def writeout_cache(self): |
|
443 | def writeout_cache(self, conn=None): | |
426 | """Write any entries in the cache to the database.""" |
|
444 | """Write any entries in the cache to the database.""" | |
427 | try: |
|
445 | if conn is None: | |
428 | self._writeout_input_cache() |
|
446 | conn = self.db | |
429 | except sqlite3.IntegrityError: |
|
|||
430 | self.new_session() |
|
|||
431 | print("ERROR! Session/line number was not unique in", |
|
|||
432 | "database. History logging moved to new session", |
|
|||
433 | self.session_number) |
|
|||
434 | try: # Try writing to the new session. If this fails, don't recurse |
|
|||
435 | self._writeout_input_cache() |
|
|||
436 | except sqlite3.IntegrityError: |
|
|||
437 | pass |
|
|||
438 | finally: |
|
|||
439 | self.db_input_cache = [] |
|
|||
440 |
|
447 | |||
|
448 | with self.db_input_cache_lock: | |||
|
449 | try: | |||
|
450 | self._writeout_input_cache(conn) | |||
|
451 | except sqlite3.IntegrityError: | |||
|
452 | self.new_session(conn) | |||
|
453 | print("ERROR! Session/line number was not unique in", | |||
|
454 | "database. History logging moved to new session", | |||
|
455 | self.session_number) | |||
|
456 | try: # Try writing to the new session. If this fails, don't recurse | |||
|
457 | self._writeout_input_cache(conn) | |||
|
458 | except sqlite3.IntegrityError: | |||
|
459 | pass | |||
|
460 | finally: | |||
|
461 | self.db_input_cache = [] | |||
|
462 | ||||
|
463 | with self.db_output_cache_lock: | |||
|
464 | try: | |||
|
465 | self._writeout_output_cache(conn) | |||
|
466 | except sqlite3.IntegrityError: | |||
|
467 | print("!! Session/line number for output was not unique", | |||
|
468 | "in database. Output will not be stored.") | |||
|
469 | finally: | |||
|
470 | self.db_output_cache = [] | |||
|
471 | ||||
|
472 | ||||
|
473 | class HistorySavingThread(threading.Thread): | |||
|
474 | """This thread takes care of writing history to the database, so that | |||
|
475 | the UI isn't held up while that happens. | |||
|
476 | ||||
|
477 | It waits for the HistoryManager's save_flag to be set, then writes out | |||
|
478 | the history cache. The main thread is responsible for setting the flag when | |||
|
479 | the cache size reaches a defined threshold.""" | |||
|
480 | daemon = True | |||
|
481 | stop_now = False | |||
|
482 | def __init__(self, history_manager): | |||
|
483 | super(HistorySavingThread, self).__init__() | |||
|
484 | self.history_manager = history_manager | |||
|
485 | atexit.register(self.stop) | |||
|
486 | ||||
|
487 | def run(self): | |||
|
488 | # We need a separate db connection per thread: | |||
441 | try: |
|
489 | try: | |
442 | self._writeout_output_cache() |
|
490 | self.db = sqlite3.connect(self.history_manager.hist_file) | |
443 | except sqlite3.IntegrityError: |
|
491 | while True: | |
444 | print("!! Session/line number for output was not unique", |
|
492 | self.history_manager.save_flag.wait() | |
445 | "in database. Output will not be stored.") |
|
493 | if self.stop_now: | |
446 | finally: |
|
494 | return | |
447 | self.db_output_cache = [] |
|
495 | self.history_manager.save_flag.clear() | |
|
496 | self.history_manager.writeout_cache(self.db) | |||
|
497 | except Exception as e: | |||
|
498 | print(("The history saving thread hit an unexpected error (%s)." | |||
|
499 | "History will not be written to the database.") % repr(e)) | |||
|
500 | ||||
|
501 | def stop(self): | |||
|
502 | """This can be called from the main thread to safely stop this thread. | |||
|
503 | ||||
|
504 | Note that it does not attempt to write out remaining history before | |||
|
505 | exiting. That should be done by calling the HistoryManager's | |||
|
506 | end_session method.""" | |||
|
507 | self.stop_now = True | |||
|
508 | self.history_manager.save_flag.set() | |||
|
509 | self.join() | |||
448 |
|
510 | |||
449 |
|
511 | |||
450 | # To match, e.g. ~5/8-~2/3 |
|
512 | # To match, e.g. ~5/8-~2/3 |
@@ -23,10 +23,10 b' def setUp():' | |||||
23 | def test_history(): |
|
23 | def test_history(): | |
24 | ip = get_ipython() |
|
24 | ip = get_ipython() | |
25 | with TemporaryDirectory() as tmpdir: |
|
25 | with TemporaryDirectory() as tmpdir: | |
26 | # Make a new :memory: DB. |
|
|||
27 | hist_manager_ori = ip.history_manager |
|
26 | hist_manager_ori = ip.history_manager | |
|
27 | hist_file = os.path.join(tmpdir, 'history.sqlite') | |||
28 | try: |
|
28 | try: | |
29 |
ip.history_manager = HistoryManager(shell=ip, hist_file= |
|
29 | ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file) | |
30 | hist = ['a=1', 'def f():\n test = 1\n return test', u"b='€Æ¾÷ß'"] |
|
30 | hist = ['a=1', 'def f():\n test = 1\n return test', u"b='€Æ¾÷ß'"] | |
31 | for i, h in enumerate(hist, start=1): |
|
31 | for i, h in enumerate(hist, start=1): | |
32 | ip.history_manager.store_inputs(i, h) |
|
32 | ip.history_manager.store_inputs(i, h) | |
@@ -38,10 +38,7 b' def test_history():' | |||||
38 |
|
38 | |||
39 | nt.assert_equal(ip.history_manager.input_hist_raw, [''] + hist) |
|
39 | nt.assert_equal(ip.history_manager.input_hist_raw, [''] + hist) | |
40 |
|
40 | |||
41 | # Check lines were written to DB |
|
41 | ||
42 | c = ip.history_manager.db.execute("SELECT source_raw FROM history") |
|
|||
43 | nt.assert_equal([x for x, in c], hist) |
|
|||
44 |
|
||||
45 | # New session |
|
42 | # New session | |
46 | ip.history_manager.reset() |
|
43 | ip.history_manager.reset() | |
47 | newcmds = ["z=5","class X(object):\n pass", "k='p'"] |
|
44 | newcmds = ["z=5","class X(object):\n pass", "k='p'"] | |
@@ -83,6 +80,7 b' def test_history():' | |||||
83 | # Duplicate line numbers - check that it doesn't crash, and |
|
80 | # Duplicate line numbers - check that it doesn't crash, and | |
84 | # gets a new session |
|
81 | # gets a new session | |
85 | ip.history_manager.store_inputs(1, "rogue") |
|
82 | ip.history_manager.store_inputs(1, "rogue") | |
|
83 | ip.history_manager.writeout_cache() | |||
86 | nt.assert_equal(ip.history_manager.session_number, 3) |
|
84 | nt.assert_equal(ip.history_manager.session_number, 3) | |
87 | finally: |
|
85 | finally: | |
88 | # Restore history manager |
|
86 | # Restore history manager |
@@ -167,7 +167,8 b' def default_config():' | |||||
167 | config.TerminalInteractiveShell.colors = 'NoColor' |
|
167 | config.TerminalInteractiveShell.colors = 'NoColor' | |
168 | config.TerminalTerminalInteractiveShell.term_title = False, |
|
168 | config.TerminalTerminalInteractiveShell.term_title = False, | |
169 | config.TerminalInteractiveShell.autocall = 0 |
|
169 | config.TerminalInteractiveShell.autocall = 0 | |
170 |
config.HistoryManager.hist_file = u' |
|
170 | config.HistoryManager.hist_file = u'test_hist.sqlite' | |
|
171 | config.HistoryManager.db_cache_size = 10000 | |||
171 | return config |
|
172 | return config | |
172 |
|
173 | |||
173 |
|
174 |
General Comments 0
You need to be logged in to leave comments.
Login now