##// END OF EJS Templates
Fixing misc testing related things.
Brian Granger -
Show More
@@ -1,76 +1,77 b''
1 1 """
2 2 Base front end class for all async frontends.
3 3 """
4 4 __docformat__ = "restructuredtext en"
5 5
6 6 #-------------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-------------------------------------------------------------------------------
12 12
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 from IPython.external import guid
18 17
18 from IPython.external import guid
19 19
20 20 from zope.interface import Interface, Attribute, implements, classProvides
21 21 from twisted.python.failure import Failure
22 from IPython.frontend.frontendbase import FrontEndBase, IFrontEnd, IFrontEndFactory
22 from IPython.frontend.frontendbase import (
23 FrontEndBase, IFrontEnd, IFrontEndFactory)
23 24 from IPython.kernel.core.history import FrontEndHistory
24 25 from IPython.kernel.engineservice import IEngineCore
25 26
26 27
27 28 class AsyncFrontEndBase(FrontEndBase):
28 29 """
29 30 Overrides FrontEndBase to wrap execute in a deferred result.
30 31 All callbacks are made as callbacks on the deferred result.
31 32 """
32 33
33 34 implements(IFrontEnd)
34 35 classProvides(IFrontEndFactory)
35 36
36 37 def __init__(self, engine=None, history=None):
37 38 assert(engine==None or IEngineCore.providedBy(engine))
38 39 self.engine = IEngineCore(engine)
39 40 if history is None:
40 41 self.history = FrontEndHistory(input_cache=[''])
41 42 else:
42 43 self.history = history
43 44
44 45
45 46 def execute(self, block, blockID=None):
46 47 """Execute the block and return the deferred result.
47 48
48 49 Parameters:
49 50 block : {str, AST}
50 51 blockID : any
51 52 Caller may provide an ID to identify this block.
52 53 result['blockID'] := blockID
53 54
54 55 Result:
55 56 Deferred result of self.interpreter.execute
56 57 """
57 58
58 59 if(not self.is_complete(block)):
59 60 return Failure(Exception("Block is not compilable"))
60 61
61 62 if(blockID == None):
62 63 blockID = guid.generate()
63 64
64 65 d = self.engine.execute(block)
65 66 d.addCallback(self._add_history, block=block)
66 67 d.addCallbacks(self._add_block_id_for_result,
67 68 errback=self._add_block_id_for_failure,
68 69 callbackArgs=(blockID,),
69 70 errbackArgs=(blockID,))
70 71 d.addBoth(self.update_cell_prompt, blockID=blockID)
71 72 d.addCallbacks(self.render_result,
72 73 errback=self.render_error)
73 74
74 75 return d
75 76
76 77
@@ -1,95 +1,100 b''
1 1 # encoding: utf-8
2 2 """This file contains unittests for the
3 3 IPython.frontend.cocoa.cocoa_frontend module.
4 4 """
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #---------------------------------------------------------------------------
8 8 # Copyright (C) 2005 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #---------------------------------------------------------------------------
13 13
14 14 #---------------------------------------------------------------------------
15 15 # Imports
16 16 #---------------------------------------------------------------------------
17 17
18 # Tell nose to skip this module
19 __test__ = {}
20
21 from twisted.trial import unittest
22 from twisted.internet.defer import succeed
23
24 from IPython.kernel.core.interpreter import Interpreter
25 import IPython.kernel.engineservice as es
18 26
19 27 try:
20 from IPython.kernel.core.interpreter import Interpreter
21 import IPython.kernel.engineservice as es
22 from IPython.testing.util import DeferredTestCase
23 from twisted.internet.defer import succeed
24 from IPython.frontend.cocoa.cocoa_frontend import IPythonCocoaController
28 from IPython.frontend.cocoa.cocoa_frontend import IPythonCocoaController
25 29 from Foundation import NSMakeRect
26 30 from AppKit import NSTextView, NSScrollView
27 31 except ImportError:
28 import nose
29 raise nose.SkipTest("This test requires zope.interface, Twisted, Foolscap and PyObjC")
32 # This tells twisted.trial to skip this module if PyObjC is not found
33 skip = True
30 34
31 class TestIPythonCocoaControler(DeferredTestCase):
35 #---------------------------------------------------------------------------
36 # Tests
37 #---------------------------------------------------------------------------
38 class TestIPythonCocoaControler(unittest.TestCase):
32 39 """Tests for IPythonCocoaController"""
33
40
34 41 def setUp(self):
35 42 self.controller = IPythonCocoaController.alloc().init()
36 43 self.engine = es.EngineService()
37 44 self.engine.startService()
38
45
39 46 def tearDown(self):
40 47 self.controller = None
41 48 self.engine.stopService()
42
49
43 50 def testControllerExecutesCode(self):
44 51 code ="""5+5"""
45 52 expected = Interpreter().execute(code)
46 53 del expected['number']
47 54 def removeNumberAndID(result):
48 55 del result['number']
49 56 del result['id']
50 57 return result
51 self.assertDeferredEquals(
52 self.controller.execute(code).addCallback(removeNumberAndID),
53 expected)
54
58 d = self.controller.execute(code)
59 d.addCallback(removeNumberAndID)
60 d.addCallback(lambda r: self.assertEquals(r, expected))
61
55 62 def testControllerMirrorsUserNSWithValuesAsStrings(self):
56 63 code = """userns1=1;userns2=2"""
57 64 def testControllerUserNS(result):
58 65 self.assertEquals(self.controller.userNS['userns1'], 1)
59 66 self.assertEquals(self.controller.userNS['userns2'], 2)
60
61 67 self.controller.execute(code).addCallback(testControllerUserNS)
62
63
68
64 69 def testControllerInstantiatesIEngine(self):
65 70 self.assert_(es.IEngineBase.providedBy(self.controller.engine))
66
71
67 72 def testControllerCompletesToken(self):
68 73 code = """longNameVariable=10"""
69 74 def testCompletes(result):
70 75 self.assert_("longNameVariable" in result)
71
76
72 77 def testCompleteToken(result):
73 78 self.controller.complete("longNa").addCallback(testCompletes)
74
79
75 80 self.controller.execute(code).addCallback(testCompletes)
76
77
81
82
78 83 def testCurrentIndent(self):
79 84 """test that current_indent_string returns current indent or None.
80 85 Uses _indent_for_block for direct unit testing.
81 86 """
82
87
83 88 self.controller.tabUsesSpaces = True
84 89 self.assert_(self.controller._indent_for_block("""a=3""") == None)
85 90 self.assert_(self.controller._indent_for_block("") == None)
86 91 block = """def test():\n a=3"""
87 92 self.assert_(self.controller._indent_for_block(block) == \
88 93 ' ' * self.controller.tabSpaces)
89
94
90 95 block = """if(True):\n%sif(False):\n%spass""" % \
91 96 (' '*self.controller.tabSpaces,
92 97 2*' '*self.controller.tabSpaces)
93 98 self.assert_(self.controller._indent_for_block(block) == \
94 99 2*(' '*self.controller.tabSpaces))
95
100
@@ -1,111 +1,109 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the asyncfrontendbase module."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #---------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #---------------------------------------------------------------------------
13 13
14 14 #---------------------------------------------------------------------------
15 15 # Imports
16 16 #---------------------------------------------------------------------------
17 17
18 # Tell nose to skip this module
19 __test__ = {}
18 20
19 try:
20 from twisted.trial import unittest
21 from IPython.frontend.asyncfrontendbase import AsyncFrontEndBase
22 from IPython.frontend import frontendbase
23 from IPython.kernel.engineservice import EngineService
24 from IPython.testing.parametric import Parametric, parametric
25 except ImportError:
26 import nose
27 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
21 from twisted.trial import unittest
22 from IPython.frontend.asyncfrontendbase import AsyncFrontEndBase
23 from IPython.frontend import frontendbase
24 from IPython.kernel.engineservice import EngineService
25 from IPython.testing.parametric import Parametric, parametric
28 26
29 27
30 28 class FrontEndCallbackChecker(AsyncFrontEndBase):
31 29 """FrontEndBase subclass for checking callbacks"""
32 30 def __init__(self, engine=None, history=None):
33 31 super(FrontEndCallbackChecker, self).__init__(engine=engine,
34 32 history=history)
35 33 self.updateCalled = False
36 34 self.renderResultCalled = False
37 35 self.renderErrorCalled = False
38 36
39 37 def update_cell_prompt(self, result, blockID=None):
40 38 self.updateCalled = True
41 39 return result
42 40
43 41 def render_result(self, result):
44 42 self.renderResultCalled = True
45 43 return result
46 44
47 45 def render_error(self, failure):
48 46 self.renderErrorCalled = True
49 47 return failure
50 48
51 49
52 50 class TestAsyncFrontendBase(unittest.TestCase):
53 51 def setUp(self):
54 52 """Setup the EngineService and FrontEndBase"""
55 53
56 54 self.fb = FrontEndCallbackChecker(engine=EngineService())
57 55
58 56 def test_implements_IFrontEnd(self):
59 57 self.assert_(frontendbase.IFrontEnd.implementedBy(
60 58 AsyncFrontEndBase))
61 59
62 60 def test_is_complete_returns_False_for_incomplete_block(self):
63 61 block = """def test(a):"""
64 62 self.assert_(self.fb.is_complete(block) == False)
65 63
66 64 def test_is_complete_returns_True_for_complete_block(self):
67 65 block = """def test(a): pass"""
68 66 self.assert_(self.fb.is_complete(block))
69 67 block = """a=3"""
70 68 self.assert_(self.fb.is_complete(block))
71 69
72 70 def test_blockID_added_to_result(self):
73 71 block = """3+3"""
74 72 d = self.fb.execute(block, blockID='TEST_ID')
75 73 d.addCallback(lambda r: self.assert_(r['blockID']=='TEST_ID'))
76 74 return d
77 75
78 76 def test_blockID_added_to_failure(self):
79 77 block = "raise Exception()"
80 78 d = self.fb.execute(block,blockID='TEST_ID')
81 79 d.addErrback(lambda f: self.assert_(f.blockID=='TEST_ID'))
82 80 return d
83 81
84 82 def test_callbacks_added_to_execute(self):
85 83 d = self.fb.execute("10+10")
86 84 d.addCallback(lambda r: self.assert_(self.fb.updateCalled and self.fb.renderResultCalled))
87 85 return d
88 86
89 87 def test_error_callback_added_to_execute(self):
90 88 """Test that render_error called on execution error."""
91 89
92 90 d = self.fb.execute("raise Exception()")
93 91 d.addErrback(lambda f: self.assert_(self.fb.renderErrorCalled))
94 92 return d
95 93
96 94 def test_history_returns_expected_block(self):
97 95 """Make sure history browsing doesn't fail."""
98 96
99 97 blocks = ["a=1","a=2","a=3"]
100 98 d = self.fb.execute(blocks[0])
101 99 d.addCallback(lambda _: self.fb.execute(blocks[1]))
102 100 d.addCallback(lambda _: self.fb.execute(blocks[2]))
103 101 d.addCallback(lambda _: self.assert_(self.fb.get_history_previous("")==blocks[-2]))
104 102 d.addCallback(lambda _: self.assert_(self.fb.get_history_previous("")==blocks[-3]))
105 103 d.addCallback(lambda _: self.assert_(self.fb.get_history_next()==blocks[-2]))
106 104 return d
107 105
108 106 def test_history_returns_none_at_startup(self):
109 107 self.assert_(self.fb.get_history_previous("")==None)
110 108 self.assert_(self.fb.get_history_next()==None)
111 109
@@ -1,67 +1,67 b''
1 1 # encoding: utf-8
2 2 """
3 3 Test process execution and IO redirection.
4 4 """
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2009 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is
12 12 # in the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 14
15 15 from cStringIO import StringIO
16 16 from time import sleep
17 17 import sys
18 18
19 19 from IPython.frontend.process import PipedProcess
20 20 from IPython.testing import decorators as testdec
21 21
22 22
23 23 def test_capture_out():
24 24 """ A simple test to see if we can execute a process and get the output.
25 25 """
26 26 s = StringIO()
27 27 p = PipedProcess('echo 1', out_callback=s.write, )
28 28 p.start()
29 29 p.join()
30 30 result = s.getvalue().rstrip()
31 31 assert result == '1'
32 32
33 33
34 34 def test_io():
35 35 """ Checks that we can send characters on stdin to the process.
36 36 """
37 37 s = StringIO()
38 38 p = PipedProcess(sys.executable + ' -c "a = raw_input(); print a"',
39 39 out_callback=s.write, )
40 40 p.start()
41 41 test_string = '12345\n'
42 42 while not hasattr(p, 'process'):
43 43 sleep(0.1)
44 44 p.process.stdin.write(test_string)
45 45 p.join()
46 46 result = s.getvalue()
47 47 assert result == test_string
48 48
49 49
50 50 def test_kill():
51 51 """ Check that we can kill a process, and its subprocess.
52 52 """
53 53 s = StringIO()
54 54 p = PipedProcess(sys.executable + ' -c "a = raw_input();"',
55 55 out_callback=s.write, )
56 56 p.start()
57 57 while not hasattr(p, 'process'):
58 58 sleep(0.1)
59 59 p.process.kill()
60 60 assert p.process.poll() is not None
61 61
62 62
63 63 if __name__ == '__main__':
64 64 test_capture_out()
65 65 test_io()
66 66 test_kill()
67 67
@@ -1,61 +1,64 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the interpreter.py module."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import unittest
19 19 from IPython.kernel.core.interpreter import Interpreter
20 20
21 21 #-----------------------------------------------------------------------------
22 22 # Tests
23 23 #-----------------------------------------------------------------------------
24 24
25 # Tell nose to skip this module
26 __test__ = {}
27
25 28 class TestInterpreter(unittest.TestCase):
26 29
27 30 def test_unicode(self):
28 31 """ Test unicode handling with the interpreter."""
29 32 i = Interpreter()
30 33 i.execute_python(u'print "ΓΉ"')
31 34 i.execute_python('print "ΓΉ"')
32 35
33 36 def test_ticket266993(self):
34 37 """ Test for ticket 266993."""
35 38 i = Interpreter()
36 39 i.execute('str("""a\nb""")')
37 40
38 41 def test_ticket364347(self):
39 42 """Test for ticket 364347."""
40 43 i = Interpreter()
41 44 i.split_commands('str("a\\nb")')
42 45
43 46 def test_split_commands(self):
44 47 """ Test that commands are indeed individually split."""
45 48 i = Interpreter()
46 49 test_atoms = [('(1\n + 1)', ),
47 50 ('1', '1', ),
48 51 ]
49 52 for atoms in test_atoms:
50 53 atoms = [atom.rstrip() + '\n' for atom in atoms]
51 54 self.assertEquals(i.split_commands(''.join(atoms)),atoms)
52 55
53 56 def test_long_lines(self):
54 57 """ Test for spurious syntax error created by the interpreter."""
55 58 test_strings = [u'( 1 +\n 1\n )\n\n',
56 59 u'(1 \n + 1\n )\n\n',
57 60 ]
58 61 i = Interpreter()
59 62 for s in test_strings:
60 63 i.execute(s)
61 64
@@ -1,171 +1,174 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the notification.py module."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 # Tell nose to skip this module
19 __test__ = {}
20
18 21 import unittest
19 22 import IPython.kernel.core.notification as notification
20 23 from nose.tools import timed
21 24
22 25 #
23 26 # Supporting test classes
24 27 #
25 28
26 29 class Observer(object):
27 30 """docstring for Observer"""
28 31 def __init__(self, expectedType, expectedSender,
29 32 center=notification.sharedCenter, **kwargs):
30 33 super(Observer, self).__init__()
31 34 self.expectedType = expectedType
32 35 self.expectedSender = expectedSender
33 36 self.expectedKwArgs = kwargs
34 37 self.recieved = False
35 38 center.add_observer(self.callback,
36 39 self.expectedType,
37 40 self.expectedSender)
38 41
39 42
40 43 def callback(self, theType, sender, args={}):
41 44 """callback"""
42 45
43 46 assert(theType == self.expectedType or
44 47 self.expectedType == None)
45 48 assert(sender == self.expectedSender or
46 49 self.expectedSender == None)
47 50 assert(args == self.expectedKwArgs)
48 51 self.recieved = True
49 52
50 53
51 54 def verify(self):
52 55 """verify"""
53 56
54 57 assert(self.recieved)
55 58
56 59 def reset(self):
57 60 """reset"""
58 61
59 62 self.recieved = False
60 63
61 64
62 65
63 66 class Notifier(object):
64 67 """docstring for Notifier"""
65 68 def __init__(self, theType, **kwargs):
66 69 super(Notifier, self).__init__()
67 70 self.theType = theType
68 71 self.kwargs = kwargs
69 72
70 73 def post(self, center=notification.sharedCenter):
71 74 """fire"""
72 75
73 76 center.post_notification(self.theType, self,
74 77 **self.kwargs)
75 78
76 79
77 80 #
78 81 # Test Cases
79 82 #
80 83
81 84 class NotificationTests(unittest.TestCase):
82 85 """docstring for NotificationTests"""
83 86
84 87 def tearDown(self):
85 88 notification.sharedCenter.remove_all_observers()
86 89
87 90 def test_notification_delivered(self):
88 91 """Test that notifications are delivered"""
89 92 expectedType = 'EXPECTED_TYPE'
90 93 sender = Notifier(expectedType)
91 94 observer = Observer(expectedType, sender)
92 95
93 96 sender.post()
94 97
95 98 observer.verify()
96 99
97 100
98 101 def test_type_specificity(self):
99 102 """Test that observers are registered by type"""
100 103
101 104 expectedType = 1
102 105 unexpectedType = "UNEXPECTED_TYPE"
103 106 sender = Notifier(expectedType)
104 107 unexpectedSender = Notifier(unexpectedType)
105 108 observer = Observer(expectedType, sender)
106 109
107 110 sender.post()
108 111 unexpectedSender.post()
109 112
110 113 observer.verify()
111 114
112 115
113 116 def test_sender_specificity(self):
114 117 """Test that observers are registered by sender"""
115 118
116 119 expectedType = "EXPECTED_TYPE"
117 120 sender1 = Notifier(expectedType)
118 121 sender2 = Notifier(expectedType)
119 122 observer = Observer(expectedType, sender1)
120 123
121 124 sender1.post()
122 125 sender2.post()
123 126
124 127 observer.verify()
125 128
126 129
127 130 def test_remove_all_observers(self):
128 131 """White-box test for remove_all_observers"""
129 132
130 133 for i in xrange(10):
131 134 Observer('TYPE', None, center=notification.sharedCenter)
132 135
133 136 self.assert_(len(notification.sharedCenter.observers[('TYPE',None)]) >= 10,
134 137 "observers registered")
135 138
136 139 notification.sharedCenter.remove_all_observers()
137 140
138 141 self.assert_(len(notification.sharedCenter.observers) == 0, "observers removed")
139 142
140 143
141 144 def test_any_sender(self):
142 145 """test_any_sender"""
143 146
144 147 expectedType = "EXPECTED_TYPE"
145 148 sender1 = Notifier(expectedType)
146 149 sender2 = Notifier(expectedType)
147 150 observer = Observer(expectedType, None)
148 151
149 152
150 153 sender1.post()
151 154 observer.verify()
152 155
153 156 observer.reset()
154 157 sender2.post()
155 158 observer.verify()
156 159
157 160
158 161 @timed(.01)
159 162 def test_post_performance(self):
160 163 """Test that post_notification, even with many registered irrelevant
161 164 observers is fast"""
162 165
163 166 for i in xrange(10):
164 167 Observer("UNRELATED_TYPE", None)
165 168
166 169 o = Observer('EXPECTED_TYPE', None)
167 170
168 171 notification.sharedCenter.post_notification('EXPECTED_TYPE', self)
169 172
170 173 o.verify()
171 174
@@ -1,70 +1,70 b''
1 1 # encoding: utf-8
2 2 """
3 3 Test the output capture at the OS level, using file descriptors.
4 4 """
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is
12 12 # in the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 # Tell nose to skip this module
16 __test__ = {}
15 17
16 # Stdlib imports
17 18 import os
18 19 from cStringIO import StringIO
19 20
20 # Our own imports
21 21 from IPython.testing import decorators as dec
22 22
23 23 #-----------------------------------------------------------------------------
24 24 # Test functions
25 25
26 26 @dec.skip_win32
27 27 def test_redirector():
28 28 """ Checks that the redirector can be used to do synchronous capture.
29 29 """
30 30 from IPython.kernel.core.fd_redirector import FDRedirector
31 31 r = FDRedirector()
32 32 out = StringIO()
33 33 try:
34 34 r.start()
35 35 for i in range(10):
36 36 os.system('echo %ic' % i)
37 37 print >>out, r.getvalue(),
38 38 print >>out, i
39 39 except:
40 40 r.stop()
41 41 raise
42 42 r.stop()
43 43 result1 = out.getvalue()
44 44 result2 = "".join("%ic\n%i\n" %(i, i) for i in range(10))
45 45 assert result1 == result2
46 46
47 47
48 48 @dec.skip_win32
49 49 def test_redirector_output_trap():
50 50 """ This test check not only that the redirector_output_trap does
51 51 trap the output, but also that it does it in a gready way, that
52 52 is by calling the callback ASAP.
53 53 """
54 54 from IPython.kernel.core.redirector_output_trap import RedirectorOutputTrap
55 55 out = StringIO()
56 56 trap = RedirectorOutputTrap(out.write, out.write)
57 57 try:
58 58 trap.set()
59 59 for i in range(10):
60 60 os.system('echo %ic' % i)
61 61 print "%ip" % i
62 62 print >>out, i
63 63 except:
64 64 trap.unset()
65 65 raise
66 66 trap.unset()
67 67 result1 = out.getvalue()
68 68 result2 = "".join("%ic\n%ip\n%i\n" %(i, i, i) for i in range(10))
69 69 assert result1 == result2
70 70
@@ -1,43 +1,46 b''
1 # Tell nose to skip this module
2 __test__ = {}
3
1 4 #from __future__ import with_statement
2 5
3 6 # XXX This file is currently disabled to preserve 2.4 compatibility.
4 7
5 8 #def test_simple():
6 9 if 0:
7 10
8 11 # XXX - for now, we need a running cluster to be started separately. The
9 12 # daemon work is almost finished, and will make much of this unnecessary.
10 13 from IPython.kernel import client
11 14 mec = client.MultiEngineClient(('127.0.0.1',10105))
12 15
13 16 try:
14 17 mec.get_ids()
15 18 except ConnectionRefusedError:
16 19 import os, time
17 20 os.system('ipcluster -n 2 &')
18 21 time.sleep(2)
19 22 mec = client.MultiEngineClient(('127.0.0.1',10105))
20 23
21 24 mec.block = False
22 25
23 26 import itertools
24 27 c = itertools.count()
25 28
26 29 parallel = RemoteMultiEngine(mec)
27 30
28 31 mec.pushAll()
29 32
30 33 ## with parallel as pr:
31 34 ## # A comment
32 35 ## remote() # this means the code below only runs remotely
33 36 ## print 'Hello remote world'
34 37 ## x = range(10)
35 38 ## # Comments are OK
36 39 ## # Even misindented.
37 40 ## y = x+1
38 41
39 42
40 43 ## with pfor('i',sequence) as pr:
41 44 ## print x[i]
42 45
43 46 print pr.x + pr.y
@@ -1,44 +1,44 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the kernel.engineservice.py module.
4 4
5 5 Things that should be tested:
6 6
7 7 - Should the EngineService return Deferred objects?
8 8 - Run the same tests that are run in shell.py.
9 9 - Make sure that the Interface is really implemented.
10 10 - The startService and stopService methods.
11 11 """
12 12
13 13 __docformat__ = "restructuredtext en"
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Copyright (C) 2008 The IPython Development Team
17 17 #
18 18 # Distributed under the terms of the BSD License. The full license is in
19 19 # the file COPYING, distributed as part of this software.
20 20 #-------------------------------------------------------------------------------
21 21
22 22 #-------------------------------------------------------------------------------
23 23 # Imports
24 24 #-------------------------------------------------------------------------------
25 25
26 try:
27 from twisted.application.service import IService
28 from IPython.kernel.controllerservice import ControllerService
29 from IPython.kernel.tests import multienginetest as met
30 from controllertest import IControllerCoreTestCase
31 from IPython.testing.util import DeferredTestCase
32 except ImportError:
33 import nose
34 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
26 # Tell nose to skip this module
27 __test__ = {}
28
29 from twisted.application.service import IService
30 from IPython.kernel.controllerservice import ControllerService
31 from IPython.kernel.tests import multienginetest as met
32 from controllertest import IControllerCoreTestCase
33 from IPython.testing.util import DeferredTestCase
34
35 35
36 36 class BasicControllerServiceTest(DeferredTestCase,
37 37 IControllerCoreTestCase):
38 38
39 39 def setUp(self):
40 40 self.controller = ControllerService()
41 41 self.controller.startService()
42 42
43 43 def tearDown(self):
44 44 self.controller.stopService()
@@ -1,93 +1,92 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the enginepb.py module."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 try:
19 from twisted.python import components
20 from twisted.internet import reactor, defer
21 from twisted.spread import pb
22 from twisted.internet.base import DelayedCall
23 DelayedCall.debug = True
18 # Tell nose to skip this module
19 __test__ = {}
24 20
25 import zope.interface as zi
21 from twisted.python import components
22 from twisted.internet import reactor, defer
23 from twisted.spread import pb
24 from twisted.internet.base import DelayedCall
25 DelayedCall.debug = True
26 26
27 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
28 from IPython.kernel import engineservice as es
29 from IPython.testing.util import DeferredTestCase
30 from IPython.kernel.controllerservice import IControllerBase
31 from IPython.kernel.enginefc import FCRemoteEngineRefFromService, IEngineBase
32 from IPython.kernel.engineservice import IEngineQueued
33 from IPython.kernel.engineconnector import EngineConnector
34
35 from IPython.kernel.tests.engineservicetest import \
36 IEngineCoreTestCase, \
37 IEngineSerializedTestCase, \
38 IEngineQueuedTestCase
39 except ImportError:
40 import nose
41 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
27 import zope.interface as zi
28
29 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
30 from IPython.kernel import engineservice as es
31 from IPython.testing.util import DeferredTestCase
32 from IPython.kernel.controllerservice import IControllerBase
33 from IPython.kernel.enginefc import FCRemoteEngineRefFromService, IEngineBase
34 from IPython.kernel.engineservice import IEngineQueued
35 from IPython.kernel.engineconnector import EngineConnector
36
37 from IPython.kernel.tests.engineservicetest import \
38 IEngineCoreTestCase, \
39 IEngineSerializedTestCase, \
40 IEngineQueuedTestCase
42 41
43 42
44 43 class EngineFCTest(DeferredTestCase,
45 44 IEngineCoreTestCase,
46 45 IEngineSerializedTestCase,
47 46 IEngineQueuedTestCase
48 47 ):
49 48
50 49 zi.implements(IControllerBase)
51 50
52 51 def setUp(self):
53 52
54 53 # Start a server and append to self.servers
55 54 self.controller_reference = FCRemoteEngineRefFromService(self)
56 55 self.controller_tub = Tub()
57 56 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
58 57 self.controller_tub.setLocation('127.0.0.1:10105')
59 58
60 59 furl = self.controller_tub.registerReference(self.controller_reference)
61 60 self.controller_tub.startService()
62 61
63 62 # Start an EngineService and append to services/client
64 63 self.engine_service = es.EngineService()
65 64 self.engine_service.startService()
66 65 self.engine_tub = Tub()
67 66 self.engine_tub.startService()
68 67 engine_connector = EngineConnector(self.engine_tub)
69 68 d = engine_connector.connect_to_controller(self.engine_service, furl)
70 69 # This deferred doesn't fire until after register_engine has returned and
71 70 # thus, self.engine has been defined and the tets can proceed.
72 71 return d
73 72
74 73 def tearDown(self):
75 74 dlist = []
76 75 # Shut down the engine
77 76 d = self.engine_tub.stopService()
78 77 dlist.append(d)
79 78 # Shut down the controller
80 79 d = self.controller_tub.stopService()
81 80 dlist.append(d)
82 81 return defer.DeferredList(dlist)
83 82
84 83 #---------------------------------------------------------------------------
85 84 # Make me look like a basic controller
86 85 #---------------------------------------------------------------------------
87 86
88 87 def register_engine(self, engine_ref, id=None, ip=None, port=None, pid=None):
89 88 self.engine = IEngineQueued(IEngineBase(engine_ref))
90 89 return {'id':id}
91 90
92 91 def unregister_engine(self, id):
93 92 pass No newline at end of file
@@ -1,80 +1,79 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the kernel.engineservice.py module.
4 4
5 5 Things that should be tested:
6 6
7 7 - Should the EngineService return Deferred objects?
8 8 - Run the same tests that are run in shell.py.
9 9 - Make sure that the Interface is really implemented.
10 10 - The startService and stopService methods.
11 11 """
12 12
13 13 __docformat__ = "restructuredtext en"
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Copyright (C) 2008 The IPython Development Team
17 17 #
18 18 # Distributed under the terms of the BSD License. The full license is in
19 19 # the file COPYING, distributed as part of this software.
20 20 #-------------------------------------------------------------------------------
21 21
22 22 #-------------------------------------------------------------------------------
23 23 # Imports
24 24 #-------------------------------------------------------------------------------
25 25
26 try:
27 from twisted.internet import defer
28 from twisted.application.service import IService
29
30 from IPython.kernel import engineservice as es
31 from IPython.testing.util import DeferredTestCase
32 from IPython.kernel.tests.engineservicetest import \
33 IEngineCoreTestCase, \
34 IEngineSerializedTestCase, \
35 IEngineQueuedTestCase, \
36 IEnginePropertiesTestCase
37 except ImportError:
38 import nose
39 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
26 # Tell nose to skip this module
27 __test__ = {}
28
29 from twisted.internet import defer
30 from twisted.application.service import IService
31
32 from IPython.kernel import engineservice as es
33 from IPython.testing.util import DeferredTestCase
34 from IPython.kernel.tests.engineservicetest import \
35 IEngineCoreTestCase, \
36 IEngineSerializedTestCase, \
37 IEngineQueuedTestCase, \
38 IEnginePropertiesTestCase
40 39
41 40
42 41 class BasicEngineServiceTest(DeferredTestCase,
43 42 IEngineCoreTestCase,
44 43 IEngineSerializedTestCase,
45 44 IEnginePropertiesTestCase):
46 45
47 46 def setUp(self):
48 47 self.engine = es.EngineService()
49 48 self.engine.startService()
50 49
51 50 def tearDown(self):
52 51 return self.engine.stopService()
53 52
54 53 class ThreadedEngineServiceTest(DeferredTestCase,
55 54 IEngineCoreTestCase,
56 55 IEngineSerializedTestCase,
57 56 IEnginePropertiesTestCase):
58 57
59 58 def setUp(self):
60 59 self.engine = es.ThreadedEngineService()
61 60 self.engine.startService()
62 61
63 62 def tearDown(self):
64 63 return self.engine.stopService()
65 64
66 65 class QueuedEngineServiceTest(DeferredTestCase,
67 66 IEngineCoreTestCase,
68 67 IEngineSerializedTestCase,
69 68 IEnginePropertiesTestCase,
70 69 IEngineQueuedTestCase):
71 70
72 71 def setUp(self):
73 72 self.rawEngine = es.EngineService()
74 73 self.rawEngine.startService()
75 74 self.engine = es.IEngineQueued(self.rawEngine)
76 75
77 76 def tearDown(self):
78 77 return self.rawEngine.stopService()
79 78
80 79
@@ -1,56 +1,55 b''
1 1 # encoding: utf-8
2 2
3 3 """"""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 try:
19 from twisted.internet import defer
20 from IPython.testing.util import DeferredTestCase
21 from IPython.kernel.controllerservice import ControllerService
22 from IPython.kernel import multiengine as me
23 from IPython.kernel.tests.multienginetest import (IMultiEngineTestCase,
24 ISynchronousMultiEngineTestCase)
25 except ImportError:
26 import nose
27 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
28
29
18 # Tell nose to skip this module
19 __test__ = {}
20
21 from twisted.internet import defer
22 from IPython.testing.util import DeferredTestCase
23 from IPython.kernel.controllerservice import ControllerService
24 from IPython.kernel import multiengine as me
25 from IPython.kernel.tests.multienginetest import (IMultiEngineTestCase,
26 ISynchronousMultiEngineTestCase)
27
28
30 29 class BasicMultiEngineTestCase(DeferredTestCase, IMultiEngineTestCase):
31 30
32 31 def setUp(self):
33 32 self.controller = ControllerService()
34 33 self.controller.startService()
35 34 self.multiengine = me.IMultiEngine(self.controller)
36 35 self.engines = []
37 36
38 37 def tearDown(self):
39 38 self.controller.stopService()
40 39 for e in self.engines:
41 40 e.stopService()
42 41
43 42
44 43 class SynchronousMultiEngineTestCase(DeferredTestCase, ISynchronousMultiEngineTestCase):
45 44
46 45 def setUp(self):
47 46 self.controller = ControllerService()
48 47 self.controller.startService()
49 48 self.multiengine = me.ISynchronousMultiEngine(me.IMultiEngine(self.controller))
50 49 self.engines = []
51 50
52 51 def tearDown(self):
53 52 self.controller.stopService()
54 53 for e in self.engines:
55 54 e.stopService()
56 55
@@ -1,144 +1,144 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 __docformat__ = "restructuredtext en"
5 5
6 6 #-------------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-------------------------------------------------------------------------------
12 12
13 13 #-------------------------------------------------------------------------------
14 14 # Imports
15 15 #-------------------------------------------------------------------------------
16 16
17 try:
18 from twisted.internet import defer, reactor
17 # Tell nose to skip this module
18 __test__ = {}
19
20 from twisted.internet import defer, reactor
21
22 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
23
24 from IPython.testing.util import DeferredTestCase
25 from IPython.kernel.controllerservice import ControllerService
26 from IPython.kernel.multiengine import IMultiEngine
27 from IPython.kernel.tests.multienginetest import IFullSynchronousMultiEngineTestCase
28 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
29 from IPython.kernel import multiengine as me
30 from IPython.kernel.clientconnector import ClientConnector
31 from IPython.kernel.parallelfunction import ParallelFunction
32 from IPython.kernel.error import CompositeError
33 from IPython.kernel.util import printer
19 34
20 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
21 35
22 from IPython.testing.util import DeferredTestCase
23 from IPython.kernel.controllerservice import ControllerService
24 from IPython.kernel.multiengine import IMultiEngine
25 from IPython.kernel.tests.multienginetest import IFullSynchronousMultiEngineTestCase
26 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
27 from IPython.kernel import multiengine as me
28 from IPython.kernel.clientconnector import ClientConnector
29 from IPython.kernel.parallelfunction import ParallelFunction
30 from IPython.kernel.error import CompositeError
31 from IPython.kernel.util import printer
32 except ImportError:
33 import nose
34 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
35
36 36 def _raise_it(f):
37 37 try:
38 38 f.raiseException()
39 39 except CompositeError, e:
40 40 e.raise_exception()
41 41
42 42
43 43 class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMultiEngineTestCase):
44 44
45 45 def setUp(self):
46 46
47 47 self.engines = []
48 48
49 49 self.controller = ControllerService()
50 50 self.controller.startService()
51 51 self.imultiengine = IMultiEngine(self.controller)
52 52 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
53 53
54 54 self.controller_tub = Tub()
55 55 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
56 56 self.controller_tub.setLocation('127.0.0.1:10105')
57 57
58 58 furl = self.controller_tub.registerReference(self.mec_referenceable)
59 59 self.controller_tub.startService()
60 60
61 61 self.client_tub = ClientConnector()
62 62 d = self.client_tub.get_multiengine_client(furl)
63 63 d.addCallback(self.handle_got_client)
64 64 return d
65 65
66 66 def handle_got_client(self, client):
67 67 self.multiengine = client
68 68
69 69 def tearDown(self):
70 70 dlist = []
71 71 # Shut down the multiengine client
72 72 d = self.client_tub.tub.stopService()
73 73 dlist.append(d)
74 74 # Shut down the engines
75 75 for e in self.engines:
76 76 e.stopService()
77 77 # Shut down the controller
78 78 d = self.controller_tub.stopService()
79 79 d.addBoth(lambda _: self.controller.stopService())
80 80 dlist.append(d)
81 81 return defer.DeferredList(dlist)
82 82
83 83 def test_mapper(self):
84 84 self.addEngine(4)
85 85 m = self.multiengine.mapper()
86 86 self.assertEquals(m.multiengine,self.multiengine)
87 87 self.assertEquals(m.dist,'b')
88 88 self.assertEquals(m.targets,'all')
89 89 self.assertEquals(m.block,True)
90 90
91 91 def test_map_default(self):
92 92 self.addEngine(4)
93 93 m = self.multiengine.mapper()
94 94 d = m.map(lambda x: 2*x, range(10))
95 95 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
96 96 d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10)))
97 97 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
98 98 return d
99 99
100 100 def test_map_noblock(self):
101 101 self.addEngine(4)
102 102 m = self.multiengine.mapper(block=False)
103 103 d = m.map(lambda x: 2*x, range(10))
104 104 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
105 105 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
106 106 return d
107 107
108 108 def test_mapper_fail(self):
109 109 self.addEngine(4)
110 110 m = self.multiengine.mapper()
111 111 d = m.map(lambda x: 1/0, range(10))
112 112 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
113 113 return d
114 114
115 115 def test_parallel(self):
116 116 self.addEngine(4)
117 117 p = self.multiengine.parallel()
118 118 self.assert_(isinstance(p, ParallelFunction))
119 119 @p
120 120 def f(x): return 2*x
121 121 d = f(range(10))
122 122 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
123 123 return d
124 124
125 125 def test_parallel_noblock(self):
126 126 self.addEngine(1)
127 127 p = self.multiengine.parallel(block=False)
128 128 self.assert_(isinstance(p, ParallelFunction))
129 129 @p
130 130 def f(x): return 2*x
131 131 d = f(range(10))
132 132 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
133 133 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
134 134 return d
135 135
136 136 def test_parallel_fail(self):
137 137 self.addEngine(4)
138 138 p = self.multiengine.parallel()
139 139 self.assert_(isinstance(p, ParallelFunction))
140 140 @p
141 141 def f(x): return 1/0
142 142 d = f(range(10))
143 143 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
144 144 return d No newline at end of file
@@ -1,102 +1,102 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the shell.py module."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 try:
19 import zope.interface as zi
20 from twisted.trial import unittest
21 from IPython.testing.util import DeferredTestCase
18 # Tell nose to skip this module
19 __test__ = {}
20
21 import zope.interface as zi
22 from twisted.trial import unittest
23 from IPython.testing.util import DeferredTestCase
24
25 from IPython.kernel.newserialized import \
26 ISerialized, \
27 IUnSerialized, \
28 Serialized, \
29 UnSerialized, \
30 SerializeIt, \
31 UnSerializeIt
22 32
23 from IPython.kernel.newserialized import \
24 ISerialized, \
25 IUnSerialized, \
26 Serialized, \
27 UnSerialized, \
28 SerializeIt, \
29 UnSerializeIt
30 except ImportError:
31 import nose
32 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Tests
36 36 #-----------------------------------------------------------------------------
37 37
38 38 class SerializedTestCase(unittest.TestCase):
39 39
40 40 def setUp(self):
41 41 pass
42 42
43 43 def tearDown(self):
44 44 pass
45 45
46 46 def testSerializedInterfaces(self):
47 47
48 48 us = UnSerialized({'a':10, 'b':range(10)})
49 49 s = ISerialized(us)
50 50 uss = IUnSerialized(s)
51 51 self.assert_(ISerialized.providedBy(s))
52 52 self.assert_(IUnSerialized.providedBy(us))
53 53 self.assert_(IUnSerialized.providedBy(uss))
54 54 for m in list(ISerialized):
55 55 self.assert_(hasattr(s, m))
56 56 for m in list(IUnSerialized):
57 57 self.assert_(hasattr(us, m))
58 58 for m in list(IUnSerialized):
59 59 self.assert_(hasattr(uss, m))
60 60
61 61 def testPickleSerialized(self):
62 62 obj = {'a':1.45345, 'b':'asdfsdf', 'c':10000L}
63 63 original = UnSerialized(obj)
64 64 originalSer = ISerialized(original)
65 65 firstData = originalSer.getData()
66 66 firstTD = originalSer.getTypeDescriptor()
67 67 firstMD = originalSer.getMetadata()
68 68 self.assert_(firstTD == 'pickle')
69 69 self.assert_(firstMD == {})
70 70 unSerialized = IUnSerialized(originalSer)
71 71 secondObj = unSerialized.getObject()
72 72 for k, v in secondObj.iteritems():
73 73 self.assert_(obj[k] == v)
74 74 secondSer = ISerialized(UnSerialized(secondObj))
75 75 self.assert_(firstData == secondSer.getData())
76 76 self.assert_(firstTD == secondSer.getTypeDescriptor() )
77 77 self.assert_(firstMD == secondSer.getMetadata())
78 78
79 79 def testNDArraySerialized(self):
80 80 try:
81 81 import numpy
82 82 except ImportError:
83 83 pass
84 84 else:
85 85 a = numpy.linspace(0.0, 1.0, 1000)
86 86 unSer1 = UnSerialized(a)
87 87 ser1 = ISerialized(unSer1)
88 88 td = ser1.getTypeDescriptor()
89 89 self.assert_(td == 'ndarray')
90 90 md = ser1.getMetadata()
91 91 self.assert_(md['shape'] == a.shape)
92 92 self.assert_(md['dtype'] == a.dtype.str)
93 93 buff = ser1.getData()
94 94 self.assert_(buff == numpy.getbuffer(a))
95 95 s = Serialized(buff, td, md)
96 96 us = IUnSerialized(s)
97 97 final = us.getObject()
98 98 self.assert_(numpy.getbuffer(a) == numpy.getbuffer(final))
99 99 self.assert_(a.dtype.str == final.dtype.str)
100 100 self.assert_(a.shape == final.shape)
101 101
102 102
@@ -1,186 +1,186 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Tests for pendingdeferred.py"""
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 try:
20 from twisted.internet import defer
21 from twisted.python import failure
22
23 from IPython.testing.util import DeferredTestCase
24 import IPython.kernel.pendingdeferred as pd
25 from IPython.kernel import error
26 from IPython.kernel.util import printer
27 except ImportError:
28 import nose
29 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
30
19 # Tell nose to skip this module
20 __test__ = {}
21
22 from twisted.internet import defer
23 from twisted.python import failure
24
25 from IPython.testing.util import DeferredTestCase
26 import IPython.kernel.pendingdeferred as pd
27 from IPython.kernel import error
28 from IPython.kernel.util import printer
29
30
31 31 class Foo(object):
32 32
33 33 def bar(self, bahz):
34 34 return defer.succeed('blahblah: %s' % bahz)
35 35
36 36 class TwoPhaseFoo(pd.PendingDeferredManager):
37 37
38 38 def __init__(self, foo):
39 39 self.foo = foo
40 40 pd.PendingDeferredManager.__init__(self)
41 41
42 42 @pd.two_phase
43 43 def bar(self, bahz):
44 44 return self.foo.bar(bahz)
45 45
46 46 class PendingDeferredManagerTest(DeferredTestCase):
47 47
48 48 def setUp(self):
49 49 self.pdm = pd.PendingDeferredManager()
50 50
51 51 def tearDown(self):
52 52 pass
53 53
54 54 def testBasic(self):
55 55 dDict = {}
56 56 # Create 10 deferreds and save them
57 57 for i in range(10):
58 58 d = defer.Deferred()
59 59 did = self.pdm.save_pending_deferred(d)
60 60 dDict[did] = d
61 61 # Make sure they are begin saved
62 62 for k in dDict.keys():
63 63 self.assert_(self.pdm.quick_has_id(k))
64 64 # Get the pending deferred (block=True), then callback with 'foo' and compare
65 65 for did in dDict.keys()[0:5]:
66 66 d = self.pdm.get_pending_deferred(did,block=True)
67 67 dDict[did].callback('foo')
68 68 d.addCallback(lambda r: self.assert_(r=='foo'))
69 69 # Get the pending deferreds with (block=False) and make sure ResultNotCompleted is raised
70 70 for did in dDict.keys()[5:10]:
71 71 d = self.pdm.get_pending_deferred(did,block=False)
72 72 d.addErrback(lambda f: self.assertRaises(error.ResultNotCompleted, f.raiseException))
73 73 # Now callback the last 5, get them and compare.
74 74 for did in dDict.keys()[5:10]:
75 75 dDict[did].callback('foo')
76 76 d = self.pdm.get_pending_deferred(did,block=False)
77 77 d.addCallback(lambda r: self.assert_(r=='foo'))
78 78
79 79 def test_save_then_delete(self):
80 80 d = defer.Deferred()
81 81 did = self.pdm.save_pending_deferred(d)
82 82 self.assert_(self.pdm.quick_has_id(did))
83 83 self.pdm.delete_pending_deferred(did)
84 84 self.assert_(not self.pdm.quick_has_id(did))
85 85
86 86 def test_save_get_delete(self):
87 87 d = defer.Deferred()
88 88 did = self.pdm.save_pending_deferred(d)
89 89 d2 = self.pdm.get_pending_deferred(did,True)
90 90 d2.addErrback(lambda f: self.assertRaises(error.AbortedPendingDeferredError, f.raiseException))
91 91 self.pdm.delete_pending_deferred(did)
92 92 return d2
93 93
94 94 def test_double_get(self):
95 95 d = defer.Deferred()
96 96 did = self.pdm.save_pending_deferred(d)
97 97 d2 = self.pdm.get_pending_deferred(did,True)
98 98 d3 = self.pdm.get_pending_deferred(did,True)
99 99 d3.addErrback(lambda f: self.assertRaises(error.InvalidDeferredID, f.raiseException))
100 100
101 101 def test_get_after_callback(self):
102 102 d = defer.Deferred()
103 103 did = self.pdm.save_pending_deferred(d)
104 104 d.callback('foo')
105 105 d2 = self.pdm.get_pending_deferred(did,True)
106 106 d2.addCallback(lambda r: self.assertEquals(r,'foo'))
107 107 self.assert_(not self.pdm.quick_has_id(did))
108 108
109 109 def test_get_before_callback(self):
110 110 d = defer.Deferred()
111 111 did = self.pdm.save_pending_deferred(d)
112 112 d2 = self.pdm.get_pending_deferred(did,True)
113 113 d.callback('foo')
114 114 d2.addCallback(lambda r: self.assertEquals(r,'foo'))
115 115 self.assert_(not self.pdm.quick_has_id(did))
116 116 d = defer.Deferred()
117 117 did = self.pdm.save_pending_deferred(d)
118 118 d2 = self.pdm.get_pending_deferred(did,True)
119 119 d2.addCallback(lambda r: self.assertEquals(r,'foo'))
120 120 d.callback('foo')
121 121 self.assert_(not self.pdm.quick_has_id(did))
122 122
123 123 def test_get_after_errback(self):
124 124 class MyError(Exception):
125 125 pass
126 126 d = defer.Deferred()
127 127 did = self.pdm.save_pending_deferred(d)
128 128 d.errback(failure.Failure(MyError('foo')))
129 129 d2 = self.pdm.get_pending_deferred(did,True)
130 130 d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException))
131 131 self.assert_(not self.pdm.quick_has_id(did))
132 132
133 133 def test_get_before_errback(self):
134 134 class MyError(Exception):
135 135 pass
136 136 d = defer.Deferred()
137 137 did = self.pdm.save_pending_deferred(d)
138 138 d2 = self.pdm.get_pending_deferred(did,True)
139 139 d.errback(failure.Failure(MyError('foo')))
140 140 d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException))
141 141 self.assert_(not self.pdm.quick_has_id(did))
142 142 d = defer.Deferred()
143 143 did = self.pdm.save_pending_deferred(d)
144 144 d2 = self.pdm.get_pending_deferred(did,True)
145 145 d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException))
146 146 d.errback(failure.Failure(MyError('foo')))
147 147 self.assert_(not self.pdm.quick_has_id(did))
148 148
149 149 def test_noresult_noblock(self):
150 150 d = defer.Deferred()
151 151 did = self.pdm.save_pending_deferred(d)
152 152 d2 = self.pdm.get_pending_deferred(did,False)
153 153 d2.addErrback(lambda f: self.assertRaises(error.ResultNotCompleted, f.raiseException))
154 154
155 155 def test_with_callbacks(self):
156 156 d = defer.Deferred()
157 157 d.addCallback(lambda r: r+' foo')
158 158 d.addCallback(lambda r: r+' bar')
159 159 did = self.pdm.save_pending_deferred(d)
160 160 d2 = self.pdm.get_pending_deferred(did,True)
161 161 d.callback('bam')
162 162 d2.addCallback(lambda r: self.assertEquals(r,'bam foo bar'))
163 163
164 164 def test_with_errbacks(self):
165 165 class MyError(Exception):
166 166 pass
167 167 d = defer.Deferred()
168 168 d.addCallback(lambda r: 'foo')
169 169 d.addErrback(lambda f: 'caught error')
170 170 did = self.pdm.save_pending_deferred(d)
171 171 d2 = self.pdm.get_pending_deferred(did,True)
172 172 d.errback(failure.Failure(MyError('bam')))
173 173 d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException))
174 174
175 175 def test_nested_deferreds(self):
176 176 d = defer.Deferred()
177 177 d2 = defer.Deferred()
178 178 d.addCallback(lambda r: d2)
179 179 did = self.pdm.save_pending_deferred(d)
180 180 d.callback('foo')
181 181 d3 = self.pdm.get_pending_deferred(did,False)
182 182 d3.addErrback(lambda f: self.assertRaises(error.ResultNotCompleted, f.raiseException))
183 183 d2.callback('bar')
184 184 d3 = self.pdm.get_pending_deferred(did,False)
185 185 d3.addCallback(lambda r: self.assertEquals(r,'bar'))
186 186
@@ -1,51 +1,51 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the kernel.task.py module."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 try:
19 import time
20
21 from twisted.internet import defer
22 from twisted.trial import unittest
23
24 from IPython.kernel import task, controllerservice as cs, engineservice as es
25 from IPython.kernel.multiengine import IMultiEngine
26 from IPython.testing.util import DeferredTestCase
27 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
28 except ImportError:
29 import nose
30 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
18 # Tell nose to skip this module
19 __test__ = {}
20
21 import time
22
23 from twisted.internet import defer
24 from twisted.trial import unittest
25
26 from IPython.kernel import task, controllerservice as cs, engineservice as es
27 from IPython.kernel.multiengine import IMultiEngine
28 from IPython.testing.util import DeferredTestCase
29 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
30
31 31
32 32 #-------------------------------------------------------------------------------
33 33 # Tests
34 34 #-------------------------------------------------------------------------------
35 35
36 36 class BasicTaskControllerTestCase(DeferredTestCase, ITaskControllerTestCase):
37 37
38 38 def setUp(self):
39 39 self.controller = cs.ControllerService()
40 40 self.controller.startService()
41 41 self.multiengine = IMultiEngine(self.controller)
42 42 self.tc = task.ITaskController(self.controller)
43 43 self.tc.failurePenalty = 0
44 44 self.engines=[]
45 45
46 46 def tearDown(self):
47 47 self.controller.stopService()
48 48 for e in self.engines:
49 49 e.stopService()
50 50
51 51
@@ -1,162 +1,161 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 __docformat__ = "restructuredtext en"
5 5
6 6 #-------------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-------------------------------------------------------------------------------
12 12
13 13 #-------------------------------------------------------------------------------
14 14 # Imports
15 15 #-------------------------------------------------------------------------------
16 16
17 try:
18 import time
17 # Tell nose to skip this module
18 __test__ = {}
19 19
20 from twisted.internet import defer, reactor
20 import time
21 21
22 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
22 from twisted.internet import defer, reactor
23 23
24 from IPython.kernel import task as taskmodule
25 from IPython.kernel import controllerservice as cs
26 import IPython.kernel.multiengine as me
27 from IPython.testing.util import DeferredTestCase
28 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
29 from IPython.kernel.taskfc import IFCTaskController
30 from IPython.kernel.util import printer
31 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
32 from IPython.kernel.clientconnector import ClientConnector
33 from IPython.kernel.error import CompositeError
34 from IPython.kernel.parallelfunction import ParallelFunction
35 except ImportError:
36 import nose
37 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
24 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
25
26 from IPython.kernel import task as taskmodule
27 from IPython.kernel import controllerservice as cs
28 import IPython.kernel.multiengine as me
29 from IPython.testing.util import DeferredTestCase
30 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
31 from IPython.kernel.taskfc import IFCTaskController
32 from IPython.kernel.util import printer
33 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
34 from IPython.kernel.clientconnector import ClientConnector
35 from IPython.kernel.error import CompositeError
36 from IPython.kernel.parallelfunction import ParallelFunction
38 37
39 38
40 39 #-------------------------------------------------------------------------------
41 40 # Tests
42 41 #-------------------------------------------------------------------------------
43 42
44 43 def _raise_it(f):
45 44 try:
46 45 f.raiseException()
47 46 except CompositeError, e:
48 47 e.raise_exception()
49 48
50 49 class TaskTest(DeferredTestCase, ITaskControllerTestCase):
51 50
52 51 def setUp(self):
53 52
54 53 self.engines = []
55 54
56 55 self.controller = cs.ControllerService()
57 56 self.controller.startService()
58 57 self.imultiengine = me.IMultiEngine(self.controller)
59 58 self.itc = taskmodule.ITaskController(self.controller)
60 59 self.itc.failurePenalty = 0
61 60
62 61 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
63 62 self.tc_referenceable = IFCTaskController(self.itc)
64 63
65 64 self.controller_tub = Tub()
66 65 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
67 66 self.controller_tub.setLocation('127.0.0.1:10105')
68 67
69 68 mec_furl = self.controller_tub.registerReference(self.mec_referenceable)
70 69 tc_furl = self.controller_tub.registerReference(self.tc_referenceable)
71 70 self.controller_tub.startService()
72 71
73 72 self.client_tub = ClientConnector()
74 73 d = self.client_tub.get_multiengine_client(mec_furl)
75 74 d.addCallback(self.handle_mec_client)
76 75 d.addCallback(lambda _: self.client_tub.get_task_client(tc_furl))
77 76 d.addCallback(self.handle_tc_client)
78 77 return d
79 78
80 79 def handle_mec_client(self, client):
81 80 self.multiengine = client
82 81
83 82 def handle_tc_client(self, client):
84 83 self.tc = client
85 84
86 85 def tearDown(self):
87 86 dlist = []
88 87 # Shut down the multiengine client
89 88 d = self.client_tub.tub.stopService()
90 89 dlist.append(d)
91 90 # Shut down the engines
92 91 for e in self.engines:
93 92 e.stopService()
94 93 # Shut down the controller
95 94 d = self.controller_tub.stopService()
96 95 d.addBoth(lambda _: self.controller.stopService())
97 96 dlist.append(d)
98 97 return defer.DeferredList(dlist)
99 98
100 99 def test_mapper(self):
101 100 self.addEngine(1)
102 101 m = self.tc.mapper()
103 102 self.assertEquals(m.task_controller,self.tc)
104 103 self.assertEquals(m.clear_before,False)
105 104 self.assertEquals(m.clear_after,False)
106 105 self.assertEquals(m.retries,0)
107 106 self.assertEquals(m.recovery_task,None)
108 107 self.assertEquals(m.depend,None)
109 108 self.assertEquals(m.block,True)
110 109
111 110 def test_map_default(self):
112 111 self.addEngine(1)
113 112 m = self.tc.mapper()
114 113 d = m.map(lambda x: 2*x, range(10))
115 114 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
116 115 d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10)))
117 116 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
118 117 return d
119 118
120 119 def test_map_noblock(self):
121 120 self.addEngine(1)
122 121 m = self.tc.mapper(block=False)
123 122 d = m.map(lambda x: 2*x, range(10))
124 123 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
125 124 return d
126 125
127 126 def test_mapper_fail(self):
128 127 self.addEngine(1)
129 128 m = self.tc.mapper()
130 129 d = m.map(lambda x: 1/0, range(10))
131 130 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
132 131 return d
133 132
134 133 def test_parallel(self):
135 134 self.addEngine(1)
136 135 p = self.tc.parallel()
137 136 self.assert_(isinstance(p, ParallelFunction))
138 137 @p
139 138 def f(x): return 2*x
140 139 d = f(range(10))
141 140 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
142 141 return d
143 142
144 143 def test_parallel_noblock(self):
145 144 self.addEngine(1)
146 145 p = self.tc.parallel(block=False)
147 146 self.assert_(isinstance(p, ParallelFunction))
148 147 @p
149 148 def f(x): return 2*x
150 149 d = f(range(10))
151 150 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
152 151 return d
153 152
154 153 def test_parallel_fail(self):
155 154 self.addEngine(1)
156 155 p = self.tc.parallel()
157 156 self.assert_(isinstance(p, ParallelFunction))
158 157 @p
159 158 def f(x): return 1/0
160 159 d = f(range(10))
161 160 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
162 161 return d No newline at end of file
@@ -1,48 +1,51 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2008 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 # Tell nose to skip this module
16 __test__ = {}
17
15 18 import tempfile
16 19 import os, sys
17 20
18 21 from twisted.internet import reactor
19 22 from twisted.trial import unittest
20 23
21 24 from IPython.kernel.error import FileTimeoutError
22 25 from IPython.kernel.twistedutil import wait_for_file
23 26
24 27 #-----------------------------------------------------------------------------
25 28 # Tests
26 29 #-----------------------------------------------------------------------------
27 30
28 31 class TestWaitForFile(unittest.TestCase):
29 32
30 33 def test_delay(self):
31 34 filename = tempfile.mktemp()
32 35 def _create_file():
33 36 open(filename,'w').write('####')
34 37 dcall = reactor.callLater(0.5, _create_file)
35 38 d = wait_for_file(filename,delay=0.1)
36 39 d.addCallback(lambda r: self.assert_(r))
37 40 def _cancel_dcall(r):
38 41 if dcall.active():
39 42 dcall.cancel()
40 43 d.addCallback(_cancel_dcall)
41 44 return d
42 45
43 46 def test_timeout(self):
44 47 filename = tempfile.mktemp()
45 48 d = wait_for_file(filename,delay=0.1,max_tries=1)
46 49 d.addErrback(lambda f: self.assertRaises(FileTimeoutError,f.raiseException))
47 50 return d
48 51 No newline at end of file
@@ -1,254 +1,254 b''
1 1 """Decorators for labeling test objects.
2 2
3 3 Decorators that merely return a modified version of the original function
4 4 object are straightforward. Decorators that return a new function object need
5 5 to use nose.tools.make_decorator(original_function)(decorator) in returning the
6 6 decorator, in order to preserve metadata such as function name, setup and
7 7 teardown functions and so on - see nose.tools for more information.
8 8
9 9 This module provides a set of useful decorators meant to be ready to use in
10 10 your own tests. See the bottom of the file for the ready-made ones, and if you
11 11 find yourself writing a new one that may be of generic use, add it here.
12 12
13 13 NOTE: This file contains IPython-specific decorators and imports the
14 14 numpy.testing.decorators file, which we've copied verbatim. Any of our own
15 15 code will be added at the bottom if we end up extending this.
16 16 """
17 17
18 18 # Stdlib imports
19 19 import inspect
20 20 import sys
21 21
22 22 # Third-party imports
23 23
24 24 # This is Michele Simionato's decorator module, also kept verbatim.
25 25 from decorator_msim import decorator, update_wrapper
26 26
27 27 # Grab the numpy-specific decorators which we keep in a file that we
28 28 # occasionally update from upstream: decorators_numpy.py is an IDENTICAL copy
29 29 # of numpy.testing.decorators.
30 30 from decorators_numpy import *
31 31
32 32 ##############################################################################
33 33 # Local code begins
34 34
35 35 # Utility functions
36 36
37 37 def apply_wrapper(wrapper,func):
38 38 """Apply a wrapper to a function for decoration.
39 39
40 40 This mixes Michele Simionato's decorator tool with nose's make_decorator,
41 41 to apply a wrapper in a decorator so that all nose attributes, as well as
42 42 function signature and other properties, survive the decoration cleanly.
43 43 This will ensure that wrapped functions can still be well introspected via
44 44 IPython, for example.
45 45 """
46 46 import nose.tools
47 47
48 48 return decorator(wrapper,nose.tools.make_decorator(func)(wrapper))
49 49
50 50
51 51 def make_label_dec(label,ds=None):
52 52 """Factory function to create a decorator that applies one or more labels.
53 53
54 54 :Parameters:
55 55 label : string or sequence
56 56 One or more labels that will be applied by the decorator to the functions
57 57 it decorates. Labels are attributes of the decorated function with their
58 58 value set to True.
59 59
60 60 :Keywords:
61 61 ds : string
62 62 An optional docstring for the resulting decorator. If not given, a
63 63 default docstring is auto-generated.
64 64
65 65 :Returns:
66 66 A decorator.
67 67
68 68 :Examples:
69 69
70 70 A simple labeling decorator:
71 71 >>> slow = make_label_dec('slow')
72 72 >>> print slow.__doc__
73 73 Labels a test as 'slow'.
74 74
75 75 And one that uses multiple labels and a custom docstring:
76 76 >>> rare = make_label_dec(['slow','hard'],
77 77 ... "Mix labels 'slow' and 'hard' for rare tests.")
78 78 >>> print rare.__doc__
79 79 Mix labels 'slow' and 'hard' for rare tests.
80 80
81 81 Now, let's test using this one:
82 82 >>> @rare
83 83 ... def f(): pass
84 84 ...
85 85 >>>
86 86 >>> f.slow
87 87 True
88 88 >>> f.hard
89 89 True
90 90 """
91 91
92 92 if isinstance(label,basestring):
93 93 labels = [label]
94 94 else:
95 95 labels = label
96 96
97 97 # Validate that the given label(s) are OK for use in setattr() by doing a
98 98 # dry run on a dummy function.
99 99 tmp = lambda : None
100 100 for label in labels:
101 101 setattr(tmp,label,True)
102 102
103 103 # This is the actual decorator we'll return
104 104 def decor(f):
105 105 for label in labels:
106 106 setattr(f,label,True)
107 107 return f
108 108
109 109 # Apply the user's docstring, or autogenerate a basic one
110 110 if ds is None:
111 111 ds = "Labels a test as %r." % label
112 112 decor.__doc__ = ds
113 113
114 114 return decor
115 115
116 116
117 117 # Inspired by numpy's skipif, but uses the full apply_wrapper utility to
118 118 # preserve function metadata better and allows the skip condition to be a
119 119 # callable.
120 120 def skipif(skip_condition, msg=None):
121 121 ''' Make function raise SkipTest exception if skip_condition is true
122 122
123 123 Parameters
124 124 ----------
125 125 skip_condition : bool or callable.
126 Flag to determine whether to skip test. If the condition is a
127 callable, it is used at runtime to dynamically make the decision. This
128 is useful for tests that may require costly imports, to delay the cost
129 until the test suite is actually executed.
126 Flag to determine whether to skip test. If the condition is a
127 callable, it is used at runtime to dynamically make the decision. This
128 is useful for tests that may require costly imports, to delay the cost
129 until the test suite is actually executed.
130 130 msg : string
131 131 Message to give on raising a SkipTest exception
132 132
133 133 Returns
134 134 -------
135 135 decorator : function
136 136 Decorator, which, when applied to a function, causes SkipTest
137 137 to be raised when the skip_condition was True, and the function
138 138 to be called normally otherwise.
139 139
140 140 Notes
141 141 -----
142 142 You will see from the code that we had to further decorate the
143 143 decorator with the nose.tools.make_decorator function in order to
144 144 transmit function name, and various other metadata.
145 145 '''
146 146
147 147 def skip_decorator(f):
148 148 # Local import to avoid a hard nose dependency and only incur the
149 149 # import time overhead at actual test-time.
150 150 import nose
151 151
152 152 # Allow for both boolean or callable skip conditions.
153 153 if callable(skip_condition):
154 154 skip_val = lambda : skip_condition()
155 155 else:
156 156 skip_val = lambda : skip_condition
157 157
158 158 def get_msg(func,msg=None):
159 159 """Skip message with information about function being skipped."""
160 160 if msg is None: out = 'Test skipped due to test condition.'
161 161 else: out = msg
162 162 return "Skipping test: %s. %s" % (func.__name__,out)
163 163
164 164 # We need to define *two* skippers because Python doesn't allow both
165 165 # return with value and yield inside the same function.
166 166 def skipper_func(*args, **kwargs):
167 167 """Skipper for normal test functions."""
168 168 if skip_val():
169 169 raise nose.SkipTest(get_msg(f,msg))
170 170 else:
171 171 return f(*args, **kwargs)
172 172
173 173 def skipper_gen(*args, **kwargs):
174 174 """Skipper for test generators."""
175 175 if skip_val():
176 176 raise nose.SkipTest(get_msg(f,msg))
177 177 else:
178 178 for x in f(*args, **kwargs):
179 179 yield x
180 180
181 181 # Choose the right skipper to use when building the actual generator.
182 182 if nose.util.isgenerator(f):
183 183 skipper = skipper_gen
184 184 else:
185 185 skipper = skipper_func
186 186
187 187 return nose.tools.make_decorator(f)(skipper)
188 188
189 189 return skip_decorator
190 190
191 191 # A version with the condition set to true, common case just to attacha message
192 192 # to a skip decorator
193 193 def skip(msg=None):
194 194 """Decorator factory - mark a test function for skipping from test suite.
195 195
196 196 :Parameters:
197 197 msg : string
198 198 Optional message to be added.
199 199
200 200 :Returns:
201 201 decorator : function
202 202 Decorator, which, when applied to a function, causes SkipTest
203 203 to be raised, with the optional message added.
204 204 """
205 205
206 206 return skipif(True,msg)
207 207
208 208
209 209 #-----------------------------------------------------------------------------
210 210 # Utility functions for decorators
211 211 def numpy_not_available():
212 212 """Can numpy be imported? Returns true if numpy does NOT import.
213 213
214 214 This is used to make a decorator to skip tests that require numpy to be
215 215 available, but delay the 'import numpy' to test execution time.
216 216 """
217 217 try:
218 218 import numpy
219 219 np_not_avail = False
220 220 except ImportError:
221 221 np_not_avail = True
222 222
223 223 return np_not_avail
224 224
225 225 #-----------------------------------------------------------------------------
226 226 # Decorators for public use
227 227
228 228 skip_doctest = make_label_dec('skip_doctest',
229 229 """Decorator - mark a function or method for skipping its doctest.
230 230
231 231 This decorator allows you to mark a function whose docstring you wish to
232 232 omit from testing, while preserving the docstring for introspection, help,
233 233 etc.""")
234 234
235 235 # Decorators to skip certain tests on specific platforms.
236 236 skip_win32 = skipif(sys.platform == 'win32',
237 237 "This test does not run under Windows")
238 238 skip_linux = skipif(sys.platform == 'linux2',
239 239 "This test does not run under Linux")
240 240 skip_osx = skipif(sys.platform == 'darwin',"This test does not run under OS X")
241 241
242 242
243 243 # Decorators to skip tests if not on specific platforms.
244 244 skip_if_not_win32 = skipif(sys.platform != 'win32',
245 245 "This test only runs under Windows")
246 246 skip_if_not_linux = skipif(sys.platform != 'linux2',
247 247 "This test only runs under Linux")
248 248 skip_if_not_osx = skipif(sys.platform != 'darwin',
249 249 "This test only runs under OSX")
250 250
251 251 # Other skip decorators
252 252 skipif_not_numpy = skipif(numpy_not_available,"This test requires numpy")
253 253
254 254 skipknownfailure = skip('This test is known to fail')
General Comments 0
You need to be logged in to leave comments. Login now