##// END OF EJS Templates
Fixed bugs in IPython.kernel....
Fixed bugs in IPython.kernel. * Removed :mod:`IPython.kernel.contexts` and :mod:`IPython.kernel.tests.test_contexts` and all code that used them. This was broken and needed to be removed. Also Enthought has a nice decorator that does this. * Fixed broken tests in :file:`test_multienginefc.py` and :file:`test_taskfc.py`. See https://bugs.launchpad.net/ipython/+bug/505358. * Changed ports number in :mod:`IPython.kernel.tests` to 10111 to avoid conflicts with users who are using the older 10105 ports. See https://bugs.launchpad.net/ipython/+bug/504515. * Fixed logic and bugs in :mod:`IPython.kernel.fcutils` and :mod:`IPython.kernel.clientconnector` related to how FURL files are validated and found. See https://bugs.launchpad.net/ipython/+bug/502790. * Fixed the unhandled :class:`DeadReferenceError`` exception that was happening in :mod:`IPython.kernel.multiengineclient' and :mod:`IPython.kernel.taskclient` when the controller died or restarted. Now we capture that error and raises a :class:`IPython.kernel.error.ConnectionError` that has a better error message. See https://bugs.launchpad.net/ipython/+bug/502796.

File last commit:

r2517:8185e15c
r2517:8185e15c
Show More
test_taskfc.py
162 lines | 5.5 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
# encoding: utf-8
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
# Tell nose to skip this module
__test__ = {}
import time
from twisted.internet import defer, reactor
from IPython.kernel.fcutil import Tub, UnauthenticatedTub
from IPython.kernel import task as taskmodule
from IPython.kernel import controllerservice as cs
import IPython.kernel.multiengine as me
from IPython.testing.util import DeferredTestCase
from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
from IPython.kernel.taskfc import IFCTaskController
from IPython.kernel.util import printer
from IPython.kernel.tests.tasktest import ITaskControllerTestCase
from IPython.kernel.clientconnector import AsyncClientConnector
from IPython.kernel.error import CompositeError
from IPython.kernel.parallelfunction import ParallelFunction
#-------------------------------------------------------------------------------
# Tests
#-------------------------------------------------------------------------------
def _raise_it(f):
try:
f.raiseException()
except CompositeError, e:
e.raise_exception()
class TaskTest(DeferredTestCase, ITaskControllerTestCase):
def setUp(self):
self.engines = []
self.controller = cs.ControllerService()
self.controller.startService()
self.imultiengine = me.IMultiEngine(self.controller)
self.itc = taskmodule.ITaskController(self.controller)
self.itc.failurePenalty = 0
self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
self.tc_referenceable = IFCTaskController(self.itc)
self.controller_tub = Tub()
self.controller_tub.listenOn('tcp:10111:interface=127.0.0.1')
self.controller_tub.setLocation('127.0.0.1:10111')
mec_furl = self.controller_tub.registerReference(self.mec_referenceable)
tc_furl = self.controller_tub.registerReference(self.tc_referenceable)
self.controller_tub.startService()
self.client_tub = AsyncClientConnector()
d = self.client_tub.get_multiengine_client(furl_or_file=mec_furl)
d.addCallback(self.handle_mec_client)
d.addCallback(lambda _: self.client_tub.get_task_client(furl_or_file=tc_furl))
d.addCallback(self.handle_tc_client)
return d
def handle_mec_client(self, client):
self.multiengine = client
def handle_tc_client(self, client):
self.tc = client
def tearDown(self):
dlist = []
# Shut down the multiengine client
d = self.client_tub.tub.stopService()
dlist.append(d)
# Shut down the engines
for e in self.engines:
e.stopService()
# Shut down the controller
d = self.controller_tub.stopService()
d.addBoth(lambda _: self.controller.stopService())
dlist.append(d)
return defer.DeferredList(dlist)
def test_mapper(self):
self.addEngine(1)
m = self.tc.mapper()
self.assertEquals(m.task_controller,self.tc)
self.assertEquals(m.clear_before,False)
self.assertEquals(m.clear_after,False)
self.assertEquals(m.retries,0)
self.assertEquals(m.recovery_task,None)
self.assertEquals(m.depend,None)
self.assertEquals(m.block,True)
def test_map_default(self):
self.addEngine(1)
m = self.tc.mapper()
d = m.map(lambda x: 2*x, range(10))
d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10)))
d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
return d
def test_map_noblock(self):
self.addEngine(1)
m = self.tc.mapper(block=False)
d = m.map(lambda x: 2*x, range(10))
d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
return d
def test_mapper_fail(self):
self.addEngine(1)
m = self.tc.mapper()
d = m.map(lambda x: 1/0, range(10))
d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
return d
def test_parallel(self):
self.addEngine(1)
p = self.tc.parallel()
self.assert_(isinstance(p, ParallelFunction))
@p
def f(x): return 2*x
d = f(range(10))
d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
return d
def test_parallel_noblock(self):
self.addEngine(1)
p = self.tc.parallel(block=False)
self.assert_(isinstance(p, ParallelFunction))
@p
def f(x): return 2*x
d = f(range(10))
d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
return d
def test_parallel_fail(self):
self.addEngine(1)
p = self.tc.parallel()
self.assert_(isinstance(p, ParallelFunction))
@p
def f(x): return 1/0
d = f(range(10))
d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
return d