diff --git a/IPython/testing/decorators.py b/IPython/testing/decorators.py index 20d0be1..a15e1b3 100644 --- a/IPython/testing/decorators.py +++ b/IPython/testing/decorators.py @@ -60,9 +60,9 @@ import nosepatch # monkeypatch nose # We already have python3-compliant code for parametric tests if sys.version[0]=='2': - from _paramtestpy2 import parametric + from _paramtestpy2 import parametric, ParametricTestCase else: - from _paramtestpy3 import parametric + from _paramtestpy3 import parametric, ParametricTestCase # Expose the unittest-driven decorators from ipunittest import ipdoctest, ipdocstring diff --git a/IPython/utils/genutils.py b/IPython/utils/genutils.py index 6fc204f..f1c2d61 100644 --- a/IPython/utils/genutils.py +++ b/IPython/utils/genutils.py @@ -105,6 +105,65 @@ if sys.platform == 'win32' and readline.have_readline: Term = IOTerm(cout=readline._outputfile,cerr=readline._outputfile) +class Tee(object): + """A class to duplicate an output stream to stdout/err. + + This works in a manner very similar to the Unix 'tee' command. + + When the object is closed or deleted, it closes the original file given to + it for duplication. + """ + # Inspired by: + # http://mail.python.org/pipermail/python-list/2007-May/442737.html + + def __init__(self, file, mode=None, channel='stdout'): + """Construct a new Tee object. + + Parameters + ---------- + file : filename or open filehandle (writable) + File that will be duplicated + + mode : optional, valid mode for open(). + If a filename was give, open with this mode. + + channel : str, one of ['stdout', 'stderr'] + """ + if channel not in ['stdout', 'stderr']: + raise ValueError('Invalid channel spec %s' % channel) + + if hasattr(file, 'write') and hasattr(file, 'seek'): + self.file = file + else: + self.file = open(name, mode) + self.channel = channel + self.ostream = getattr(sys, channel) + setattr(sys, channel, self) + self._closed = False + + def close(self): + """Close the file and restore the channel.""" + self.flush() + setattr(sys, self.channel, self.ostream) + self.file.close() + self._closed = True + + def write(self, data): + """Write data to both channels.""" + self.file.write(data) + self.ostream.write(data) + self.ostream.flush() + + def flush(self): + """Flush both channels.""" + self.file.flush() + self.ostream.flush() + + def __del__(self): + if not self._closed: + self.close() + + #**************************************************************************** # Generic warning/error printer, used by everything else def warn(msg,level=2,exit_val=1): diff --git a/IPython/utils/tests/test_genutils.py b/IPython/utils/tests/test_genutils.py index ea6faf3..72afb83 100644 --- a/IPython/utils/tests/test_genutils.py +++ b/IPython/utils/tests/test_genutils.py @@ -20,7 +20,9 @@ import os import shutil import sys import tempfile +import unittest +from cStringIO import StringIO from os.path import join, abspath, split # third-party @@ -32,6 +34,7 @@ from nose.tools import raises # Our own import IPython from IPython.utils import genutils +from IPython.testing import decorators as dec from IPython.testing.decorators import skipif, skip_if_not_win32 # Platform-dependent imports @@ -107,7 +110,7 @@ def teardown_environment(): (wreg.OpenKey, wreg.QueryValueEx,) = platformstuff # Build decorator that uses the setup_environment/setup_environment -with_enivronment = with_setup(setup_environment, teardown_environment) +with_environment = with_setup(setup_environment, teardown_environment) # @@ -115,7 +118,7 @@ with_enivronment = with_setup(setup_environment, teardown_environment) # @skip_if_not_win32 -@with_enivronment +@with_environment def test_get_home_dir_1(): """Testcase for py2exe logic, un-compressed lib """ @@ -128,7 +131,7 @@ def test_get_home_dir_1(): nt.assert_equal(home_dir, abspath(HOME_TEST_DIR)) @skip_if_not_win32 -@with_enivronment +@with_environment def test_get_home_dir_2(): """Testcase for py2exe logic, compressed lib """ @@ -139,14 +142,14 @@ def test_get_home_dir_2(): home_dir = genutils.get_home_dir() nt.assert_equal(home_dir, abspath(HOME_TEST_DIR).lower()) -@with_enivronment +@with_environment def test_get_home_dir_3(): """Testcase $HOME is set, then use its value as home directory.""" env["HOME"] = HOME_TEST_DIR home_dir = genutils.get_home_dir() nt.assert_equal(home_dir, env["HOME"]) -@with_enivronment +@with_environment def test_get_home_dir_4(): """Testcase $HOME is not set, os=='poix'. This should fail with HomeDirError""" @@ -156,7 +159,7 @@ def test_get_home_dir_4(): nt.assert_raises(genutils.HomeDirError, genutils.get_home_dir) @skip_if_not_win32 -@with_enivronment +@with_environment def test_get_home_dir_5(): """Testcase $HOME is not set, os=='nt' env['HOMEDRIVE'],env['HOMEPATH'] points to path.""" @@ -169,7 +172,7 @@ def test_get_home_dir_5(): nt.assert_equal(home_dir, abspath(HOME_TEST_DIR)) @skip_if_not_win32 -@with_enivronment +@with_environment def test_get_home_dir_6(): """Testcase $HOME is not set, os=='nt' env['HOMEDRIVE'],env['HOMEPATH'] do not point to path. @@ -186,7 +189,7 @@ def test_get_home_dir_6(): # Should we stub wreg fully so we can run the test on all platforms? @skip_if_not_win32 -@with_enivronment +@with_environment def test_get_home_dir_7(): """Testcase $HOME is not set, os=='nt' env['HOMEDRIVE'],env['HOMEPATH'], env['USERPROFILE'] missing @@ -214,7 +217,7 @@ def test_get_home_dir_7(): # Tests for get_ipython_dir # -@with_enivronment +@with_environment def test_get_ipython_dir_1(): """test_get_ipython_dir_1, Testcase to see if we can call get_ipython_dir without Exceptions.""" env['IPYTHONDIR'] = "someplace/.ipython" @@ -222,7 +225,7 @@ def test_get_ipython_dir_1(): nt.assert_equal(ipdir, "someplace/.ipython") -@with_enivronment +@with_environment def test_get_ipython_dir_2(): """test_get_ipython_dir_2, Testcase to see if we can call get_ipython_dir without Exceptions.""" genutils.get_home_dir = lambda : "someplace" @@ -283,3 +286,38 @@ def test_filefind(): def test_get_ipython_package_dir(): ipdir = genutils.get_ipython_package_dir() nt.assert_true(os.path.isdir(ipdir)) + + +def test_tee_simple(): + "Very simple check with stdout only" + chan = StringIO() + text = 'Hello' + tee = genutils.Tee(chan, channel='stdout') + print >> chan, text, + nt.assert_equal(chan.getvalue(), text) + + +class TeeTestCase(dec.ParametricTestCase): + + def tchan(self, channel, check='close'): + trap = StringIO() + chan = StringIO() + text = 'Hello' + + std_ori = getattr(sys, channel) + setattr(sys, channel, trap) + + tee = genutils.Tee(chan, channel=channel) + print >> chan, text, + setattr(sys, channel, std_ori) + trap_val = trap.getvalue() + nt.assert_equals(chan.getvalue(), text) + if check=='close': + tee.close() + else: + del tee + + def test(self): + for chan in ['stdout', 'stderr']: + for check in ['close', 'del']: + yield self.tchan(chan, check)