##// END OF EJS Templates
Fixing misc testing related things.
Brian Granger -
Show More
@@ -1,76 +1,77 b''
1 1 """
2 2 Base front end class for all async frontends.
3 3 """
4 4 __docformat__ = "restructuredtext en"
5 5
6 6 #-------------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-------------------------------------------------------------------------------
12 12
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 from IPython.external import guid
18 17
18 from IPython.external import guid
19 19
20 20 from zope.interface import Interface, Attribute, implements, classProvides
21 21 from twisted.python.failure import Failure
22 from IPython.frontend.frontendbase import FrontEndBase, IFrontEnd, IFrontEndFactory
22 from IPython.frontend.frontendbase import (
23 FrontEndBase, IFrontEnd, IFrontEndFactory)
23 24 from IPython.kernel.core.history import FrontEndHistory
24 25 from IPython.kernel.engineservice import IEngineCore
25 26
26 27
27 28 class AsyncFrontEndBase(FrontEndBase):
28 29 """
29 30 Overrides FrontEndBase to wrap execute in a deferred result.
30 31 All callbacks are made as callbacks on the deferred result.
31 32 """
32 33
33 34 implements(IFrontEnd)
34 35 classProvides(IFrontEndFactory)
35 36
36 37 def __init__(self, engine=None, history=None):
37 38 assert(engine==None or IEngineCore.providedBy(engine))
38 39 self.engine = IEngineCore(engine)
39 40 if history is None:
40 41 self.history = FrontEndHistory(input_cache=[''])
41 42 else:
42 43 self.history = history
43 44
44 45
45 46 def execute(self, block, blockID=None):
46 47 """Execute the block and return the deferred result.
47 48
48 49 Parameters:
49 50 block : {str, AST}
50 51 blockID : any
51 52 Caller may provide an ID to identify this block.
52 53 result['blockID'] := blockID
53 54
54 55 Result:
55 56 Deferred result of self.interpreter.execute
56 57 """
57 58
58 59 if(not self.is_complete(block)):
59 60 return Failure(Exception("Block is not compilable"))
60 61
61 62 if(blockID == None):
62 63 blockID = guid.generate()
63 64
64 65 d = self.engine.execute(block)
65 66 d.addCallback(self._add_history, block=block)
66 67 d.addCallbacks(self._add_block_id_for_result,
67 68 errback=self._add_block_id_for_failure,
68 69 callbackArgs=(blockID,),
69 70 errbackArgs=(blockID,))
70 71 d.addBoth(self.update_cell_prompt, blockID=blockID)
71 72 d.addCallbacks(self.render_result,
72 73 errback=self.render_error)
73 74
74 75 return d
75 76
76 77
@@ -1,95 +1,100 b''
1 1 # encoding: utf-8
2 2 """This file contains unittests for the
3 3 IPython.frontend.cocoa.cocoa_frontend module.
4 4 """
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #---------------------------------------------------------------------------
8 8 # Copyright (C) 2005 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #---------------------------------------------------------------------------
13 13
14 14 #---------------------------------------------------------------------------
15 15 # Imports
16 16 #---------------------------------------------------------------------------
17 17
18 # Tell nose to skip this module
19 __test__ = {}
20
21 from twisted.trial import unittest
22 from twisted.internet.defer import succeed
18 23
19 try:
20 24 from IPython.kernel.core.interpreter import Interpreter
21 25 import IPython.kernel.engineservice as es
22 from IPython.testing.util import DeferredTestCase
23 from twisted.internet.defer import succeed
26
27 try:
24 28 from IPython.frontend.cocoa.cocoa_frontend import IPythonCocoaController
25 29 from Foundation import NSMakeRect
26 30 from AppKit import NSTextView, NSScrollView
27 31 except ImportError:
28 import nose
29 raise nose.SkipTest("This test requires zope.interface, Twisted, Foolscap and PyObjC")
32 # This tells twisted.trial to skip this module if PyObjC is not found
33 skip = True
30 34
31 class TestIPythonCocoaControler(DeferredTestCase):
35 #---------------------------------------------------------------------------
36 # Tests
37 #---------------------------------------------------------------------------
38 class TestIPythonCocoaControler(unittest.TestCase):
32 39 """Tests for IPythonCocoaController"""
33 40
34 41 def setUp(self):
35 42 self.controller = IPythonCocoaController.alloc().init()
36 43 self.engine = es.EngineService()
37 44 self.engine.startService()
38 45
39 46 def tearDown(self):
40 47 self.controller = None
41 48 self.engine.stopService()
42 49
43 50 def testControllerExecutesCode(self):
44 51 code ="""5+5"""
45 52 expected = Interpreter().execute(code)
46 53 del expected['number']
47 54 def removeNumberAndID(result):
48 55 del result['number']
49 56 del result['id']
50 57 return result
51 self.assertDeferredEquals(
52 self.controller.execute(code).addCallback(removeNumberAndID),
53 expected)
58 d = self.controller.execute(code)
59 d.addCallback(removeNumberAndID)
60 d.addCallback(lambda r: self.assertEquals(r, expected))
54 61
55 62 def testControllerMirrorsUserNSWithValuesAsStrings(self):
56 63 code = """userns1=1;userns2=2"""
57 64 def testControllerUserNS(result):
58 65 self.assertEquals(self.controller.userNS['userns1'], 1)
59 66 self.assertEquals(self.controller.userNS['userns2'], 2)
60
61 67 self.controller.execute(code).addCallback(testControllerUserNS)
62 68
63
64 69 def testControllerInstantiatesIEngine(self):
65 70 self.assert_(es.IEngineBase.providedBy(self.controller.engine))
66 71
67 72 def testControllerCompletesToken(self):
68 73 code = """longNameVariable=10"""
69 74 def testCompletes(result):
70 75 self.assert_("longNameVariable" in result)
71 76
72 77 def testCompleteToken(result):
73 78 self.controller.complete("longNa").addCallback(testCompletes)
74 79
75 80 self.controller.execute(code).addCallback(testCompletes)
76 81
77 82
78 83 def testCurrentIndent(self):
79 84 """test that current_indent_string returns current indent or None.
80 85 Uses _indent_for_block for direct unit testing.
81 86 """
82 87
83 88 self.controller.tabUsesSpaces = True
84 89 self.assert_(self.controller._indent_for_block("""a=3""") == None)
85 90 self.assert_(self.controller._indent_for_block("") == None)
86 91 block = """def test():\n a=3"""
87 92 self.assert_(self.controller._indent_for_block(block) == \
88 93 ' ' * self.controller.tabSpaces)
89 94
90 95 block = """if(True):\n%sif(False):\n%spass""" % \
91 96 (' '*self.controller.tabSpaces,
92 97 2*' '*self.controller.tabSpaces)
93 98 self.assert_(self.controller._indent_for_block(block) == \
94 99 2*(' '*self.controller.tabSpaces))
95 100
@@ -1,111 +1,109 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the asyncfrontendbase module."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #---------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #---------------------------------------------------------------------------
13 13
14 14 #---------------------------------------------------------------------------
15 15 # Imports
16 16 #---------------------------------------------------------------------------
17 17
18 # Tell nose to skip this module
19 __test__ = {}
18 20
19 try:
20 21 from twisted.trial import unittest
21 22 from IPython.frontend.asyncfrontendbase import AsyncFrontEndBase
22 23 from IPython.frontend import frontendbase
23 24 from IPython.kernel.engineservice import EngineService
24 25 from IPython.testing.parametric import Parametric, parametric
25 except ImportError:
26 import nose
27 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
28 26
29 27
30 28 class FrontEndCallbackChecker(AsyncFrontEndBase):
31 29 """FrontEndBase subclass for checking callbacks"""
32 30 def __init__(self, engine=None, history=None):
33 31 super(FrontEndCallbackChecker, self).__init__(engine=engine,
34 32 history=history)
35 33 self.updateCalled = False
36 34 self.renderResultCalled = False
37 35 self.renderErrorCalled = False
38 36
39 37 def update_cell_prompt(self, result, blockID=None):
40 38 self.updateCalled = True
41 39 return result
42 40
43 41 def render_result(self, result):
44 42 self.renderResultCalled = True
45 43 return result
46 44
47 45 def render_error(self, failure):
48 46 self.renderErrorCalled = True
49 47 return failure
50 48
51 49
52 50 class TestAsyncFrontendBase(unittest.TestCase):
53 51 def setUp(self):
54 52 """Setup the EngineService and FrontEndBase"""
55 53
56 54 self.fb = FrontEndCallbackChecker(engine=EngineService())
57 55
58 56 def test_implements_IFrontEnd(self):
59 57 self.assert_(frontendbase.IFrontEnd.implementedBy(
60 58 AsyncFrontEndBase))
61 59
62 60 def test_is_complete_returns_False_for_incomplete_block(self):
63 61 block = """def test(a):"""
64 62 self.assert_(self.fb.is_complete(block) == False)
65 63
66 64 def test_is_complete_returns_True_for_complete_block(self):
67 65 block = """def test(a): pass"""
68 66 self.assert_(self.fb.is_complete(block))
69 67 block = """a=3"""
70 68 self.assert_(self.fb.is_complete(block))
71 69
72 70 def test_blockID_added_to_result(self):
73 71 block = """3+3"""
74 72 d = self.fb.execute(block, blockID='TEST_ID')
75 73 d.addCallback(lambda r: self.assert_(r['blockID']=='TEST_ID'))
76 74 return d
77 75
78 76 def test_blockID_added_to_failure(self):
79 77 block = "raise Exception()"
80 78 d = self.fb.execute(block,blockID='TEST_ID')
81 79 d.addErrback(lambda f: self.assert_(f.blockID=='TEST_ID'))
82 80 return d
83 81
84 82 def test_callbacks_added_to_execute(self):
85 83 d = self.fb.execute("10+10")
86 84 d.addCallback(lambda r: self.assert_(self.fb.updateCalled and self.fb.renderResultCalled))
87 85 return d
88 86
89 87 def test_error_callback_added_to_execute(self):
90 88 """Test that render_error called on execution error."""
91 89
92 90 d = self.fb.execute("raise Exception()")
93 91 d.addErrback(lambda f: self.assert_(self.fb.renderErrorCalled))
94 92 return d
95 93
96 94 def test_history_returns_expected_block(self):
97 95 """Make sure history browsing doesn't fail."""
98 96
99 97 blocks = ["a=1","a=2","a=3"]
100 98 d = self.fb.execute(blocks[0])
101 99 d.addCallback(lambda _: self.fb.execute(blocks[1]))
102 100 d.addCallback(lambda _: self.fb.execute(blocks[2]))
103 101 d.addCallback(lambda _: self.assert_(self.fb.get_history_previous("")==blocks[-2]))
104 102 d.addCallback(lambda _: self.assert_(self.fb.get_history_previous("")==blocks[-3]))
105 103 d.addCallback(lambda _: self.assert_(self.fb.get_history_next()==blocks[-2]))
106 104 return d
107 105
108 106 def test_history_returns_none_at_startup(self):
109 107 self.assert_(self.fb.get_history_previous("")==None)
110 108 self.assert_(self.fb.get_history_next()==None)
111 109
@@ -1,67 +1,67 b''
1 1 # encoding: utf-8
2 2 """
3 3 Test process execution and IO redirection.
4 4 """
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2009 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is
12 12 # in the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 14
15 15 from cStringIO import StringIO
16 16 from time import sleep
17 17 import sys
18 18
19 19 from IPython.frontend.process import PipedProcess
20 20 from IPython.testing import decorators as testdec
21 21
22 22
23 23 def test_capture_out():
24 24 """ A simple test to see if we can execute a process and get the output.
25 25 """
26 26 s = StringIO()
27 27 p = PipedProcess('echo 1', out_callback=s.write, )
28 28 p.start()
29 29 p.join()
30 30 result = s.getvalue().rstrip()
31 31 assert result == '1'
32 32
33 33
34 34 def test_io():
35 35 """ Checks that we can send characters on stdin to the process.
36 36 """
37 37 s = StringIO()
38 38 p = PipedProcess(sys.executable + ' -c "a = raw_input(); print a"',
39 39 out_callback=s.write, )
40 40 p.start()
41 41 test_string = '12345\n'
42 42 while not hasattr(p, 'process'):
43 43 sleep(0.1)
44 44 p.process.stdin.write(test_string)
45 45 p.join()
46 46 result = s.getvalue()
47 47 assert result == test_string
48 48
49 49
50 50 def test_kill():
51 51 """ Check that we can kill a process, and its subprocess.
52 52 """
53 53 s = StringIO()
54 54 p = PipedProcess(sys.executable + ' -c "a = raw_input();"',
55 55 out_callback=s.write, )
56 56 p.start()
57 57 while not hasattr(p, 'process'):
58 58 sleep(0.1)
59 59 p.process.kill()
60 60 assert p.process.poll() is not None
61 61
62 62
63 63 if __name__ == '__main__':
64 64 test_capture_out()
65 65 test_io()
66 66 test_kill()
67 67
@@ -1,61 +1,64 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the interpreter.py module."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import unittest
19 19 from IPython.kernel.core.interpreter import Interpreter
20 20
21 21 #-----------------------------------------------------------------------------
22 22 # Tests
23 23 #-----------------------------------------------------------------------------
24 24
25 # Tell nose to skip this module
26 __test__ = {}
27
25 28 class TestInterpreter(unittest.TestCase):
26 29
27 30 def test_unicode(self):
28 31 """ Test unicode handling with the interpreter."""
29 32 i = Interpreter()
30 33 i.execute_python(u'print "ù"')
31 34 i.execute_python('print "ù"')
32 35
33 36 def test_ticket266993(self):
34 37 """ Test for ticket 266993."""
35 38 i = Interpreter()
36 39 i.execute('str("""a\nb""")')
37 40
38 41 def test_ticket364347(self):
39 42 """Test for ticket 364347."""
40 43 i = Interpreter()
41 44 i.split_commands('str("a\\nb")')
42 45
43 46 def test_split_commands(self):
44 47 """ Test that commands are indeed individually split."""
45 48 i = Interpreter()
46 49 test_atoms = [('(1\n + 1)', ),
47 50 ('1', '1', ),
48 51 ]
49 52 for atoms in test_atoms:
50 53 atoms = [atom.rstrip() + '\n' for atom in atoms]
51 54 self.assertEquals(i.split_commands(''.join(atoms)),atoms)
52 55
53 56 def test_long_lines(self):
54 57 """ Test for spurious syntax error created by the interpreter."""
55 58 test_strings = [u'( 1 +\n 1\n )\n\n',
56 59 u'(1 \n + 1\n )\n\n',
57 60 ]
58 61 i = Interpreter()
59 62 for s in test_strings:
60 63 i.execute(s)
61 64
@@ -1,171 +1,174 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the notification.py module."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 # Tell nose to skip this module
19 __test__ = {}
20
18 21 import unittest
19 22 import IPython.kernel.core.notification as notification
20 23 from nose.tools import timed
21 24
22 25 #
23 26 # Supporting test classes
24 27 #
25 28
26 29 class Observer(object):
27 30 """docstring for Observer"""
28 31 def __init__(self, expectedType, expectedSender,
29 32 center=notification.sharedCenter, **kwargs):
30 33 super(Observer, self).__init__()
31 34 self.expectedType = expectedType
32 35 self.expectedSender = expectedSender
33 36 self.expectedKwArgs = kwargs
34 37 self.recieved = False
35 38 center.add_observer(self.callback,
36 39 self.expectedType,
37 40 self.expectedSender)
38 41
39 42
40 43 def callback(self, theType, sender, args={}):
41 44 """callback"""
42 45
43 46 assert(theType == self.expectedType or
44 47 self.expectedType == None)
45 48 assert(sender == self.expectedSender or
46 49 self.expectedSender == None)
47 50 assert(args == self.expectedKwArgs)
48 51 self.recieved = True
49 52
50 53
51 54 def verify(self):
52 55 """verify"""
53 56
54 57 assert(self.recieved)
55 58
56 59 def reset(self):
57 60 """reset"""
58 61
59 62 self.recieved = False
60 63
61 64
62 65
63 66 class Notifier(object):
64 67 """docstring for Notifier"""
65 68 def __init__(self, theType, **kwargs):
66 69 super(Notifier, self).__init__()
67 70 self.theType = theType
68 71 self.kwargs = kwargs
69 72
70 73 def post(self, center=notification.sharedCenter):
71 74 """fire"""
72 75
73 76 center.post_notification(self.theType, self,
74 77 **self.kwargs)
75 78
76 79
77 80 #
78 81 # Test Cases
79 82 #
80 83
81 84 class NotificationTests(unittest.TestCase):
82 85 """docstring for NotificationTests"""
83 86
84 87 def tearDown(self):
85 88 notification.sharedCenter.remove_all_observers()
86 89
87 90 def test_notification_delivered(self):
88 91 """Test that notifications are delivered"""
89 92 expectedType = 'EXPECTED_TYPE'
90 93 sender = Notifier(expectedType)
91 94 observer = Observer(expectedType, sender)
92 95
93 96 sender.post()
94 97
95 98 observer.verify()
96 99
97 100
98 101 def test_type_specificity(self):
99 102 """Test that observers are registered by type"""
100 103
101 104 expectedType = 1
102 105 unexpectedType = "UNEXPECTED_TYPE"
103 106 sender = Notifier(expectedType)
104 107 unexpectedSender = Notifier(unexpectedType)
105 108 observer = Observer(expectedType, sender)
106 109
107 110 sender.post()
108 111 unexpectedSender.post()
109 112
110 113 observer.verify()
111 114
112 115
113 116 def test_sender_specificity(self):
114 117 """Test that observers are registered by sender"""
115 118
116 119 expectedType = "EXPECTED_TYPE"
117 120 sender1 = Notifier(expectedType)
118 121 sender2 = Notifier(expectedType)
119 122 observer = Observer(expectedType, sender1)
120 123
121 124 sender1.post()
122 125 sender2.post()
123 126
124 127 observer.verify()
125 128
126 129
127 130 def test_remove_all_observers(self):
128 131 """White-box test for remove_all_observers"""
129 132
130 133 for i in xrange(10):
131 134 Observer('TYPE', None, center=notification.sharedCenter)
132 135
133 136 self.assert_(len(notification.sharedCenter.observers[('TYPE',None)]) >= 10,
134 137 "observers registered")
135 138
136 139 notification.sharedCenter.remove_all_observers()
137 140
138 141 self.assert_(len(notification.sharedCenter.observers) == 0, "observers removed")
139 142
140 143
141 144 def test_any_sender(self):
142 145 """test_any_sender"""
143 146
144 147 expectedType = "EXPECTED_TYPE"
145 148 sender1 = Notifier(expectedType)
146 149 sender2 = Notifier(expectedType)
147 150 observer = Observer(expectedType, None)
148 151
149 152
150 153 sender1.post()
151 154 observer.verify()
152 155
153 156 observer.reset()
154 157 sender2.post()
155 158 observer.verify()
156 159
157 160
158 161 @timed(.01)
159 162 def test_post_performance(self):
160 163 """Test that post_notification, even with many registered irrelevant
161 164 observers is fast"""
162 165
163 166 for i in xrange(10):
164 167 Observer("UNRELATED_TYPE", None)
165 168
166 169 o = Observer('EXPECTED_TYPE', None)
167 170
168 171 notification.sharedCenter.post_notification('EXPECTED_TYPE', self)
169 172
170 173 o.verify()
171 174
@@ -1,70 +1,70 b''
1 1 # encoding: utf-8
2 2 """
3 3 Test the output capture at the OS level, using file descriptors.
4 4 """
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is
12 12 # in the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 # Tell nose to skip this module
16 __test__ = {}
15 17
16 # Stdlib imports
17 18 import os
18 19 from cStringIO import StringIO
19 20
20 # Our own imports
21 21 from IPython.testing import decorators as dec
22 22
23 23 #-----------------------------------------------------------------------------
24 24 # Test functions
25 25
26 26 @dec.skip_win32
27 27 def test_redirector():
28 28 """ Checks that the redirector can be used to do synchronous capture.
29 29 """
30 30 from IPython.kernel.core.fd_redirector import FDRedirector
31 31 r = FDRedirector()
32 32 out = StringIO()
33 33 try:
34 34 r.start()
35 35 for i in range(10):
36 36 os.system('echo %ic' % i)
37 37 print >>out, r.getvalue(),
38 38 print >>out, i
39 39 except:
40 40 r.stop()
41 41 raise
42 42 r.stop()
43 43 result1 = out.getvalue()
44 44 result2 = "".join("%ic\n%i\n" %(i, i) for i in range(10))
45 45 assert result1 == result2
46 46
47 47
48 48 @dec.skip_win32
49 49 def test_redirector_output_trap():
50 50 """ This test check not only that the redirector_output_trap does
51 51 trap the output, but also that it does it in a gready way, that
52 52 is by calling the callback ASAP.
53 53 """
54 54 from IPython.kernel.core.redirector_output_trap import RedirectorOutputTrap
55 55 out = StringIO()
56 56 trap = RedirectorOutputTrap(out.write, out.write)
57 57 try:
58 58 trap.set()
59 59 for i in range(10):
60 60 os.system('echo %ic' % i)
61 61 print "%ip" % i
62 62 print >>out, i
63 63 except:
64 64 trap.unset()
65 65 raise
66 66 trap.unset()
67 67 result1 = out.getvalue()
68 68 result2 = "".join("%ic\n%ip\n%i\n" %(i, i, i) for i in range(10))
69 69 assert result1 == result2
70 70
@@ -1,43 +1,46 b''
1 # Tell nose to skip this module
2 __test__ = {}
3
1 4 #from __future__ import with_statement
2 5
3 6 # XXX This file is currently disabled to preserve 2.4 compatibility.
4 7
5 8 #def test_simple():
6 9 if 0:
7 10
8 11 # XXX - for now, we need a running cluster to be started separately. The
9 12 # daemon work is almost finished, and will make much of this unnecessary.
10 13 from IPython.kernel import client
11 14 mec = client.MultiEngineClient(('127.0.0.1',10105))
12 15
13 16 try:
14 17 mec.get_ids()
15 18 except ConnectionRefusedError:
16 19 import os, time
17 20 os.system('ipcluster -n 2 &')
18 21 time.sleep(2)
19 22 mec = client.MultiEngineClient(('127.0.0.1',10105))
20 23
21 24 mec.block = False
22 25
23 26 import itertools
24 27 c = itertools.count()
25 28
26 29 parallel = RemoteMultiEngine(mec)
27 30
28 31 mec.pushAll()
29 32
30 33 ## with parallel as pr:
31 34 ## # A comment
32 35 ## remote() # this means the code below only runs remotely
33 36 ## print 'Hello remote world'
34 37 ## x = range(10)
35 38 ## # Comments are OK
36 39 ## # Even misindented.
37 40 ## y = x+1
38 41
39 42
40 43 ## with pfor('i',sequence) as pr:
41 44 ## print x[i]
42 45
43 46 print pr.x + pr.y
@@ -1,44 +1,44 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the kernel.engineservice.py module.
4 4
5 5 Things that should be tested:
6 6
7 7 - Should the EngineService return Deferred objects?
8 8 - Run the same tests that are run in shell.py.
9 9 - Make sure that the Interface is really implemented.
10 10 - The startService and stopService methods.
11 11 """
12 12
13 13 __docformat__ = "restructuredtext en"
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Copyright (C) 2008 The IPython Development Team
17 17 #
18 18 # Distributed under the terms of the BSD License. The full license is in
19 19 # the file COPYING, distributed as part of this software.
20 20 #-------------------------------------------------------------------------------
21 21
22 22 #-------------------------------------------------------------------------------
23 23 # Imports
24 24 #-------------------------------------------------------------------------------
25 25
26 try:
26 # Tell nose to skip this module
27 __test__ = {}
28
27 29 from twisted.application.service import IService
28 30 from IPython.kernel.controllerservice import ControllerService
29 31 from IPython.kernel.tests import multienginetest as met
30 32 from controllertest import IControllerCoreTestCase
31 33 from IPython.testing.util import DeferredTestCase
32 except ImportError:
33 import nose
34 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
34
35 35
36 36 class BasicControllerServiceTest(DeferredTestCase,
37 37 IControllerCoreTestCase):
38 38
39 39 def setUp(self):
40 40 self.controller = ControllerService()
41 41 self.controller.startService()
42 42
43 43 def tearDown(self):
44 44 self.controller.stopService()
@@ -1,93 +1,92 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the enginepb.py module."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 try:
18 # Tell nose to skip this module
19 __test__ = {}
20
19 21 from twisted.python import components
20 22 from twisted.internet import reactor, defer
21 23 from twisted.spread import pb
22 24 from twisted.internet.base import DelayedCall
23 25 DelayedCall.debug = True
24 26
25 27 import zope.interface as zi
26 28
27 29 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
28 30 from IPython.kernel import engineservice as es
29 31 from IPython.testing.util import DeferredTestCase
30 32 from IPython.kernel.controllerservice import IControllerBase
31 33 from IPython.kernel.enginefc import FCRemoteEngineRefFromService, IEngineBase
32 34 from IPython.kernel.engineservice import IEngineQueued
33 35 from IPython.kernel.engineconnector import EngineConnector
34 36
35 37 from IPython.kernel.tests.engineservicetest import \
36 38 IEngineCoreTestCase, \
37 39 IEngineSerializedTestCase, \
38 40 IEngineQueuedTestCase
39 except ImportError:
40 import nose
41 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
42 41
43 42
44 43 class EngineFCTest(DeferredTestCase,
45 44 IEngineCoreTestCase,
46 45 IEngineSerializedTestCase,
47 46 IEngineQueuedTestCase
48 47 ):
49 48
50 49 zi.implements(IControllerBase)
51 50
52 51 def setUp(self):
53 52
54 53 # Start a server and append to self.servers
55 54 self.controller_reference = FCRemoteEngineRefFromService(self)
56 55 self.controller_tub = Tub()
57 56 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
58 57 self.controller_tub.setLocation('127.0.0.1:10105')
59 58
60 59 furl = self.controller_tub.registerReference(self.controller_reference)
61 60 self.controller_tub.startService()
62 61
63 62 # Start an EngineService and append to services/client
64 63 self.engine_service = es.EngineService()
65 64 self.engine_service.startService()
66 65 self.engine_tub = Tub()
67 66 self.engine_tub.startService()
68 67 engine_connector = EngineConnector(self.engine_tub)
69 68 d = engine_connector.connect_to_controller(self.engine_service, furl)
70 69 # This deferred doesn't fire until after register_engine has returned and
71 70 # thus, self.engine has been defined and the tets can proceed.
72 71 return d
73 72
74 73 def tearDown(self):
75 74 dlist = []
76 75 # Shut down the engine
77 76 d = self.engine_tub.stopService()
78 77 dlist.append(d)
79 78 # Shut down the controller
80 79 d = self.controller_tub.stopService()
81 80 dlist.append(d)
82 81 return defer.DeferredList(dlist)
83 82
84 83 #---------------------------------------------------------------------------
85 84 # Make me look like a basic controller
86 85 #---------------------------------------------------------------------------
87 86
88 87 def register_engine(self, engine_ref, id=None, ip=None, port=None, pid=None):
89 88 self.engine = IEngineQueued(IEngineBase(engine_ref))
90 89 return {'id':id}
91 90
92 91 def unregister_engine(self, id):
93 92 pass No newline at end of file
@@ -1,80 +1,79 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the kernel.engineservice.py module.
4 4
5 5 Things that should be tested:
6 6
7 7 - Should the EngineService return Deferred objects?
8 8 - Run the same tests that are run in shell.py.
9 9 - Make sure that the Interface is really implemented.
10 10 - The startService and stopService methods.
11 11 """
12 12
13 13 __docformat__ = "restructuredtext en"
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Copyright (C) 2008 The IPython Development Team
17 17 #
18 18 # Distributed under the terms of the BSD License. The full license is in
19 19 # the file COPYING, distributed as part of this software.
20 20 #-------------------------------------------------------------------------------
21 21
22 22 #-------------------------------------------------------------------------------
23 23 # Imports
24 24 #-------------------------------------------------------------------------------
25 25
26 try:
26 # Tell nose to skip this module
27 __test__ = {}
28
27 29 from twisted.internet import defer
28 30 from twisted.application.service import IService
29 31
30 32 from IPython.kernel import engineservice as es
31 33 from IPython.testing.util import DeferredTestCase
32 34 from IPython.kernel.tests.engineservicetest import \
33 35 IEngineCoreTestCase, \
34 36 IEngineSerializedTestCase, \
35 37 IEngineQueuedTestCase, \
36 38 IEnginePropertiesTestCase
37 except ImportError:
38 import nose
39 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
40 39
41 40
42 41 class BasicEngineServiceTest(DeferredTestCase,
43 42 IEngineCoreTestCase,
44 43 IEngineSerializedTestCase,
45 44 IEnginePropertiesTestCase):
46 45
47 46 def setUp(self):
48 47 self.engine = es.EngineService()
49 48 self.engine.startService()
50 49
51 50 def tearDown(self):
52 51 return self.engine.stopService()
53 52
54 53 class ThreadedEngineServiceTest(DeferredTestCase,
55 54 IEngineCoreTestCase,
56 55 IEngineSerializedTestCase,
57 56 IEnginePropertiesTestCase):
58 57
59 58 def setUp(self):
60 59 self.engine = es.ThreadedEngineService()
61 60 self.engine.startService()
62 61
63 62 def tearDown(self):
64 63 return self.engine.stopService()
65 64
66 65 class QueuedEngineServiceTest(DeferredTestCase,
67 66 IEngineCoreTestCase,
68 67 IEngineSerializedTestCase,
69 68 IEnginePropertiesTestCase,
70 69 IEngineQueuedTestCase):
71 70
72 71 def setUp(self):
73 72 self.rawEngine = es.EngineService()
74 73 self.rawEngine.startService()
75 74 self.engine = es.IEngineQueued(self.rawEngine)
76 75
77 76 def tearDown(self):
78 77 return self.rawEngine.stopService()
79 78
80 79
@@ -1,56 +1,55 b''
1 1 # encoding: utf-8
2 2
3 3 """"""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 try:
18 # Tell nose to skip this module
19 __test__ = {}
20
19 21 from twisted.internet import defer
20 22 from IPython.testing.util import DeferredTestCase
21 23 from IPython.kernel.controllerservice import ControllerService
22 24 from IPython.kernel import multiengine as me
23 25 from IPython.kernel.tests.multienginetest import (IMultiEngineTestCase,
24 26 ISynchronousMultiEngineTestCase)
25 except ImportError:
26 import nose
27 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
28 27
29 28
30 29 class BasicMultiEngineTestCase(DeferredTestCase, IMultiEngineTestCase):
31 30
32 31 def setUp(self):
33 32 self.controller = ControllerService()
34 33 self.controller.startService()
35 34 self.multiengine = me.IMultiEngine(self.controller)
36 35 self.engines = []
37 36
38 37 def tearDown(self):
39 38 self.controller.stopService()
40 39 for e in self.engines:
41 40 e.stopService()
42 41
43 42
44 43 class SynchronousMultiEngineTestCase(DeferredTestCase, ISynchronousMultiEngineTestCase):
45 44
46 45 def setUp(self):
47 46 self.controller = ControllerService()
48 47 self.controller.startService()
49 48 self.multiengine = me.ISynchronousMultiEngine(me.IMultiEngine(self.controller))
50 49 self.engines = []
51 50
52 51 def tearDown(self):
53 52 self.controller.stopService()
54 53 for e in self.engines:
55 54 e.stopService()
56 55
@@ -1,144 +1,144 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 __docformat__ = "restructuredtext en"
5 5
6 6 #-------------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-------------------------------------------------------------------------------
12 12
13 13 #-------------------------------------------------------------------------------
14 14 # Imports
15 15 #-------------------------------------------------------------------------------
16 16
17 try:
17 # Tell nose to skip this module
18 __test__ = {}
19
18 20 from twisted.internet import defer, reactor
19 21
20 22 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
21 23
22 24 from IPython.testing.util import DeferredTestCase
23 25 from IPython.kernel.controllerservice import ControllerService
24 26 from IPython.kernel.multiengine import IMultiEngine
25 27 from IPython.kernel.tests.multienginetest import IFullSynchronousMultiEngineTestCase
26 28 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
27 29 from IPython.kernel import multiengine as me
28 30 from IPython.kernel.clientconnector import ClientConnector
29 31 from IPython.kernel.parallelfunction import ParallelFunction
30 32 from IPython.kernel.error import CompositeError
31 33 from IPython.kernel.util import printer
32 except ImportError:
33 import nose
34 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
34
35 35
36 36 def _raise_it(f):
37 37 try:
38 38 f.raiseException()
39 39 except CompositeError, e:
40 40 e.raise_exception()
41 41
42 42
43 43 class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMultiEngineTestCase):
44 44
45 45 def setUp(self):
46 46
47 47 self.engines = []
48 48
49 49 self.controller = ControllerService()
50 50 self.controller.startService()
51 51 self.imultiengine = IMultiEngine(self.controller)
52 52 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
53 53
54 54 self.controller_tub = Tub()
55 55 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
56 56 self.controller_tub.setLocation('127.0.0.1:10105')
57 57
58 58 furl = self.controller_tub.registerReference(self.mec_referenceable)
59 59 self.controller_tub.startService()
60 60
61 61 self.client_tub = ClientConnector()
62 62 d = self.client_tub.get_multiengine_client(furl)
63 63 d.addCallback(self.handle_got_client)
64 64 return d
65 65
66 66 def handle_got_client(self, client):
67 67 self.multiengine = client
68 68
69 69 def tearDown(self):
70 70 dlist = []
71 71 # Shut down the multiengine client
72 72 d = self.client_tub.tub.stopService()
73 73 dlist.append(d)
74 74 # Shut down the engines
75 75 for e in self.engines:
76 76 e.stopService()
77 77 # Shut down the controller
78 78 d = self.controller_tub.stopService()
79 79 d.addBoth(lambda _: self.controller.stopService())
80 80 dlist.append(d)
81 81 return defer.DeferredList(dlist)
82 82
83 83 def test_mapper(self):
84 84 self.addEngine(4)
85 85 m = self.multiengine.mapper()
86 86 self.assertEquals(m.multiengine,self.multiengine)
87 87 self.assertEquals(m.dist,'b')
88 88 self.assertEquals(m.targets,'all')
89 89 self.assertEquals(m.block,True)
90 90
91 91 def test_map_default(self):
92 92 self.addEngine(4)
93 93 m = self.multiengine.mapper()
94 94 d = m.map(lambda x: 2*x, range(10))
95 95 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
96 96 d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10)))
97 97 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
98 98 return d
99 99
100 100 def test_map_noblock(self):
101 101 self.addEngine(4)
102 102 m = self.multiengine.mapper(block=False)
103 103 d = m.map(lambda x: 2*x, range(10))
104 104 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
105 105 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
106 106 return d
107 107
108 108 def test_mapper_fail(self):
109 109 self.addEngine(4)
110 110 m = self.multiengine.mapper()
111 111 d = m.map(lambda x: 1/0, range(10))
112 112 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
113 113 return d
114 114
115 115 def test_parallel(self):
116 116 self.addEngine(4)
117 117 p = self.multiengine.parallel()
118 118 self.assert_(isinstance(p, ParallelFunction))
119 119 @p
120 120 def f(x): return 2*x
121 121 d = f(range(10))
122 122 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
123 123 return d
124 124
125 125 def test_parallel_noblock(self):
126 126 self.addEngine(1)
127 127 p = self.multiengine.parallel(block=False)
128 128 self.assert_(isinstance(p, ParallelFunction))
129 129 @p
130 130 def f(x): return 2*x
131 131 d = f(range(10))
132 132 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
133 133 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
134 134 return d
135 135
136 136 def test_parallel_fail(self):
137 137 self.addEngine(4)
138 138 p = self.multiengine.parallel()
139 139 self.assert_(isinstance(p, ParallelFunction))
140 140 @p
141 141 def f(x): return 1/0
142 142 d = f(range(10))
143 143 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
144 144 return d No newline at end of file
@@ -1,102 +1,102 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the shell.py module."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 try:
18 # Tell nose to skip this module
19 __test__ = {}
20
19 21 import zope.interface as zi
20 22 from twisted.trial import unittest
21 23 from IPython.testing.util import DeferredTestCase
22 24
23 25 from IPython.kernel.newserialized import \
24 26 ISerialized, \
25 27 IUnSerialized, \
26 28 Serialized, \
27 29 UnSerialized, \
28 30 SerializeIt, \
29 31 UnSerializeIt
30 except ImportError:
31 import nose
32 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
32
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Tests
36 36 #-----------------------------------------------------------------------------
37 37
38 38 class SerializedTestCase(unittest.TestCase):
39 39
40 40 def setUp(self):
41 41 pass
42 42
43 43 def tearDown(self):
44 44 pass
45 45
46 46 def testSerializedInterfaces(self):
47 47
48 48 us = UnSerialized({'a':10, 'b':range(10)})
49 49 s = ISerialized(us)
50 50 uss = IUnSerialized(s)
51 51 self.assert_(ISerialized.providedBy(s))
52 52 self.assert_(IUnSerialized.providedBy(us))
53 53 self.assert_(IUnSerialized.providedBy(uss))
54 54 for m in list(ISerialized):
55 55 self.assert_(hasattr(s, m))
56 56 for m in list(IUnSerialized):
57 57 self.assert_(hasattr(us, m))
58 58 for m in list(IUnSerialized):
59 59 self.assert_(hasattr(uss, m))
60 60
61 61 def testPickleSerialized(self):
62 62 obj = {'a':1.45345, 'b':'asdfsdf', 'c':10000L}
63 63 original = UnSerialized(obj)
64 64 originalSer = ISerialized(original)
65 65 firstData = originalSer.getData()
66 66 firstTD = originalSer.getTypeDescriptor()
67 67 firstMD = originalSer.getMetadata()
68 68 self.assert_(firstTD == 'pickle')
69 69 self.assert_(firstMD == {})
70 70 unSerialized = IUnSerialized(originalSer)
71 71 secondObj = unSerialized.getObject()
72 72 for k, v in secondObj.iteritems():
73 73 self.assert_(obj[k] == v)
74 74 secondSer = ISerialized(UnSerialized(secondObj))
75 75 self.assert_(firstData == secondSer.getData())
76 76 self.assert_(firstTD == secondSer.getTypeDescriptor() )
77 77 self.assert_(firstMD == secondSer.getMetadata())
78 78
79 79 def testNDArraySerialized(self):
80 80 try:
81 81 import numpy
82 82 except ImportError:
83 83 pass
84 84 else:
85 85 a = numpy.linspace(0.0, 1.0, 1000)
86 86 unSer1 = UnSerialized(a)
87 87 ser1 = ISerialized(unSer1)
88 88 td = ser1.getTypeDescriptor()
89 89 self.assert_(td == 'ndarray')
90 90 md = ser1.getMetadata()
91 91 self.assert_(md['shape'] == a.shape)
92 92 self.assert_(md['dtype'] == a.dtype.str)
93 93 buff = ser1.getData()
94 94 self.assert_(buff == numpy.getbuffer(a))
95 95 s = Serialized(buff, td, md)
96 96 us = IUnSerialized(s)
97 97 final = us.getObject()
98 98 self.assert_(numpy.getbuffer(a) == numpy.getbuffer(final))
99 99 self.assert_(a.dtype.str == final.dtype.str)
100 100 self.assert_(a.shape == final.shape)
101 101
102 102
@@ -1,186 +1,186 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Tests for pendingdeferred.py"""
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 try:
19 # Tell nose to skip this module
20 __test__ = {}
21
20 22 from twisted.internet import defer
21 23 from twisted.python import failure
22 24
23 25 from IPython.testing.util import DeferredTestCase
24 26 import IPython.kernel.pendingdeferred as pd
25 27 from IPython.kernel import error
26 28 from IPython.kernel.util import printer
27 except ImportError:
28 import nose
29 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
29
30 30
31 31 class Foo(object):
32 32
33 33 def bar(self, bahz):
34 34 return defer.succeed('blahblah: %s' % bahz)
35 35
36 36 class TwoPhaseFoo(pd.PendingDeferredManager):
37 37
38 38 def __init__(self, foo):
39 39 self.foo = foo
40 40 pd.PendingDeferredManager.__init__(self)
41 41
42 42 @pd.two_phase
43 43 def bar(self, bahz):
44 44 return self.foo.bar(bahz)
45 45
46 46 class PendingDeferredManagerTest(DeferredTestCase):
47 47
48 48 def setUp(self):
49 49 self.pdm = pd.PendingDeferredManager()
50 50
51 51 def tearDown(self):
52 52 pass
53 53
54 54 def testBasic(self):
55 55 dDict = {}
56 56 # Create 10 deferreds and save them
57 57 for i in range(10):
58 58 d = defer.Deferred()
59 59 did = self.pdm.save_pending_deferred(d)
60 60 dDict[did] = d
61 61 # Make sure they are begin saved
62 62 for k in dDict.keys():
63 63 self.assert_(self.pdm.quick_has_id(k))
64 64 # Get the pending deferred (block=True), then callback with 'foo' and compare
65 65 for did in dDict.keys()[0:5]:
66 66 d = self.pdm.get_pending_deferred(did,block=True)
67 67 dDict[did].callback('foo')
68 68 d.addCallback(lambda r: self.assert_(r=='foo'))
69 69 # Get the pending deferreds with (block=False) and make sure ResultNotCompleted is raised
70 70 for did in dDict.keys()[5:10]:
71 71 d = self.pdm.get_pending_deferred(did,block=False)
72 72 d.addErrback(lambda f: self.assertRaises(error.ResultNotCompleted, f.raiseException))
73 73 # Now callback the last 5, get them and compare.
74 74 for did in dDict.keys()[5:10]:
75 75 dDict[did].callback('foo')
76 76 d = self.pdm.get_pending_deferred(did,block=False)
77 77 d.addCallback(lambda r: self.assert_(r=='foo'))
78 78
79 79 def test_save_then_delete(self):
80 80 d = defer.Deferred()
81 81 did = self.pdm.save_pending_deferred(d)
82 82 self.assert_(self.pdm.quick_has_id(did))
83 83 self.pdm.delete_pending_deferred(did)
84 84 self.assert_(not self.pdm.quick_has_id(did))
85 85
86 86 def test_save_get_delete(self):
87 87 d = defer.Deferred()
88 88 did = self.pdm.save_pending_deferred(d)
89 89 d2 = self.pdm.get_pending_deferred(did,True)
90 90 d2.addErrback(lambda f: self.assertRaises(error.AbortedPendingDeferredError, f.raiseException))
91 91 self.pdm.delete_pending_deferred(did)
92 92 return d2
93 93
94 94 def test_double_get(self):
95 95 d = defer.Deferred()
96 96 did = self.pdm.save_pending_deferred(d)
97 97 d2 = self.pdm.get_pending_deferred(did,True)
98 98 d3 = self.pdm.get_pending_deferred(did,True)
99 99 d3.addErrback(lambda f: self.assertRaises(error.InvalidDeferredID, f.raiseException))
100 100
101 101 def test_get_after_callback(self):
102 102 d = defer.Deferred()
103 103 did = self.pdm.save_pending_deferred(d)
104 104 d.callback('foo')
105 105 d2 = self.pdm.get_pending_deferred(did,True)
106 106 d2.addCallback(lambda r: self.assertEquals(r,'foo'))
107 107 self.assert_(not self.pdm.quick_has_id(did))
108 108
109 109 def test_get_before_callback(self):
110 110 d = defer.Deferred()
111 111 did = self.pdm.save_pending_deferred(d)
112 112 d2 = self.pdm.get_pending_deferred(did,True)
113 113 d.callback('foo')
114 114 d2.addCallback(lambda r: self.assertEquals(r,'foo'))
115 115 self.assert_(not self.pdm.quick_has_id(did))
116 116 d = defer.Deferred()
117 117 did = self.pdm.save_pending_deferred(d)
118 118 d2 = self.pdm.get_pending_deferred(did,True)
119 119 d2.addCallback(lambda r: self.assertEquals(r,'foo'))
120 120 d.callback('foo')
121 121 self.assert_(not self.pdm.quick_has_id(did))
122 122
123 123 def test_get_after_errback(self):
124 124 class MyError(Exception):
125 125 pass
126 126 d = defer.Deferred()
127 127 did = self.pdm.save_pending_deferred(d)
128 128 d.errback(failure.Failure(MyError('foo')))
129 129 d2 = self.pdm.get_pending_deferred(did,True)
130 130 d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException))
131 131 self.assert_(not self.pdm.quick_has_id(did))
132 132
133 133 def test_get_before_errback(self):
134 134 class MyError(Exception):
135 135 pass
136 136 d = defer.Deferred()
137 137 did = self.pdm.save_pending_deferred(d)
138 138 d2 = self.pdm.get_pending_deferred(did,True)
139 139 d.errback(failure.Failure(MyError('foo')))
140 140 d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException))
141 141 self.assert_(not self.pdm.quick_has_id(did))
142 142 d = defer.Deferred()
143 143 did = self.pdm.save_pending_deferred(d)
144 144 d2 = self.pdm.get_pending_deferred(did,True)
145 145 d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException))
146 146 d.errback(failure.Failure(MyError('foo')))
147 147 self.assert_(not self.pdm.quick_has_id(did))
148 148
149 149 def test_noresult_noblock(self):
150 150 d = defer.Deferred()
151 151 did = self.pdm.save_pending_deferred(d)
152 152 d2 = self.pdm.get_pending_deferred(did,False)
153 153 d2.addErrback(lambda f: self.assertRaises(error.ResultNotCompleted, f.raiseException))
154 154
155 155 def test_with_callbacks(self):
156 156 d = defer.Deferred()
157 157 d.addCallback(lambda r: r+' foo')
158 158 d.addCallback(lambda r: r+' bar')
159 159 did = self.pdm.save_pending_deferred(d)
160 160 d2 = self.pdm.get_pending_deferred(did,True)
161 161 d.callback('bam')
162 162 d2.addCallback(lambda r: self.assertEquals(r,'bam foo bar'))
163 163
164 164 def test_with_errbacks(self):
165 165 class MyError(Exception):
166 166 pass
167 167 d = defer.Deferred()
168 168 d.addCallback(lambda r: 'foo')
169 169 d.addErrback(lambda f: 'caught error')
170 170 did = self.pdm.save_pending_deferred(d)
171 171 d2 = self.pdm.get_pending_deferred(did,True)
172 172 d.errback(failure.Failure(MyError('bam')))
173 173 d2.addErrback(lambda f: self.assertRaises(MyError, f.raiseException))
174 174
175 175 def test_nested_deferreds(self):
176 176 d = defer.Deferred()
177 177 d2 = defer.Deferred()
178 178 d.addCallback(lambda r: d2)
179 179 did = self.pdm.save_pending_deferred(d)
180 180 d.callback('foo')
181 181 d3 = self.pdm.get_pending_deferred(did,False)
182 182 d3.addErrback(lambda f: self.assertRaises(error.ResultNotCompleted, f.raiseException))
183 183 d2.callback('bar')
184 184 d3 = self.pdm.get_pending_deferred(did,False)
185 185 d3.addCallback(lambda r: self.assertEquals(r,'bar'))
186 186
@@ -1,51 +1,51 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the kernel.task.py module."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 try:
18 # Tell nose to skip this module
19 __test__ = {}
20
19 21 import time
20 22
21 23 from twisted.internet import defer
22 24 from twisted.trial import unittest
23 25
24 26 from IPython.kernel import task, controllerservice as cs, engineservice as es
25 27 from IPython.kernel.multiengine import IMultiEngine
26 28 from IPython.testing.util import DeferredTestCase
27 29 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
28 except ImportError:
29 import nose
30 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
30
31 31
32 32 #-------------------------------------------------------------------------------
33 33 # Tests
34 34 #-------------------------------------------------------------------------------
35 35
36 36 class BasicTaskControllerTestCase(DeferredTestCase, ITaskControllerTestCase):
37 37
38 38 def setUp(self):
39 39 self.controller = cs.ControllerService()
40 40 self.controller.startService()
41 41 self.multiengine = IMultiEngine(self.controller)
42 42 self.tc = task.ITaskController(self.controller)
43 43 self.tc.failurePenalty = 0
44 44 self.engines=[]
45 45
46 46 def tearDown(self):
47 47 self.controller.stopService()
48 48 for e in self.engines:
49 49 e.stopService()
50 50
51 51
@@ -1,162 +1,161 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 __docformat__ = "restructuredtext en"
5 5
6 6 #-------------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-------------------------------------------------------------------------------
12 12
13 13 #-------------------------------------------------------------------------------
14 14 # Imports
15 15 #-------------------------------------------------------------------------------
16 16
17 try:
17 # Tell nose to skip this module
18 __test__ = {}
19
18 20 import time
19 21
20 22 from twisted.internet import defer, reactor
21 23
22 24 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
23 25
24 26 from IPython.kernel import task as taskmodule
25 27 from IPython.kernel import controllerservice as cs
26 28 import IPython.kernel.multiengine as me
27 29 from IPython.testing.util import DeferredTestCase
28 30 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
29 31 from IPython.kernel.taskfc import IFCTaskController
30 32 from IPython.kernel.util import printer
31 33 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
32 34 from IPython.kernel.clientconnector import ClientConnector
33 35 from IPython.kernel.error import CompositeError
34 36 from IPython.kernel.parallelfunction import ParallelFunction
35 except ImportError:
36 import nose
37 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
38 37
39 38
40 39 #-------------------------------------------------------------------------------
41 40 # Tests
42 41 #-------------------------------------------------------------------------------
43 42
44 43 def _raise_it(f):
45 44 try:
46 45 f.raiseException()
47 46 except CompositeError, e:
48 47 e.raise_exception()
49 48
50 49 class TaskTest(DeferredTestCase, ITaskControllerTestCase):
51 50
52 51 def setUp(self):
53 52
54 53 self.engines = []
55 54
56 55 self.controller = cs.ControllerService()
57 56 self.controller.startService()
58 57 self.imultiengine = me.IMultiEngine(self.controller)
59 58 self.itc = taskmodule.ITaskController(self.controller)
60 59 self.itc.failurePenalty = 0
61 60
62 61 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
63 62 self.tc_referenceable = IFCTaskController(self.itc)
64 63
65 64 self.controller_tub = Tub()
66 65 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
67 66 self.controller_tub.setLocation('127.0.0.1:10105')
68 67
69 68 mec_furl = self.controller_tub.registerReference(self.mec_referenceable)
70 69 tc_furl = self.controller_tub.registerReference(self.tc_referenceable)
71 70 self.controller_tub.startService()
72 71
73 72 self.client_tub = ClientConnector()
74 73 d = self.client_tub.get_multiengine_client(mec_furl)
75 74 d.addCallback(self.handle_mec_client)
76 75 d.addCallback(lambda _: self.client_tub.get_task_client(tc_furl))
77 76 d.addCallback(self.handle_tc_client)
78 77 return d
79 78
80 79 def handle_mec_client(self, client):
81 80 self.multiengine = client
82 81
83 82 def handle_tc_client(self, client):
84 83 self.tc = client
85 84
86 85 def tearDown(self):
87 86 dlist = []
88 87 # Shut down the multiengine client
89 88 d = self.client_tub.tub.stopService()
90 89 dlist.append(d)
91 90 # Shut down the engines
92 91 for e in self.engines:
93 92 e.stopService()
94 93 # Shut down the controller
95 94 d = self.controller_tub.stopService()
96 95 d.addBoth(lambda _: self.controller.stopService())
97 96 dlist.append(d)
98 97 return defer.DeferredList(dlist)
99 98
100 99 def test_mapper(self):
101 100 self.addEngine(1)
102 101 m = self.tc.mapper()
103 102 self.assertEquals(m.task_controller,self.tc)
104 103 self.assertEquals(m.clear_before,False)
105 104 self.assertEquals(m.clear_after,False)
106 105 self.assertEquals(m.retries,0)
107 106 self.assertEquals(m.recovery_task,None)
108 107 self.assertEquals(m.depend,None)
109 108 self.assertEquals(m.block,True)
110 109
111 110 def test_map_default(self):
112 111 self.addEngine(1)
113 112 m = self.tc.mapper()
114 113 d = m.map(lambda x: 2*x, range(10))
115 114 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
116 115 d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10)))
117 116 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
118 117 return d
119 118
120 119 def test_map_noblock(self):
121 120 self.addEngine(1)
122 121 m = self.tc.mapper(block=False)
123 122 d = m.map(lambda x: 2*x, range(10))
124 123 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
125 124 return d
126 125
127 126 def test_mapper_fail(self):
128 127 self.addEngine(1)
129 128 m = self.tc.mapper()
130 129 d = m.map(lambda x: 1/0, range(10))
131 130 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
132 131 return d
133 132
134 133 def test_parallel(self):
135 134 self.addEngine(1)
136 135 p = self.tc.parallel()
137 136 self.assert_(isinstance(p, ParallelFunction))
138 137 @p
139 138 def f(x): return 2*x
140 139 d = f(range(10))
141 140 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
142 141 return d
143 142
144 143 def test_parallel_noblock(self):
145 144 self.addEngine(1)
146 145 p = self.tc.parallel(block=False)
147 146 self.assert_(isinstance(p, ParallelFunction))
148 147 @p
149 148 def f(x): return 2*x
150 149 d = f(range(10))
151 150 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
152 151 return d
153 152
154 153 def test_parallel_fail(self):
155 154 self.addEngine(1)
156 155 p = self.tc.parallel()
157 156 self.assert_(isinstance(p, ParallelFunction))
158 157 @p
159 158 def f(x): return 1/0
160 159 d = f(range(10))
161 160 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
162 161 return d No newline at end of file
@@ -1,48 +1,51 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2008 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 # Tell nose to skip this module
16 __test__ = {}
17
15 18 import tempfile
16 19 import os, sys
17 20
18 21 from twisted.internet import reactor
19 22 from twisted.trial import unittest
20 23
21 24 from IPython.kernel.error import FileTimeoutError
22 25 from IPython.kernel.twistedutil import wait_for_file
23 26
24 27 #-----------------------------------------------------------------------------
25 28 # Tests
26 29 #-----------------------------------------------------------------------------
27 30
28 31 class TestWaitForFile(unittest.TestCase):
29 32
30 33 def test_delay(self):
31 34 filename = tempfile.mktemp()
32 35 def _create_file():
33 36 open(filename,'w').write('####')
34 37 dcall = reactor.callLater(0.5, _create_file)
35 38 d = wait_for_file(filename,delay=0.1)
36 39 d.addCallback(lambda r: self.assert_(r))
37 40 def _cancel_dcall(r):
38 41 if dcall.active():
39 42 dcall.cancel()
40 43 d.addCallback(_cancel_dcall)
41 44 return d
42 45
43 46 def test_timeout(self):
44 47 filename = tempfile.mktemp()
45 48 d = wait_for_file(filename,delay=0.1,max_tries=1)
46 49 d.addErrback(lambda f: self.assertRaises(FileTimeoutError,f.raiseException))
47 50 return d
48 51 No newline at end of file
1 NO CONTENT: modified file
General Comments 0
You need to be logged in to leave comments. Login now