##// END OF EJS Templates
Fix almost all IPython.core tests for Python 3.
Thomas Kluyver -
Show More
@@ -1,772 +1,771 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Tools for inspecting Python objects.
3 3
4 4 Uses syntax highlighting for presenting the various information elements.
5 5
6 6 Similar in spirit to the inspect module, but all calls take a name argument to
7 7 reference the name under which an object is being read.
8 8 """
9 9
10 10 #*****************************************************************************
11 11 # Copyright (C) 2001-2004 Fernando Perez <fperez@colorado.edu>
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #*****************************************************************************
16 16
17 17 __all__ = ['Inspector','InspectColors']
18 18
19 19 # stdlib modules
20 20 import __builtin__
21 21 import inspect
22 22 import linecache
23 23 import os
24 24 import sys
25 25 import types
26 26 from collections import namedtuple
27 27 try:
28 28 from itertools import izip_longest
29 29 except ImportError:
30 30 from itertools import zip_longest as izip_longest
31 31
32 32 # IPython's own
33 33 from IPython.core import page
34 34 from IPython.utils import PyColorize
35 35 from IPython.utils import io
36 36 from IPython.utils import py3compat
37 37 from IPython.utils.text import indent
38 38 from IPython.utils.wildcard import list_namespace
39 39 from IPython.utils.coloransi import *
40 40
41 41 #****************************************************************************
42 42 # Builtin color schemes
43 43
44 44 Colors = TermColors # just a shorthand
45 45
46 46 # Build a few color schemes
47 47 NoColor = ColorScheme(
48 48 'NoColor',{
49 49 'header' : Colors.NoColor,
50 50 'normal' : Colors.NoColor # color off (usu. Colors.Normal)
51 51 } )
52 52
53 53 LinuxColors = ColorScheme(
54 54 'Linux',{
55 55 'header' : Colors.LightRed,
56 56 'normal' : Colors.Normal # color off (usu. Colors.Normal)
57 57 } )
58 58
59 59 LightBGColors = ColorScheme(
60 60 'LightBG',{
61 61 'header' : Colors.Red,
62 62 'normal' : Colors.Normal # color off (usu. Colors.Normal)
63 63 } )
64 64
65 65 # Build table of color schemes (needed by the parser)
66 66 InspectColors = ColorSchemeTable([NoColor,LinuxColors,LightBGColors],
67 67 'Linux')
68 68
69 69 #****************************************************************************
70 70 # Auxiliary functions and objects
71 71
72 72 # See the messaging spec for the definition of all these fields. This list
73 73 # effectively defines the order of display
74 74 info_fields = ['type_name', 'base_class', 'string_form', 'namespace',
75 75 'length', 'file', 'definition', 'docstring', 'source',
76 76 'init_definition', 'class_docstring', 'init_docstring',
77 77 'call_def', 'call_docstring',
78 78 # These won't be printed but will be used to determine how to
79 79 # format the object
80 80 'ismagic', 'isalias', 'isclass', 'argspec', 'found', 'name'
81 81 ]
82 82
83 83
84 84 def object_info(**kw):
85 85 """Make an object info dict with all fields present."""
86 86 infodict = dict(izip_longest(info_fields, [None]))
87 87 infodict.update(kw)
88 88 return infodict
89 89
90 90
91 91 def getdoc(obj):
92 92 """Stable wrapper around inspect.getdoc.
93 93
94 94 This can't crash because of attribute problems.
95 95
96 96 It also attempts to call a getdoc() method on the given object. This
97 97 allows objects which provide their docstrings via non-standard mechanisms
98 98 (like Pyro proxies) to still be inspected by ipython's ? system."""
99 99
100 100 ds = None # default return value
101 101 try:
102 102 ds = inspect.getdoc(obj)
103 103 except:
104 104 # Harden against an inspect failure, which can occur with
105 105 # SWIG-wrapped extensions.
106 106 pass
107 107 # Allow objects to offer customized documentation via a getdoc method:
108 108 try:
109 109 ds2 = obj.getdoc()
110 110 except:
111 111 pass
112 112 else:
113 113 # if we get extra info, we add it to the normal docstring.
114 114 if ds is None:
115 115 ds = ds2
116 116 else:
117 117 ds = '%s\n%s' % (ds,ds2)
118 118 return ds
119 119
120 120
121 121 def getsource(obj,is_binary=False):
122 122 """Wrapper around inspect.getsource.
123 123
124 124 This can be modified by other projects to provide customized source
125 125 extraction.
126 126
127 127 Inputs:
128 128
129 129 - obj: an object whose source code we will attempt to extract.
130 130
131 131 Optional inputs:
132 132
133 133 - is_binary: whether the object is known to come from a binary source.
134 134 This implementation will skip returning any output for binary objects, but
135 135 custom extractors may know how to meaningfully process them."""
136 136
137 137 if is_binary:
138 138 return None
139 139 else:
140 140 # get source if obj was decorated with @decorator
141 141 if hasattr(obj,"__wrapped__"):
142 142 obj = obj.__wrapped__
143 143 try:
144 144 src = inspect.getsource(obj)
145 145 except TypeError:
146 146 if hasattr(obj,'__class__'):
147 147 src = inspect.getsource(obj.__class__)
148 148 return src
149 149
150 150 def getargspec(obj):
151 151 """Get the names and default values of a function's arguments.
152 152
153 153 A tuple of four things is returned: (args, varargs, varkw, defaults).
154 154 'args' is a list of the argument names (it may contain nested lists).
155 155 'varargs' and 'varkw' are the names of the * and ** arguments or None.
156 156 'defaults' is an n-tuple of the default values of the last n arguments.
157 157
158 158 Modified version of inspect.getargspec from the Python Standard
159 159 Library."""
160 160
161 161 if inspect.isfunction(obj):
162 162 func_obj = obj
163 163 elif inspect.ismethod(obj):
164 164 func_obj = obj.im_func
165 165 elif hasattr(obj, '__call__'):
166 166 func_obj = obj.__call__
167 167 else:
168 168 raise TypeError('arg is not a Python function')
169 169 args, varargs, varkw = inspect.getargs(func_obj.func_code)
170 170 return args, varargs, varkw, func_obj.func_defaults
171 171
172 172
173 173 def format_argspec(argspec):
174 174 """Format argspect, convenience wrapper around inspect's.
175 175
176 176 This takes a dict instead of ordered arguments and calls
177 177 inspect.format_argspec with the arguments in the necessary order.
178 178 """
179 179 return inspect.formatargspec(argspec['args'], argspec['varargs'],
180 180 argspec['varkw'], argspec['defaults'])
181 181
182 182
183 183 def call_tip(oinfo, format_call=True):
184 184 """Extract call tip data from an oinfo dict.
185 185
186 186 Parameters
187 187 ----------
188 188 oinfo : dict
189 189
190 190 format_call : bool, optional
191 191 If True, the call line is formatted and returned as a string. If not, a
192 192 tuple of (name, argspec) is returned.
193 193
194 194 Returns
195 195 -------
196 196 call_info : None, str or (str, dict) tuple.
197 197 When format_call is True, the whole call information is formattted as a
198 198 single string. Otherwise, the object's name and its argspec dict are
199 199 returned. If no call information is available, None is returned.
200 200
201 201 docstring : str or None
202 202 The most relevant docstring for calling purposes is returned, if
203 203 available. The priority is: call docstring for callable instances, then
204 204 constructor docstring for classes, then main object's docstring otherwise
205 205 (regular functions).
206 206 """
207 207 # Get call definition
208 208 argspec = oinfo.get('argspec')
209 209 if argspec is None:
210 210 call_line = None
211 211 else:
212 212 # Callable objects will have 'self' as their first argument, prune
213 213 # it out if it's there for clarity (since users do *not* pass an
214 214 # extra first argument explicitly).
215 215 try:
216 216 has_self = argspec['args'][0] == 'self'
217 217 except (KeyError, IndexError):
218 218 pass
219 219 else:
220 220 if has_self:
221 221 argspec['args'] = argspec['args'][1:]
222 222
223 223 call_line = oinfo['name']+format_argspec(argspec)
224 224
225 225 # Now get docstring.
226 226 # The priority is: call docstring, constructor docstring, main one.
227 227 doc = oinfo.get('call_docstring')
228 228 if doc is None:
229 229 doc = oinfo.get('init_docstring')
230 230 if doc is None:
231 231 doc = oinfo.get('docstring','')
232 232
233 233 return call_line, doc
234 234
235 235
236 236 class Inspector:
237 237 def __init__(self, color_table=InspectColors,
238 238 code_color_table=PyColorize.ANSICodeColors,
239 239 scheme='NoColor',
240 240 str_detail_level=0):
241 241 self.color_table = color_table
242 242 self.parser = PyColorize.Parser(code_color_table,out='str')
243 243 self.format = self.parser.format
244 244 self.str_detail_level = str_detail_level
245 245 self.set_active_scheme(scheme)
246 246
247 247 def _getdef(self,obj,oname=''):
248 248 """Return the definition header for any callable object.
249 249
250 250 If any exception is generated, None is returned instead and the
251 251 exception is suppressed."""
252 252
253 253 try:
254 254 # We need a plain string here, NOT unicode!
255 255 hdef = oname + inspect.formatargspec(*getargspec(obj))
256 256 return py3compat.unicode_to_str(hdef, 'ascii')
257 257 except:
258 258 return None
259 259
260 260 def __head(self,h):
261 261 """Return a header string with proper colors."""
262 262 return '%s%s%s' % (self.color_table.active_colors.header,h,
263 263 self.color_table.active_colors.normal)
264 264
265 265 def set_active_scheme(self,scheme):
266 266 self.color_table.set_active_scheme(scheme)
267 267 self.parser.color_table.set_active_scheme(scheme)
268 268
269 269 def noinfo(self,msg,oname):
270 270 """Generic message when no information is found."""
271 271 print 'No %s found' % msg,
272 272 if oname:
273 273 print 'for %s' % oname
274 274 else:
275 275 print
276 276
277 277 def pdef(self,obj,oname=''):
278 278 """Print the definition header for any callable object.
279 279
280 280 If the object is a class, print the constructor information."""
281 281
282 282 if not callable(obj):
283 283 print 'Object is not callable.'
284 284 return
285 285
286 286 header = ''
287 287
288 288 if inspect.isclass(obj):
289 289 header = self.__head('Class constructor information:\n')
290 290 obj = obj.__init__
291 291 elif type(obj) is types.InstanceType:
292 292 obj = obj.__call__
293 293
294 294 output = self._getdef(obj,oname)
295 295 if output is None:
296 296 self.noinfo('definition header',oname)
297 297 else:
298 298 print >>io.stdout, header,self.format(output),
299 299
300 300 def pdoc(self,obj,oname='',formatter = None):
301 301 """Print the docstring for any object.
302 302
303 303 Optional:
304 304 -formatter: a function to run the docstring through for specially
305 305 formatted docstrings.
306 306
307 307 Examples
308 308 --------
309 309
310 310 In [1]: class NoInit:
311 311 ...: pass
312 312
313 313 In [2]: class NoDoc:
314 314 ...: def __init__(self):
315 315 ...: pass
316 316
317 317 In [3]: %pdoc NoDoc
318 318 No documentation found for NoDoc
319 319
320 320 In [4]: %pdoc NoInit
321 321 No documentation found for NoInit
322 322
323 323 In [5]: obj = NoInit()
324 324
325 325 In [6]: %pdoc obj
326 326 No documentation found for obj
327 327
328 328 In [5]: obj2 = NoDoc()
329 329
330 330 In [6]: %pdoc obj2
331 331 No documentation found for obj2
332 332 """
333 333
334 334 head = self.__head # For convenience
335 335 lines = []
336 336 ds = getdoc(obj)
337 337 if formatter:
338 338 ds = formatter(ds)
339 339 if ds:
340 340 lines.append(head("Class Docstring:"))
341 341 lines.append(indent(ds))
342 342 if inspect.isclass(obj) and hasattr(obj, '__init__'):
343 343 init_ds = getdoc(obj.__init__)
344 344 if init_ds is not None:
345 345 lines.append(head("Constructor Docstring:"))
346 346 lines.append(indent(init_ds))
347 elif (type(obj) is types.InstanceType or isinstance(obj,object)) \
348 and hasattr(obj,'__call__'):
347 elif hasattr(obj,'__call__'):
349 348 call_ds = getdoc(obj.__call__)
350 349 if call_ds:
351 350 lines.append(head("Calling Docstring:"))
352 351 lines.append(indent(call_ds))
353 352
354 353 if not lines:
355 354 self.noinfo('documentation',oname)
356 355 else:
357 356 page.page('\n'.join(lines))
358 357
359 358 def psource(self,obj,oname=''):
360 359 """Print the source code for an object."""
361 360
362 361 # Flush the source cache because inspect can return out-of-date source
363 362 linecache.checkcache()
364 363 try:
365 364 src = getsource(obj)
366 365 except:
367 366 self.noinfo('source',oname)
368 367 else:
369 368 page.page(self.format(py3compat.unicode_to_str(src)))
370 369
371 370 def pfile(self,obj,oname=''):
372 371 """Show the whole file where an object was defined."""
373 372
374 373 try:
375 374 try:
376 375 lineno = inspect.getsourcelines(obj)[1]
377 376 except TypeError:
378 377 # For instances, try the class object like getsource() does
379 378 if hasattr(obj,'__class__'):
380 379 lineno = inspect.getsourcelines(obj.__class__)[1]
381 380 # Adjust the inspected object so getabsfile() below works
382 381 obj = obj.__class__
383 382 except:
384 383 self.noinfo('file',oname)
385 384 return
386 385
387 386 # We only reach this point if object was successfully queried
388 387
389 388 # run contents of file through pager starting at line
390 389 # where the object is defined
391 390 ofile = inspect.getabsfile(obj)
392 391
393 392 if ofile.endswith(('.so', '.dll', '.pyd')):
394 393 print 'File %r is binary, not printing.' % ofile
395 394 elif not os.path.isfile(ofile):
396 395 print 'File %r does not exist, not printing.' % ofile
397 396 else:
398 397 # Print only text files, not extension binaries. Note that
399 398 # getsourcelines returns lineno with 1-offset and page() uses
400 399 # 0-offset, so we must adjust.
401 400 page.page(self.format(open(ofile).read()),lineno-1)
402 401
403 402 def _format_fields(self, fields, title_width=12):
404 403 """Formats a list of fields for display.
405 404
406 405 Parameters
407 406 ----------
408 407 fields : list
409 408 A list of 2-tuples: (field_title, field_content)
410 409 title_width : int
411 410 How many characters to pad titles to. Default 12.
412 411 """
413 412 out = []
414 413 header = self.__head
415 414 for title, content in fields:
416 415 if len(content.splitlines()) > 1:
417 416 title = header(title + ":") + "\n"
418 417 else:
419 418 title = header((title+":").ljust(title_width))
420 419 out.append(title + content)
421 420 return "\n".join(out)
422 421
423 422 # The fields to be displayed by pinfo: (fancy_name, key_in_info_dict)
424 423 pinfo_fields1 = [("Type", "type_name"),
425 424 ("Base Class", "base_class"),
426 425 ("String Form", "string_form"),
427 426 ("Namespace", "namespace"),
428 427 ("Length", "length"),
429 428 ("File", "file"),
430 429 ("Definition", "definition")]
431 430
432 431 pinfo_fields_obj = [("Class Docstring", "class_docstring"),
433 432 ("Constructor Docstring","init_docstring"),
434 433 ("Call def", "call_def"),
435 434 ("Call docstring", "call_docstring")]
436 435
437 436 def pinfo(self,obj,oname='',formatter=None,info=None,detail_level=0):
438 437 """Show detailed information about an object.
439 438
440 439 Optional arguments:
441 440
442 441 - oname: name of the variable pointing to the object.
443 442
444 443 - formatter: special formatter for docstrings (see pdoc)
445 444
446 445 - info: a structure with some information fields which may have been
447 446 precomputed already.
448 447
449 448 - detail_level: if set to 1, more information is given.
450 449 """
451 450 info = self.info(obj, oname=oname, formatter=formatter,
452 451 info=info, detail_level=detail_level)
453 452 displayfields = []
454 453 for title, key in self.pinfo_fields1:
455 454 field = info[key]
456 455 if field is not None:
457 456 displayfields.append((title, field.rstrip()))
458 457
459 458 # Source or docstring, depending on detail level and whether
460 459 # source found.
461 460 if detail_level > 0 and info['source'] is not None:
462 461 displayfields.append(("Source", self.format(py3compat.unicode_to_str(info['source']))))
463 462 elif info['docstring'] is not None:
464 463 displayfields.append(("Docstring", info["docstring"]))
465 464
466 465 # Constructor info for classes
467 466 if info['isclass']:
468 467 if info['init_definition'] or info['init_docstring']:
469 468 displayfields.append(("Constructor information", ""))
470 469 if info['init_definition'] is not None:
471 470 displayfields.append((" Definition",
472 471 info['init_definition'].rstrip()))
473 472 if info['init_docstring'] is not None:
474 473 displayfields.append((" Docstring",
475 474 indent(info['init_docstring'])))
476 475
477 476 # Info for objects:
478 477 else:
479 478 for title, key in self.pinfo_fields_obj:
480 479 field = info[key]
481 480 if field is not None:
482 481 displayfields.append((title, field.rstrip()))
483 482
484 483 # Finally send to printer/pager:
485 484 if displayfields:
486 485 page.page(self._format_fields(displayfields))
487 486
488 487 def info(self, obj, oname='', formatter=None, info=None, detail_level=0):
489 488 """Compute a dict with detailed information about an object.
490 489
491 490 Optional arguments:
492 491
493 492 - oname: name of the variable pointing to the object.
494 493
495 494 - formatter: special formatter for docstrings (see pdoc)
496 495
497 496 - info: a structure with some information fields which may have been
498 497 precomputed already.
499 498
500 499 - detail_level: if set to 1, more information is given.
501 500 """
502 501
503 502 obj_type = type(obj)
504 503
505 504 header = self.__head
506 505 if info is None:
507 506 ismagic = 0
508 507 isalias = 0
509 508 ospace = ''
510 509 else:
511 510 ismagic = info.ismagic
512 511 isalias = info.isalias
513 512 ospace = info.namespace
514 513
515 514 # Get docstring, special-casing aliases:
516 515 if isalias:
517 516 if not callable(obj):
518 517 try:
519 518 ds = "Alias to the system command:\n %s" % obj[1]
520 519 except:
521 520 ds = "Alias: " + str(obj)
522 521 else:
523 522 ds = "Alias to " + str(obj)
524 523 if obj.__doc__:
525 524 ds += "\nDocstring:\n" + obj.__doc__
526 525 else:
527 526 ds = getdoc(obj)
528 527 if ds is None:
529 528 ds = '<no docstring>'
530 529 if formatter is not None:
531 530 ds = formatter(ds)
532 531
533 532 # store output in a dict, we initialize it here and fill it as we go
534 533 out = dict(name=oname, found=True, isalias=isalias, ismagic=ismagic)
535 534
536 535 string_max = 200 # max size of strings to show (snipped if longer)
537 536 shalf = int((string_max -5)/2)
538 537
539 538 if ismagic:
540 539 obj_type_name = 'Magic function'
541 540 elif isalias:
542 541 obj_type_name = 'System alias'
543 542 else:
544 543 obj_type_name = obj_type.__name__
545 544 out['type_name'] = obj_type_name
546 545
547 546 try:
548 547 bclass = obj.__class__
549 548 out['base_class'] = str(bclass)
550 549 except: pass
551 550
552 551 # String form, but snip if too long in ? form (full in ??)
553 552 if detail_level >= self.str_detail_level:
554 553 try:
555 554 ostr = str(obj)
556 555 str_head = 'string_form'
557 556 if not detail_level and len(ostr)>string_max:
558 557 ostr = ostr[:shalf] + ' <...> ' + ostr[-shalf:]
559 558 ostr = ("\n" + " " * len(str_head.expandtabs())).\
560 559 join(q.strip() for q in ostr.split("\n"))
561 560 out[str_head] = ostr
562 561 except:
563 562 pass
564 563
565 564 if ospace:
566 565 out['namespace'] = ospace
567 566
568 567 # Length (for strings and lists)
569 568 try:
570 569 out['length'] = str(len(obj))
571 570 except: pass
572 571
573 572 # Filename where object was defined
574 573 binary_file = False
575 574 try:
576 575 try:
577 576 fname = inspect.getabsfile(obj)
578 577 except TypeError:
579 578 # For an instance, the file that matters is where its class was
580 579 # declared.
581 580 if hasattr(obj,'__class__'):
582 581 fname = inspect.getabsfile(obj.__class__)
583 582 if fname.endswith('<string>'):
584 583 fname = 'Dynamically generated function. No source code available.'
585 584 if fname.endswith(('.so', '.dll', '.pyd')):
586 585 binary_file = True
587 586 out['file'] = fname
588 587 except:
589 588 # if anything goes wrong, we don't want to show source, so it's as
590 589 # if the file was binary
591 590 binary_file = True
592 591
593 592 # reconstruct the function definition and print it:
594 593 defln = self._getdef(obj, oname)
595 594 if defln:
596 595 out['definition'] = self.format(defln)
597 596
598 597 # Docstrings only in detail 0 mode, since source contains them (we
599 598 # avoid repetitions). If source fails, we add them back, see below.
600 599 if ds and detail_level == 0:
601 600 out['docstring'] = ds
602 601
603 602 # Original source code for any callable
604 603 if detail_level:
605 604 # Flush the source cache because inspect can return out-of-date
606 605 # source
607 606 linecache.checkcache()
608 607 source = None
609 608 try:
610 609 try:
611 610 source = getsource(obj,binary_file)
612 611 except TypeError:
613 612 if hasattr(obj,'__class__'):
614 613 source = getsource(obj.__class__,binary_file)
615 614 if source is not None:
616 615 out['source'] = source.rstrip()
617 616 except Exception:
618 617 pass
619 618
620 619 if ds and source is None:
621 620 out['docstring'] = ds
622 621
623 622
624 623 # Constructor docstring for classes
625 624 if inspect.isclass(obj):
626 625 out['isclass'] = True
627 626 # reconstruct the function definition and print it:
628 627 try:
629 628 obj_init = obj.__init__
630 629 except AttributeError:
631 630 init_def = init_ds = None
632 631 else:
633 632 init_def = self._getdef(obj_init,oname)
634 633 init_ds = getdoc(obj_init)
635 634 # Skip Python's auto-generated docstrings
636 635 if init_ds and \
637 636 init_ds.startswith('x.__init__(...) initializes'):
638 637 init_ds = None
639 638
640 639 if init_def or init_ds:
641 640 if init_def:
642 641 out['init_definition'] = self.format(init_def)
643 642 if init_ds:
644 643 out['init_docstring'] = init_ds
645 644
646 645 # and class docstring for instances:
647 646 else:
648 647 # First, check whether the instance docstring is identical to the
649 648 # class one, and print it separately if they don't coincide. In
650 649 # most cases they will, but it's nice to print all the info for
651 650 # objects which use instance-customized docstrings.
652 651 if ds:
653 652 try:
654 653 cls = getattr(obj,'__class__')
655 654 except:
656 655 class_ds = None
657 656 else:
658 657 class_ds = getdoc(cls)
659 658 # Skip Python's auto-generated docstrings
660 659 if class_ds and \
661 660 (class_ds.startswith('function(code, globals[,') or \
662 661 class_ds.startswith('instancemethod(function, instance,') or \
663 662 class_ds.startswith('module(name[,') ):
664 663 class_ds = None
665 664 if class_ds and ds != class_ds:
666 665 out['class_docstring'] = class_ds
667 666
668 667 # Next, try to show constructor docstrings
669 668 try:
670 669 init_ds = getdoc(obj.__init__)
671 670 # Skip Python's auto-generated docstrings
672 671 if init_ds and \
673 672 init_ds.startswith('x.__init__(...) initializes'):
674 673 init_ds = None
675 674 except AttributeError:
676 675 init_ds = None
677 676 if init_ds:
678 677 out['init_docstring'] = init_ds
679 678
680 679 # Call form docstring for callable instances
681 680 if hasattr(obj, '__call__'):
682 681 call_def = self._getdef(obj.__call__, oname)
683 682 if call_def is not None:
684 683 out['call_def'] = self.format(call_def)
685 684 call_ds = getdoc(obj.__call__)
686 685 # Skip Python's auto-generated docstrings
687 686 if call_ds and call_ds.startswith('x.__call__(...) <==> x(...)'):
688 687 call_ds = None
689 688 if call_ds:
690 689 out['call_docstring'] = call_ds
691 690
692 691 # Compute the object's argspec as a callable. The key is to decide
693 692 # whether to pull it from the object itself, from its __init__ or
694 693 # from its __call__ method.
695 694
696 695 if inspect.isclass(obj):
697 696 # Old-style classes need not have an __init__
698 697 callable_obj = getattr(obj, "__init__", None)
699 698 elif callable(obj):
700 699 callable_obj = obj
701 700 else:
702 701 callable_obj = None
703 702
704 703 if callable_obj:
705 704 try:
706 705 args, varargs, varkw, defaults = getargspec(callable_obj)
707 706 except (TypeError, AttributeError):
708 707 # For extensions/builtins we can't retrieve the argspec
709 708 pass
710 709 else:
711 710 out['argspec'] = dict(args=args, varargs=varargs,
712 711 varkw=varkw, defaults=defaults)
713 712
714 713 return object_info(**out)
715 714
716 715
717 716 def psearch(self,pattern,ns_table,ns_search=[],
718 717 ignore_case=False,show_all=False):
719 718 """Search namespaces with wildcards for objects.
720 719
721 720 Arguments:
722 721
723 722 - pattern: string containing shell-like wildcards to use in namespace
724 723 searches and optionally a type specification to narrow the search to
725 724 objects of that type.
726 725
727 726 - ns_table: dict of name->namespaces for search.
728 727
729 728 Optional arguments:
730 729
731 730 - ns_search: list of namespace names to include in search.
732 731
733 732 - ignore_case(False): make the search case-insensitive.
734 733
735 734 - show_all(False): show all names, including those starting with
736 735 underscores.
737 736 """
738 737 #print 'ps pattern:<%r>' % pattern # dbg
739 738
740 739 # defaults
741 740 type_pattern = 'all'
742 741 filter = ''
743 742
744 743 cmds = pattern.split()
745 744 len_cmds = len(cmds)
746 745 if len_cmds == 1:
747 746 # Only filter pattern given
748 747 filter = cmds[0]
749 748 elif len_cmds == 2:
750 749 # Both filter and type specified
751 750 filter,type_pattern = cmds
752 751 else:
753 752 raise ValueError('invalid argument string for psearch: <%s>' %
754 753 pattern)
755 754
756 755 # filter search namespaces
757 756 for name in ns_search:
758 757 if name not in ns_table:
759 758 raise ValueError('invalid namespace <%s>. Valid names: %s' %
760 759 (name,ns_table.keys()))
761 760
762 761 #print 'type_pattern:',type_pattern # dbg
763 762 search_result = []
764 763 for ns_name in ns_search:
765 764 ns = ns_table[ns_name]
766 765 tmp_res = list(list_namespace(ns,type_pattern,filter,
767 766 ignore_case=ignore_case,
768 767 show_all=show_all))
769 768 search_result.extend(tmp_res)
770 769 search_result.sort()
771 770
772 771 page.page('\n'.join(search_result))
@@ -1,167 +1,168 b''
1 1 """Tests for input handlers.
2 2 """
3 3 #-----------------------------------------------------------------------------
4 4 # Module imports
5 5 #-----------------------------------------------------------------------------
6 6
7 7 # third party
8 8 import nose.tools as nt
9 9
10 10 # our own packages
11 11 from IPython.core import autocall
12 12 from IPython.testing import decorators as dec
13 13 from IPython.testing import tools as tt
14 14 from IPython.testing.globalipapp import get_ipython
15 from IPython.utils import py3compat
15 16
16 17 #-----------------------------------------------------------------------------
17 18 # Globals
18 19 #-----------------------------------------------------------------------------
19 20
20 21 # Get the public instance of IPython
21 22 ip = get_ipython()
22 23
23 24 failures = []
24 25 num_tests = 0
25 26
26 27 #-----------------------------------------------------------------------------
27 28 # Test functions
28 29 #-----------------------------------------------------------------------------
29 30
30 31 class CallableIndexable(object):
31 32 def __getitem__(self, idx): return True
32 33 def __call__(self, *args, **kws): return True
33 34
34 35
35 36 class Autocallable(autocall.IPyAutocall):
36 37 def __call__(self):
37 38 return "called"
38 39
39 40
40 41 def run(tests):
41 42 """Loop through a list of (pre, post) inputs, where pre is the string
42 43 handed to ipython, and post is how that string looks after it's been
43 44 transformed (i.e. ipython's notion of _i)"""
44 45 tt.check_pairs(ip.prefilter_manager.prefilter_lines, tests)
45 46
46 47
47 48 def test_handlers():
48 49 # alias expansion
49 50
50 51 # We're using 'true' as our syscall of choice because it doesn't
51 52 # write anything to stdout.
52 53
53 54 # Turn off actual execution of aliases, because it's noisy
54 55 old_system_cmd = ip.system
55 56 ip.system = lambda cmd: None
56 57
57 58
58 59 ip.alias_manager.alias_table['an_alias'] = (0, 'true')
59 60 # These are useful for checking a particular recursive alias issue
60 61 ip.alias_manager.alias_table['top'] = (0, 'd:/cygwin/top')
61 62 ip.alias_manager.alias_table['d'] = (0, 'true')
62 63 run([(i,py3compat.u_format(o)) for i,o in \
63 64 [("an_alias", 'get_ipython().system({u}"true ")'), # alias
64 65 # Below: recursive aliases should expand whitespace-surrounded
65 66 # chars, *not* initial chars which happen to be aliases:
66 67 ("top", 'get_ipython().system({u}"d:/cygwin/top ")'),
67 68 ]])
68 69 ip.system = old_system_cmd
69 70
70 71 call_idx = CallableIndexable()
71 72 ip.user_ns['call_idx'] = call_idx
72 73
73 74 # For many of the below, we're also checking that leading whitespace
74 75 # turns off the esc char, which it should unless there is a continuation
75 76 # line.
76 77 run([(i,py3compat.u_format(o)) for i,o in \
77 78 [('"no change"', '"no change"'), # normal
78 79 ("!true", 'get_ipython().system({u}"true")'), # shell_escapes
79 80 ("!! true", 'get_ipython().magic({u}"sx true")'), # shell_escapes + magic
80 81 ("!!true", 'get_ipython().magic({u}"sx true")'), # shell_escapes + magic
81 82 ("%lsmagic", 'get_ipython().magic({u}"lsmagic ")'), # magic
82 83 ("lsmagic", 'get_ipython().magic({u}"lsmagic ")'), # magic
83 84 #("a = b # PYTHON-MODE", '_i'), # emacs -- avoids _in cache
84 85
85 86 # post-esc-char whitespace goes inside
86 87 ("! true", 'get_ipython().system({u}" true")'),
87 88
88 89 # handle_help
89 90
90 91 # These are weak tests -- just looking at what the help handlers
91 92 # logs, which is not how it really does its work. But it still
92 93 # lets us check the key paths through the handler.
93 94
94 95 ("x=1 # what?", "x=1 # what?"), # no help if valid python
95 96 ]])
96 97
97 98 # multi_line_specials
98 99 ip.prefilter_manager.multi_line_specials = False
99 100 # W/ multi_line_specials off, leading ws kills esc chars/autoexpansion
100 101 run([
101 102 ('if 1:\n !true', 'if 1:\n !true'),
102 103 ('if 1:\n lsmagic', 'if 1:\n lsmagic'),
103 104 ('if 1:\n an_alias', 'if 1:\n an_alias'),
104 105 ])
105 106
106 107 ip.prefilter_manager.multi_line_specials = True
107 108 # initial indents must be preserved.
108 109 run([(i,py3compat.u_format(o)) for i,o in \
109 110 [('if 1:\n !true', 'if 1:\n get_ipython().system({u}"true")'),
110 111 ('if 2:\n lsmagic', 'if 2:\n get_ipython().magic({u}"lsmagic ")'),
111 112 ('if 1:\n an_alias', 'if 1:\n get_ipython().system({u}"true ")'),
112 113 # Weird one
113 114 ('if 1:\n !!true', 'if 1:\n get_ipython().magic({u}"sx true")'),
114 115
115 116 # Even with m_l_s on, autocall is off even with special chars
116 117 ('if 1:\n /fun 1 2', 'if 1:\n /fun 1 2'),
117 118 ('if 1:\n ;fun 1 2', 'if 1:\n ;fun 1 2'),
118 119 ('if 1:\n ,fun 1 2', 'if 1:\n ,fun 1 2'),
119 120 ('if 1:\n ?fun 1 2', 'if 1:\n ?fun 1 2'),
120 121 # What about !!
121 122 ]])
122 123
123 124 # Objects which are instances of IPyAutocall are *always* autocalled
124 125 autocallable = Autocallable()
125 126 ip.user_ns['autocallable'] = autocallable
126 127
127 128 # auto
128 129 ip.magic('autocall 0')
129 130 # Only explicit escapes or instances of IPyAutocallable should get
130 131 # expanded
131 132 run([
132 133 ('len "abc"', 'len "abc"'),
133 134 ('autocallable', 'autocallable()'),
134 135 (",list 1 2 3", 'list("1", "2", "3")'),
135 136 (";list 1 2 3", 'list("1 2 3")'),
136 137 ("/len range(1,4)", 'len(range(1,4))'),
137 138 ])
138 139 ip.magic('autocall 1')
139 140 run([
140 141 (",list 1 2 3", 'list("1", "2", "3")'),
141 142 (";list 1 2 3", 'list("1 2 3")'),
142 143 ("/len range(1,4)", 'len(range(1,4))'),
143 144 ('len "abc"', 'len("abc")'),
144 145 ('len "abc";', 'len("abc");'), # ; is special -- moves out of parens
145 146 # Autocall is turned off if first arg is [] and the object
146 147 # is both callable and indexable. Like so:
147 148 ('len [1,2]', 'len([1,2])'), # len doesn't support __getitem__...
148 149 ('call_idx [1]', 'call_idx [1]'), # call_idx *does*..
149 150 ('call_idx 1', 'call_idx(1)'),
150 151 ('len', 'len '), # only at 2 does it auto-call on single args
151 152 ])
152 153 ip.magic('autocall 2')
153 154 run([
154 155 (",list 1 2 3", 'list("1", "2", "3")'),
155 156 (";list 1 2 3", 'list("1 2 3")'),
156 157 ("/len range(1,4)", 'len(range(1,4))'),
157 158 ('len "abc"', 'len("abc")'),
158 159 ('len "abc";', 'len("abc");'),
159 160 ('len [1,2]', 'len([1,2])'),
160 161 ('call_idx [1]', 'call_idx [1]'),
161 162 ('call_idx 1', 'call_idx(1)'),
162 163 # This is what's different:
163 164 ('len', 'len()'), # only at 2 does it auto-call on single args
164 165 ])
165 166 ip.magic('autocall 1')
166 167
167 168 nt.assert_equals(failures, [])
@@ -1,118 +1,118 b''
1 1 # coding: utf-8
2 2 """Tests for the IPython tab-completion machinery.
3 3 """
4 4 #-----------------------------------------------------------------------------
5 5 # Module imports
6 6 #-----------------------------------------------------------------------------
7 7
8 8 # stdlib
9 9 import os
10 10 import sys
11 11 import unittest
12 12 from datetime import datetime
13 13 # third party
14 14 import nose.tools as nt
15 15
16 16 # our own packages
17 17 from IPython.utils.tempdir import TemporaryDirectory
18 18 from IPython.core.history import HistoryManager, extract_hist_ranges
19 19 from IPython.utils import py3compat
20 20
21 21 def setUp():
22 22 nt.assert_equal(sys.getdefaultencoding(), "utf-8" if py3compat.PY3 else "ascii")
23 23
24 24 def test_history():
25 25 ip = get_ipython()
26 26 with TemporaryDirectory() as tmpdir:
27 27 hist_manager_ori = ip.history_manager
28 28 hist_file = os.path.join(tmpdir, 'history.sqlite')
29 29 try:
30 30 ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file)
31 31 hist = ['a=1', 'def f():\n test = 1\n return test', u"b='β‚¬Γ†ΒΎΓ·ΓŸ'"]
32 32 for i, h in enumerate(hist, start=1):
33 33 ip.history_manager.store_inputs(i, h)
34 34
35 35 ip.history_manager.db_log_output = True
36 36 # Doesn't match the input, but we'll just check it's stored.
37 37 ip.history_manager.output_hist_reprs[3] = "spam"
38 38 ip.history_manager.store_output(3)
39 39
40 40 nt.assert_equal(ip.history_manager.input_hist_raw, [''] + hist)
41 41
42 42 # Check whether specifying a range beyond the end of the current
43 43 # session results in an error (gh-804)
44 44 ip.magic('%hist 2-500')
45 45
46 46 # New session
47 47 ip.history_manager.reset()
48 48 newcmds = ["z=5","class X(object):\n pass", "k='p'"]
49 49 for i, cmd in enumerate(newcmds, start=1):
50 50 ip.history_manager.store_inputs(i, cmd)
51 51 gothist = ip.history_manager.get_range(start=1, stop=4)
52 52 nt.assert_equal(list(gothist), zip([0,0,0],[1,2,3], newcmds))
53 53 # Previous session:
54 54 gothist = ip.history_manager.get_range(-1, 1, 4)
55 55 nt.assert_equal(list(gothist), zip([1,1,1],[1,2,3], hist))
56 56
57 57 # Check get_hist_tail
58 58 gothist = ip.history_manager.get_tail(4, output=True,
59 59 include_latest=True)
60 60 expected = [(1, 3, (hist[-1], "spam")),
61 61 (2, 1, (newcmds[0], None)),
62 62 (2, 2, (newcmds[1], None)),
63 63 (2, 3, (newcmds[2], None)),]
64 64 nt.assert_equal(list(gothist), expected)
65 65
66 66 gothist = ip.history_manager.get_tail(2)
67 67 expected = [(2, 1, newcmds[0]),
68 68 (2, 2, newcmds[1])]
69 69 nt.assert_equal(list(gothist), expected)
70 70
71 71 # Check get_hist_search
72 72 gothist = ip.history_manager.search("*test*")
73 73 nt.assert_equal(list(gothist), [(1,2,hist[1])] )
74 74 gothist = ip.history_manager.search("b*", output=True)
75 75 nt.assert_equal(list(gothist), [(1,3,(hist[2],"spam"))] )
76 76
77 77 # Cross testing: check that magic %save can get previous session.
78 78 testfilename = os.path.realpath(os.path.join(tmpdir, "test.py"))
79 79 ip.magic_save(testfilename + " ~1/1-3")
80 testfile = open(testfilename, "r")
81 nt.assert_equal(testfile.read().decode("utf-8"),
82 "# coding: utf-8\n" + "\n".join(hist))
80 with py3compat.open(testfilename) as testfile:
81 nt.assert_equal(testfile.read(),
82 u"# coding: utf-8\n" + u"\n".join(hist))
83 83
84 84 # Duplicate line numbers - check that it doesn't crash, and
85 85 # gets a new session
86 86 ip.history_manager.store_inputs(1, "rogue")
87 87 ip.history_manager.writeout_cache()
88 88 nt.assert_equal(ip.history_manager.session_number, 3)
89 89 finally:
90 90 # Restore history manager
91 91 ip.history_manager = hist_manager_ori
92 92
93 93
94 94 def test_extract_hist_ranges():
95 95 instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5"
96 96 expected = [(0, 1, 2), # 0 == current session
97 97 (2, 3, 4),
98 98 (-4, 5, 7),
99 99 (-4, 7, 10),
100 100 (-9, 2, None), # None == to end
101 101 (-8, 1, None),
102 102 (-7, 1, 6)]
103 103 actual = list(extract_hist_ranges(instr))
104 104 nt.assert_equal(actual, expected)
105 105
106 106 def test_magic_rerun():
107 107 """Simple test for %rerun (no args -> rerun last line)"""
108 108 ip = get_ipython()
109 109 ip.run_cell("a = 10")
110 110 ip.run_cell("a += 1")
111 111 nt.assert_equal(ip.user_ns["a"], 11)
112 112 ip.run_cell("%rerun")
113 113 nt.assert_equal(ip.user_ns["a"], 12)
114 114
115 115 def test_timestamp_type():
116 116 ip = get_ipython()
117 117 info = ip.history_manager.get_session_info()
118 118 nt.assert_true(isinstance(info[1], datetime))
@@ -1,271 +1,272 b''
1 1 """Tests for the key interactiveshell module, where the main ipython class is defined.
2 2 """
3 3 #-----------------------------------------------------------------------------
4 4 # Module imports
5 5 #-----------------------------------------------------------------------------
6 6
7 7 # stdlib
8 8 import os
9 9 import shutil
10 10 import tempfile
11 11
12 12 # third party
13 13 import nose.tools as nt
14 14
15 15 # our own packages
16 16 from IPython.testing import decorators as dec
17 17 from IPython.testing.globalipapp import get_ipython
18 from IPython.utils import py3compat
18 19
19 20 #-----------------------------------------------------------------------------
20 21 # Globals
21 22 #-----------------------------------------------------------------------------
22 23
23 24 # Get the public instance of IPython
24 25 ip = get_ipython()
25 26
26 27 #-----------------------------------------------------------------------------
27 28 # Test functions
28 29 #-----------------------------------------------------------------------------
29 30
30 31 @dec.parametric
31 32 def test_reset():
32 33 """reset must clear most namespaces."""
33 34 # The number of variables in the private user_ns_hidden is not zero, but it
34 35 # should be constant regardless of what we do
35 36 nvars_config_ns = len(ip.user_ns_hidden)
36 37
37 38 # Check that reset runs without error
38 39 ip.reset()
39 40
40 41 # Once we've reset it (to clear of any junk that might have been there from
41 42 # other tests, we can count how many variables are in the user's namespace
42 43 nvars_user_ns = len(ip.user_ns)
43 44
44 45 # Now add a few variables to user_ns, and check that reset clears them
45 46 ip.user_ns['x'] = 1
46 47 ip.user_ns['y'] = 1
47 48 ip.reset()
48 49
49 50 # Finally, check that all namespaces have only as many variables as we
50 51 # expect to find in them:
51 52 for ns in ip.ns_refs_table:
52 53 if ns is ip.user_ns:
53 54 nvars_expected = nvars_user_ns
54 55 elif ns is ip.user_ns_hidden:
55 56 nvars_expected = nvars_config_ns
56 57 else:
57 58 nvars_expected = 0
58 59
59 60 yield nt.assert_equals(len(ns), nvars_expected)
60 61
61 62
62 63 # Tests for reporting of exceptions in various modes, handling of SystemExit,
63 64 # and %tb functionality. This is really a mix of testing ultraTB and interactiveshell.
64 65
65 66 def doctest_tb_plain():
66 67 """
67 68 In [18]: xmode plain
68 69 Exception reporting mode: Plain
69 70
70 71 In [19]: run simpleerr.py
71 72 Traceback (most recent call last):
72 73 ...line 32, in <module>
73 74 bar(mode)
74 75 ...line 16, in bar
75 76 div0()
76 77 ...line 8, in div0
77 78 x/y
78 79 ZeroDivisionError: ...
79 80 """
80 81
81 82
82 83 def doctest_tb_context():
83 84 """
84 85 In [3]: xmode context
85 86 Exception reporting mode: Context
86 87
87 88 In [4]: run simpleerr.py
88 89 ---------------------------------------------------------------------------
89 90 ZeroDivisionError Traceback (most recent call last)
90 91 <BLANKLINE>
91 92 ... in <module>()
92 93 30 mode = 'div'
93 94 31
94 95 ---> 32 bar(mode)
95 96 <BLANKLINE>
96 97 ... in bar(mode)
97 98 14 "bar"
98 99 15 if mode=='div':
99 100 ---> 16 div0()
100 101 17 elif mode=='exit':
101 102 18 try:
102 103 <BLANKLINE>
103 104 ... in div0()
104 105 6 x = 1
105 106 7 y = 0
106 107 ----> 8 x/y
107 108 9
108 109 10 def sysexit(stat, mode):
109 110 <BLANKLINE>
110 111 ZeroDivisionError: ...
111 112 """
112 113
113 114
114 115 def doctest_tb_verbose():
115 116 """
116 117 In [5]: xmode verbose
117 118 Exception reporting mode: Verbose
118 119
119 120 In [6]: run simpleerr.py
120 121 ---------------------------------------------------------------------------
121 122 ZeroDivisionError Traceback (most recent call last)
122 123 <BLANKLINE>
123 124 ... in <module>()
124 125 30 mode = 'div'
125 126 31
126 127 ---> 32 bar(mode)
127 128 global bar = <function bar at ...>
128 129 global mode = 'div'
129 130 <BLANKLINE>
130 131 ... in bar(mode='div')
131 132 14 "bar"
132 133 15 if mode=='div':
133 134 ---> 16 div0()
134 135 global div0 = <function div0 at ...>
135 136 17 elif mode=='exit':
136 137 18 try:
137 138 <BLANKLINE>
138 139 ... in div0()
139 140 6 x = 1
140 141 7 y = 0
141 142 ----> 8 x/y
142 143 x = 1
143 144 y = 0
144 145 9
145 146 10 def sysexit(stat, mode):
146 147 <BLANKLINE>
147 148 ZeroDivisionError: ...
148 149 """
149 150
150 151 @py3compat.u_format
151 152 def doctest_tb_sysexit():
152 153 """
153 154 In [17]: %xmode plain
154 155 Exception reporting mode: Plain
155 156
156 157 In [18]: %run simpleerr.py exit
157 158 An exception has occurred, use %tb to see the full traceback.
158 159 SystemExit: (1, {u}'Mode = exit')
159 160
160 161 In [19]: %run simpleerr.py exit 2
161 162 An exception has occurred, use %tb to see the full traceback.
162 163 SystemExit: (2, {u}'Mode = exit')
163 164
164 165 In [20]: %tb
165 166 Traceback (most recent call last):
166 167 File ... in <module>
167 168 bar(mode)
168 169 File ... line 22, in bar
169 170 sysexit(stat, mode)
170 171 File ... line 11, in sysexit
171 172 raise SystemExit(stat, 'Mode = %s' % mode)
172 173 SystemExit: (2, {u}'Mode = exit')
173 174
174 175 In [21]: %xmode context
175 176 Exception reporting mode: Context
176 177
177 178 In [22]: %tb
178 179 ---------------------------------------------------------------------------
179 180 SystemExit Traceback (most recent call last)
180 181 <BLANKLINE>
181 182 ...<module>()
182 183 30 mode = 'div'
183 184 31
184 185 ---> 32 bar(mode)
185 186 <BLANKLINE>
186 187 ...bar(mode)
187 188 20 except:
188 189 21 stat = 1
189 190 ---> 22 sysexit(stat, mode)
190 191 23 else:
191 192 24 raise ValueError('Unknown mode')
192 193 <BLANKLINE>
193 194 ...sysexit(stat, mode)
194 195 9
195 196 10 def sysexit(stat, mode):
196 197 ---> 11 raise SystemExit(stat, 'Mode = %s' % mode)
197 198 12
198 199 13 def bar(mode):
199 200 <BLANKLINE>
200 201 SystemExit: (2, {u}'Mode = exit')
201 202
202 203 In [23]: %xmode verbose
203 204 Exception reporting mode: Verbose
204 205
205 206 In [24]: %tb
206 207 ---------------------------------------------------------------------------
207 208 SystemExit Traceback (most recent call last)
208 209 <BLANKLINE>
209 210 ... in <module>()
210 211 30 mode = 'div'
211 212 31
212 213 ---> 32 bar(mode)
213 214 global bar = <function bar at ...>
214 215 global mode = {u}'exit'
215 216 <BLANKLINE>
216 217 ... in bar(mode={u}'exit')
217 218 20 except:
218 219 21 stat = 1
219 220 ---> 22 sysexit(stat, mode)
220 221 global sysexit = <function sysexit at ...>
221 222 stat = 2
222 223 mode = {u}'exit'
223 224 23 else:
224 225 24 raise ValueError('Unknown mode')
225 226 <BLANKLINE>
226 227 ... in sysexit(stat=2, mode={u}'exit')
227 228 9
228 229 10 def sysexit(stat, mode):
229 230 ---> 11 raise SystemExit(stat, 'Mode = %s' % mode)
230 231 global SystemExit = undefined
231 232 stat = 2
232 233 mode = {u}'exit'
233 234 12
234 235 13 def bar(mode):
235 236 <BLANKLINE>
236 237 SystemExit: (2, {u}'Mode = exit')
237 238 """
238 239
239 240
240 241 def test_run_cell():
241 242 import textwrap
242 243 ip.run_cell('a = 10\na+=1')
243 244 ip.run_cell('assert a == 11\nassert 1')
244 245
245 246 nt.assert_equals(ip.user_ns['a'], 11)
246 247 complex = textwrap.dedent("""
247 248 if 1:
248 249 print "hello"
249 250 if 1:
250 251 print "world"
251 252
252 253 if 2:
253 254 print "foo"
254 255
255 256 if 3:
256 257 print "bar"
257 258
258 259 if 4:
259 260 print "bar"
260 261
261 262 """)
262 263 # Simply verifies that this kind of input is run
263 264 ip.run_cell(complex)
264 265
265 266
266 267 def test_db():
267 268 """Test the internal database used for variable persistence."""
268 269 ip.db['__unittest_'] = 12
269 270 nt.assert_equals(ip.db['__unittest_'], 12)
270 271 del ip.db['__unittest_']
271 272 assert '__unittest_' not in ip.db
@@ -1,467 +1,461 b''
1 1 """Tests for various magic functions.
2 2
3 3 Needs to be run by nose (to make ipython session available).
4 4 """
5 5 from __future__ import absolute_import
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Imports
9 9 #-----------------------------------------------------------------------------
10 10
11 11 import os
12 12 import sys
13 13 import tempfile
14 14 import types
15 15 from StringIO import StringIO
16 16
17 17 import nose.tools as nt
18 18
19 19 from IPython.utils.path import get_long_path_name
20 20 from IPython.testing import decorators as dec
21 21 from IPython.testing import tools as tt
22 22 from IPython.utils import py3compat
23 23
24 24 #-----------------------------------------------------------------------------
25 25 # Test functions begin
26 26 #-----------------------------------------------------------------------------
27 27 def test_rehashx():
28 28 # clear up everything
29 29 _ip = get_ipython()
30 30 _ip.alias_manager.alias_table.clear()
31 31 del _ip.db['syscmdlist']
32 32
33 33 _ip.magic('rehashx')
34 34 # Practically ALL ipython development systems will have more than 10 aliases
35 35
36 36 yield (nt.assert_true, len(_ip.alias_manager.alias_table) > 10)
37 37 for key, val in _ip.alias_manager.alias_table.iteritems():
38 38 # we must strip dots from alias names
39 39 nt.assert_true('.' not in key)
40 40
41 41 # rehashx must fill up syscmdlist
42 42 scoms = _ip.db['syscmdlist']
43 43 yield (nt.assert_true, len(scoms) > 10)
44 44
45 45
46 46 def test_magic_parse_options():
47 47 """Test that we don't mangle paths when parsing magic options."""
48 48 ip = get_ipython()
49 49 path = 'c:\\x'
50 50 opts = ip.parse_options('-f %s' % path,'f:')[0]
51 51 # argv splitting is os-dependent
52 52 if os.name == 'posix':
53 53 expected = 'c:x'
54 54 else:
55 55 expected = path
56 56 nt.assert_equals(opts['f'], expected)
57 57
58 58
59 59 def doctest_hist_f():
60 60 """Test %hist -f with temporary filename.
61 61
62 62 In [9]: import tempfile
63 63
64 64 In [10]: tfile = tempfile.mktemp('.py','tmp-ipython-')
65 65
66 66 In [11]: %hist -nl -f $tfile 3
67 67
68 68 In [13]: import os; os.unlink(tfile)
69 69 """
70 70
71 71
72 72 def doctest_hist_r():
73 73 """Test %hist -r
74 74
75 75 XXX - This test is not recording the output correctly. For some reason, in
76 76 testing mode the raw history isn't getting populated. No idea why.
77 77 Disabling the output checking for now, though at least we do run it.
78 78
79 79 In [1]: 'hist' in _ip.lsmagic()
80 80 Out[1]: True
81 81
82 82 In [2]: x=1
83 83
84 84 In [3]: %hist -rl 2
85 85 x=1 # random
86 86 %hist -r 2
87 87 """
88 88
89 89 def doctest_hist_op():
90 90 """Test %hist -op
91 91
92 92 In [1]: class b(float):
93 93 ...: pass
94 94 ...:
95 95
96 96 In [2]: class s(object):
97 97 ...: def __str__(self):
98 98 ...: return 's'
99 99 ...:
100 100
101 101 In [3]:
102 102
103 103 In [4]: class r(b):
104 104 ...: def __repr__(self):
105 105 ...: return 'r'
106 106 ...:
107 107
108 108 In [5]: class sr(s,r): pass
109 109 ...:
110 110
111 111 In [6]:
112 112
113 113 In [7]: bb=b()
114 114
115 115 In [8]: ss=s()
116 116
117 117 In [9]: rr=r()
118 118
119 119 In [10]: ssrr=sr()
120 120
121 121 In [11]: 4.5
122 122 Out[11]: 4.5
123 123
124 124 In [12]: str(ss)
125 125 Out[12]: 's'
126 126
127 127 In [13]:
128 128
129 129 In [14]: %hist -op
130 130 >>> class b:
131 131 ... pass
132 132 ...
133 133 >>> class s(b):
134 134 ... def __str__(self):
135 135 ... return 's'
136 136 ...
137 137 >>>
138 138 >>> class r(b):
139 139 ... def __repr__(self):
140 140 ... return 'r'
141 141 ...
142 142 >>> class sr(s,r): pass
143 143 >>>
144 144 >>> bb=b()
145 145 >>> ss=s()
146 146 >>> rr=r()
147 147 >>> ssrr=sr()
148 148 >>> 4.5
149 149 4.5
150 150 >>> str(ss)
151 151 's'
152 152 >>>
153 153 """
154 154
155 155 def test_macro():
156 156 ip = get_ipython()
157 157 ip.history_manager.reset() # Clear any existing history.
158 158 cmds = ["a=1", "def b():\n return a**2", "print(a,b())"]
159 159 for i, cmd in enumerate(cmds, start=1):
160 160 ip.history_manager.store_inputs(i, cmd)
161 161 ip.magic("macro test 1-3")
162 162 nt.assert_equal(ip.user_ns["test"].value, "\n".join(cmds)+"\n")
163 163
164 164 # List macros.
165 165 assert "test" in ip.magic("macro")
166 166
167 167 def test_macro_run():
168 168 """Test that we can run a multi-line macro successfully."""
169 169 ip = get_ipython()
170 170 ip.history_manager.reset()
171 171 cmds = ["a=10", "a+=1", py3compat.doctest_refactor_print("print a"),
172 172 "%macro test 2-3"]
173 173 for cmd in cmds:
174 174 ip.run_cell(cmd)
175 175 nt.assert_equal(ip.user_ns["test"].value,
176 176 py3compat.doctest_refactor_print("a+=1\nprint a\n"))
177 177 original_stdout = sys.stdout
178 178 new_stdout = StringIO()
179 179 sys.stdout = new_stdout
180 180 try:
181 181 ip.run_cell("test")
182 182 nt.assert_true("12" in new_stdout.getvalue())
183 183 ip.run_cell("test")
184 184 nt.assert_true("13" in new_stdout.getvalue())
185 185 finally:
186 186 sys.stdout = original_stdout
187 187 new_stdout.close()
188 188
189 189
190 190 # XXX failing for now, until we get clearcmd out of quarantine. But we should
191 191 # fix this and revert the skip to happen only if numpy is not around.
192 192 #@dec.skipif_not_numpy
193 193 @dec.skip_known_failure
194 194 def test_numpy_clear_array_undec():
195 195 from IPython.extensions import clearcmd
196 196
197 197 _ip.ex('import numpy as np')
198 198 _ip.ex('a = np.empty(2)')
199 199 yield (nt.assert_true, 'a' in _ip.user_ns)
200 200 _ip.magic('clear array')
201 201 yield (nt.assert_false, 'a' in _ip.user_ns)
202 202
203 203
204 204 # Multiple tests for clipboard pasting
205 205 @dec.parametric
206 206 def test_paste():
207 207 _ip = get_ipython()
208 208 def paste(txt, flags='-q'):
209 209 """Paste input text, by default in quiet mode"""
210 210 hooks.clipboard_get = lambda : txt
211 211 _ip.magic('paste '+flags)
212 212
213 213 # Inject fake clipboard hook but save original so we can restore it later
214 214 hooks = _ip.hooks
215 215 user_ns = _ip.user_ns
216 216 original_clip = hooks.clipboard_get
217 217
218 218 try:
219 # This try/except with an emtpy except clause is here only because
220 # try/yield/finally is invalid syntax in Python 2.4. This will be
221 # removed when we drop 2.4-compatibility, and the emtpy except below
222 # will be changed to a finally.
223
224 219 # Run tests with fake clipboard function
225 220 user_ns.pop('x', None)
226 221 paste('x=1')
227 222 yield nt.assert_equal(user_ns['x'], 1)
228 223
229 224 user_ns.pop('x', None)
230 225 paste('>>> x=2')
231 226 yield nt.assert_equal(user_ns['x'], 2)
232 227
233 228 paste("""
234 229 >>> x = [1,2,3]
235 230 >>> y = []
236 231 >>> for i in x:
237 232 ... y.append(i**2)
238 233 ...
239 234 """)
240 235 yield nt.assert_equal(user_ns['x'], [1,2,3])
241 236 yield nt.assert_equal(user_ns['y'], [1,4,9])
242 237
243 238 # Now, test that paste -r works
244 239 user_ns.pop('x', None)
245 240 yield nt.assert_false('x' in user_ns)
246 241 _ip.magic('paste -r')
247 242 yield nt.assert_equal(user_ns['x'], [1,2,3])
248 243
249 244 # Also test paste echoing, by temporarily faking the writer
250 245 w = StringIO()
251 246 writer = _ip.write
252 247 _ip.write = w.write
253 248 code = """
254 249 a = 100
255 250 b = 200"""
256 251 try:
257 252 paste(code,'')
258 253 out = w.getvalue()
259 254 finally:
260 255 _ip.write = writer
261 256 yield nt.assert_equal(user_ns['a'], 100)
262 257 yield nt.assert_equal(user_ns['b'], 200)
263 258 yield nt.assert_equal(out, code+"\n## -- End pasted text --\n")
264 259
265 260 finally:
266 # This should be in a finally clause, instead of the bare except above.
267 261 # Restore original hook
268 262 hooks.clipboard_get = original_clip
269 263
270 264
271 265 def test_time():
272 266 _ip.magic('time None')
273 267
274 268
275 269 @py3compat.doctest_refactor_print
276 270 def doctest_time():
277 271 """
278 272 In [10]: %time None
279 273 CPU times: user 0.00 s, sys: 0.00 s, total: 0.00 s
280 274 Wall time: 0.00 s
281 275
282 276 In [11]: def f(kmjy):
283 277 ....: %time print 2*kmjy
284 278
285 279 In [12]: f(3)
286 280 6
287 281 CPU times: user 0.00 s, sys: 0.00 s, total: 0.00 s
288 282 Wall time: 0.00 s
289 283 """
290 284
291 285
292 286 def test_doctest_mode():
293 287 "Toggle doctest_mode twice, it should be a no-op and run without error"
294 288 _ip.magic('doctest_mode')
295 289 _ip.magic('doctest_mode')
296 290
297 291
298 292 def test_parse_options():
299 293 """Tests for basic options parsing in magics."""
300 294 # These are only the most minimal of tests, more should be added later. At
301 295 # the very least we check that basic text/unicode calls work OK.
302 296 nt.assert_equal(_ip.parse_options('foo', '')[1], 'foo')
303 297 nt.assert_equal(_ip.parse_options(u'foo', '')[1], u'foo')
304 298
305 299
306 300 def test_dirops():
307 301 """Test various directory handling operations."""
308 302 # curpath = lambda :os.path.splitdrive(os.getcwdu())[1].replace('\\','/')
309 303 curpath = os.getcwdu
310 304 startdir = os.getcwdu()
311 305 ipdir = _ip.ipython_dir
312 306 try:
313 307 _ip.magic('cd "%s"' % ipdir)
314 308 nt.assert_equal(curpath(), ipdir)
315 309 _ip.magic('cd -')
316 310 nt.assert_equal(curpath(), startdir)
317 311 _ip.magic('pushd "%s"' % ipdir)
318 312 nt.assert_equal(curpath(), ipdir)
319 313 _ip.magic('popd')
320 314 nt.assert_equal(curpath(), startdir)
321 315 finally:
322 316 os.chdir(startdir)
323 317
324 318
325 319 def check_cpaste(code, should_fail=False):
326 320 """Execute code via 'cpaste' and ensure it was executed, unless
327 321 should_fail is set.
328 322 """
329 323 _ip.user_ns['code_ran'] = False
330 324
331 325 src = StringIO()
332 326 src.write('\n')
333 327 src.write(code)
334 328 src.write('\n--\n')
335 329 src.seek(0)
336 330
337 331 stdin_save = sys.stdin
338 332 sys.stdin = src
339 333
340 334 try:
341 335 _ip.magic('cpaste')
342 336 except:
343 337 if not should_fail:
344 338 raise AssertionError("Failure not expected : '%s'" %
345 339 code)
346 340 else:
347 341 assert _ip.user_ns['code_ran']
348 342 if should_fail:
349 343 raise AssertionError("Failure expected : '%s'" % code)
350 344 finally:
351 345 sys.stdin = stdin_save
352 346
353 347
354 348 def test_cpaste():
355 349 """Test cpaste magic"""
356 350
357 351 def run():
358 352 """Marker function: sets a flag when executed.
359 353 """
360 354 _ip.user_ns['code_ran'] = True
361 355 return 'run' # return string so '+ run()' doesn't result in success
362 356
363 357 tests = {'pass': ["> > > run()",
364 358 ">>> > run()",
365 359 "+++ run()",
366 360 "++ run()",
367 361 " >>> run()"],
368 362
369 363 'fail': ["+ + run()",
370 364 " ++ run()"]}
371 365
372 366 _ip.user_ns['run'] = run
373 367
374 368 for code in tests['pass']:
375 369 check_cpaste(code)
376 370
377 371 for code in tests['fail']:
378 372 check_cpaste(code, should_fail=True)
379 373
380 374 def test_xmode():
381 375 # Calling xmode three times should be a no-op
382 376 xmode = _ip.InteractiveTB.mode
383 377 for i in range(3):
384 378 _ip.magic("xmode")
385 379 nt.assert_equal(_ip.InteractiveTB.mode, xmode)
386 380
387 381 def test_reset_hard():
388 382 monitor = []
389 383 class A(object):
390 384 def __del__(self):
391 385 monitor.append(1)
392 386 def __repr__(self):
393 387 return "<A instance>"
394 388
395 389 _ip.user_ns["a"] = A()
396 390 _ip.run_cell("a")
397 391
398 392 nt.assert_equal(monitor, [])
399 393 _ip.magic_reset("-f")
400 394 nt.assert_equal(monitor, [1])
401 395
402 396 class TestXdel(tt.TempFileMixin):
403 397 def test_xdel(self):
404 398 """Test that references from %run are cleared by xdel."""
405 399 src = ("class A(object):\n"
406 400 " monitor = []\n"
407 401 " def __del__(self):\n"
408 402 " self.monitor.append(1)\n"
409 403 "a = A()\n")
410 404 self.mktmp(src)
411 405 # %run creates some hidden references...
412 406 _ip.magic("run %s" % self.fname)
413 407 # ... as does the displayhook.
414 408 _ip.run_cell("a")
415 409
416 410 monitor = _ip.user_ns["A"].monitor
417 411 nt.assert_equal(monitor, [])
418 412
419 413 _ip.magic("xdel a")
420 414
421 415 # Check that a's __del__ method has been called.
422 416 nt.assert_equal(monitor, [1])
423 417
424 418 def doctest_who():
425 419 """doctest for %who
426 420
427 421 In [1]: %reset -f
428 422
429 423 In [2]: alpha = 123
430 424
431 425 In [3]: beta = 'beta'
432 426
433 427 In [4]: %who int
434 428 alpha
435 429
436 430 In [5]: %who str
437 431 beta
438 432
439 433 In [6]: %whos
440 434 Variable Type Data/Info
441 435 ----------------------------
442 436 alpha int 123
443 437 beta str beta
444 438
445 439 In [7]: %who_ls
446 440 Out[7]: ['alpha', 'beta']
447 441 """
448 442
449 443 @py3compat.u_format
450 444 def doctest_precision():
451 445 """doctest for %precision
452 446
453 447 In [1]: f = get_ipython().shell.display_formatter.formatters['text/plain']
454 448
455 449 In [2]: %precision 5
456 450 Out[2]: {u}'%.5f'
457 451
458 452 In [3]: f.float_format
459 453 Out[3]: {u}'%.5f'
460 454
461 455 In [4]: %precision %e
462 456 Out[4]: {u}'%e'
463 457
464 458 In [5]: f(3.1415927)
465 459 Out[5]: {u}'3.141593e+00'
466 460 """
467 461
@@ -1,210 +1,210 b''
1 1 """Tests for code execution (%run and related), which is particularly tricky.
2 2
3 3 Because of how %run manages namespaces, and the fact that we are trying here to
4 4 verify subtle object deletion and reference counting issues, the %run tests
5 5 will be kept in this separate file. This makes it easier to aggregate in one
6 6 place the tricks needed to handle it; most other magics are much easier to test
7 7 and we do so in a common test_magic file.
8 8 """
9 9 from __future__ import absolute_import
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 15 import os
16 16 import sys
17 17 import tempfile
18 18
19 19 import nose.tools as nt
20 20 from nose import SkipTest
21 21
22 22 from IPython.testing import decorators as dec
23 23 from IPython.testing import tools as tt
24 from IPython.utils.py3compat import doctest_refactor_print
24 from IPython.utils import py3compat
25 25
26 26 #-----------------------------------------------------------------------------
27 27 # Test functions begin
28 28 #-----------------------------------------------------------------------------
29 29
30 30 def doctest_refbug():
31 31 """Very nasty problem with references held by multiple runs of a script.
32 32 See: https://github.com/ipython/ipython/issues/141
33 33
34 34 In [1]: _ip.clear_main_mod_cache()
35 35 # random
36 36
37 37 In [2]: %run refbug
38 38
39 39 In [3]: call_f()
40 40 lowercased: hello
41 41
42 42 In [4]: %run refbug
43 43
44 44 In [5]: call_f()
45 45 lowercased: hello
46 46 lowercased: hello
47 47 """
48 48
49 49
50 50 def doctest_run_builtins():
51 51 r"""Check that %run doesn't damage __builtins__.
52 52
53 53 In [1]: import tempfile
54 54
55 55 In [2]: bid1 = id(__builtins__)
56 56
57 57 In [3]: fname = tempfile.mkstemp('.py')[1]
58 58
59 59 In [3]: f = open(fname,'w')
60 60
61 61 In [4]: dummy= f.write('pass\n')
62 62
63 63 In [5]: f.flush()
64 64
65 65 In [6]: t1 = type(__builtins__)
66 66
67 67 In [7]: %run $fname
68 68
69 69 In [7]: f.close()
70 70
71 71 In [8]: bid2 = id(__builtins__)
72 72
73 73 In [9]: t2 = type(__builtins__)
74 74
75 75 In [10]: t1 == t2
76 76 Out[10]: True
77 77
78 78 In [10]: bid1 == bid2
79 79 Out[10]: True
80 80
81 81 In [12]: try:
82 82 ....: os.unlink(fname)
83 83 ....: except:
84 84 ....: pass
85 85 ....:
86 86 """
87 87
88 @doctest_refactor_print
88 @py3compat.doctest_refactor_print
89 89 def doctest_reset_del():
90 90 """Test that resetting doesn't cause errors in __del__ methods.
91 91
92 92 In [2]: class A(object):
93 93 ...: def __del__(self):
94 94 ...: print str("Hi")
95 95 ...:
96 96
97 97 In [3]: a = A()
98 98
99 99 In [4]: get_ipython().reset()
100 100 Hi
101 101
102 102 In [5]: 1+1
103 103 Out[5]: 2
104 104 """
105 105
106 106 # For some tests, it will be handy to organize them in a class with a common
107 107 # setup that makes a temp file
108 108
109 109 class TestMagicRunPass(tt.TempFileMixin):
110 110
111 111 def setup(self):
112 112 """Make a valid python temp file."""
113 113 self.mktmp('pass\n')
114 114
115 115 def run_tmpfile(self):
116 116 _ip = get_ipython()
117 117 # This fails on Windows if self.tmpfile.name has spaces or "~" in it.
118 118 # See below and ticket https://bugs.launchpad.net/bugs/366353
119 119 _ip.magic('run %s' % self.fname)
120 120
121 121 def test_builtins_id(self):
122 122 """Check that %run doesn't damage __builtins__ """
123 123 _ip = get_ipython()
124 124 # Test that the id of __builtins__ is not modified by %run
125 125 bid1 = id(_ip.user_ns['__builtins__'])
126 126 self.run_tmpfile()
127 127 bid2 = id(_ip.user_ns['__builtins__'])
128 128 tt.assert_equals(bid1, bid2)
129 129
130 130 def test_builtins_type(self):
131 131 """Check that the type of __builtins__ doesn't change with %run.
132 132
133 133 However, the above could pass if __builtins__ was already modified to
134 134 be a dict (it should be a module) by a previous use of %run. So we
135 135 also check explicitly that it really is a module:
136 136 """
137 137 _ip = get_ipython()
138 138 self.run_tmpfile()
139 139 tt.assert_equals(type(_ip.user_ns['__builtins__']),type(sys))
140 140
141 141 def test_prompts(self):
142 142 """Test that prompts correctly generate after %run"""
143 143 self.run_tmpfile()
144 144 _ip = get_ipython()
145 145 p2 = str(_ip.displayhook.prompt2).strip()
146 146 nt.assert_equals(p2[:3], '...')
147 147
148 148
149 149 class TestMagicRunSimple(tt.TempFileMixin):
150 150
151 151 def test_simpledef(self):
152 152 """Test that simple class definitions work."""
153 153 src = ("class foo: pass\n"
154 154 "def f(): return foo()")
155 155 self.mktmp(src)
156 156 _ip.magic('run %s' % self.fname)
157 157 _ip.run_cell('t = isinstance(f(), foo)')
158 158 nt.assert_true(_ip.user_ns['t'])
159 159
160 160 def test_obj_del(self):
161 161 """Test that object's __del__ methods are called on exit."""
162 162 if sys.platform == 'win32':
163 163 try:
164 164 import win32api
165 165 except ImportError:
166 166 raise SkipTest("Test requires pywin32")
167 167 src = ("class A(object):\n"
168 168 " def __del__(self):\n"
169 169 " print 'object A deleted'\n"
170 170 "a = A()\n")
171 self.mktmp(src)
171 self.mktmp(py3compat.doctest_refactor_print(src))
172 172 tt.ipexec_validate(self.fname, 'object A deleted')
173 173
174 174 @dec.skip_known_failure
175 175 def test_aggressive_namespace_cleanup(self):
176 176 """Test that namespace cleanup is not too aggressive GH-238
177 177
178 178 Returning from another run magic deletes the namespace"""
179 179 # see ticket https://github.com/ipython/ipython/issues/238
180 180 class secondtmp(tt.TempFileMixin): pass
181 181 empty = secondtmp()
182 182 empty.mktmp('')
183 183 src = ("ip = get_ipython()\n"
184 184 "for i in range(5):\n"
185 185 " try:\n"
186 186 " ip.magic('run %s')\n"
187 187 " except NameError, e:\n"
188 188 " print i;break\n" % empty.fname)
189 self.mktmp(src)
189 self.mktmp(py3compat.doctest_refactor_print(src))
190 190 _ip.magic('run %s' % self.fname)
191 191 _ip.run_cell('ip == get_ipython()')
192 192 tt.assert_equals(_ip.user_ns['i'], 5)
193 193
194 194 @dec.skip_win32
195 195 def test_tclass(self):
196 196 mydir = os.path.dirname(__file__)
197 197 tc = os.path.join(mydir, 'tclass')
198 198 src = ("%%run '%s' C-first\n"
199 199 "%%run '%s' C-second\n"
200 200 "%%run '%s' C-third\n") % (tc, tc, tc)
201 201 self.mktmp(src, '.ipy')
202 202 out = """\
203 ARGV 1-: [u'C-first']
204 ARGV 1-: [u'C-second']
203 ARGV 1-: [{u}'C-first']
204 ARGV 1-: [{u}'C-second']
205 205 tclass.py: deleting object: C-first
206 ARGV 1-: [u'C-third']
206 ARGV 1-: [{u}'C-third']
207 207 tclass.py: deleting object: C-second
208 208 tclass.py: deleting object: C-third
209 209 """
210 tt.ipexec_validate(self.fname, out)
210 tt.ipexec_validate(self.fname, py3compat.u_format(out))
@@ -1,69 +1,71 b''
1 1 """Implementation of the parametric test support for Python 3.x.
2 2
3 3 Thanks for the py3 version to Robert Collins, from the Testing in Python
4 4 mailing list.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import unittest
19 19 from unittest import TestSuite
20 20
21 21 #-----------------------------------------------------------------------------
22 22 # Classes and functions
23 23 #-----------------------------------------------------------------------------
24 24
25 25
26 26 def isgenerator(func):
27 27 return hasattr(func,'_generator')
28 28
29 29
30 30 class IterCallableSuite(TestSuite):
31 31 def __init__(self, iterator, adapter):
32 32 self._iter = iterator
33 33 self._adapter = adapter
34 34 def __iter__(self):
35 35 yield self._adapter(self._iter.__next__)
36 36
37 37 class ParametricTestCase(unittest.TestCase):
38 38 """Write parametric tests in normal unittest testcase form.
39 39
40 40 Limitations: the last iteration misses printing out a newline when
41 41 running in verbose mode.
42 42 """
43 43
44 44 def run(self, result=None):
45 45 testMethod = getattr(self, self._testMethodName)
46 46 # For normal tests, we just call the base class and return that
47 47 if isgenerator(testMethod):
48 48 def adapter(next_test):
49 return unittest.FunctionTestCase(next_test,
50 self.setUp,
51 self.tearDown)
49 ftc = unittest.FunctionTestCase(next_test,
50 self.setUp,
51 self.tearDown)
52 self._nose_case = ftc # Nose 1.0 rejects the test without this
53 return ftc
52 54
53 55 return IterCallableSuite(testMethod(),adapter).run(result)
54 56 else:
55 57 return super(ParametricTestCase, self).run(result)
56 58
57 59
58 60 def parametric(func):
59 61 """Decorator to make a simple function into a normal test via
60 62 unittest."""
61 63 # Hack, until I figure out how to write isgenerator() for python3!!
62 64 func._generator = True
63 65
64 66 class Tester(ParametricTestCase):
65 67 test = staticmethod(func)
66 68
67 69 Tester.__name__ = func.__name__
68 70
69 71 return Tester
@@ -1,345 +1,346 b''
1 1 """Generic testing tools that do NOT depend on Twisted.
2 2
3 3 In particular, this module exposes a set of top-level assert* functions that
4 4 can be used in place of nose.tools.assert* in method generators (the ones in
5 5 nose can not, at least as of nose 0.10.4).
6 6
7 7 Note: our testing package contains testing.util, which does depend on Twisted
8 8 and provides utilities for tests that manage Deferreds. All testing support
9 9 tools that only depend on nose, IPython or the standard library should go here
10 10 instead.
11 11
12 12
13 13 Authors
14 14 -------
15 15 - Fernando Perez <Fernando.Perez@berkeley.edu>
16 16 """
17 17
18 18 from __future__ import absolute_import
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Copyright (C) 2009 The IPython Development Team
22 22 #
23 23 # Distributed under the terms of the BSD License. The full license is in
24 24 # the file COPYING, distributed as part of this software.
25 25 #-----------------------------------------------------------------------------
26 26
27 27 #-----------------------------------------------------------------------------
28 28 # Imports
29 29 #-----------------------------------------------------------------------------
30 30
31 31 import os
32 32 import re
33 33 import sys
34 34 import tempfile
35 35
36 36 from contextlib import contextmanager
37 37
38 38 try:
39 39 # These tools are used by parts of the runtime, so we make the nose
40 40 # dependency optional at this point. Nose is a hard dependency to run the
41 41 # test suite, but NOT to use ipython itself.
42 42 import nose.tools as nt
43 43 has_nose = True
44 44 except ImportError:
45 45 has_nose = False
46 46
47 47 from IPython.config.loader import Config
48 48 from IPython.utils.process import find_cmd, getoutputerror
49 49 from IPython.utils.text import list_strings
50 50 from IPython.utils.io import temp_pyfile
51 from IPython.utils.py3compat import PY3
51 52
52 53 from . import decorators as dec
53 54 from . import skipdoctest
54 55
55 56 #-----------------------------------------------------------------------------
56 57 # Globals
57 58 #-----------------------------------------------------------------------------
58 59
59 60 # Make a bunch of nose.tools assert wrappers that can be used in test
60 61 # generators. This will expose an assert* function for each one in nose.tools.
61 62
62 63 _tpl = """
63 64 def %(name)s(*a,**kw):
64 65 return nt.%(name)s(*a,**kw)
65 66 """
66 67
67 68 if has_nose:
68 69 for _x in [a for a in dir(nt) if a.startswith('assert')]:
69 70 exec _tpl % dict(name=_x)
70 71
71 72 #-----------------------------------------------------------------------------
72 73 # Functions and classes
73 74 #-----------------------------------------------------------------------------
74 75
75 76 # The docstring for full_path doctests differently on win32 (different path
76 77 # separator) so just skip the doctest there. The example remains informative.
77 78 doctest_deco = skipdoctest.skip_doctest if sys.platform == 'win32' else dec.null_deco
78 79
79 80 @doctest_deco
80 81 def full_path(startPath,files):
81 82 """Make full paths for all the listed files, based on startPath.
82 83
83 84 Only the base part of startPath is kept, since this routine is typically
84 85 used with a script's __file__ variable as startPath. The base of startPath
85 86 is then prepended to all the listed files, forming the output list.
86 87
87 88 Parameters
88 89 ----------
89 90 startPath : string
90 91 Initial path to use as the base for the results. This path is split
91 92 using os.path.split() and only its first component is kept.
92 93
93 94 files : string or list
94 95 One or more files.
95 96
96 97 Examples
97 98 --------
98 99
99 100 >>> full_path('/foo/bar.py',['a.txt','b.txt'])
100 101 ['/foo/a.txt', '/foo/b.txt']
101 102
102 103 >>> full_path('/foo',['a.txt','b.txt'])
103 104 ['/a.txt', '/b.txt']
104 105
105 106 If a single file is given, the output is still a list:
106 107 >>> full_path('/foo','a.txt')
107 108 ['/a.txt']
108 109 """
109 110
110 111 files = list_strings(files)
111 112 base = os.path.split(startPath)[0]
112 113 return [ os.path.join(base,f) for f in files ]
113 114
114 115
115 116 def parse_test_output(txt):
116 117 """Parse the output of a test run and return errors, failures.
117 118
118 119 Parameters
119 120 ----------
120 121 txt : str
121 122 Text output of a test run, assumed to contain a line of one of the
122 123 following forms::
123 124 'FAILED (errors=1)'
124 125 'FAILED (failures=1)'
125 126 'FAILED (errors=1, failures=1)'
126 127
127 128 Returns
128 129 -------
129 130 nerr, nfail: number of errors and failures.
130 131 """
131 132
132 133 err_m = re.search(r'^FAILED \(errors=(\d+)\)', txt, re.MULTILINE)
133 134 if err_m:
134 135 nerr = int(err_m.group(1))
135 136 nfail = 0
136 137 return nerr, nfail
137 138
138 139 fail_m = re.search(r'^FAILED \(failures=(\d+)\)', txt, re.MULTILINE)
139 140 if fail_m:
140 141 nerr = 0
141 142 nfail = int(fail_m.group(1))
142 143 return nerr, nfail
143 144
144 145 both_m = re.search(r'^FAILED \(errors=(\d+), failures=(\d+)\)', txt,
145 146 re.MULTILINE)
146 147 if both_m:
147 148 nerr = int(both_m.group(1))
148 149 nfail = int(both_m.group(2))
149 150 return nerr, nfail
150 151
151 152 # If the input didn't match any of these forms, assume no error/failures
152 153 return 0, 0
153 154
154 155
155 156 # So nose doesn't think this is a test
156 157 parse_test_output.__test__ = False
157 158
158 159
159 160 def default_argv():
160 161 """Return a valid default argv for creating testing instances of ipython"""
161 162
162 163 return ['--quick', # so no config file is loaded
163 164 # Other defaults to minimize side effects on stdout
164 165 '--colors=NoColor', '--no-term-title','--no-banner',
165 166 '--autocall=0']
166 167
167 168
168 169 def default_config():
169 170 """Return a config object with good defaults for testing."""
170 171 config = Config()
171 172 config.TerminalInteractiveShell.colors = 'NoColor'
172 173 config.TerminalTerminalInteractiveShell.term_title = False,
173 174 config.TerminalInteractiveShell.autocall = 0
174 175 config.HistoryManager.hist_file = tempfile.mktemp(u'test_hist.sqlite')
175 176 config.HistoryManager.db_cache_size = 10000
176 177 return config
177 178
178 179
179 180 def ipexec(fname, options=None):
180 181 """Utility to call 'ipython filename'.
181 182
182 183 Starts IPython witha minimal and safe configuration to make startup as fast
183 184 as possible.
184 185
185 186 Note that this starts IPython in a subprocess!
186 187
187 188 Parameters
188 189 ----------
189 190 fname : str
190 191 Name of file to be executed (should have .py or .ipy extension).
191 192
192 193 options : optional, list
193 194 Extra command-line flags to be passed to IPython.
194 195
195 196 Returns
196 197 -------
197 198 (stdout, stderr) of ipython subprocess.
198 199 """
199 200 if options is None: options = []
200 201
201 202 # For these subprocess calls, eliminate all prompt printing so we only see
202 203 # output from script execution
203 204 prompt_opts = [ '--InteractiveShell.prompt_in1=""',
204 205 '--InteractiveShell.prompt_in2=""',
205 206 '--InteractiveShell.prompt_out=""'
206 207 ]
207 208 cmdargs = ' '.join(default_argv() + prompt_opts + options)
208 209
209 210 _ip = get_ipython()
210 211 test_dir = os.path.dirname(__file__)
211 212
212 ipython_cmd = find_cmd('ipython')
213 ipython_cmd = find_cmd('ipython3' if PY3 else 'ipython')
213 214 # Absolute path for filename
214 215 full_fname = os.path.join(test_dir, fname)
215 216 full_cmd = '%s %s %s' % (ipython_cmd, cmdargs, full_fname)
216 217 #print >> sys.stderr, 'FULL CMD:', full_cmd # dbg
217 218 out = getoutputerror(full_cmd)
218 219 # `import readline` causes 'ESC[?1034h' to be the first output sometimes,
219 220 # so strip that off the front of the first line if it is found
220 221 if out:
221 222 first = out[0]
222 223 m = re.match(r'\x1b\[[^h]+h', first)
223 224 if m:
224 225 # strip initial readline escape
225 226 out = list(out)
226 227 out[0] = first[len(m.group()):]
227 228 out = tuple(out)
228 229 return out
229 230
230 231
231 232 def ipexec_validate(fname, expected_out, expected_err='',
232 233 options=None):
233 234 """Utility to call 'ipython filename' and validate output/error.
234 235
235 236 This function raises an AssertionError if the validation fails.
236 237
237 238 Note that this starts IPython in a subprocess!
238 239
239 240 Parameters
240 241 ----------
241 242 fname : str
242 243 Name of the file to be executed (should have .py or .ipy extension).
243 244
244 245 expected_out : str
245 246 Expected stdout of the process.
246 247
247 248 expected_err : optional, str
248 249 Expected stderr of the process.
249 250
250 251 options : optional, list
251 252 Extra command-line flags to be passed to IPython.
252 253
253 254 Returns
254 255 -------
255 256 None
256 257 """
257 258
258 259 import nose.tools as nt
259 260
260 261 out, err = ipexec(fname)
261 262 #print 'OUT', out # dbg
262 263 #print 'ERR', err # dbg
263 264 # If there are any errors, we must check those befor stdout, as they may be
264 265 # more informative than simply having an empty stdout.
265 266 if err:
266 267 if expected_err:
267 268 nt.assert_equals(err.strip(), expected_err.strip())
268 269 else:
269 270 raise ValueError('Running file %r produced error: %r' %
270 271 (fname, err))
271 272 # If no errors or output on stderr was expected, match stdout
272 273 nt.assert_equals(out.strip(), expected_out.strip())
273 274
274 275
275 276 class TempFileMixin(object):
276 277 """Utility class to create temporary Python/IPython files.
277 278
278 279 Meant as a mixin class for test cases."""
279 280
280 281 def mktmp(self, src, ext='.py'):
281 282 """Make a valid python temp file."""
282 283 fname, f = temp_pyfile(src, ext)
283 284 self.tmpfile = f
284 285 self.fname = fname
285 286
286 287 def tearDown(self):
287 288 if hasattr(self, 'tmpfile'):
288 289 # If the tmpfile wasn't made because of skipped tests, like in
289 290 # win32, there's nothing to cleanup.
290 291 self.tmpfile.close()
291 292 try:
292 293 os.unlink(self.fname)
293 294 except:
294 295 # On Windows, even though we close the file, we still can't
295 296 # delete it. I have no clue why
296 297 pass
297 298
298 299 pair_fail_msg = ("Testing {0}\n\n"
299 300 "In:\n"
300 301 " {1!r}\n"
301 302 "Expected:\n"
302 303 " {2!r}\n"
303 304 "Got:\n"
304 305 " {3!r}\n")
305 306 def check_pairs(func, pairs):
306 307 """Utility function for the common case of checking a function with a
307 308 sequence of input/output pairs.
308 309
309 310 Parameters
310 311 ----------
311 312 func : callable
312 313 The function to be tested. Should accept a single argument.
313 314 pairs : iterable
314 315 A list of (input, expected_output) tuples.
315 316
316 317 Returns
317 318 -------
318 319 None. Raises an AssertionError if any output does not match the expected
319 320 value.
320 321 """
321 322 name = getattr(func, "func_name", getattr(func, "__name__", "<unknown>"))
322 323 for inp, expected in pairs:
323 324 out = func(inp)
324 325 assert out == expected, pair_fail_msg.format(name, inp, expected, out)
325 326
326 327 @contextmanager
327 328 def mute_warn():
328 329 from IPython.utils import warn
329 330 save_warn = warn.warn
330 331 warn.warn = lambda *a, **kw: None
331 332 try:
332 333 yield
333 334 finally:
334 335 warn.warn = save_warn
335 336
336 337 @contextmanager
337 338 def make_tempfile(name):
338 339 """ Create an empty, named, temporary file for the duration of the context.
339 340 """
340 341 f = open(name, 'w')
341 342 f.close()
342 343 try:
343 344 yield
344 345 finally:
345 346 os.unlink(name)
@@ -1,191 +1,193 b''
1 1 """Posix-specific implementation of process utilities.
2 2
3 3 This file is only meant to be imported by process.py, not by end-users.
4 4 """
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Stdlib
19 19 import subprocess as sp
20 20 import sys
21 21
22 22 from IPython.external import pexpect
23 23
24 24 # Our own
25 25 from .autoattr import auto_attr
26 26 from ._process_common import getoutput
27 27 from IPython.utils import text
28 from IPython.utils import py3compat
28 29
29 30 #-----------------------------------------------------------------------------
30 31 # Function definitions
31 32 #-----------------------------------------------------------------------------
32 33
33 34 def _find_cmd(cmd):
34 35 """Find the full path to a command using which."""
35 36
36 return sp.Popen(['/usr/bin/env', 'which', cmd],
37 path = sp.Popen(['/usr/bin/env', 'which', cmd],
37 38 stdout=sp.PIPE).communicate()[0]
39 return py3compat.bytes_to_str(path)
38 40
39 41
40 42 class ProcessHandler(object):
41 43 """Execute subprocesses under the control of pexpect.
42 44 """
43 45 # Timeout in seconds to wait on each reading of the subprocess' output.
44 46 # This should not be set too low to avoid cpu overusage from our side,
45 47 # since we read in a loop whose period is controlled by this timeout.
46 48 read_timeout = 0.05
47 49
48 50 # Timeout to give a process if we receive SIGINT, between sending the
49 51 # SIGINT to the process and forcefully terminating it.
50 52 terminate_timeout = 0.2
51 53
52 54 # File object where stdout and stderr of the subprocess will be written
53 55 logfile = None
54 56
55 57 # Shell to call for subprocesses to execute
56 58 sh = None
57 59
58 60 @auto_attr
59 61 def sh(self):
60 62 sh = pexpect.which('sh')
61 63 if sh is None:
62 64 raise OSError('"sh" shell not found')
63 65 return sh
64 66
65 67 def __init__(self, logfile=None, read_timeout=None, terminate_timeout=None):
66 68 """Arguments are used for pexpect calls."""
67 69 self.read_timeout = (ProcessHandler.read_timeout if read_timeout is
68 70 None else read_timeout)
69 71 self.terminate_timeout = (ProcessHandler.terminate_timeout if
70 72 terminate_timeout is None else
71 73 terminate_timeout)
72 74 self.logfile = sys.stdout if logfile is None else logfile
73 75
74 76 def getoutput(self, cmd):
75 77 """Run a command and return its stdout/stderr as a string.
76 78
77 79 Parameters
78 80 ----------
79 81 cmd : str
80 82 A command to be executed in the system shell.
81 83
82 84 Returns
83 85 -------
84 86 output : str
85 87 A string containing the combination of stdout and stderr from the
86 88 subprocess, in whatever order the subprocess originally wrote to its
87 89 file descriptors (so the order of the information in this string is the
88 90 correct order as would be seen if running the command in a terminal).
89 91 """
90 92 pcmd = self._make_cmd(cmd)
91 93 try:
92 94 return pexpect.run(pcmd).replace('\r\n', '\n')
93 95 except KeyboardInterrupt:
94 96 print('^C', file=sys.stderr, end='')
95 97
96 98 def getoutput_pexpect(self, cmd):
97 99 """Run a command and return its stdout/stderr as a string.
98 100
99 101 Parameters
100 102 ----------
101 103 cmd : str
102 104 A command to be executed in the system shell.
103 105
104 106 Returns
105 107 -------
106 108 output : str
107 109 A string containing the combination of stdout and stderr from the
108 110 subprocess, in whatever order the subprocess originally wrote to its
109 111 file descriptors (so the order of the information in this string is the
110 112 correct order as would be seen if running the command in a terminal).
111 113 """
112 114 pcmd = self._make_cmd(cmd)
113 115 try:
114 116 return pexpect.run(pcmd).replace('\r\n', '\n')
115 117 except KeyboardInterrupt:
116 118 print('^C', file=sys.stderr, end='')
117 119
118 120 def system(self, cmd):
119 121 """Execute a command in a subshell.
120 122
121 123 Parameters
122 124 ----------
123 125 cmd : str
124 126 A command to be executed in the system shell.
125 127
126 128 Returns
127 129 -------
128 130 int : child's exitstatus
129 131 """
130 132 # Get likely encoding for the output.
131 133 enc = text.getdefaultencoding()
132 134
133 135 pcmd = self._make_cmd(cmd)
134 136 # Patterns to match on the output, for pexpect. We read input and
135 137 # allow either a short timeout or EOF
136 138 patterns = [pexpect.TIMEOUT, pexpect.EOF]
137 139 # the index of the EOF pattern in the list.
138 140 EOF_index = 1 # Fix this index if you change the list!!
139 141 # The size of the output stored so far in the process output buffer.
140 142 # Since pexpect only appends to this buffer, each time we print we
141 143 # record how far we've printed, so that next time we only print *new*
142 144 # content from the buffer.
143 145 out_size = 0
144 146 try:
145 147 # Since we're not really searching the buffer for text patterns, we
146 148 # can set pexpect's search window to be tiny and it won't matter.
147 149 # We only search for the 'patterns' timeout or EOF, which aren't in
148 150 # the text itself.
149 151 #child = pexpect.spawn(pcmd, searchwindowsize=1)
150 152 child = pexpect.spawn(pcmd)
151 153 flush = sys.stdout.flush
152 154 while True:
153 155 # res is the index of the pattern that caused the match, so we
154 156 # know whether we've finished (if we matched EOF) or not
155 157 res_idx = child.expect_list(patterns, self.read_timeout)
156 158 print(child.before[out_size:].decode(enc, 'replace'), end='')
157 159 flush()
158 160 if res_idx==EOF_index:
159 161 break
160 162 # Update the pointer to what we've already printed
161 163 out_size = len(child.before)
162 164 except KeyboardInterrupt:
163 165 # We need to send ^C to the process. The ascii code for '^C' is 3
164 166 # (the character is known as ETX for 'End of Text', see
165 167 # curses.ascii.ETX).
166 168 child.sendline(chr(3))
167 169 # Read and print any more output the program might produce on its
168 170 # way out.
169 171 try:
170 172 out_size = len(child.before)
171 173 child.expect_list(patterns, self.terminate_timeout)
172 174 print(child.before[out_size:].decode(enc, 'replace'), end='')
173 175 sys.stdout.flush()
174 176 except KeyboardInterrupt:
175 177 # Impatient users tend to type it multiple times
176 178 pass
177 179 finally:
178 180 # Ensure the subprocess really is terminated
179 181 child.terminate(force=True)
180 182 return child.exitstatus
181 183
182 184 def _make_cmd(self, cmd):
183 185 return '%s -c "%s"' % (self.sh, cmd)
184 186
185 187
186 188 # Make system() with a functional interface for outside use. Note that we use
187 189 # getoutput() from the _common utils, which is built on top of popen(). Using
188 190 # pexpect to get subprocess output produces difficult to parse output, since
189 191 # programs think they are talking to a tty and produce highly formatted output
190 192 # (ls is a good example) that makes them hard.
191 193 system = ProcessHandler().system
@@ -1,362 +1,362 b''
1 1 #!/usr/bin/env python
2 2
3 3 """ PickleShare - a small 'shelve' like datastore with concurrency support
4 4
5 5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6 6 shelve, many processes can access the database simultaneously. Changing a
7 7 value in database is immediately visible to other processes accessing the
8 8 same database.
9 9
10 10 Concurrency is possible because the values are stored in separate files. Hence
11 11 the "database" is a directory where *all* files are governed by PickleShare.
12 12
13 13 Example usage::
14 14
15 15 from pickleshare import *
16 16 db = PickleShareDB('~/testpickleshare')
17 17 db.clear()
18 18 print "Should be empty:",db.items()
19 19 db['hello'] = 15
20 20 db['aku ankka'] = [1,2,313]
21 21 db['paths/are/ok/key'] = [1,(5,46)]
22 22 print db.keys()
23 23 del db['aku ankka']
24 24
25 25 This module is certainly not ZODB, but can be used for low-load
26 26 (non-mission-critical) situations where tiny code size trumps the
27 27 advanced features of a "real" object database.
28 28
29 29 Installation guide: easy_install pickleshare
30 30
31 31 Author: Ville Vainio <vivainio@gmail.com>
32 32 License: MIT open source license.
33 33
34 34 """
35 35
36 36 from IPython.external.path import path as Path
37 37 import os,stat,time
38 38 import collections
39 39 import cPickle as pickle
40 40 import glob
41 41
42 42 def gethashfile(key):
43 43 return ("%02x" % abs(hash(key) % 256))[-2:]
44 44
45 45 _sentinel = object()
46 46
47 47 class PickleShareDB(collections.MutableMapping):
48 48 """ The main 'connection' object for PickleShare database """
49 49 def __init__(self,root):
50 50 """ Return a db object that will manage the specied directory"""
51 51 self.root = Path(root).expanduser().abspath()
52 52 if not self.root.isdir():
53 53 self.root.makedirs()
54 54 # cache has { 'key' : (obj, orig_mod_time) }
55 55 self.cache = {}
56 56
57 57
58 58 def __getitem__(self,key):
59 59 """ db['key'] reading """
60 60 fil = self.root / key
61 61 try:
62 62 mtime = (fil.stat()[stat.ST_MTIME])
63 63 except OSError:
64 64 raise KeyError(key)
65 65
66 66 if fil in self.cache and mtime == self.cache[fil][1]:
67 67 return self.cache[fil][0]
68 68 try:
69 69 # The cached item has expired, need to read
70 obj = pickle.load(fil.open())
70 obj = pickle.load(fil.open("rb"))
71 71 except:
72 72 raise KeyError(key)
73 73
74 74 self.cache[fil] = (obj,mtime)
75 75 return obj
76 76
77 77 def __setitem__(self,key,value):
78 78 """ db['key'] = 5 """
79 79 fil = self.root / key
80 80 parent = fil.parent
81 81 if parent and not parent.isdir():
82 82 parent.makedirs()
83 pickled = pickle.dump(value,fil.open('w'))
83 pickled = pickle.dump(value,fil.open('wb'))
84 84 try:
85 85 self.cache[fil] = (value,fil.mtime)
86 86 except OSError,e:
87 87 if e.errno != 2:
88 88 raise
89 89
90 90 def hset(self, hashroot, key, value):
91 91 """ hashed set """
92 92 hroot = self.root / hashroot
93 93 if not hroot.isdir():
94 94 hroot.makedirs()
95 95 hfile = hroot / gethashfile(key)
96 96 d = self.get(hfile, {})
97 97 d.update( {key : value})
98 98 self[hfile] = d
99 99
100 100
101 101
102 102 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
103 103 """ hashed get """
104 104 hroot = self.root / hashroot
105 105 hfile = hroot / gethashfile(key)
106 106
107 107 d = self.get(hfile, _sentinel )
108 108 #print "got dict",d,"from",hfile
109 109 if d is _sentinel:
110 110 if fast_only:
111 111 if default is _sentinel:
112 112 raise KeyError(key)
113 113
114 114 return default
115 115
116 116 # slow mode ok, works even after hcompress()
117 117 d = self.hdict(hashroot)
118 118
119 119 return d.get(key, default)
120 120
121 121 def hdict(self, hashroot):
122 122 """ Get all data contained in hashed category 'hashroot' as dict """
123 123 hfiles = self.keys(hashroot + "/*")
124 124 hfiles.sort()
125 125 last = len(hfiles) and hfiles[-1] or ''
126 126 if last.endswith('xx'):
127 127 # print "using xx"
128 128 hfiles = [last] + hfiles[:-1]
129 129
130 130 all = {}
131 131
132 132 for f in hfiles:
133 133 # print "using",f
134 134 try:
135 135 all.update(self[f])
136 136 except KeyError:
137 137 print "Corrupt",f,"deleted - hset is not threadsafe!"
138 138 del self[f]
139 139
140 140 self.uncache(f)
141 141
142 142 return all
143 143
144 144 def hcompress(self, hashroot):
145 145 """ Compress category 'hashroot', so hset is fast again
146 146
147 147 hget will fail if fast_only is True for compressed items (that were
148 148 hset before hcompress).
149 149
150 150 """
151 151 hfiles = self.keys(hashroot + "/*")
152 152 all = {}
153 153 for f in hfiles:
154 154 # print "using",f
155 155 all.update(self[f])
156 156 self.uncache(f)
157 157
158 158 self[hashroot + '/xx'] = all
159 159 for f in hfiles:
160 160 p = self.root / f
161 161 if p.basename() == 'xx':
162 162 continue
163 163 p.remove()
164 164
165 165
166 166
167 167 def __delitem__(self,key):
168 168 """ del db["key"] """
169 169 fil = self.root / key
170 170 self.cache.pop(fil,None)
171 171 try:
172 172 fil.remove()
173 173 except OSError:
174 174 # notfound and permission denied are ok - we
175 175 # lost, the other process wins the conflict
176 176 pass
177 177
178 178 def _normalized(self, p):
179 179 """ Make a key suitable for user's eyes """
180 180 return str(self.root.relpathto(p)).replace('\\','/')
181 181
182 182 def keys(self, globpat = None):
183 183 """ All keys in DB, or all keys matching a glob"""
184 184
185 185 if globpat is None:
186 186 files = self.root.walkfiles()
187 187 else:
188 188 files = [Path(p) for p in glob.glob(self.root/globpat)]
189 189 return [self._normalized(p) for p in files if p.isfile()]
190 190
191 191 def __iter__(self):
192 192 return iter(keys)
193 193
194 194 def __len__(self):
195 195 return len(keys)
196 196
197 197 def uncache(self,*items):
198 198 """ Removes all, or specified items from cache
199 199
200 200 Use this after reading a large amount of large objects
201 201 to free up memory, when you won't be needing the objects
202 202 for a while.
203 203
204 204 """
205 205 if not items:
206 206 self.cache = {}
207 207 for it in items:
208 208 self.cache.pop(it,None)
209 209
210 210 def waitget(self,key, maxwaittime = 60 ):
211 211 """ Wait (poll) for a key to get a value
212 212
213 213 Will wait for `maxwaittime` seconds before raising a KeyError.
214 214 The call exits normally if the `key` field in db gets a value
215 215 within the timeout period.
216 216
217 217 Use this for synchronizing different processes or for ensuring
218 218 that an unfortunately timed "db['key'] = newvalue" operation
219 219 in another process (which causes all 'get' operation to cause a
220 220 KeyError for the duration of pickling) won't screw up your program
221 221 logic.
222 222 """
223 223
224 224 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
225 225 tries = 0
226 226 waited = 0
227 227 while 1:
228 228 try:
229 229 val = self[key]
230 230 return val
231 231 except KeyError:
232 232 pass
233 233
234 234 if waited > maxwaittime:
235 235 raise KeyError(key)
236 236
237 237 time.sleep(wtimes[tries])
238 238 waited+=wtimes[tries]
239 239 if tries < len(wtimes) -1:
240 240 tries+=1
241 241
242 242 def getlink(self,folder):
243 243 """ Get a convenient link for accessing items """
244 244 return PickleShareLink(self, folder)
245 245
246 246 def __repr__(self):
247 247 return "PickleShareDB('%s')" % self.root
248 248
249 249
250 250
251 251 class PickleShareLink:
252 252 """ A shortdand for accessing nested PickleShare data conveniently.
253 253
254 254 Created through PickleShareDB.getlink(), example::
255 255
256 256 lnk = db.getlink('myobjects/test')
257 257 lnk.foo = 2
258 258 lnk.bar = lnk.foo + 5
259 259
260 260 """
261 261 def __init__(self, db, keydir ):
262 262 self.__dict__.update(locals())
263 263
264 264 def __getattr__(self,key):
265 265 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
266 266 def __setattr__(self,key,val):
267 267 self.db[self.keydir+'/' + key] = val
268 268 def __repr__(self):
269 269 db = self.__dict__['db']
270 270 keys = db.keys( self.__dict__['keydir'] +"/*")
271 271 return "<PickleShareLink '%s': %s>" % (
272 272 self.__dict__['keydir'],
273 273 ";".join([Path(k).basename() for k in keys]))
274 274
275 275
276 276 def test():
277 277 db = PickleShareDB('~/testpickleshare')
278 278 db.clear()
279 279 print "Should be empty:",db.items()
280 280 db['hello'] = 15
281 281 db['aku ankka'] = [1,2,313]
282 282 db['paths/nest/ok/keyname'] = [1,(5,46)]
283 283 db.hset('hash', 'aku', 12)
284 284 db.hset('hash', 'ankka', 313)
285 285 print "12 =",db.hget('hash','aku')
286 286 print "313 =",db.hget('hash','ankka')
287 287 print "all hashed",db.hdict('hash')
288 288 print db.keys()
289 289 print db.keys('paths/nest/ok/k*')
290 290 print dict(db) # snapsot of whole db
291 291 db.uncache() # frees memory, causes re-reads later
292 292
293 293 # shorthand for accessing deeply nested files
294 294 lnk = db.getlink('myobjects/test')
295 295 lnk.foo = 2
296 296 lnk.bar = lnk.foo + 5
297 297 print lnk.bar # 7
298 298
299 299 def stress():
300 300 db = PickleShareDB('~/fsdbtest')
301 301 import time,sys
302 302 for i in range(1000):
303 303 for j in range(1000):
304 304 if i % 15 == 0 and i < 200:
305 305 if str(j) in db:
306 306 del db[str(j)]
307 307 continue
308 308
309 309 if j%33 == 0:
310 310 time.sleep(0.02)
311 311
312 312 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
313 313 db.hset('hash',j, db.hget('hash',j,15) + 1 )
314 314
315 315 print i,
316 316 sys.stdout.flush()
317 317 if i % 10 == 0:
318 318 db.uncache()
319 319
320 320 def main():
321 321 import textwrap
322 322 usage = textwrap.dedent("""\
323 323 pickleshare - manage PickleShare databases
324 324
325 325 Usage:
326 326
327 327 pickleshare dump /path/to/db > dump.txt
328 328 pickleshare load /path/to/db < dump.txt
329 329 pickleshare test /path/to/db
330 330 """)
331 331 DB = PickleShareDB
332 332 import sys
333 333 if len(sys.argv) < 2:
334 334 print usage
335 335 return
336 336
337 337 cmd = sys.argv[1]
338 338 args = sys.argv[2:]
339 339 if cmd == 'dump':
340 340 if not args: args= ['.']
341 341 db = DB(args[0])
342 342 import pprint
343 343 pprint.pprint(db.items())
344 344 elif cmd == 'load':
345 345 cont = sys.stdin.read()
346 346 db = DB(args[0])
347 347 data = eval(cont)
348 348 db.clear()
349 349 for k,v in db.items():
350 350 db[k] = v
351 351 elif cmd == 'testwait':
352 352 db = DB(args[0])
353 353 db.clear()
354 354 print db.waitget('250')
355 355 elif cmd == 'test':
356 356 test()
357 357 stress()
358 358
359 359 if __name__== "__main__":
360 360 main()
361 361
362 362
General Comments 0
You need to be logged in to leave comments. Login now