##// END OF EJS Templates
BUG: use a :memory: history DB for testing. Refactor the initialization of the HistoryManager to support this.
Robert Kern -
Show More
@@ -1,740 +1,748 b''
1 1 """ History related magics and functionality """
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team.
4 4 #
5 5 # Distributed under the terms of the BSD License.
6 6 #
7 7 # The full license is in the file COPYING.txt, distributed with this software.
8 8 #-----------------------------------------------------------------------------
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Imports
12 12 #-----------------------------------------------------------------------------
13 13 from __future__ import print_function
14 14
15 15 # Stdlib imports
16 16 import datetime
17 17 import json
18 18 import os
19 19 import re
20 20 import sqlite3
21 21
22 22 from collections import defaultdict
23 23
24 24 # Our own packages
25 25 from IPython.config.configurable import Configurable
26 26 import IPython.utils.io
27 27
28 28 from IPython.testing import decorators as testdec
29 29 from IPython.utils.io import ask_yes_no
30 30 from IPython.utils.traitlets import Bool, Dict, Instance, Int, List, Unicode
31 31 from IPython.utils.warn import warn
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Classes and functions
35 35 #-----------------------------------------------------------------------------
36 36
37 37 class HistoryManager(Configurable):
38 38 """A class to organize all history-related functionality in one place.
39 39 """
40 40 # Public interface
41 41
42 42 # An instance of the IPython shell we are attached to
43 43 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
44 44 # Lists to hold processed and raw history. These start with a blank entry
45 45 # so that we can index them starting from 1
46 46 input_hist_parsed = List([""])
47 47 input_hist_raw = List([""])
48 48 # A list of directories visited during session
49 49 dir_hist = List()
50 def _dir_hist_default(self):
51 try:
52 return [os.getcwd()]
53 except OSError:
54 return []
55
50 56 # A dict of output history, keyed with ints from the shell's
51 57 # execution count. If there are several outputs from one command,
52 58 # only the last one is stored.
53 59 output_hist = Dict()
54 60 # Contains all outputs, in lists of reprs.
55 output_hist_reprs = Instance(defaultdict)
61 output_hist_reprs = Instance(defaultdict, args=(list,))
56 62
57 63 # String holding the path to the history file
58 hist_file = Unicode()
64 hist_file = Unicode(config=True)
65
59 66 # The SQLite database
60 67 db = Instance(sqlite3.Connection)
61 68 # The number of the current session in the history database
62 69 session_number = Int()
63 70 # Should we log output to the database? (default no)
64 71 db_log_output = Bool(False, config=True)
65 72 # Write to database every x commands (higher values save disk access & power)
66 73 # Values of 1 or less effectively disable caching.
67 74 db_cache_size = Int(0, config=True)
68 75 # The input and output caches
69 76 db_input_cache = List()
70 77 db_output_cache = List()
71 78
72 79 # Private interface
73 80 # Variables used to store the three last inputs from the user. On each new
74 81 # history update, we populate the user's namespace with these, shifted as
75 82 # necessary.
76 _i00, _i, _ii, _iii = '','','',''
83 _i00 = Unicode(u'')
84 _i = Unicode(u'')
85 _ii = Unicode(u'')
86 _iii = Unicode(u'')
77 87
78 88 # A set with all forms of the exit command, so that we don't store them in
79 89 # the history (it's annoying to rewind the first entry and land on an exit
80 90 # call).
81 _exit_commands = None
91 _exit_commands = Instance(set, args=(['Quit', 'quit', 'Exit', 'exit',
92 '%Quit', '%quit', '%Exit', '%exit'],))
82 93
83 def __init__(self, shell, config=None):
94 def __init__(self, shell, config=None, **traits):
84 95 """Create a new history manager associated with a shell instance.
85 96 """
86 97 # We need a pointer back to the shell for various tasks.
87 super(HistoryManager, self).__init__(shell=shell, config=config)
88
89 # list of visited directories
90 try:
91 self.dir_hist = [os.getcwd()]
92 except OSError:
93 self.dir_hist = []
98 super(HistoryManager, self).__init__(shell=shell, config=config,
99 **traits)
94 100
95 # Now the history file
101 if self.hist_file == u'':
102 # No one has set the hist_file, yet.
96 103 if shell.profile:
97 104 histfname = 'history-%s' % shell.profile
98 105 else:
99 106 histfname = 'history'
100 107 self.hist_file = os.path.join(shell.ipython_dir, histfname + '.sqlite')
108
101 109 try:
102 110 self.init_db()
103 111 except sqlite3.DatabaseError:
112 if os.path.isfile(self.hist_file):
113 # Try to move the file out of the way.
104 114 newpath = os.path.join(self.shell.ipython_dir, "hist-corrupt.sqlite")
105 115 os.rename(self.hist_file, newpath)
106 116 print("ERROR! History file wasn't a valid SQLite database.",
107 117 "It was moved to %s" % newpath, "and a new file created.")
108 118 self.init_db()
119 else:
120 # The hist_file is probably :memory: or something else.
121 raise
109 122
110 123 self.new_session()
111 124
112 self._i00, self._i, self._ii, self._iii = '','','',''
113 self.output_hist_reprs = defaultdict(list)
114
115 self._exit_commands = set(['Quit', 'quit', 'Exit', 'exit', '%Quit',
116 '%quit', '%Exit', '%exit'])
117 125
118 126 def init_db(self):
119 127 """Connect to the database, and create tables if necessary."""
120 128 self.db = sqlite3.connect(self.hist_file)
121 129 self.db.execute("""CREATE TABLE IF NOT EXISTS sessions (session integer
122 130 primary key autoincrement, start timestamp,
123 131 end timestamp, num_cmds integer, remark text)""")
124 132 self.db.execute("""CREATE TABLE IF NOT EXISTS history
125 133 (session integer, line integer, source text, source_raw text,
126 134 PRIMARY KEY (session, line))""")
127 135 # Output history is optional, but ensure the table's there so it can be
128 136 # enabled later.
129 137 self.db.execute("""CREATE TABLE IF NOT EXISTS output_history
130 138 (session integer, line integer, output text,
131 139 PRIMARY KEY (session, line))""")
132 140 self.db.commit()
133 141
134 142 def new_session(self):
135 143 """Get a new session number."""
136 144 with self.db:
137 145 cur = self.db.execute("""INSERT INTO sessions VALUES (NULL, ?, NULL,
138 146 NULL, "") """, (datetime.datetime.now(),))
139 147 self.session_number = cur.lastrowid
140 148
141 149 def end_session(self):
142 150 """Close the database session, filling in the end time and line count."""
143 151 self.writeout_cache()
144 152 with self.db:
145 153 self.db.execute("""UPDATE sessions SET end=?, num_cmds=? WHERE
146 154 session==?""", (datetime.datetime.now(),
147 155 len(self.input_hist_parsed)-1, self.session_number))
148 156 self.session_number = 0
149 157
150 158 def name_session(self, name):
151 159 """Give the current session a name in the history database."""
152 160 with self.db:
153 161 self.db.execute("UPDATE sessions SET remark=? WHERE session==?",
154 162 (name, self.session_number))
155 163
156 164 def reset(self, new_session=True):
157 165 """Clear the session history, releasing all object references, and
158 166 optionally open a new session."""
159 167 if self.session_number:
160 168 self.end_session()
161 169 self.input_hist_parsed[:] = [""]
162 170 self.input_hist_raw[:] = [""]
163 171 self.output_hist.clear()
164 172 # The directory history can't be completely empty
165 173 self.dir_hist[:] = [os.getcwd()]
166 174
167 175 if new_session:
168 176 self.new_session()
169 177
170 178 ## -------------------------------
171 179 ## Methods for retrieving history:
172 180 ## -------------------------------
173 181 def _run_sql(self, sql, params, raw=True, output=False):
174 182 """Prepares and runs an SQL query for the history database.
175 183
176 184 Parameters
177 185 ----------
178 186 sql : str
179 187 Any filtering expressions to go after SELECT ... FROM ...
180 188 params : tuple
181 189 Parameters passed to the SQL query (to replace "?")
182 190 raw, output : bool
183 191 See :meth:`get_range`
184 192
185 193 Returns
186 194 -------
187 195 Tuples as :meth:`get_range`
188 196 """
189 197 toget = 'source_raw' if raw else 'source'
190 198 sqlfrom = "history"
191 199 if output:
192 200 sqlfrom = "history LEFT JOIN output_history USING (session, line)"
193 201 toget = "history.%s, output_history.output" % toget
194 202 cur = self.db.execute("SELECT session, line, %s FROM %s " %\
195 203 (toget, sqlfrom) + sql, params)
196 204 if output: # Regroup into 3-tuples, and parse JSON
197 205 loads = lambda out: json.loads(out) if out else None
198 206 return ((ses, lin, (inp, loads(out))) \
199 207 for ses, lin, inp, out in cur)
200 208 return cur
201 209
202 210
203 211 def get_tail(self, n=10, raw=True, output=False, include_latest=False):
204 212 """Get the last n lines from the history database.
205 213
206 214 Parameters
207 215 ----------
208 216 n : int
209 217 The number of lines to get
210 218 raw, output : bool
211 219 See :meth:`get_range`
212 220 include_latest : bool
213 221 If False (default), n+1 lines are fetched, and the latest one
214 222 is discarded. This is intended to be used where the function
215 223 is called by a user command, which it should not return.
216 224
217 225 Returns
218 226 -------
219 227 Tuples as :meth:`get_range`
220 228 """
221 229 self.writeout_cache()
222 230 if not include_latest:
223 231 n += 1
224 232 cur = self._run_sql("ORDER BY session DESC, line DESC LIMIT ?",
225 233 (n,), raw=raw, output=output)
226 234 if not include_latest:
227 235 return reversed(list(cur)[1:])
228 236 return reversed(list(cur))
229 237
230 238 def search(self, pattern="*", raw=True, search_raw=True,
231 239 output=False):
232 240 """Search the database using unix glob-style matching (wildcards
233 241 * and ?).
234 242
235 243 Parameters
236 244 ----------
237 245 pattern : str
238 246 The wildcarded pattern to match when searching
239 247 search_raw : bool
240 248 If True, search the raw input, otherwise, the parsed input
241 249 raw, output : bool
242 250 See :meth:`get_range`
243 251
244 252 Returns
245 253 -------
246 254 Tuples as :meth:`get_range`
247 255 """
248 256 tosearch = "source_raw" if search_raw else "source"
249 257 if output:
250 258 tosearch = "history." + tosearch
251 259 self.writeout_cache()
252 260 return self._run_sql("WHERE %s GLOB ?" % tosearch, (pattern,),
253 261 raw=raw, output=output)
254 262
255 263 def _get_range_session(self, start=1, stop=None, raw=True, output=False):
256 264 """Get input and output history from the current session. Called by
257 265 get_range, and takes similar parameters."""
258 266 input_hist = self.input_hist_raw if raw else self.input_hist_parsed
259 267
260 268 n = len(input_hist)
261 269 if start < 0:
262 270 start += n
263 271 if not stop:
264 272 stop = n
265 273 elif stop < 0:
266 274 stop += n
267 275
268 276 for i in range(start, stop):
269 277 if output:
270 278 line = (input_hist[i], self.output_hist_reprs.get(i))
271 279 else:
272 280 line = input_hist[i]
273 281 yield (0, i, line)
274 282
275 283 def get_range(self, session=0, start=1, stop=None, raw=True,output=False):
276 284 """Retrieve input by session.
277 285
278 286 Parameters
279 287 ----------
280 288 session : int
281 289 Session number to retrieve. The current session is 0, and negative
282 290 numbers count back from current session, so -1 is previous session.
283 291 start : int
284 292 First line to retrieve.
285 293 stop : int
286 294 End of line range (excluded from output itself). If None, retrieve
287 295 to the end of the session.
288 296 raw : bool
289 297 If True, return untranslated input
290 298 output : bool
291 299 If True, attempt to include output. This will be 'real' Python
292 300 objects for the current session, or text reprs from previous
293 301 sessions if db_log_output was enabled at the time. Where no output
294 302 is found, None is used.
295 303
296 304 Returns
297 305 -------
298 306 An iterator over the desired lines. Each line is a 3-tuple, either
299 307 (session, line, input) if output is False, or
300 308 (session, line, (input, output)) if output is True.
301 309 """
302 310 if session == 0 or session==self.session_number: # Current session
303 311 return self._get_range_session(start, stop, raw, output)
304 312 if session < 0:
305 313 session += self.session_number
306 314
307 315 if stop:
308 316 lineclause = "line >= ? AND line < ?"
309 317 params = (session, start, stop)
310 318 else:
311 319 lineclause = "line>=?"
312 320 params = (session, start)
313 321
314 322 return self._run_sql("WHERE session==? AND %s""" % lineclause,
315 323 params, raw=raw, output=output)
316 324
317 325 def get_range_by_str(self, rangestr, raw=True, output=False):
318 326 """Get lines of history from a string of ranges, as used by magic
319 327 commands %hist, %save, %macro, etc.
320 328
321 329 Parameters
322 330 ----------
323 331 rangestr : str
324 332 A string specifying ranges, e.g. "5 ~2/1-4". See
325 333 :func:`magic_history` for full details.
326 334 raw, output : bool
327 335 As :meth:`get_range`
328 336
329 337 Returns
330 338 -------
331 339 Tuples as :meth:`get_range`
332 340 """
333 341 for sess, s, e in extract_hist_ranges(rangestr):
334 342 for line in self.get_range(sess, s, e, raw=raw, output=output):
335 343 yield line
336 344
337 345 ## ----------------------------
338 346 ## Methods for storing history:
339 347 ## ----------------------------
340 348 def store_inputs(self, line_num, source, source_raw=None):
341 349 """Store source and raw input in history and create input cache
342 350 variables _i*.
343 351
344 352 Parameters
345 353 ----------
346 354 line_num : int
347 355 The prompt number of this input.
348 356
349 357 source : str
350 358 Python input.
351 359
352 360 source_raw : str, optional
353 361 If given, this is the raw input without any IPython transformations
354 362 applied to it. If not given, ``source`` is used.
355 363 """
356 364 if source_raw is None:
357 365 source_raw = source
358 366 source = source.rstrip('\n')
359 367 source_raw = source_raw.rstrip('\n')
360 368
361 369 # do not store exit/quit commands
362 370 if source_raw.strip() in self._exit_commands:
363 371 return
364 372
365 373 self.input_hist_parsed.append(source)
366 374 self.input_hist_raw.append(source_raw)
367 375
368 376 self.db_input_cache.append((line_num, source, source_raw))
369 377 # Trigger to flush cache and write to DB.
370 378 if len(self.db_input_cache) >= self.db_cache_size:
371 379 self.writeout_cache()
372 380
373 381 # update the auto _i variables
374 382 self._iii = self._ii
375 383 self._ii = self._i
376 384 self._i = self._i00
377 385 self._i00 = source_raw
378 386
379 387 # hackish access to user namespace to create _i1,_i2... dynamically
380 388 new_i = '_i%s' % line_num
381 389 to_main = {'_i': self._i,
382 390 '_ii': self._ii,
383 391 '_iii': self._iii,
384 392 new_i : self._i00 }
385 393 self.shell.user_ns.update(to_main)
386 394
387 395 def store_output(self, line_num):
388 396 """If database output logging is enabled, this saves all the
389 397 outputs from the indicated prompt number to the database. It's
390 398 called by run_cell after code has been executed.
391 399
392 400 Parameters
393 401 ----------
394 402 line_num : int
395 403 The line number from which to save outputs
396 404 """
397 405 if (not self.db_log_output) or not self.output_hist_reprs[line_num]:
398 406 return
399 407 output = json.dumps(self.output_hist_reprs[line_num])
400 408
401 409 self.db_output_cache.append((line_num, output))
402 410 if self.db_cache_size <= 1:
403 411 self.writeout_cache()
404 412
405 413 def _writeout_input_cache(self):
406 414 for line in self.db_input_cache:
407 415 with self.db:
408 416 self.db.execute("INSERT INTO history VALUES (?, ?, ?, ?)",
409 417 (self.session_number,)+line)
410 418
411 419 def _writeout_output_cache(self):
412 420 for line in self.db_output_cache:
413 421 with self.db:
414 422 self.db.execute("INSERT INTO output_history VALUES (?, ?, ?)",
415 423 (self.session_number,)+line)
416 424
417 425 def writeout_cache(self):
418 426 """Write any entries in the cache to the database."""
419 427 try:
420 428 self._writeout_input_cache()
421 429 except sqlite3.IntegrityError:
422 430 self.new_session()
423 431 print("ERROR! Session/line number was not unique in",
424 432 "database. History logging moved to new session",
425 433 self.session_number)
426 434 try: # Try writing to the new session. If this fails, don't recurse
427 435 self.writeout_cache()
428 436 except sqlite3.IntegrityError:
429 437 pass
430 438 finally:
431 439 self.db_input_cache = []
432 440
433 441 try:
434 442 self._writeout_output_cache()
435 443 except sqlite3.IntegrityError:
436 444 print("!! Session/line number for output was not unique",
437 445 "in database. Output will not be stored.")
438 446 finally:
439 447 self.db_output_cache = []
440 448
441 449
442 450 # To match, e.g. ~5/8-~2/3
443 451 range_re = re.compile(r"""
444 452 ((?P<startsess>~?\d+)/)?
445 453 (?P<start>\d+) # Only the start line num is compulsory
446 454 ((?P<sep>[\-:])
447 455 ((?P<endsess>~?\d+)/)?
448 456 (?P<end>\d+))?
449 457 """, re.VERBOSE)
450 458
451 459 def extract_hist_ranges(ranges_str):
452 460 """Turn a string of history ranges into 3-tuples of (session, start, stop).
453 461
454 462 Examples
455 463 --------
456 464 list(extract_input_ranges("~8/5-~7/4 2"))
457 465 [(-8, 5, None), (-7, 1, 4), (0, 2, 3)]
458 466 """
459 467 for range_str in ranges_str.split():
460 468 rmatch = range_re.match(range_str)
461 469 if not rmatch:
462 470 continue
463 471 start = int(rmatch.group("start"))
464 472 end = rmatch.group("end")
465 473 end = int(end) if end else start+1 # If no end specified, get (a, a+1)
466 474 if rmatch.group("sep") == "-": # 1-3 == 1:4 --> [1, 2, 3]
467 475 end += 1
468 476 startsess = rmatch.group("startsess") or "0"
469 477 endsess = rmatch.group("endsess") or startsess
470 478 startsess = int(startsess.replace("~","-"))
471 479 endsess = int(endsess.replace("~","-"))
472 480 assert endsess >= startsess
473 481
474 482 if endsess == startsess:
475 483 yield (startsess, start, end)
476 484 continue
477 485 # Multiple sessions in one range:
478 486 yield (startsess, start, None)
479 487 for sess in range(startsess+1, endsess):
480 488 yield (sess, 1, None)
481 489 yield (endsess, 1, end)
482 490
483 491 def _format_lineno(session, line):
484 492 """Helper function to format line numbers properly."""
485 493 if session == 0:
486 494 return str(line)
487 495 return "%s#%s" % (session, line)
488 496
489 497 @testdec.skip_doctest
490 498 def magic_history(self, parameter_s = ''):
491 499 """Print input history (_i<n> variables), with most recent last.
492 500
493 501 %history -> print at most 40 inputs (some may be multi-line)\\
494 502 %history n -> print at most n inputs\\
495 503 %history n1 n2 -> print inputs between n1 and n2 (n2 not included)\\
496 504
497 505 By default, input history is printed without line numbers so it can be
498 506 directly pasted into an editor. Use -n to show them.
499 507
500 508 Ranges of history can be indicated using the syntax:
501 509 4 : Line 4, current session
502 510 4-6 : Lines 4-6, current session
503 511 243/1-5: Lines 1-5, session 243
504 512 ~2/7 : Line 7, session 2 before current
505 513 ~8/1-~6/5 : From the first line of 8 sessions ago, to the fifth line
506 514 of 6 sessions ago.
507 515 Multiple ranges can be entered, separated by spaces
508 516
509 517 The same syntax is used by %macro, %save, %edit, %rerun
510 518
511 519 Options:
512 520
513 521 -n: print line numbers for each input.
514 522 This feature is only available if numbered prompts are in use.
515 523
516 524 -o: also print outputs for each input.
517 525
518 526 -p: print classic '>>>' python prompts before each input. This is useful
519 527 for making documentation, and in conjunction with -o, for producing
520 528 doctest-ready output.
521 529
522 530 -r: (default) print the 'raw' history, i.e. the actual commands you typed.
523 531
524 532 -t: print the 'translated' history, as IPython understands it. IPython
525 533 filters your input and converts it all into valid Python source before
526 534 executing it (things like magics or aliases are turned into function
527 535 calls, for example). With this option, you'll see the native history
528 536 instead of the user-entered version: '%cd /' will be seen as
529 537 'get_ipython().magic("%cd /")' instead of '%cd /'.
530 538
531 539 -g: treat the arg as a pattern to grep for in (full) history.
532 540 This includes the saved history (almost all commands ever written).
533 541 Use '%hist -g' to show full saved history (may be very long).
534 542
535 543 -l: get the last n lines from all sessions. Specify n as a single arg, or
536 544 the default is the last 10 lines.
537 545
538 546 -f FILENAME: instead of printing the output to the screen, redirect it to
539 547 the given file. The file is always overwritten, though IPython asks for
540 548 confirmation first if it already exists.
541 549
542 550 Examples
543 551 --------
544 552 ::
545 553
546 554 In [6]: %hist -n 4 6
547 555 4:a = 12
548 556 5:print a**2
549 557
550 558 """
551 559
552 560 if not self.shell.displayhook.do_full_cache:
553 561 print('This feature is only available if numbered prompts are in use.')
554 562 return
555 563 opts,args = self.parse_options(parameter_s,'noprtglf:',mode='string')
556 564
557 565 # For brevity
558 566 history_manager = self.shell.history_manager
559 567
560 568 def _format_lineno(session, line):
561 569 """Helper function to format line numbers properly."""
562 570 if session in (0, history_manager.session_number):
563 571 return str(line)
564 572 return "%s/%s" % (session, line)
565 573
566 574 # Check if output to specific file was requested.
567 575 try:
568 576 outfname = opts['f']
569 577 except KeyError:
570 578 outfile = IPython.utils.io.Term.cout # default
571 579 # We don't want to close stdout at the end!
572 580 close_at_end = False
573 581 else:
574 582 if os.path.exists(outfname):
575 583 if not ask_yes_no("File %r exists. Overwrite?" % outfname):
576 584 print('Aborting.')
577 585 return
578 586
579 587 outfile = open(outfname,'w')
580 588 close_at_end = True
581 589
582 590 print_nums = 'n' in opts
583 591 get_output = 'o' in opts
584 592 pyprompts = 'p' in opts
585 593 # Raw history is the default
586 594 raw = not('t' in opts)
587 595
588 596 default_length = 40
589 597 pattern = None
590 598
591 599 if 'g' in opts: # Glob search
592 600 pattern = "*" + args + "*" if args else "*"
593 601 hist = history_manager.search(pattern, raw=raw, output=get_output)
594 602 elif 'l' in opts: # Get 'tail'
595 603 try:
596 604 n = int(args)
597 605 except ValueError, IndexError:
598 606 n = 10
599 607 hist = history_manager.get_tail(n, raw=raw, output=get_output)
600 608 else:
601 609 if args: # Get history by ranges
602 610 hist = history_manager.get_range_by_str(args, raw, get_output)
603 611 else: # Just get history for the current session
604 612 hist = history_manager.get_range(raw=raw, output=get_output)
605 613
606 614 # We could be displaying the entire history, so let's not try to pull it
607 615 # into a list in memory. Anything that needs more space will just misalign.
608 616 width = 4
609 617
610 618 for session, lineno, inline in hist:
611 619 # Print user history with tabs expanded to 4 spaces. The GUI clients
612 620 # use hard tabs for easier usability in auto-indented code, but we want
613 621 # to produce PEP-8 compliant history for safe pasting into an editor.
614 622 if get_output:
615 623 inline, output = inline
616 624 inline = inline.expandtabs(4).rstrip()
617 625
618 626 multiline = "\n" in inline
619 627 line_sep = '\n' if multiline else ' '
620 628 if print_nums:
621 629 print('%s:%s' % (_format_lineno(session, lineno).rjust(width),
622 630 line_sep), file=outfile, end='')
623 631 if pyprompts:
624 632 print(">>> ", end="", file=outfile)
625 633 if multiline:
626 634 inline = "\n... ".join(inline.splitlines()) + "\n..."
627 635 print(inline, file=outfile)
628 636 if get_output and output:
629 637 print("\n".join(output), file=outfile)
630 638
631 639 if close_at_end:
632 640 outfile.close()
633 641
634 642
635 643 def magic_rep(self, arg):
636 644 r""" Repeat a command, or get command to input line for editing
637 645
638 646 - %rep (no arguments):
639 647
640 648 Place a string version of last computation result (stored in the special '_'
641 649 variable) to the next input prompt. Allows you to create elaborate command
642 650 lines without using copy-paste::
643 651
644 652 In[1]: l = ["hei", "vaan"]
645 653 In[2]: "".join(l)
646 654 Out[2]: heivaan
647 655 In[3]: %rep
648 656 In[4]: heivaan_ <== cursor blinking
649 657
650 658 %rep 45
651 659
652 660 Place history line 45 on the next input prompt. Use %hist to find
653 661 out the number.
654 662
655 663 %rep 1-4
656 664
657 665 Combine the specified lines into one cell, and place it on the next
658 666 input prompt. See %history for the slice syntax.
659 667
660 668 %rep foo+bar
661 669
662 670 If foo+bar can be evaluated in the user namespace, the result is
663 671 placed at the next input prompt. Otherwise, the history is searched
664 672 for lines which contain that substring, and the most recent one is
665 673 placed at the next input prompt.
666 674 """
667 675 if not arg: # Last output
668 676 self.set_next_input(str(self.shell.user_ns["_"]))
669 677 return
670 678 # Get history range
671 679 histlines = self.history_manager.get_range_by_str(arg)
672 680 cmd = "\n".join(x[2] for x in histlines)
673 681 if cmd:
674 682 self.set_next_input(cmd.rstrip())
675 683 return
676 684
677 685 try: # Variable in user namespace
678 686 cmd = str(eval(arg, self.shell.user_ns))
679 687 except Exception: # Search for term in history
680 688 histlines = self.history_manager.search("*"+arg+"*")
681 689 for h in reversed([x[2] for x in histlines]):
682 690 if 'rep' in h:
683 691 continue
684 692 self.set_next_input(h.rstrip())
685 693 return
686 694 else:
687 695 self.set_next_input(cmd.rstrip())
688 696 print("Couldn't evaluate or find in history:", arg)
689 697
690 698 def magic_rerun(self, parameter_s=''):
691 699 """Re-run previous input
692 700
693 701 By default, you can specify ranges of input history to be repeated
694 702 (as with %history). With no arguments, it will repeat the last line.
695 703
696 704 Options:
697 705
698 706 -l <n> : Repeat the last n lines of input, not including the
699 707 current command.
700 708
701 709 -g foo : Repeat the most recent line which contains foo
702 710 """
703 711 opts, args = self.parse_options(parameter_s, 'l:g:', mode='string')
704 712 if "l" in opts: # Last n lines
705 713 n = int(opts['l'])
706 714 hist = self.history_manager.get_tail(n)
707 715 elif "g" in opts: # Search
708 716 p = "*"+opts['g']+"*"
709 717 hist = list(self.history_manager.search(p))
710 718 for l in reversed(hist):
711 719 if "rerun" not in l[2]:
712 720 hist = [l] # The last match which isn't a %rerun
713 721 break
714 722 else:
715 723 hist = [] # No matches except %rerun
716 724 elif args: # Specify history ranges
717 725 hist = self.history_manager.get_range_by_str(args)
718 726 else: # Last line
719 727 hist = self.history_manager.get_tail(1)
720 728 hist = [x[2] for x in hist]
721 729 if not hist:
722 730 print("No lines in history match specification")
723 731 return
724 732 histlines = "\n".join(hist)
725 733 print("=== Executing: ===")
726 734 print(histlines)
727 735 print("=== Output: ===")
728 736 self.run_cell("\n".join(hist), store_history=False)
729 737
730 738
731 739 def init_ipython(ip):
732 740 ip.define_magic("rep", magic_rep)
733 741 ip.define_magic("recall", magic_rep)
734 742 ip.define_magic("rerun", magic_rerun)
735 743 ip.define_magic("hist",magic_history) # Alternative name
736 744 ip.define_magic("history",magic_history)
737 745
738 746 # XXX - ipy_completers are in quarantine, need to be updated to new apis
739 747 #import ipy_completers
740 748 #ipy_completers.quick_completer('%hist' ,'-g -t -r -n')
@@ -1,119 +1,111 b''
1 1 # coding: utf-8
2 2 """Tests for the IPython tab-completion machinery.
3 3 """
4 4 #-----------------------------------------------------------------------------
5 5 # Module imports
6 6 #-----------------------------------------------------------------------------
7 7
8 8 # stdlib
9 9 import os
10 10 import sys
11 11 import unittest
12 12
13 13 # third party
14 14 import nose.tools as nt
15 15
16 16 # our own packages
17 17 from IPython.utils.tempdir import TemporaryDirectory
18 18 from IPython.core.history import HistoryManager, extract_hist_ranges
19 19
20 20 def setUp():
21 21 nt.assert_equal(sys.getdefaultencoding(), "ascii")
22 22
23 23 def test_history():
24 24 ip = get_ipython()
25 25 with TemporaryDirectory() as tmpdir:
26 #tmpdir = '/software/temp'
27 histfile = os.path.realpath(os.path.join(tmpdir, 'history.sqlite'))
28 # Ensure that we restore the history management that we mess with in
29 # this test doesn't affect the IPython instance used by the test suite
30 # beyond this test.
26 # Make a new :memory: DB.
31 27 hist_manager_ori = ip.history_manager
32 28 try:
33 ip.history_manager = HistoryManager(shell=ip)
34 ip.history_manager.hist_file = histfile
35 ip.history_manager.init_db() # Has to be called after changing file
36 ip.history_manager.reset()
37 print 'test',histfile
29 ip.history_manager = HistoryManager(shell=ip, hist_file=':memory:')
38 30 hist = ['a=1', 'def f():\n test = 1\n return test', u"b='β‚¬Γ†ΒΎΓ·ΓŸ'"]
39 31 for i, h in enumerate(hist, start=1):
40 32 ip.history_manager.store_inputs(i, h)
41 33
42 34 ip.history_manager.db_log_output = True
43 35 # Doesn't match the input, but we'll just check it's stored.
44 36 ip.history_manager.output_hist_reprs[3].append("spam")
45 37 ip.history_manager.store_output(3)
46 38
47 39 nt.assert_equal(ip.history_manager.input_hist_raw, [''] + hist)
48 40
49 41 # Check lines were written to DB
50 42 c = ip.history_manager.db.execute("SELECT source_raw FROM history")
51 43 nt.assert_equal([x for x, in c], hist)
52 44
53 45 # New session
54 46 ip.history_manager.reset()
55 47 newcmds = ["z=5","class X(object):\n pass", "k='p'"]
56 48 for i, cmd in enumerate(newcmds, start=1):
57 49 ip.history_manager.store_inputs(i, cmd)
58 50 gothist = ip.history_manager.get_range(start=1, stop=4)
59 51 nt.assert_equal(list(gothist), zip([0,0,0],[1,2,3], newcmds))
60 52 # Previous session:
61 53 gothist = ip.history_manager.get_range(-1, 1, 4)
62 54 nt.assert_equal(list(gothist), zip([1,1,1],[1,2,3], hist))
63 55
64 56 # Check get_hist_tail
65 57 gothist = ip.history_manager.get_tail(4, output=True,
66 58 include_latest=True)
67 59 expected = [(1, 3, (hist[-1], ["spam"])),
68 60 (2, 1, (newcmds[0], None)),
69 61 (2, 2, (newcmds[1], None)),
70 62 (2, 3, (newcmds[2], None)),]
71 63 nt.assert_equal(list(gothist), expected)
72 64
73 65 gothist = ip.history_manager.get_tail(2)
74 66 expected = [(2, 1, newcmds[0]),
75 67 (2, 2, newcmds[1])]
76 68 nt.assert_equal(list(gothist), expected)
77 69
78 70 # Check get_hist_search
79 71 gothist = ip.history_manager.search("*test*")
80 72 nt.assert_equal(list(gothist), [(1,2,hist[1])] )
81 73 gothist = ip.history_manager.search("b*", output=True)
82 74 nt.assert_equal(list(gothist), [(1,3,(hist[2],["spam"]))] )
83 75
84 76 # Cross testing: check that magic %save can get previous session.
85 77 testfilename = os.path.realpath(os.path.join(tmpdir, "test.py"))
86 78 ip.magic_save(testfilename + " ~1/1-3")
87 79 testfile = open(testfilename, "r")
88 80 nt.assert_equal(testfile.read().decode("utf-8"),
89 81 "# coding: utf-8\n" + "\n".join(hist))
90 82
91 83 # Duplicate line numbers - check that it doesn't crash, and
92 84 # gets a new session
93 85 ip.history_manager.store_inputs(1, "rogue")
94 86 nt.assert_equal(ip.history_manager.session_number, 3)
95 87 finally:
96 88 # Restore history manager
97 89 ip.history_manager = hist_manager_ori
98 90
99 91
100 92 def test_extract_hist_ranges():
101 93 instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5"
102 94 expected = [(0, 1, 2), # 0 == current session
103 95 (2, 3, 4),
104 96 (-4, 5, 7),
105 97 (-4, 7, 10),
106 98 (-9, 2, None), # None == to end
107 99 (-8, 1, None),
108 100 (-7, 1, 6)]
109 101 actual = list(extract_hist_ranges(instr))
110 102 nt.assert_equal(actual, expected)
111 103
112 104 def test_magic_rerun():
113 105 """Simple test for %rerun (no args -> rerun last line)"""
114 106 ip = get_ipython()
115 107 ip.run_cell("a = 10")
116 108 ip.run_cell("a += 1")
117 109 nt.assert_equal(ip.user_ns["a"], 11)
118 110 ip.run_cell("%rerun")
119 111 nt.assert_equal(ip.user_ns["a"], 12)
@@ -1,277 +1,278 b''
1 1 """Generic testing tools that do NOT depend on Twisted.
2 2
3 3 In particular, this module exposes a set of top-level assert* functions that
4 4 can be used in place of nose.tools.assert* in method generators (the ones in
5 5 nose can not, at least as of nose 0.10.4).
6 6
7 7 Note: our testing package contains testing.util, which does depend on Twisted
8 8 and provides utilities for tests that manage Deferreds. All testing support
9 9 tools that only depend on nose, IPython or the standard library should go here
10 10 instead.
11 11
12 12
13 13 Authors
14 14 -------
15 15 - Fernando Perez <Fernando.Perez@berkeley.edu>
16 16 """
17 17
18 18 from __future__ import absolute_import
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Copyright (C) 2009 The IPython Development Team
22 22 #
23 23 # Distributed under the terms of the BSD License. The full license is in
24 24 # the file COPYING, distributed as part of this software.
25 25 #-----------------------------------------------------------------------------
26 26
27 27 #-----------------------------------------------------------------------------
28 28 # Imports
29 29 #-----------------------------------------------------------------------------
30 30
31 31 import os
32 32 import re
33 33 import sys
34 34
35 35 try:
36 36 # These tools are used by parts of the runtime, so we make the nose
37 37 # dependency optional at this point. Nose is a hard dependency to run the
38 38 # test suite, but NOT to use ipython itself.
39 39 import nose.tools as nt
40 40 has_nose = True
41 41 except ImportError:
42 42 has_nose = False
43 43
44 44 from IPython.config.loader import Config
45 45 from IPython.utils.process import find_cmd, getoutputerror
46 46 from IPython.utils.text import list_strings
47 47 from IPython.utils.io import temp_pyfile
48 48
49 49 from . import decorators as dec
50 50
51 51 #-----------------------------------------------------------------------------
52 52 # Globals
53 53 #-----------------------------------------------------------------------------
54 54
55 55 # Make a bunch of nose.tools assert wrappers that can be used in test
56 56 # generators. This will expose an assert* function for each one in nose.tools.
57 57
58 58 _tpl = """
59 59 def %(name)s(*a,**kw):
60 60 return nt.%(name)s(*a,**kw)
61 61 """
62 62
63 63 if has_nose:
64 64 for _x in [a for a in dir(nt) if a.startswith('assert')]:
65 65 exec _tpl % dict(name=_x)
66 66
67 67 #-----------------------------------------------------------------------------
68 68 # Functions and classes
69 69 #-----------------------------------------------------------------------------
70 70
71 71 # The docstring for full_path doctests differently on win32 (different path
72 72 # separator) so just skip the doctest there. The example remains informative.
73 73 doctest_deco = dec.skip_doctest if sys.platform == 'win32' else dec.null_deco
74 74
75 75 @doctest_deco
76 76 def full_path(startPath,files):
77 77 """Make full paths for all the listed files, based on startPath.
78 78
79 79 Only the base part of startPath is kept, since this routine is typically
80 80 used with a script's __file__ variable as startPath. The base of startPath
81 81 is then prepended to all the listed files, forming the output list.
82 82
83 83 Parameters
84 84 ----------
85 85 startPath : string
86 86 Initial path to use as the base for the results. This path is split
87 87 using os.path.split() and only its first component is kept.
88 88
89 89 files : string or list
90 90 One or more files.
91 91
92 92 Examples
93 93 --------
94 94
95 95 >>> full_path('/foo/bar.py',['a.txt','b.txt'])
96 96 ['/foo/a.txt', '/foo/b.txt']
97 97
98 98 >>> full_path('/foo',['a.txt','b.txt'])
99 99 ['/a.txt', '/b.txt']
100 100
101 101 If a single file is given, the output is still a list:
102 102 >>> full_path('/foo','a.txt')
103 103 ['/a.txt']
104 104 """
105 105
106 106 files = list_strings(files)
107 107 base = os.path.split(startPath)[0]
108 108 return [ os.path.join(base,f) for f in files ]
109 109
110 110
111 111 def parse_test_output(txt):
112 112 """Parse the output of a test run and return errors, failures.
113 113
114 114 Parameters
115 115 ----------
116 116 txt : str
117 117 Text output of a test run, assumed to contain a line of one of the
118 118 following forms::
119 119 'FAILED (errors=1)'
120 120 'FAILED (failures=1)'
121 121 'FAILED (errors=1, failures=1)'
122 122
123 123 Returns
124 124 -------
125 125 nerr, nfail: number of errors and failures.
126 126 """
127 127
128 128 err_m = re.search(r'^FAILED \(errors=(\d+)\)', txt, re.MULTILINE)
129 129 if err_m:
130 130 nerr = int(err_m.group(1))
131 131 nfail = 0
132 132 return nerr, nfail
133 133
134 134 fail_m = re.search(r'^FAILED \(failures=(\d+)\)', txt, re.MULTILINE)
135 135 if fail_m:
136 136 nerr = 0
137 137 nfail = int(fail_m.group(1))
138 138 return nerr, nfail
139 139
140 140 both_m = re.search(r'^FAILED \(errors=(\d+), failures=(\d+)\)', txt,
141 141 re.MULTILINE)
142 142 if both_m:
143 143 nerr = int(both_m.group(1))
144 144 nfail = int(both_m.group(2))
145 145 return nerr, nfail
146 146
147 147 # If the input didn't match any of these forms, assume no error/failures
148 148 return 0, 0
149 149
150 150
151 151 # So nose doesn't think this is a test
152 152 parse_test_output.__test__ = False
153 153
154 154
155 155 def default_argv():
156 156 """Return a valid default argv for creating testing instances of ipython"""
157 157
158 158 return ['--quick', # so no config file is loaded
159 159 # Other defaults to minimize side effects on stdout
160 160 '--colors=NoColor', '--no-term-title','--no-banner',
161 161 '--autocall=0']
162 162
163 163
164 164 def default_config():
165 165 """Return a config object with good defaults for testing."""
166 166 config = Config()
167 167 config.TerminalInteractiveShell.colors = 'NoColor'
168 168 config.TerminalTerminalInteractiveShell.term_title = False,
169 169 config.TerminalInteractiveShell.autocall = 0
170 config.HistoryManager.hist_file = u':memory:'
170 171 return config
171 172
172 173
173 174 def ipexec(fname, options=None):
174 175 """Utility to call 'ipython filename'.
175 176
176 177 Starts IPython witha minimal and safe configuration to make startup as fast
177 178 as possible.
178 179
179 180 Note that this starts IPython in a subprocess!
180 181
181 182 Parameters
182 183 ----------
183 184 fname : str
184 185 Name of file to be executed (should have .py or .ipy extension).
185 186
186 187 options : optional, list
187 188 Extra command-line flags to be passed to IPython.
188 189
189 190 Returns
190 191 -------
191 192 (stdout, stderr) of ipython subprocess.
192 193 """
193 194 if options is None: options = []
194 195
195 196 # For these subprocess calls, eliminate all prompt printing so we only see
196 197 # output from script execution
197 198 prompt_opts = ['--prompt-in1=""', '--prompt-in2=""', '--prompt-out=""']
198 199 cmdargs = ' '.join(default_argv() + prompt_opts + options)
199 200
200 201 _ip = get_ipython()
201 202 test_dir = os.path.dirname(__file__)
202 203
203 204 ipython_cmd = find_cmd('ipython')
204 205 # Absolute path for filename
205 206 full_fname = os.path.join(test_dir, fname)
206 207 full_cmd = '%s %s %s' % (ipython_cmd, cmdargs, full_fname)
207 208 #print >> sys.stderr, 'FULL CMD:', full_cmd # dbg
208 209 return getoutputerror(full_cmd)
209 210
210 211
211 212 def ipexec_validate(fname, expected_out, expected_err='',
212 213 options=None):
213 214 """Utility to call 'ipython filename' and validate output/error.
214 215
215 216 This function raises an AssertionError if the validation fails.
216 217
217 218 Note that this starts IPython in a subprocess!
218 219
219 220 Parameters
220 221 ----------
221 222 fname : str
222 223 Name of the file to be executed (should have .py or .ipy extension).
223 224
224 225 expected_out : str
225 226 Expected stdout of the process.
226 227
227 228 expected_err : optional, str
228 229 Expected stderr of the process.
229 230
230 231 options : optional, list
231 232 Extra command-line flags to be passed to IPython.
232 233
233 234 Returns
234 235 -------
235 236 None
236 237 """
237 238
238 239 import nose.tools as nt
239 240
240 241 out, err = ipexec(fname)
241 242 #print 'OUT', out # dbg
242 243 #print 'ERR', err # dbg
243 244 # If there are any errors, we must check those befor stdout, as they may be
244 245 # more informative than simply having an empty stdout.
245 246 if err:
246 247 if expected_err:
247 248 nt.assert_equals(err.strip(), expected_err.strip())
248 249 else:
249 250 raise ValueError('Running file %r produced error: %r' %
250 251 (fname, err))
251 252 # If no errors or output on stderr was expected, match stdout
252 253 nt.assert_equals(out.strip(), expected_out.strip())
253 254
254 255
255 256 class TempFileMixin(object):
256 257 """Utility class to create temporary Python/IPython files.
257 258
258 259 Meant as a mixin class for test cases."""
259 260
260 261 def mktmp(self, src, ext='.py'):
261 262 """Make a valid python temp file."""
262 263 fname, f = temp_pyfile(src, ext)
263 264 self.tmpfile = f
264 265 self.fname = fname
265 266
266 267 def tearDown(self):
267 268 if hasattr(self, 'tmpfile'):
268 269 # If the tmpfile wasn't made because of skipped tests, like in
269 270 # win32, there's nothing to cleanup.
270 271 self.tmpfile.close()
271 272 try:
272 273 os.unlink(self.fname)
273 274 except:
274 275 # On Windows, even though we close the file, we still can't
275 276 # delete it. I have no clue why
276 277 pass
277 278
General Comments 0
You need to be logged in to leave comments. Login now