##// END OF EJS Templates
Simplify magic_history display code, allow get_hist_search to include output in return values, and add some unit tests.
Thomas Kluyver -
Show More
@@ -1,598 +1,589 b''
1 1 """ History related magics and functionality """
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team.
4 4 #
5 5 # Distributed under the terms of the BSD License.
6 6 #
7 7 # The full license is in the file COPYING.txt, distributed with this software.
8 8 #-----------------------------------------------------------------------------
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Imports
12 12 #-----------------------------------------------------------------------------
13 13 from __future__ import print_function
14 14
15 15 # Stdlib imports
16 16 import os
17 17 import re
18 18 import sqlite3
19 19
20 20 # Our own packages
21 21 from IPython.config.configurable import Configurable
22 22 import IPython.utils.io
23 23
24 24 from IPython.testing import decorators as testdec
25 25 from IPython.utils.io import ask_yes_no
26 26 from IPython.utils.traitlets import Bool, Dict, Instance, Int, List, Unicode
27 27 from IPython.utils.warn import warn
28 28
29 29 #-----------------------------------------------------------------------------
30 30 # Classes and functions
31 31 #-----------------------------------------------------------------------------
32 32
33 33 class HistoryManager(Configurable):
34 34 """A class to organize all history-related functionality in one place.
35 35 """
36 36 # Public interface
37 37
38 38 # An instance of the IPython shell we are attached to
39 39 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
40 40 # Lists to hold processed and raw history. These start with a blank entry
41 41 # so that we can index them starting from 1
42 42 input_hist_parsed = List([""])
43 43 input_hist_raw = List([""])
44 44 # A list of directories visited during session
45 45 dir_hist = List()
46 46 # A dict of output history, keyed with ints from the shell's execution count
47 47 output_hist = Dict()
48 48 # String holding the path to the history file
49 49 hist_file = Unicode()
50 50 # The SQLite database
51 51 db = Instance(sqlite3.Connection)
52 52 # The number of the current session in the history database
53 53 session_number = Int()
54 54 # Should we log output to the database? (default no)
55 55 db_log_output = Bool(False, config=True)
56 56 # Write to database every x commands (higher values save disk access & power)
57 57 # Values of 1 or less effectively disable caching.
58 58 db_cache_size = Int(0, config=True)
59 59 # The input and output caches
60 60 db_input_cache = List()
61 61 db_output_cache = List()
62 62
63 63 # Private interface
64 64 # Variables used to store the three last inputs from the user. On each new
65 65 # history update, we populate the user's namespace with these, shifted as
66 66 # necessary.
67 67 _i00, _i, _ii, _iii = '','','',''
68 68
69 69 # A set with all forms of the exit command, so that we don't store them in
70 70 # the history (it's annoying to rewind the first entry and land on an exit
71 71 # call).
72 72 _exit_commands = None
73 73
74 74 def __init__(self, shell, config=None):
75 75 """Create a new history manager associated with a shell instance.
76 76 """
77 77 # We need a pointer back to the shell for various tasks.
78 78 super(HistoryManager, self).__init__(shell=shell, config=config)
79 79
80 80 # list of visited directories
81 81 try:
82 82 self.dir_hist = [os.getcwd()]
83 83 except OSError:
84 84 self.dir_hist = []
85 85
86 86 # Now the history file
87 87 if shell.profile:
88 88 histfname = 'history-%s' % shell.profile
89 89 else:
90 90 histfname = 'history'
91 91 self.hist_file = os.path.join(shell.ipython_dir, histfname + '.sqlite')
92 92 self.init_db()
93 93
94 94 self._i00, self._i, self._ii, self._iii = '','','',''
95 95
96 96 self._exit_commands = set(['Quit', 'quit', 'Exit', 'exit', '%Quit',
97 97 '%quit', '%Exit', '%exit'])
98 98
99 99 def init_db(self):
100 100 """Connect to the database and get new session number."""
101 101 self.db = sqlite3.connect(self.hist_file)
102 102 self.db.execute("""CREATE TABLE IF NOT EXISTS history
103 103 (session integer, line integer, source text, source_raw text,
104 104 PRIMARY KEY (session, line))""")
105 105 # Output history is optional, but ensure the table's there so it can be
106 106 # enabled later.
107 107 self.db.execute("""CREATE TABLE IF NOT EXISTS output_history
108 108 (session integer, line integer, output text,
109 109 PRIMARY KEY (session, line))""")
110 110 cur = self.db.execute("""SELECT name FROM sqlite_master WHERE
111 111 type='table' AND name='singletons'""")
112 112 if not cur.fetchone():
113 113 self.db.execute("""CREATE TABLE singletons
114 114 (name text PRIMARY KEY, value)""")
115 115 self.db.execute("""INSERT INTO singletons VALUES
116 116 ('session_number', 1)""")
117 117 self.db.commit()
118 118 cur = self.db.execute("""SELECT value FROM singletons WHERE
119 119 name='session_number'""")
120 120 self.session_number = cur.fetchone()[0]
121 121
122 122 #Increment by one for next session.
123 123 self.db.execute("""UPDATE singletons SET value=? WHERE
124 124 name='session_number'""", (self.session_number+1,))
125 125 self.db.commit()
126 126
127 127 def get_hist_tail(self, n=10, raw=True, output=False):
128 128 """Get the last n lines from the history database."""
129 129 toget = 'source_raw' if raw else 'source'
130 130 sqlfrom = "history"
131 131 if output:
132 132 sqlfrom = "history LEFT JOIN output_history USING (session, line)"
133 133 toget = "history.%s, output_history.output" % toget
134 134 cur = self.db.execute("SELECT session, line, " + toget +\
135 135 " FROM "+sqlfrom+" ORDER BY session DESC, line DESC LIMIT ?", (n,))
136 136 hist = reversed(cur.fetchall())
137 137 if output:
138 138 return ((ses, lin, (inp, out)) for ses, lin, inp, out in hist)
139 139 return hist
140 140
141 def get_hist_search(self, pattern="*", raw=True):
141 def get_hist_search(self, pattern="*", raw=True, output=False):
142 142 """Search the database using unix glob-style matching (wildcards * and
143 143 ?, escape using \).
144 144
145 145 Returns
146 146 -------
147 147 An iterator over tuples: (session, line_number, command)
148 148 """
149 149 toget = "source_raw" if raw else "source"
150 return self.db.execute("SELECT session, line, " +toget+ \
151 " FROM history WHERE " +toget+ " GLOB ?", (pattern,))
150 tosearch = toget
151 sqlfrom = "history"
152 if output:
153 sqlfrom = "history LEFT JOIN output_history USING (session, line)"
154 toget = "history.%s, output_history.output" % toget
155 tosearch = "history." + tosearch
156 hist = self.db.execute("SELECT session, line, " +toget+ \
157 " FROM "+sqlfrom+" WHERE " +tosearch+ " GLOB ?", (pattern,))
158 if output:
159 return ((ses, lin, (inp, out)) for ses, lin, inp, out in hist)
160 return hist
152 161
153 162 def _get_hist_session(self, start=1, stop=None, raw=True, output=False):
154 163 """Get input and output history from the current session. Called by
155 164 get_history, and takes similar parameters."""
156 165 input_hist = self.input_hist_raw if raw else self.input_hist_parsed
157 166
158 167 n = len(input_hist)
159 168 if start < 0:
160 169 start += n
161 170 if not stop:
162 171 stop = n
163 172 elif stop < 0:
164 173 stop += n
165 174
166 175 for i in range(start, stop):
167 176 if output:
168 177 line = (input_hist[i], repr(self.output_hist.get(i)))
169 178 else:
170 179 line = input_hist[i]
171 180 yield (0, i, line)
172 181
173 182 def get_history(self, session=0, start=1, stop=None, raw=True,output=False):
174 183 """Retrieve input by session.
175 184
176 185 Parameters
177 186 ----------
178 187 session : int
179 188 Session number to retrieve. The current session is 0, and negative
180 189 numbers count back from current session, so -1 is previous session.
181 190 start : int
182 191 First line to retrieve.
183 192 stop : int
184 193 End of line range (excluded from output itself). If None, retrieve
185 194 to the end of the session.
186 195 raw : bool
187 196 If True, return untranslated input
188 197 output : bool
189 198 If True, attempt to include output. This will be 'real' Python
190 199 objects for the current session, or text reprs from previous
191 200 sessions if db_log_output was enabled at the time. Where no output
192 201 is found, None is used.
193 202
194 203 Returns
195 204 -------
196 205 An iterator over the desired lines. Each line is a 3-tuple, either
197 206 (session, line, input) if output is False, or
198 207 (session, line, (input, output)) if output is True.
199 208 """
200 209 if session == 0 or session==self.session_number: # Current session
201 210 return self._get_hist_session(start, stop, raw, output)
202 211 if session < 0:
203 212 session += self.session_number
204 213
205 214 # Assemble the SQL query:
206 215 sqlfrom = "history"
207 216 toget = 'source_raw' if raw else 'source'
208 217 if output:
209 218 sqlfrom = "history LEFT JOIN output_history USING (session, line)"
210 219 toget = "history.%s, output_history.output" % toget
211 220 if stop:
212 221 lineclause = "line >= ? AND line < ?"
213 222 params = (session, start, stop)
214 223 else:
215 224 lineclause = "line>=?"
216 225 params = (session, start)
217 226
218 227 cur = self.db.execute("""SELECT session, line, %s FROM %s WHERE
219 228 session==? AND %s""" %(toget, sqlfrom, lineclause), params)
220 229 if output: # Regroup into 3-tuples
221 230 return ((ses, lin, (inp, out)) for ses, lin, inp, out in cur)
222 231 return cur
223 232
224 233 def get_hist_from_rangestr(self, rangestr, raw=True, output=False):
225 234 """Get lines of history from a string of ranges, as used by magic
226 235 commands %hist, %save, %macro, etc."""
227 236 for sess, s, e in extract_hist_ranges(rangestr):
228 237 for line in self.get_history(sess, s, e, raw=raw, output=output):
229 238 yield line
230 239
231 240 def store_inputs(self, line_num, source, source_raw=None):
232 241 """Store source and raw input in history and create input cache
233 242 variables _i*.
234 243
235 244 Parameters
236 245 ----------
237 246 line_num : int
238 247 The prompt number of this input.
239 248
240 249 source : str
241 250 Python input.
242 251
243 252 source_raw : str, optional
244 253 If given, this is the raw input without any IPython transformations
245 254 applied to it. If not given, ``source`` is used.
246 255 """
247 256 if source_raw is None:
248 257 source_raw = source
249 258
250 259 # do not store exit/quit commands
251 260 if source_raw.strip() in self._exit_commands:
252 261 return
253 262
254 263 self.input_hist_parsed.append(source.rstrip())
255 264 self.input_hist_raw.append(source_raw.rstrip())
256 265
257 266 self.db_input_cache.append((self.session_number, line_num,
258 267 source, source_raw))
259 268 # Trigger to flush cache and write to DB.
260 269 if len(self.db_input_cache) >= self.db_cache_size:
261 270 self.writeout_cache()
262 271
263 272 # update the auto _i variables
264 273 self._iii = self._ii
265 274 self._ii = self._i
266 275 self._i = self._i00
267 276 self._i00 = source_raw
268 277
269 278 # hackish access to user namespace to create _i1,_i2... dynamically
270 279 new_i = '_i%s' % line_num
271 280 to_main = {'_i': self._i,
272 281 '_ii': self._ii,
273 282 '_iii': self._iii,
274 283 new_i : self._i00 }
275 284 self.shell.user_ns.update(to_main)
276 285
277 286 def store_output(self, line_num, output):
278 287 if not self.db_log_output:
279 288 return
280 289 db_row = (self.session_number, line_num, output)
281 290 if self.db_cache_size > 1:
282 291 self.db_output_cache.append(db_row)
283 292 else:
284 293 with self.db:
285 294 self.db.execute("INSERT INTO output_history VALUES (?,?,?)", db_row)
286 295
287 296 def writeout_cache(self):
288 297 #print(self.db_input_cache)
289 298 with self.db:
290 299 self.db.executemany("INSERT INTO history VALUES (?, ?, ?, ?)",
291 300 self.db_input_cache)
292 301 self.db.executemany("INSERT INTO output_history VALUES (?, ?, ?)",
293 302 self.db_output_cache)
294 303 self.db_input_cache = []
295 304 self.db_output_cache = []
296 305
297 306 def sync_inputs(self):
298 307 """Ensure raw and translated histories have same length."""
299 308 lr = len(self.input_hist_raw)
300 309 lp = len(self.input_hist_parsed)
301 310 if lp < lr:
302 311 self.input_hist_raw[:lr-lp] = []
303 312 elif lr < lp:
304 313 self.input_hist_parsed[:lp-lr] = []
305 314
306 315 def reset(self, new_session=True):
307 316 """Clear the current session's history, and (optionally) start a new
308 317 session."""
309 318 self.input_hist_parsed[:] = [""]
310 319 self.input_hist_raw[:] = [""]
311 320 self.output_hist.clear()
312 321 # The directory history can't be completely empty
313 322 self.dir_hist[:] = [os.getcwd()]
314 323
315 324 if new_session:
316 325 self.writeout_cache()
317 326 self.init_db() # Get new session number
318 327
319 328 # To match, e.g. ~5/8-~2/3
320 329 range_re = re.compile(r"""
321 330 ((?P<startsess>~?\d+)/)?
322 331 (?P<start>\d+) # Only the start line num is compulsory
323 332 ((?P<sep>[\-:])
324 333 ((?P<endsess>~?\d+)/)?
325 334 (?P<end>\d+))?
326 335 """, re.VERBOSE)
327 336
328 337 def extract_hist_ranges(ranges_str):
329 338 """Turn a string of history ranges into 3-tuples of (session, start, stop).
330 339
331 340 Examples
332 341 --------
333 342 list(extract_input_ranges("~8/5-~7/4 2"))
334 343 [(-8, 5, None), (-7, 1, 4), (0, 2, 3)]
335 344 """
336 345 for range_str in ranges_str.split():
337 346 rmatch = range_re.match(range_str)
338 347 start = int(rmatch.group("start"))
339 348 end = rmatch.group("end")
340 349 end = int(end) if end else start+1 # If no end specified, get (a, a+1)
341 350 if rmatch.group("sep") == "-": # 1-3 == 1:4 --> [1, 2, 3]
342 351 end += 1
343 352 startsess = rmatch.group("startsess") or "0"
344 353 endsess = rmatch.group("endsess") or startsess
345 354 startsess = int(startsess.replace("~","-"))
346 355 endsess = int(endsess.replace("~","-"))
347 356 assert endsess >= startsess
348 357
349 358 if endsess == startsess:
350 359 yield (startsess, start, end)
351 360 continue
352 361 # Multiple sessions in one range:
353 362 yield (startsess, start, None)
354 363 for sess in range(startsess+1, endsess):
355 364 yield (sess, 1, None)
356 365 yield (endsess, 1, end)
357 366
358 367 def _format_lineno(session, line):
359 368 """Helper function to format line numbers properly."""
360 369 if session == 0:
361 370 return str(line)
362 371 return "%s#%s" % (session, line)
363 372
364 373 @testdec.skip_doctest
365 374 def magic_history(self, parameter_s = ''):
366 375 """Print input history (_i<n> variables), with most recent last.
367 376
368 377 %history -> print at most 40 inputs (some may be multi-line)\\
369 378 %history n -> print at most n inputs\\
370 379 %history n1 n2 -> print inputs between n1 and n2 (n2 not included)\\
371 380
372 381 By default, input history is printed without line numbers so it can be
373 382 directly pasted into an editor.
374 383
375 384 With -n, each input's number <n> is shown, and is accessible as the
376 385 automatically generated variable _i<n> as well as In[<n>]. Multi-line
377 386 statements are printed starting at a new line for easy copy/paste.
378 387
379 388 Options:
380 389
381 390 -n: print line numbers for each input.
382 391 This feature is only available if numbered prompts are in use.
383 392
384 393 -o: also print outputs for each input.
385 394
386 395 -p: print classic '>>>' python prompts before each input. This is useful
387 396 for making documentation, and in conjunction with -o, for producing
388 397 doctest-ready output.
389 398
390 399 -r: (default) print the 'raw' history, i.e. the actual commands you typed.
391 400
392 401 -t: print the 'translated' history, as IPython understands it. IPython
393 402 filters your input and converts it all into valid Python source before
394 403 executing it (things like magics or aliases are turned into function
395 404 calls, for example). With this option, you'll see the native history
396 405 instead of the user-entered version: '%cd /' will be seen as
397 406 'get_ipython().magic("%cd /")' instead of '%cd /'.
398 407
399 408 -g: treat the arg as a pattern to grep for in (full) history.
400 409 This includes the saved history (almost all commands ever written).
401 410 Use '%hist -g' to show full saved history (may be very long).
402 411
403 412 -l: get the last n lines from all sessions. Specify n as a single arg, or
404 413 the default is the last 10 lines.
405 414
406 415 -f FILENAME: instead of printing the output to the screen, redirect it to
407 416 the given file. The file is always overwritten, though IPython asks for
408 417 confirmation first if it already exists.
409 418
410 419 Examples
411 420 --------
412 421 ::
413 422
414 423 In [6]: %hist -n 4 6
415 424 4:a = 12
416 425 5:print a**2
417 426
418 427 """
419 428
420 429 if not self.shell.displayhook.do_full_cache:
421 430 print('This feature is only available if numbered prompts are in use.')
422 431 return
423 432 opts,args = self.parse_options(parameter_s,'noprtglf:',mode='string')
424 433
425 434 # For brevity
426 435 history_manager = self.shell.history_manager
427 436
428 437 def _format_lineno(session, line):
429 438 """Helper function to format line numbers properly."""
430 439 if session in (0, history_manager.session_number):
431 440 return str(line)
432 441 return "%s/%s" % (session, line)
433 442
434 443 # Check if output to specific file was requested.
435 444 try:
436 445 outfname = opts['f']
437 446 except KeyError:
438 447 outfile = IPython.utils.io.Term.cout # default
439 448 # We don't want to close stdout at the end!
440 449 close_at_end = False
441 450 else:
442 451 if os.path.exists(outfname):
443 452 if not ask_yes_no("File %r exists. Overwrite?" % outfname):
444 453 print('Aborting.')
445 454 return
446 455
447 456 outfile = open(outfname,'w')
448 457 close_at_end = True
449 458
450 459 print_nums = 'n' in opts
451 460 get_output = 'o' in opts
452 461 pyprompts = 'p' in opts
453 462 # Raw history is the default
454 463 raw = not('t' in opts)
455 464
456 465 default_length = 40
457 466 pattern = None
458 467
459 # Glob search:
460 if 'g' in opts:
468 if 'g' in opts: # Glob search
461 469 pattern = "*" + args + "*" if args else "*"
462
463 # Display:
464 matches_current_session = []
465 for session, line, s in history_manager.get_hist_search(pattern, raw):
466 if session == history_manager.session_number:
467 matches_current_session.append((line, s))
468 continue
469 print("%d/%d: %s" %(session, line, s.expandtabs(4)), file=outfile)
470 if matches_current_session:
471 print("=== Current session: ===", file=outfile)
472 for line, s in matches_current_session:
473 print("%d: %s" %(line, s.expandtabs(4)), file=outfile)
474 return
475
476 if 'l' in opts: # Get 'tail'
470 hist = history_manager.get_hist_search(pattern, raw=raw,
471 output=get_output)
472 elif 'l' in opts: # Get 'tail'
477 473 try:
478 474 n = int(args)
479 475 except ValueError, IndexError:
480 476 n = 10
481 477 hist = history_manager.get_hist_tail(n, raw=raw, output=get_output)
482 478 else:
483 479 if args: # Get history by ranges
484 480 hist = history_manager.get_hist_from_rangestr(args, raw, get_output)
485 481 else: # Just get history for the current session
486 482 hist = history_manager.get_history(raw=raw, output=get_output)
487 # Pull hist into a list, so we can get the widest number in it.
488 hist = list(hist)
489 if not hist:
490 return
491 483
492 width = max(len(_format_lineno(s, l)) for s, l, _ in hist)
484 # We could be displaying the entire history, so let's not try to pull it
485 # into a list in memory. Anything that needs more space will just misalign.
486 width = 4
493 487
494 488 for session, lineno, inline in hist:
495 489 # Print user history with tabs expanded to 4 spaces. The GUI clients
496 490 # use hard tabs for easier usability in auto-indented code, but we want
497 491 # to produce PEP-8 compliant history for safe pasting into an editor.
498 492 if get_output:
499 493 inline, output = inline
500 494 inline = inline.expandtabs(4).rstrip()
501
502 if pattern is not None and not fnmatch.fnmatch(inline, pattern):
503 continue
504 495
505 496 multiline = "\n" in inline
506 line_sep = '\n' if multiline else ''
497 line_sep = '\n' if multiline else ' '
507 498 if print_nums:
508 499 print('%s:%s' % (_format_lineno(session, lineno).rjust(width),
509 500 line_sep), file=outfile, end='')
510 501 if pyprompts:
511 502 print(">>> ", end="", file=outfile)
512 503 if multiline:
513 504 inline = "\n... ".join(inline.splitlines()) + "\n..."
514 505 print(inline, file=outfile)
515 506 if get_output and output:
516 507 print(output, file=outfile)
517 508
518 509 if close_at_end:
519 510 outfile.close()
520 511
521 512 # %hist is an alternative name
522 513 magic_hist = magic_history
523 514
524 515
525 516 def rep_f(self, arg):
526 517 r""" Repeat a command, or get command to input line for editing
527 518
528 519 - %rep (no arguments):
529 520
530 521 Place a string version of last computation result (stored in the special '_'
531 522 variable) to the next input prompt. Allows you to create elaborate command
532 523 lines without using copy-paste::
533 524
534 525 $ l = ["hei", "vaan"]
535 526 $ "".join(l)
536 527 ==> heivaan
537 528 $ %rep
538 529 $ heivaan_ <== cursor blinking
539 530
540 531 %rep 45
541 532
542 533 Place history line 45 to next input prompt. Use %hist to find out the
543 534 number.
544 535
545 536 %rep 1-4 6-7 3
546 537
547 538 Repeat the specified lines immediately. Input slice syntax is the same as
548 539 in %macro and %save.
549 540
550 541 %rep foo
551 542
552 543 Place the most recent line that has the substring "foo" to next input.
553 544 (e.g. 'svn ci -m foobar').
554 545 """
555 546
556 547 opts,args = self.parse_options(arg,'',mode='list')
557 548 if not args:
558 549 self.set_next_input(str(self.shell.user_ns["_"]))
559 550 return
560 551
561 552 if len(args) == 1 and not '-' in args[0]:
562 553 arg = args[0]
563 554 if len(arg) > 1 and arg.startswith('0'):
564 555 # get from shadow hist
565 556 num = int(arg[1:])
566 557 line = self.shell.shadowhist.get(num)
567 558 self.set_next_input(str(line))
568 559 return
569 560 try:
570 561 num = int(args[0])
571 562 self.set_next_input(str(self.shell.input_hist_raw[num]).rstrip())
572 563 return
573 564 except ValueError:
574 565 pass
575 566
576 567 for h in reversed(self.shell.input_hist_raw):
577 568 if 'rep' in h:
578 569 continue
579 570 if fnmatch.fnmatch(h,'*' + arg + '*'):
580 571 self.set_next_input(str(h).rstrip())
581 572 return
582 573
583 574 try:
584 575 lines = self.extract_input_slices(args, True)
585 576 print("lines", lines)
586 577 self.run_cell(lines)
587 578 except ValueError:
588 579 print("Not found in recent history:", args)
589 580
590 581
591 582 def init_ipython(ip):
592 583 ip.define_magic("rep",rep_f)
593 584 ip.define_magic("hist",magic_hist)
594 585 ip.define_magic("history",magic_history)
595 586
596 587 # XXX - ipy_completers are in quarantine, need to be updated to new apis
597 588 #import ipy_completers
598 589 #ipy_completers.quick_completer('%hist' ,'-g -t -r -n')
@@ -1,74 +1,92 b''
1 1 """Tests for the IPython tab-completion machinery.
2 2 """
3 3 #-----------------------------------------------------------------------------
4 4 # Module imports
5 5 #-----------------------------------------------------------------------------
6 6
7 7 # stdlib
8 8 import os
9 9 import sys
10 10 import unittest
11 11
12 12 # third party
13 13 import nose.tools as nt
14 14
15 15 # our own packages
16 16 from IPython.utils.tempdir import TemporaryDirectory
17 17 from IPython.core.history import HistoryManager, extract_hist_ranges
18 18
19 19 def test_history():
20 20
21 21 ip = get_ipython()
22 22 with TemporaryDirectory() as tmpdir:
23 23 #tmpdir = '/software/temp'
24 24 histfile = os.path.realpath(os.path.join(tmpdir, 'history.sqlite'))
25 25 # Ensure that we restore the history management that we mess with in
26 26 # this test doesn't affect the IPython instance used by the test suite
27 27 # beyond this test.
28 28 hist_manager_ori = ip.history_manager
29 29 try:
30 30 ip.history_manager = HistoryManager(shell=ip)
31 31 ip.history_manager.hist_file = histfile
32 32 ip.history_manager.init_db() # Has to be called after changing file
33 33 print 'test',histfile
34 34 hist = ['a=1', 'def f():\n test = 1\n return test', 'b=2']
35 35 for i, h in enumerate(hist, start=1):
36 36 ip.history_manager.store_inputs(i, h)
37 37
38 ip.history_manager.db_log_output = True
39 # Doesn't match the input, but we'll just check it's stored.
40 ip.history_manager.store_output(3, "spam")
41
38 42 nt.assert_equal(ip.history_manager.input_hist_raw, [''] + hist)
39 43
40 44 # Check lines were written to DB
41 45 c = ip.history_manager.db.execute("SELECT source_raw FROM history")
42 46 nt.assert_equal([x for x, in c], hist)
43 47
44 48 # New session
45 49 ip.history_manager.reset()
46 50 newcmds = ["z=5","class X(object):\n pass", "k='p'"]
47 for i, cmd in enumerate(newcmds):
51 for i, cmd in enumerate(newcmds, start=1):
48 52 ip.history_manager.store_inputs(i, cmd)
49 53 gothist = ip.history_manager.get_history(start=1, stop=4)
50 54 nt.assert_equal(list(gothist), zip([0,0,0],[1,2,3], newcmds))
51 55 # Previous session:
52 56 gothist = ip.history_manager.get_history(-1, 1, 4)
53 57 nt.assert_equal(list(gothist), zip([1,1,1],[1,2,3], hist))
54 58
59 # Check get_hist_tail
60 gothist = ip.history_manager.get_hist_tail(4, output=True)
61 expected = [(1, 3, (hist[-1], "spam")),
62 (2, 1, (newcmds[0], None)),
63 (2, 2, (newcmds[1], None)),
64 (2, 3, (newcmds[2], None)),]
65 nt.assert_equal(list(gothist), expected)
66
67 # Check get_hist_search
68 gothist = ip.history_manager.get_hist_search("*test*")
69 nt.assert_equal(list(gothist), [(1,2,hist[1])] )
70 gothist = ip.history_manager.get_hist_search("b*", output=True)
71 nt.assert_equal(list(gothist), [(1,3,(hist[2],"spam"))] )
72
55 73 # Cross testing: check that magic %save can get previous session.
56 74 testfilename = os.path.realpath(os.path.join(tmpdir, "test.py"))
57 75 ip.magic_save(testfilename + " ~1/1-3")
58 76 testfile = open(testfilename, "r")
59 77 nt.assert_equal(testfile.read(), "\n".join(hist))
60 78 finally:
61 79 # Restore history manager
62 80 ip.history_manager = hist_manager_ori
63 81
64 82 def test_extract_hist_ranges():
65 83 instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5"
66 84 expected = [(0, 1, 2), # 0 == current session
67 85 (2, 3, 4),
68 86 (-4, 5, 7),
69 87 (-4, 7, 10),
70 88 (-9, 2, None), # None == to end
71 89 (-8, 1, None),
72 90 (-7, 1, 6)]
73 91 actual = list(extract_hist_ranges(instr))
74 92 nt.assert_equal(actual, expected)
General Comments 0
You need to be logged in to leave comments. Login now