# encoding: utf-8 """""" __docformat__ = "restructuredtext en" #------------------------------------------------------------------------------- # Copyright (C) 2008 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Imports #------------------------------------------------------------------------------- from twisted.internet import defer from IPython.kernel import engineservice as es from IPython.kernel import multiengine as me from IPython.kernel import newserialized from IPython.kernel.error import NotDefined from IPython.testing import util from IPython.testing.parametric import parametric, Parametric from IPython.kernel import newserialized from IPython.kernel.util import printer from IPython.kernel.error import (InvalidEngineID, NoEnginesRegistered, CompositeError, InvalidDeferredID) from IPython.kernel.tests.engineservicetest import validCommands, invalidCommands from IPython.kernel.core.interpreter import Interpreter #------------------------------------------------------------------------------- # Base classes and utilities #------------------------------------------------------------------------------- class IMultiEngineBaseTestCase(object): """Basic utilities for working with multiengine tests. Some subclass should define: * self.multiengine * self.engines to keep track of engines for clean up""" def createShell(self): return Interpreter() def addEngine(self, n=1): for i in range(n): e = es.EngineService() e.startService() regDict = self.controller.register_engine(es.QueuedEngine(e), None) e.id = regDict['id'] self.engines.append(e) def testf(x): return 2.0*x globala = 99 def testg(x): return globala*x def isdid(did): if not isinstance(did, str): return False if not len(did)==40: return False return True def _raise_it(f): try: f.raiseException() except CompositeError, e: e.raise_exception() #------------------------------------------------------------------------------- # IMultiEngineTestCase #------------------------------------------------------------------------------- class IMultiEngineTestCase(IMultiEngineBaseTestCase): """A test for any object that implements IEngineMultiplexer. self.multiengine must be defined and implement IEngineMultiplexer. """ def testIMultiEngineInterface(self): """Does self.engine claim to implement IEngineCore?""" self.assert_(me.IEngineMultiplexer.providedBy(self.multiengine)) self.assert_(me.IMultiEngine.providedBy(self.multiengine)) def testIEngineMultiplexerInterfaceMethods(self): """Does self.engine have the methods and attributes in IEngineCore.""" for m in list(me.IEngineMultiplexer): self.assert_(hasattr(self.multiengine, m)) def testIEngineMultiplexerDeferreds(self): self.addEngine(1) d= self.multiengine.execute('a=5', targets=0) d.addCallback(lambda _: self.multiengine.push(dict(a=5),targets=0)) d.addCallback(lambda _: self.multiengine.push(dict(a=5, b='asdf', c=[1,2,3]),targets=0)) d.addCallback(lambda _: self.multiengine.pull(('a','b','c'),targets=0)) d.addCallback(lambda _: self.multiengine.get_result(targets=0)) d.addCallback(lambda _: self.multiengine.reset(targets=0)) d.addCallback(lambda _: self.multiengine.keys(targets=0)) d.addCallback(lambda _: self.multiengine.push_serialized(dict(a=newserialized.serialize(10)),targets=0)) d.addCallback(lambda _: self.multiengine.pull_serialized('a',targets=0)) d.addCallback(lambda _: self.multiengine.clear_queue(targets=0)) d.addCallback(lambda _: self.multiengine.queue_status(targets=0)) return d def testInvalidEngineID(self): self.addEngine(1) badID = 100 d = self.multiengine.execute('a=5', targets=badID) d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException)) d.addCallback(lambda _: self.multiengine.push(dict(a=5), targets=badID)) d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException)) d.addCallback(lambda _: self.multiengine.pull('a', targets=badID)) d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException)) d.addCallback(lambda _: self.multiengine.reset(targets=badID)) d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException)) d.addCallback(lambda _: self.multiengine.keys(targets=badID)) d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException)) d.addCallback(lambda _: self.multiengine.push_serialized(dict(a=newserialized.serialize(10)), targets=badID)) d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException)) d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=badID)) d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException)) d.addCallback(lambda _: self.multiengine.queue_status(targets=badID)) d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException)) return d def testNoEnginesRegistered(self): badID = 'all' d= self.multiengine.execute('a=5', targets=badID) d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException)) d.addCallback(lambda _: self.multiengine.push(dict(a=5), targets=badID)) d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException)) d.addCallback(lambda _: self.multiengine.pull('a', targets=badID)) d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException)) d.addCallback(lambda _: self.multiengine.get_result(targets=badID)) d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException)) d.addCallback(lambda _: self.multiengine.reset(targets=badID)) d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException)) d.addCallback(lambda _: self.multiengine.keys(targets=badID)) d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException)) d.addCallback(lambda _: self.multiengine.push_serialized(dict(a=newserialized.serialize(10)), targets=badID)) d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException)) d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=badID)) d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException)) d.addCallback(lambda _: self.multiengine.queue_status(targets=badID)) d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException)) return d def runExecuteAll(self, d, cmd, shell): actual = shell.execute(cmd) d.addCallback(lambda _: self.multiengine.execute(cmd)) def compare(result): for r in result: actual['id'] = r['id'] self.assertEquals(r, actual) d.addCallback(compare) def testExecuteAll(self): self.addEngine(4) d= defer.Deferred() shell = Interpreter() for cmd in validCommands: self.runExecuteAll(d, cmd, shell) d.callback(None) return d # The following two methods show how to do parametrized # tests. This is really slick! Same is used above. def runExecuteFailures(self, cmd, exc): self.addEngine(4) d= self.multiengine.execute(cmd) d.addErrback(lambda f: self.assertRaises(exc, _raise_it, f)) return d @parametric def testExecuteFailuresMultiEng(cls): return [(cls.runExecuteFailures,cmd,exc) for cmd,exc in invalidCommands] def testPushPull(self): self.addEngine(1) objs = [10,"hi there",1.2342354,{"p":(1,2)}] d= self.multiengine.push(dict(key=objs[0]), targets=0) d.addCallback(lambda _: self.multiengine.pull('key', targets=0)) d.addCallback(lambda r: self.assertEquals(r, [objs[0]])) d.addCallback(lambda _: self.multiengine.push(dict(key=objs[1]), targets=0)) d.addCallback(lambda _: self.multiengine.pull('key', targets=0)) d.addCallback(lambda r: self.assertEquals(r, [objs[1]])) d.addCallback(lambda _: self.multiengine.push(dict(key=objs[2]), targets=0)) d.addCallback(lambda _: self.multiengine.pull('key', targets=0)) d.addCallback(lambda r: self.assertEquals(r, [objs[2]])) d.addCallback(lambda _: self.multiengine.push(dict(key=objs[3]), targets=0)) d.addCallback(lambda _: self.multiengine.pull('key', targets=0)) d.addCallback(lambda r: self.assertEquals(r, [objs[3]])) d.addCallback(lambda _: self.multiengine.reset(targets=0)) d.addCallback(lambda _: self.multiengine.pull('a', targets=0)) d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f)) d.addCallback(lambda _: self.multiengine.push(dict(a=10,b=20))) d.addCallback(lambda _: self.multiengine.pull(('a','b'))) d.addCallback(lambda r: self.assertEquals(r, [[10,20]])) return d def testPushPullAll(self): self.addEngine(4) d= self.multiengine.push(dict(a=10)) d.addCallback(lambda _: self.multiengine.pull('a')) d.addCallback(lambda r: self.assert_(r==[10,10,10,10])) d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20))) d.addCallback(lambda _: self.multiengine.pull(('a','b'))) d.addCallback(lambda r: self.assert_(r==4*[[10,20]])) d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20), targets=0)) d.addCallback(lambda _: self.multiengine.pull(('a','b'), targets=0)) d.addCallback(lambda r: self.assert_(r==[[10,20]])) d.addCallback(lambda _: self.multiengine.push(dict(a=None, b=None), targets=0)) d.addCallback(lambda _: self.multiengine.pull(('a','b'), targets=0)) d.addCallback(lambda r: self.assert_(r==[[None,None]])) return d def testPushPullSerialized(self): self.addEngine(1) objs = [10,"hi there",1.2342354,{"p":(1,2)}] d= self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[0])), targets=0) d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0)) d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject()) d.addCallback(lambda r: self.assertEquals(r, objs[0])) d.addCallback(lambda _: self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[1])), targets=0)) d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0)) d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject()) d.addCallback(lambda r: self.assertEquals(r, objs[1])) d.addCallback(lambda _: self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[2])), targets=0)) d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0)) d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject()) d.addCallback(lambda r: self.assertEquals(r, objs[2])) d.addCallback(lambda _: self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[3])), targets=0)) d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0)) d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject()) d.addCallback(lambda r: self.assertEquals(r, objs[3])) d.addCallback(lambda _: self.multiengine.push(dict(a=10,b=range(5)), targets=0)) d.addCallback(lambda _: self.multiengine.pull_serialized(('a','b'), targets=0)) d.addCallback(lambda serial: [newserialized.IUnSerialized(s).getObject() for s in serial[0]]) d.addCallback(lambda r: self.assertEquals(r, [10, range(5)])) d.addCallback(lambda _: self.multiengine.reset(targets=0)) d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0)) d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f)) return d objs = [10,"hi there",1.2342354,{"p":(1,2)}] d= defer.succeed(None) for o in objs: self.multiengine.push_serialized(0, key=newserialized.serialize(o)) value = self.multiengine.pull_serialized(0, 'key') value.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject()) d = self.assertDeferredEquals(value,o,d) return d def runGetResultAll(self, d, cmd, shell): actual = shell.execute(cmd) d.addCallback(lambda _: self.multiengine.execute(cmd)) d.addCallback(lambda _: self.multiengine.get_result()) def compare(result): for r in result: actual['id'] = r['id'] self.assertEquals(r, actual) d.addCallback(compare) def testGetResultAll(self): self.addEngine(4) d= defer.Deferred() shell = Interpreter() for cmd in validCommands: self.runGetResultAll(d, cmd, shell) d.callback(None) return d def testGetResultDefault(self): self.addEngine(1) target = 0 cmd = 'a=5' shell = self.createShell() shellResult = shell.execute(cmd) def popit(dikt, key): dikt.pop(key) return dikt d= self.multiengine.execute(cmd, targets=target) d.addCallback(lambda _: self.multiengine.get_result(targets=target)) d.addCallback(lambda r: self.assertEquals(shellResult, popit(r[0],'id'))) return d def testGetResultFailure(self): self.addEngine(1) d= self.multiengine.get_result(None, targets=0) d.addErrback(lambda f: self.assertRaises(IndexError, _raise_it, f)) d.addCallback(lambda _: self.multiengine.get_result(10, targets=0)) d.addErrback(lambda f: self.assertRaises(IndexError, _raise_it, f)) return d def testPushFunction(self): self.addEngine(1) d= self.multiengine.push_function(dict(f=testf), targets=0) d.addCallback(lambda _: self.multiengine.execute('result = f(10)', targets=0)) d.addCallback(lambda _: self.multiengine.pull('result', targets=0)) d.addCallback(lambda r: self.assertEquals(r[0], testf(10))) d.addCallback(lambda _: self.multiengine.push(dict(globala=globala), targets=0)) d.addCallback(lambda _: self.multiengine.push_function(dict(g=testg), targets=0)) d.addCallback(lambda _: self.multiengine.execute('result = g(10)', targets=0)) d.addCallback(lambda _: self.multiengine.pull('result', targets=0)) d.addCallback(lambda r: self.assertEquals(r[0], testg(10))) return d def testPullFunction(self): self.addEngine(1) d= self.multiengine.push(dict(a=globala), targets=0) d.addCallback(lambda _: self.multiengine.push_function(dict(f=testf), targets=0)) d.addCallback(lambda _: self.multiengine.pull_function('f', targets=0)) d.addCallback(lambda r: self.assertEquals(r[0](10), testf(10))) d.addCallback(lambda _: self.multiengine.execute("def g(x): return x*x", targets=0)) d.addCallback(lambda _: self.multiengine.pull_function(('f','g'),targets=0)) d.addCallback(lambda r: self.assertEquals((r[0][0](10),r[0][1](10)), (testf(10), 100))) return d def testPushFunctionAll(self): self.addEngine(4) d= self.multiengine.push_function(dict(f=testf)) d.addCallback(lambda _: self.multiengine.execute('result = f(10)')) d.addCallback(lambda _: self.multiengine.pull('result')) d.addCallback(lambda r: self.assertEquals(r, 4*[testf(10)])) d.addCallback(lambda _: self.multiengine.push(dict(globala=globala))) d.addCallback(lambda _: self.multiengine.push_function(dict(testg=testg))) d.addCallback(lambda _: self.multiengine.execute('result = testg(10)')) d.addCallback(lambda _: self.multiengine.pull('result')) d.addCallback(lambda r: self.assertEquals(r, 4*[testg(10)])) return d def testPullFunctionAll(self): self.addEngine(4) d= self.multiengine.push_function(dict(f=testf)) d.addCallback(lambda _: self.multiengine.pull_function('f')) d.addCallback(lambda r: self.assertEquals([func(10) for func in r], 4*[testf(10)])) return d def testGetIDs(self): self.addEngine(1) d= self.multiengine.get_ids() d.addCallback(lambda r: self.assertEquals(r, [0])) d.addCallback(lambda _: self.addEngine(3)) d.addCallback(lambda _: self.multiengine.get_ids()) d.addCallback(lambda r: self.assertEquals(r, [0,1,2,3])) return d def testClearQueue(self): self.addEngine(4) d= self.multiengine.clear_queue() d.addCallback(lambda r: self.assertEquals(r,4*[None])) return d def testQueueStatus(self): self.addEngine(4) d= self.multiengine.queue_status(targets=0) d.addCallback(lambda r: self.assert_(isinstance(r[0],tuple))) return d def testGetSetProperties(self): self.addEngine(4) dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5)) d= self.multiengine.set_properties(dikt) d.addCallback(lambda r: self.multiengine.get_properties()) d.addCallback(lambda r: self.assertEquals(r, 4*[dikt])) d.addCallback(lambda r: self.multiengine.get_properties(('c',))) d.addCallback(lambda r: self.assertEquals(r, 4*[{'c': dikt['c']}])) d.addCallback(lambda r: self.multiengine.set_properties(dict(c=False))) d.addCallback(lambda r: self.multiengine.get_properties(('c', 'd'))) d.addCallback(lambda r: self.assertEquals(r, 4*[dict(c=False, d=None)])) return d def testClearProperties(self): self.addEngine(4) dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5)) d= self.multiengine.set_properties(dikt) d.addCallback(lambda r: self.multiengine.clear_properties()) d.addCallback(lambda r: self.multiengine.get_properties()) d.addCallback(lambda r: self.assertEquals(r, 4*[{}])) return d def testDelHasProperties(self): self.addEngine(4) dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5)) d= self.multiengine.set_properties(dikt) d.addCallback(lambda r: self.multiengine.del_properties(('b','e'))) d.addCallback(lambda r: self.multiengine.has_properties(('a','b','c','d','e'))) d.addCallback(lambda r: self.assertEquals(r, 4*[[True, False, True, True, False]])) return d Parametric(IMultiEngineTestCase) #------------------------------------------------------------------------------- # ISynchronousMultiEngineTestCase #------------------------------------------------------------------------------- class ISynchronousMultiEngineTestCase(IMultiEngineBaseTestCase): def testISynchronousMultiEngineInterface(self): """Does self.engine claim to implement IEngineCore?""" self.assert_(me.ISynchronousEngineMultiplexer.providedBy(self.multiengine)) self.assert_(me.ISynchronousMultiEngine.providedBy(self.multiengine)) def testExecute(self): self.addEngine(4) execute = self.multiengine.execute d= execute('a=5', targets=0, block=True) d.addCallback(lambda r: self.assert_(len(r)==1)) d.addCallback(lambda _: execute('b=10')) d.addCallback(lambda r: self.assert_(len(r)==4)) d.addCallback(lambda _: execute('c=30', block=False)) d.addCallback(lambda did: self.assert_(isdid(did))) d.addCallback(lambda _: execute('d=[0,1,2]', block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assert_(len(r)==4)) return d def testPushPull(self): data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'}) self.addEngine(4) push = self.multiengine.push pull = self.multiengine.pull d= push({'data':data}, targets=0) d.addCallback(lambda r: pull('data', targets=0)) d.addCallback(lambda r: self.assertEqual(r,[data])) d.addCallback(lambda _: push({'data':data})) d.addCallback(lambda r: pull('data')) d.addCallback(lambda r: self.assertEqual(r,4*[data])) d.addCallback(lambda _: push({'data':data}, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda _: pull('data', block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEqual(r,4*[data])) d.addCallback(lambda _: push(dict(a=10,b=20))) d.addCallback(lambda _: pull(('a','b'))) d.addCallback(lambda r: self.assertEquals(r, 4*[[10,20]])) return d def testPushPullFunction(self): self.addEngine(4) pushf = self.multiengine.push_function pullf = self.multiengine.pull_function push = self.multiengine.push pull = self.multiengine.pull execute = self.multiengine.execute d= pushf({'testf':testf}, targets=0) d.addCallback(lambda r: pullf('testf', targets=0)) d.addCallback(lambda r: self.assertEqual(r[0](1.0), testf(1.0))) d.addCallback(lambda _: execute('r = testf(10)', targets=0)) d.addCallback(lambda _: pull('r', targets=0)) d.addCallback(lambda r: self.assertEquals(r[0], testf(10))) d.addCallback(lambda _: pushf({'testf':testf}, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda _: pullf('testf', block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEqual(r[0](1.0), testf(1.0))) d.addCallback(lambda _: execute("def g(x): return x*x", targets=0)) d.addCallback(lambda _: pullf(('testf','g'),targets=0)) d.addCallback(lambda r: self.assertEquals((r[0][0](10),r[0][1](10)), (testf(10), 100))) return d def testGetResult(self): shell = Interpreter() result1 = shell.execute('a=10') result1['id'] = 0 result2 = shell.execute('b=20') result2['id'] = 0 execute= self.multiengine.execute get_result = self.multiengine.get_result self.addEngine(1) d= execute('a=10') d.addCallback(lambda _: get_result()) d.addCallback(lambda r: self.assertEquals(r[0], result1)) d.addCallback(lambda _: execute('b=20')) d.addCallback(lambda _: get_result(1)) d.addCallback(lambda r: self.assertEquals(r[0], result1)) d.addCallback(lambda _: get_result(2, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEquals(r[0], result2)) return d def testResetAndKeys(self): self.addEngine(1) #Blocking mode d= self.multiengine.push(dict(a=10, b=20, c=range(10)), targets=0) d.addCallback(lambda _: self.multiengine.keys(targets=0)) def keys_found(keys): self.assert_('a' in keys[0]) self.assert_('b' in keys[0]) self.assert_('b' in keys[0]) d.addCallback(keys_found) d.addCallback(lambda _: self.multiengine.reset(targets=0)) d.addCallback(lambda _: self.multiengine.keys(targets=0)) def keys_not_found(keys): self.assert_('a' not in keys[0]) self.assert_('b' not in keys[0]) self.assert_('b' not in keys[0]) d.addCallback(keys_not_found) #Non-blocking mode d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20, c=range(10)), targets=0)) d.addCallback(lambda _: self.multiengine.keys(targets=0, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) def keys_found(keys): self.assert_('a' in keys[0]) self.assert_('b' in keys[0]) self.assert_('b' in keys[0]) d.addCallback(keys_found) d.addCallback(lambda _: self.multiengine.reset(targets=0, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda _: self.multiengine.keys(targets=0, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) def keys_not_found(keys): self.assert_('a' not in keys[0]) self.assert_('b' not in keys[0]) self.assert_('b' not in keys[0]) d.addCallback(keys_not_found) return d def testPushPullSerialized(self): self.addEngine(1) dikt = dict(a=10,b='hi there',c=1.2345,d={'p':(1,2)}) sdikt = {} for k,v in dikt.iteritems(): sdikt[k] = newserialized.serialize(v) d= self.multiengine.push_serialized(dict(a=sdikt['a']), targets=0) d.addCallback(lambda _: self.multiengine.pull('a',targets=0)) d.addCallback(lambda r: self.assertEquals(r[0], dikt['a'])) d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0)) d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject()) d.addCallback(lambda r: self.assertEquals(r, dikt['a'])) d.addCallback(lambda _: self.multiengine.push_serialized(sdikt, targets=0)) d.addCallback(lambda _: self.multiengine.pull_serialized(sdikt.keys(), targets=0)) d.addCallback(lambda serial: [newserialized.IUnSerialized(s).getObject() for s in serial[0]]) d.addCallback(lambda r: self.assertEquals(r, dikt.values())) d.addCallback(lambda _: self.multiengine.reset(targets=0)) d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0)) d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f)) #Non-blocking mode d.addCallback(lambda r: self.multiengine.push_serialized(dict(a=sdikt['a']), targets=0, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda _: self.multiengine.pull('a',targets=0)) d.addCallback(lambda r: self.assertEquals(r[0], dikt['a'])) d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject()) d.addCallback(lambda r: self.assertEquals(r, dikt['a'])) d.addCallback(lambda _: self.multiengine.push_serialized(sdikt, targets=0, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda _: self.multiengine.pull_serialized(sdikt.keys(), targets=0, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda serial: [newserialized.IUnSerialized(s).getObject() for s in serial[0]]) d.addCallback(lambda r: self.assertEquals(r, dikt.values())) d.addCallback(lambda _: self.multiengine.reset(targets=0)) d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f)) return d def testClearQueue(self): self.addEngine(4) d= self.multiengine.clear_queue() d.addCallback(lambda r: self.multiengine.clear_queue(block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEquals(r,4*[None])) return d def testQueueStatus(self): self.addEngine(4) d= self.multiengine.queue_status(targets=0) d.addCallback(lambda r: self.assert_(isinstance(r[0],tuple))) d.addCallback(lambda r: self.multiengine.queue_status(targets=0, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assert_(isinstance(r[0],tuple))) return d def testGetIDs(self): self.addEngine(1) d= self.multiengine.get_ids() d.addCallback(lambda r: self.assertEquals(r, [0])) d.addCallback(lambda _: self.addEngine(3)) d.addCallback(lambda _: self.multiengine.get_ids()) d.addCallback(lambda r: self.assertEquals(r, [0,1,2,3])) return d def testGetSetProperties(self): self.addEngine(4) dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5)) d= self.multiengine.set_properties(dikt) d.addCallback(lambda r: self.multiengine.get_properties()) d.addCallback(lambda r: self.assertEquals(r, 4*[dikt])) d.addCallback(lambda r: self.multiengine.get_properties(('c',))) d.addCallback(lambda r: self.assertEquals(r, 4*[{'c': dikt['c']}])) d.addCallback(lambda r: self.multiengine.set_properties(dict(c=False))) d.addCallback(lambda r: self.multiengine.get_properties(('c', 'd'))) d.addCallback(lambda r: self.assertEquals(r, 4*[dict(c=False, d=None)])) #Non-blocking d.addCallback(lambda r: self.multiengine.set_properties(dikt, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.multiengine.get_properties(block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEquals(r, 4*[dikt])) d.addCallback(lambda r: self.multiengine.get_properties(('c',), block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEquals(r, 4*[{'c': dikt['c']}])) d.addCallback(lambda r: self.multiengine.set_properties(dict(c=False), block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.multiengine.get_properties(('c', 'd'), block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEquals(r, 4*[dict(c=False, d=None)])) return d def testClearProperties(self): self.addEngine(4) dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5)) d= self.multiengine.set_properties(dikt) d.addCallback(lambda r: self.multiengine.clear_properties()) d.addCallback(lambda r: self.multiengine.get_properties()) d.addCallback(lambda r: self.assertEquals(r, 4*[{}])) #Non-blocking d.addCallback(lambda r: self.multiengine.set_properties(dikt, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.multiengine.clear_properties(block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.multiengine.get_properties(block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEquals(r, 4*[{}])) return d def testDelHasProperties(self): self.addEngine(4) dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5)) d= self.multiengine.set_properties(dikt) d.addCallback(lambda r: self.multiengine.del_properties(('b','e'))) d.addCallback(lambda r: self.multiengine.has_properties(('a','b','c','d','e'))) d.addCallback(lambda r: self.assertEquals(r, 4*[[True, False, True, True, False]])) #Non-blocking d.addCallback(lambda r: self.multiengine.set_properties(dikt, block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.multiengine.del_properties(('b','e'), block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.multiengine.has_properties(('a','b','c','d','e'), block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEquals(r, 4*[[True, False, True, True, False]])) return d def test_clear_pending_deferreds(self): self.addEngine(4) did_list = [] d= self.multiengine.execute('a=10',block=False) d.addCallback(lambda did: did_list.append(did)) d.addCallback(lambda _: self.multiengine.push(dict(b=10),block=False)) d.addCallback(lambda did: did_list.append(did)) d.addCallback(lambda _: self.multiengine.pull(('a','b'),block=False)) d.addCallback(lambda did: did_list.append(did)) d.addCallback(lambda _: self.multiengine.clear_pending_deferreds()) d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[0],True)) d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException)) d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[1],True)) d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException)) d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[2],True)) d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException)) return d #------------------------------------------------------------------------------- # Coordinator test cases #------------------------------------------------------------------------------- class IMultiEngineCoordinatorTestCase(object): def testScatterGather(self): self.addEngine(4) d= self.multiengine.scatter('a', range(16)) d.addCallback(lambda r: self.multiengine.gather('a')) d.addCallback(lambda r: self.assertEquals(r, range(16))) d.addCallback(lambda _: self.multiengine.gather('asdf')) d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f)) return d def testScatterGatherNumpy(self): try: import numpy from numpy.testing.utils import assert_array_equal, assert_array_almost_equal except: return else: self.addEngine(4) a = numpy.arange(16) d = self.multiengine.scatter('a', a) d.addCallback(lambda r: self.multiengine.gather('a')) d.addCallback(lambda r: assert_array_equal(r, a)) return d def testMap(self): self.addEngine(4) def f(x): return x**2 data = range(16) d= self.multiengine.map(f, data) d.addCallback(lambda r: self.assertEquals(r,[f(x) for x in data])) return d class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase): def testScatterGatherNonblocking(self): self.addEngine(4) d= self.multiengine.scatter('a', range(16), block=False) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.multiengine.gather('a', block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEquals(r, range(16))) return d def testScatterGatherNumpyNonblocking(self): try: import numpy from numpy.testing.utils import assert_array_equal, assert_array_almost_equal except: return else: self.addEngine(4) a = numpy.arange(16) d = self.multiengine.scatter('a', a, block=False) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.multiengine.gather('a', block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: assert_array_equal(r, a)) return d def test_clear_pending_deferreds(self): self.addEngine(4) did_list = [] d= self.multiengine.scatter('a',range(16),block=False) d.addCallback(lambda did: did_list.append(did)) d.addCallback(lambda _: self.multiengine.gather('a',block=False)) d.addCallback(lambda did: did_list.append(did)) d.addCallback(lambda _: self.multiengine.map(lambda x: x, range(16),block=False)) d.addCallback(lambda did: did_list.append(did)) d.addCallback(lambda _: self.multiengine.clear_pending_deferreds()) d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[0],True)) d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException)) d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[1],True)) d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException)) d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[2],True)) d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException)) return d #------------------------------------------------------------------------------- # Extras test cases #------------------------------------------------------------------------------- class IMultiEngineExtrasTestCase(object): def testZipPull(self): self.addEngine(4) d= self.multiengine.push(dict(a=10,b=20)) d.addCallback(lambda r: self.multiengine.zip_pull(('a','b'))) d.addCallback(lambda r: self.assert_(r, [4*[10],4*[20]])) return d def testRun(self): self.addEngine(4) import tempfile fname = tempfile.mktemp('foo.py') f= open(fname, 'w') f.write('a = 10\nb=30') f.close() d= self.multiengine.run(fname) d.addCallback(lambda r: self.multiengine.pull(('a','b'))) d.addCallback(lambda r: self.assertEquals(r, 4*[[10,30]])) return d class ISynchronousMultiEngineExtrasTestCase(IMultiEngineExtrasTestCase): def testZipPullNonblocking(self): self.addEngine(4) d= self.multiengine.push(dict(a=10,b=20)) d.addCallback(lambda r: self.multiengine.zip_pull(('a','b'), block=False)) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assert_(r, [4*[10],4*[20]])) return d def testRunNonblocking(self): self.addEngine(4) import tempfile fname = tempfile.mktemp('foo.py') f= open(fname, 'w') f.write('a = 10\nb=30') f.close() d= self.multiengine.run(fname, block=False) d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.multiengine.pull(('a','b'))) d.addCallback(lambda r: self.assertEquals(r, 4*[[10,30]])) return d #------------------------------------------------------------------------------- # IFullSynchronousMultiEngineTestCase #------------------------------------------------------------------------------- class IFullSynchronousMultiEngineTestCase(ISynchronousMultiEngineTestCase, ISynchronousMultiEngineCoordinatorTestCase, ISynchronousMultiEngineExtrasTestCase): pass