##// END OF EJS Templates
Merging in vvatsa's ssh mode for ipcluster with some changes....
Merging in vvatsa's ssh mode for ipcluster with some changes. We now have a fully working ssh mode for the new ipcluster. It should work well on Unix, Linux and OS X.

File last commit:

r1555:ea082057
r1832:34a51241
Show More
test_multienginefc.py
143 lines | 5.1 KiB | text/x-python | PythonLexer
/ IPython / kernel / tests / test_multienginefc.py
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 #!/usr/bin/env python
# encoding: utf-8
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
Brian E Granger
Finished initial reworking and updating of setup.py and friends, including the MANIFEST.in. Everything seems...
r1244 try:
from twisted.internet import defer, reactor
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian E Granger
Finished initial reworking and updating of setup.py and friends, including the MANIFEST.in. Everything seems...
r1244 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian E Granger
Finished initial reworking and updating of setup.py and friends, including the MANIFEST.in. Everything seems...
r1244 from IPython.testing.util import DeferredTestCase
from IPython.kernel.controllerservice import ControllerService
from IPython.kernel.multiengine import IMultiEngine
from IPython.kernel.tests.multienginetest import IFullSynchronousMultiEngineTestCase
from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
from IPython.kernel import multiengine as me
from IPython.kernel.clientconnector import ClientConnector
Brian E Granger
Fixing more tests and examples after the task, map and @parallel work.
r1396 from IPython.kernel.parallelfunction import ParallelFunction
from IPython.kernel.error import CompositeError
from IPython.kernel.util import printer
Brian E Granger
Finished initial reworking and updating of setup.py and friends, including the MANIFEST.in. Everything seems...
r1244 except ImportError:
Brian Granger
Removed some tabs and added a new way of skipping tests that have...
r1555 import nose
raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
Brian E Granger
Fixing more tests and examples after the task, map and @parallel work.
r1396
Brian Granger
Removed some tabs and added a new way of skipping tests that have...
r1555 def _raise_it(f):
try:
f.raiseException()
except CompositeError, e:
e.raise_exception()
class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMultiEngineTestCase):
def setUp(self):
Brian E Granger
Fixing more tests and examples after the task, map and @parallel work.
r1396
Brian Granger
Removed some tabs and added a new way of skipping tests that have...
r1555 self.engines = []
self.controller = ControllerService()
self.controller.startService()
self.imultiengine = IMultiEngine(self.controller)
self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
self.controller_tub = Tub()
self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
self.controller_tub.setLocation('127.0.0.1:10105')
Brian E Granger
Fixing more tests and examples after the task, map and @parallel work.
r1396
Brian Granger
Removed some tabs and added a new way of skipping tests that have...
r1555 furl = self.controller_tub.registerReference(self.mec_referenceable)
self.controller_tub.startService()
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Removed some tabs and added a new way of skipping tests that have...
r1555 self.client_tub = ClientConnector()
d = self.client_tub.get_multiengine_client(furl)
d.addCallback(self.handle_got_client)
return d
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian Granger
Removed some tabs and added a new way of skipping tests that have...
r1555 def handle_got_client(self, client):
self.multiengine = client
def tearDown(self):
dlist = []
# Shut down the multiengine client
d = self.client_tub.tub.stopService()
dlist.append(d)
# Shut down the engines
for e in self.engines:
e.stopService()
# Shut down the controller
d = self.controller_tub.stopService()
d.addBoth(lambda _: self.controller.stopService())
dlist.append(d)
return defer.DeferredList(dlist)
Brian E Granger
Fixing more tests and examples after the task, map and @parallel work.
r1396
Brian Granger
Removed some tabs and added a new way of skipping tests that have...
r1555 def test_mapper(self):
self.addEngine(4)
m = self.multiengine.mapper()
self.assertEquals(m.multiengine,self.multiengine)
self.assertEquals(m.dist,'b')
self.assertEquals(m.targets,'all')
self.assertEquals(m.block,True)
def test_map_default(self):
self.addEngine(4)
m = self.multiengine.mapper()
d = m.map(lambda x: 2*x, range(10))
d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10)))
d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
return d
def test_map_noblock(self):
self.addEngine(4)
m = self.multiengine.mapper(block=False)
d = m.map(lambda x: 2*x, range(10))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
return d
def test_mapper_fail(self):
self.addEngine(4)
m = self.multiengine.mapper()
d = m.map(lambda x: 1/0, range(10))
d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
return d
def test_parallel(self):
self.addEngine(4)
p = self.multiengine.parallel()
self.assert_(isinstance(p, ParallelFunction))
@p
def f(x): return 2*x
d = f(range(10))
d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
return d
def test_parallel_noblock(self):
self.addEngine(1)
p = self.multiengine.parallel(block=False)
self.assert_(isinstance(p, ParallelFunction))
@p
def f(x): return 2*x
d = f(range(10))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
return d
def test_parallel_fail(self):
self.addEngine(4)
p = self.multiengine.parallel()
self.assert_(isinstance(p, ParallelFunction))
@p
def f(x): return 1/0
d = f(range(10))
d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
return d