##// END OF EJS Templates
Fixing minor bugs in tests.
Fixing minor bugs in tests.

File last commit:

r1551:d8f2aac4
r1559:2353c578 merge
Show More
multienginetest.py
827 lines | 39.2 KiB | text/x-python | PythonLexer
# encoding: utf-8
""""""
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
from twisted.internet import defer
from IPython.kernel import engineservice as es
from IPython.kernel import multiengine as me
from IPython.kernel import newserialized
from IPython.testing import util
from IPython.testing.parametric import parametric, Parametric
from IPython.kernel import newserialized
from IPython.kernel.util import printer
from IPython.kernel.error import (InvalidEngineID,
NoEnginesRegistered,
CompositeError,
InvalidDeferredID)
from IPython.kernel.tests.engineservicetest import validCommands, invalidCommands
from IPython.kernel.core.interpreter import Interpreter
#-------------------------------------------------------------------------------
# Base classes and utilities
#-------------------------------------------------------------------------------
class IMultiEngineBaseTestCase(object):
"""Basic utilities for working with multiengine tests.
Some subclass should define:
* self.multiengine
* self.engines to keep track of engines for clean up"""
def createShell(self):
return Interpreter()
def addEngine(self, n=1):
for i in range(n):
e = es.EngineService()
e.startService()
regDict = self.controller.register_engine(es.QueuedEngine(e), None)
e.id = regDict['id']
self.engines.append(e)
def testf(x):
return 2.0*x
globala = 99
def testg(x):
return globala*x
def isdid(did):
if not isinstance(did, str):
return False
if not len(did)==40:
return False
return True
def _raise_it(f):
try:
f.raiseException()
except CompositeError, e:
e.raise_exception()
#-------------------------------------------------------------------------------
# IMultiEngineTestCase
#-------------------------------------------------------------------------------
class IMultiEngineTestCase(IMultiEngineBaseTestCase):
"""A test for any object that implements IEngineMultiplexer.
self.multiengine must be defined and implement IEngineMultiplexer.
"""
def testIMultiEngineInterface(self):
"""Does self.engine claim to implement IEngineCore?"""
self.assert_(me.IEngineMultiplexer.providedBy(self.multiengine))
self.assert_(me.IMultiEngine.providedBy(self.multiengine))
def testIEngineMultiplexerInterfaceMethods(self):
"""Does self.engine have the methods and attributes in IEngineCore."""
for m in list(me.IEngineMultiplexer):
self.assert_(hasattr(self.multiengine, m))
def testIEngineMultiplexerDeferreds(self):
self.addEngine(1)
d= self.multiengine.execute('a=5', targets=0)
d.addCallback(lambda _: self.multiengine.push(dict(a=5),targets=0))
d.addCallback(lambda _: self.multiengine.push(dict(a=5, b='asdf', c=[1,2,3]),targets=0))
d.addCallback(lambda _: self.multiengine.pull(('a','b','c'),targets=0))
d.addCallback(lambda _: self.multiengine.get_result(targets=0))
d.addCallback(lambda _: self.multiengine.reset(targets=0))
d.addCallback(lambda _: self.multiengine.keys(targets=0))
d.addCallback(lambda _: self.multiengine.push_serialized(dict(a=newserialized.serialize(10)),targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('a',targets=0))
d.addCallback(lambda _: self.multiengine.clear_queue(targets=0))
d.addCallback(lambda _: self.multiengine.queue_status(targets=0))
return d
def testInvalidEngineID(self):
self.addEngine(1)
badID = 100
d = self.multiengine.execute('a=5', targets=badID)
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.push(dict(a=5), targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.pull('a', targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.reset(targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.keys(targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.push_serialized(dict(a=newserialized.serialize(10)), targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.queue_status(targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
return d
def testNoEnginesRegistered(self):
badID = 'all'
d= self.multiengine.execute('a=5', targets=badID)
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.push(dict(a=5), targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.pull('a', targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.get_result(targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.reset(targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.keys(targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.push_serialized(dict(a=newserialized.serialize(10)), targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.queue_status(targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
return d
def runExecuteAll(self, d, cmd, shell):
actual = shell.execute(cmd)
d.addCallback(lambda _: self.multiengine.execute(cmd))
def compare(result):
for r in result:
actual['id'] = r['id']
self.assertEquals(r, actual)
d.addCallback(compare)
def testExecuteAll(self):
self.addEngine(4)
d= defer.Deferred()
shell = Interpreter()
for cmd in validCommands:
self.runExecuteAll(d, cmd, shell)
d.callback(None)
return d
# The following two methods show how to do parametrized
# tests. This is really slick! Same is used above.
def runExecuteFailures(self, cmd, exc):
self.addEngine(4)
d= self.multiengine.execute(cmd)
d.addErrback(lambda f: self.assertRaises(exc, _raise_it, f))
return d
@parametric
def testExecuteFailuresMultiEng(cls):
return [(cls.runExecuteFailures,cmd,exc) for
cmd,exc in invalidCommands]
def testPushPull(self):
self.addEngine(1)
objs = [10,"hi there",1.2342354,{"p":(1,2)}]
d= self.multiengine.push(dict(key=objs[0]), targets=0)
d.addCallback(lambda _: self.multiengine.pull('key', targets=0))
d.addCallback(lambda r: self.assertEquals(r, [objs[0]]))
d.addCallback(lambda _: self.multiengine.push(dict(key=objs[1]), targets=0))
d.addCallback(lambda _: self.multiengine.pull('key', targets=0))
d.addCallback(lambda r: self.assertEquals(r, [objs[1]]))
d.addCallback(lambda _: self.multiengine.push(dict(key=objs[2]), targets=0))
d.addCallback(lambda _: self.multiengine.pull('key', targets=0))
d.addCallback(lambda r: self.assertEquals(r, [objs[2]]))
d.addCallback(lambda _: self.multiengine.push(dict(key=objs[3]), targets=0))
d.addCallback(lambda _: self.multiengine.pull('key', targets=0))
d.addCallback(lambda r: self.assertEquals(r, [objs[3]]))
d.addCallback(lambda _: self.multiengine.reset(targets=0))
d.addCallback(lambda _: self.multiengine.pull('a', targets=0))
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
d.addCallback(lambda _: self.multiengine.push(dict(a=10,b=20)))
d.addCallback(lambda _: self.multiengine.pull(('a','b')))
d.addCallback(lambda r: self.assertEquals(r, [[10,20]]))
return d
def testPushPullAll(self):
self.addEngine(4)
d= self.multiengine.push(dict(a=10))
d.addCallback(lambda _: self.multiengine.pull('a'))
d.addCallback(lambda r: self.assert_(r==[10,10,10,10]))
d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20)))
d.addCallback(lambda _: self.multiengine.pull(('a','b')))
d.addCallback(lambda r: self.assert_(r==4*[[10,20]]))
d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20), targets=0))
d.addCallback(lambda _: self.multiengine.pull(('a','b'), targets=0))
d.addCallback(lambda r: self.assert_(r==[[10,20]]))
d.addCallback(lambda _: self.multiengine.push(dict(a=None, b=None), targets=0))
d.addCallback(lambda _: self.multiengine.pull(('a','b'), targets=0))
d.addCallback(lambda r: self.assert_(r==[[None,None]]))
return d
def testPushPullSerialized(self):
self.addEngine(1)
objs = [10,"hi there",1.2342354,{"p":(1,2)}]
d= self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[0])), targets=0)
d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0))
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d.addCallback(lambda r: self.assertEquals(r, objs[0]))
d.addCallback(lambda _: self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[1])), targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0))
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d.addCallback(lambda r: self.assertEquals(r, objs[1]))
d.addCallback(lambda _: self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[2])), targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0))
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d.addCallback(lambda r: self.assertEquals(r, objs[2]))
d.addCallback(lambda _: self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[3])), targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0))
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d.addCallback(lambda r: self.assertEquals(r, objs[3]))
d.addCallback(lambda _: self.multiengine.push(dict(a=10,b=range(5)), targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized(('a','b'), targets=0))
d.addCallback(lambda serial: [newserialized.IUnSerialized(s).getObject() for s in serial[0]])
d.addCallback(lambda r: self.assertEquals(r, [10, range(5)]))
d.addCallback(lambda _: self.multiengine.reset(targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0))
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
return d
objs = [10,"hi there",1.2342354,{"p":(1,2)}]
d= defer.succeed(None)
for o in objs:
self.multiengine.push_serialized(0, key=newserialized.serialize(o))
value = self.multiengine.pull_serialized(0, 'key')
value.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d = self.assertDeferredEquals(value,o,d)
return d
def runGetResultAll(self, d, cmd, shell):
actual = shell.execute(cmd)
d.addCallback(lambda _: self.multiengine.execute(cmd))
d.addCallback(lambda _: self.multiengine.get_result())
def compare(result):
for r in result:
actual['id'] = r['id']
self.assertEquals(r, actual)
d.addCallback(compare)
def testGetResultAll(self):
self.addEngine(4)
d= defer.Deferred()
shell = Interpreter()
for cmd in validCommands:
self.runGetResultAll(d, cmd, shell)
d.callback(None)
return d
def testGetResultDefault(self):
self.addEngine(1)
target = 0
cmd = 'a=5'
shell = self.createShell()
shellResult = shell.execute(cmd)
def popit(dikt, key):
dikt.pop(key)
return dikt
d= self.multiengine.execute(cmd, targets=target)
d.addCallback(lambda _: self.multiengine.get_result(targets=target))
d.addCallback(lambda r: self.assertEquals(shellResult, popit(r[0],'id')))
return d
def testGetResultFailure(self):
self.addEngine(1)
d= self.multiengine.get_result(None, targets=0)
d.addErrback(lambda f: self.assertRaises(IndexError, _raise_it, f))
d.addCallback(lambda _: self.multiengine.get_result(10, targets=0))
d.addErrback(lambda f: self.assertRaises(IndexError, _raise_it, f))
return d
def testPushFunction(self):
self.addEngine(1)
d= self.multiengine.push_function(dict(f=testf), targets=0)
d.addCallback(lambda _: self.multiengine.execute('result = f(10)', targets=0))
d.addCallback(lambda _: self.multiengine.pull('result', targets=0))
d.addCallback(lambda r: self.assertEquals(r[0], testf(10)))
d.addCallback(lambda _: self.multiengine.push(dict(globala=globala), targets=0))
d.addCallback(lambda _: self.multiengine.push_function(dict(g=testg), targets=0))
d.addCallback(lambda _: self.multiengine.execute('result = g(10)', targets=0))
d.addCallback(lambda _: self.multiengine.pull('result', targets=0))
d.addCallback(lambda r: self.assertEquals(r[0], testg(10)))
return d
def testPullFunction(self):
self.addEngine(1)
d= self.multiengine.push(dict(a=globala), targets=0)
d.addCallback(lambda _: self.multiengine.push_function(dict(f=testf), targets=0))
d.addCallback(lambda _: self.multiengine.pull_function('f', targets=0))
d.addCallback(lambda r: self.assertEquals(r[0](10), testf(10)))
d.addCallback(lambda _: self.multiengine.execute("def g(x): return x*x", targets=0))
d.addCallback(lambda _: self.multiengine.pull_function(('f','g'),targets=0))
d.addCallback(lambda r: self.assertEquals((r[0][0](10),r[0][1](10)), (testf(10), 100)))
return d
def testPushFunctionAll(self):
self.addEngine(4)
d= self.multiengine.push_function(dict(f=testf))
d.addCallback(lambda _: self.multiengine.execute('result = f(10)'))
d.addCallback(lambda _: self.multiengine.pull('result'))
d.addCallback(lambda r: self.assertEquals(r, 4*[testf(10)]))
d.addCallback(lambda _: self.multiengine.push(dict(globala=globala)))
d.addCallback(lambda _: self.multiengine.push_function(dict(testg=testg)))
d.addCallback(lambda _: self.multiengine.execute('result = testg(10)'))
d.addCallback(lambda _: self.multiengine.pull('result'))
d.addCallback(lambda r: self.assertEquals(r, 4*[testg(10)]))
return d
def testPullFunctionAll(self):
self.addEngine(4)
d= self.multiengine.push_function(dict(f=testf))
d.addCallback(lambda _: self.multiengine.pull_function('f'))
d.addCallback(lambda r: self.assertEquals([func(10) for func in r], 4*[testf(10)]))
return d
def testGetIDs(self):
self.addEngine(1)
d= self.multiengine.get_ids()
d.addCallback(lambda r: self.assertEquals(r, [0]))
d.addCallback(lambda _: self.addEngine(3))
d.addCallback(lambda _: self.multiengine.get_ids())
d.addCallback(lambda r: self.assertEquals(r, [0,1,2,3]))
return d
def testClearQueue(self):
self.addEngine(4)
d= self.multiengine.clear_queue()
d.addCallback(lambda r: self.assertEquals(r,4*[None]))
return d
def testQueueStatus(self):
self.addEngine(4)
d= self.multiengine.queue_status(targets=0)
d.addCallback(lambda r: self.assert_(isinstance(r[0],tuple)))
return d
def testGetSetProperties(self):
self.addEngine(4)
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
d= self.multiengine.set_properties(dikt)
d.addCallback(lambda r: self.multiengine.get_properties())
d.addCallback(lambda r: self.assertEquals(r, 4*[dikt]))
d.addCallback(lambda r: self.multiengine.get_properties(('c',)))
d.addCallback(lambda r: self.assertEquals(r, 4*[{'c': dikt['c']}]))
d.addCallback(lambda r: self.multiengine.set_properties(dict(c=False)))
d.addCallback(lambda r: self.multiengine.get_properties(('c', 'd')))
d.addCallback(lambda r: self.assertEquals(r, 4*[dict(c=False, d=None)]))
return d
def testClearProperties(self):
self.addEngine(4)
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
d= self.multiengine.set_properties(dikt)
d.addCallback(lambda r: self.multiengine.clear_properties())
d.addCallback(lambda r: self.multiengine.get_properties())
d.addCallback(lambda r: self.assertEquals(r, 4*[{}]))
return d
def testDelHasProperties(self):
self.addEngine(4)
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
d= self.multiengine.set_properties(dikt)
d.addCallback(lambda r: self.multiengine.del_properties(('b','e')))
d.addCallback(lambda r: self.multiengine.has_properties(('a','b','c','d','e')))
d.addCallback(lambda r: self.assertEquals(r, 4*[[True, False, True, True, False]]))
return d
Parametric(IMultiEngineTestCase)
#-------------------------------------------------------------------------------
# ISynchronousMultiEngineTestCase
#-------------------------------------------------------------------------------
class ISynchronousMultiEngineTestCase(IMultiEngineBaseTestCase):
def testISynchronousMultiEngineInterface(self):
"""Does self.engine claim to implement IEngineCore?"""
self.assert_(me.ISynchronousEngineMultiplexer.providedBy(self.multiengine))
self.assert_(me.ISynchronousMultiEngine.providedBy(self.multiengine))
def testExecute(self):
self.addEngine(4)
execute = self.multiengine.execute
d= execute('a=5', targets=0, block=True)
d.addCallback(lambda r: self.assert_(len(r)==1))
d.addCallback(lambda _: execute('b=10'))
d.addCallback(lambda r: self.assert_(len(r)==4))
d.addCallback(lambda _: execute('c=30', block=False))
d.addCallback(lambda did: self.assert_(isdid(did)))
d.addCallback(lambda _: execute('d=[0,1,2]', block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assert_(len(r)==4))
return d
def testPushPull(self):
data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
self.addEngine(4)
push = self.multiengine.push
pull = self.multiengine.pull
d= push({'data':data}, targets=0)
d.addCallback(lambda r: pull('data', targets=0))
d.addCallback(lambda r: self.assertEqual(r,[data]))
d.addCallback(lambda _: push({'data':data}))
d.addCallback(lambda r: pull('data'))
d.addCallback(lambda r: self.assertEqual(r,4*[data]))
d.addCallback(lambda _: push({'data':data}, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda _: pull('data', block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEqual(r,4*[data]))
d.addCallback(lambda _: push(dict(a=10,b=20)))
d.addCallback(lambda _: pull(('a','b')))
d.addCallback(lambda r: self.assertEquals(r, 4*[[10,20]]))
return d
def testPushPullFunction(self):
self.addEngine(4)
pushf = self.multiengine.push_function
pullf = self.multiengine.pull_function
push = self.multiengine.push
pull = self.multiengine.pull
execute = self.multiengine.execute
d= pushf({'testf':testf}, targets=0)
d.addCallback(lambda r: pullf('testf', targets=0))
d.addCallback(lambda r: self.assertEqual(r[0](1.0), testf(1.0)))
d.addCallback(lambda _: execute('r = testf(10)', targets=0))
d.addCallback(lambda _: pull('r', targets=0))
d.addCallback(lambda r: self.assertEquals(r[0], testf(10)))
d.addCallback(lambda _: pushf({'testf':testf}, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda _: pullf('testf', block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEqual(r[0](1.0), testf(1.0)))
d.addCallback(lambda _: execute("def g(x): return x*x", targets=0))
d.addCallback(lambda _: pullf(('testf','g'),targets=0))
d.addCallback(lambda r: self.assertEquals((r[0][0](10),r[0][1](10)), (testf(10), 100)))
return d
def testGetResult(self):
shell = Interpreter()
result1 = shell.execute('a=10')
result1['id'] = 0
result2 = shell.execute('b=20')
result2['id'] = 0
execute= self.multiengine.execute
get_result = self.multiengine.get_result
self.addEngine(1)
d= execute('a=10')
d.addCallback(lambda _: get_result())
d.addCallback(lambda r: self.assertEquals(r[0], result1))
d.addCallback(lambda _: execute('b=20'))
d.addCallback(lambda _: get_result(1))
d.addCallback(lambda r: self.assertEquals(r[0], result1))
d.addCallback(lambda _: get_result(2, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r[0], result2))
return d
def testResetAndKeys(self):
self.addEngine(1)
#Blocking mode
d= self.multiengine.push(dict(a=10, b=20, c=range(10)), targets=0)
d.addCallback(lambda _: self.multiengine.keys(targets=0))
def keys_found(keys):
self.assert_('a' in keys[0])
self.assert_('b' in keys[0])
self.assert_('b' in keys[0])
d.addCallback(keys_found)
d.addCallback(lambda _: self.multiengine.reset(targets=0))
d.addCallback(lambda _: self.multiengine.keys(targets=0))
def keys_not_found(keys):
self.assert_('a' not in keys[0])
self.assert_('b' not in keys[0])
self.assert_('b' not in keys[0])
d.addCallback(keys_not_found)
#Non-blocking mode
d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20, c=range(10)), targets=0))
d.addCallback(lambda _: self.multiengine.keys(targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
def keys_found(keys):
self.assert_('a' in keys[0])
self.assert_('b' in keys[0])
self.assert_('b' in keys[0])
d.addCallback(keys_found)
d.addCallback(lambda _: self.multiengine.reset(targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda _: self.multiengine.keys(targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
def keys_not_found(keys):
self.assert_('a' not in keys[0])
self.assert_('b' not in keys[0])
self.assert_('b' not in keys[0])
d.addCallback(keys_not_found)
return d
def testPushPullSerialized(self):
self.addEngine(1)
dikt = dict(a=10,b='hi there',c=1.2345,d={'p':(1,2)})
sdikt = {}
for k,v in dikt.iteritems():
sdikt[k] = newserialized.serialize(v)
d= self.multiengine.push_serialized(dict(a=sdikt['a']), targets=0)
d.addCallback(lambda _: self.multiengine.pull('a',targets=0))
d.addCallback(lambda r: self.assertEquals(r[0], dikt['a']))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0))
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d.addCallback(lambda r: self.assertEquals(r, dikt['a']))
d.addCallback(lambda _: self.multiengine.push_serialized(sdikt, targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized(sdikt.keys(), targets=0))
d.addCallback(lambda serial: [newserialized.IUnSerialized(s).getObject() for s in serial[0]])
d.addCallback(lambda r: self.assertEquals(r, dikt.values()))
d.addCallback(lambda _: self.multiengine.reset(targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0))
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
#Non-blocking mode
d.addCallback(lambda r: self.multiengine.push_serialized(dict(a=sdikt['a']), targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda _: self.multiengine.pull('a',targets=0))
d.addCallback(lambda r: self.assertEquals(r[0], dikt['a']))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d.addCallback(lambda r: self.assertEquals(r, dikt['a']))
d.addCallback(lambda _: self.multiengine.push_serialized(sdikt, targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda _: self.multiengine.pull_serialized(sdikt.keys(), targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda serial: [newserialized.IUnSerialized(s).getObject() for s in serial[0]])
d.addCallback(lambda r: self.assertEquals(r, dikt.values()))
d.addCallback(lambda _: self.multiengine.reset(targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
return d
def testClearQueue(self):
self.addEngine(4)
d= self.multiengine.clear_queue()
d.addCallback(lambda r: self.multiengine.clear_queue(block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r,4*[None]))
return d
def testQueueStatus(self):
self.addEngine(4)
d= self.multiengine.queue_status(targets=0)
d.addCallback(lambda r: self.assert_(isinstance(r[0],tuple)))
d.addCallback(lambda r: self.multiengine.queue_status(targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assert_(isinstance(r[0],tuple)))
return d
def testGetIDs(self):
self.addEngine(1)
d= self.multiengine.get_ids()
d.addCallback(lambda r: self.assertEquals(r, [0]))
d.addCallback(lambda _: self.addEngine(3))
d.addCallback(lambda _: self.multiengine.get_ids())
d.addCallback(lambda r: self.assertEquals(r, [0,1,2,3]))
return d
def testGetSetProperties(self):
self.addEngine(4)
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
d= self.multiengine.set_properties(dikt)
d.addCallback(lambda r: self.multiengine.get_properties())
d.addCallback(lambda r: self.assertEquals(r, 4*[dikt]))
d.addCallback(lambda r: self.multiengine.get_properties(('c',)))
d.addCallback(lambda r: self.assertEquals(r, 4*[{'c': dikt['c']}]))
d.addCallback(lambda r: self.multiengine.set_properties(dict(c=False)))
d.addCallback(lambda r: self.multiengine.get_properties(('c', 'd')))
d.addCallback(lambda r: self.assertEquals(r, 4*[dict(c=False, d=None)]))
#Non-blocking
d.addCallback(lambda r: self.multiengine.set_properties(dikt, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.get_properties(block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r, 4*[dikt]))
d.addCallback(lambda r: self.multiengine.get_properties(('c',), block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r, 4*[{'c': dikt['c']}]))
d.addCallback(lambda r: self.multiengine.set_properties(dict(c=False), block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.get_properties(('c', 'd'), block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r, 4*[dict(c=False, d=None)]))
return d
def testClearProperties(self):
self.addEngine(4)
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
d= self.multiengine.set_properties(dikt)
d.addCallback(lambda r: self.multiengine.clear_properties())
d.addCallback(lambda r: self.multiengine.get_properties())
d.addCallback(lambda r: self.assertEquals(r, 4*[{}]))
#Non-blocking
d.addCallback(lambda r: self.multiengine.set_properties(dikt, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.clear_properties(block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.get_properties(block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r, 4*[{}]))
return d
def testDelHasProperties(self):
self.addEngine(4)
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
d= self.multiengine.set_properties(dikt)
d.addCallback(lambda r: self.multiengine.del_properties(('b','e')))
d.addCallback(lambda r: self.multiengine.has_properties(('a','b','c','d','e')))
d.addCallback(lambda r: self.assertEquals(r, 4*[[True, False, True, True, False]]))
#Non-blocking
d.addCallback(lambda r: self.multiengine.set_properties(dikt, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.del_properties(('b','e'), block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.has_properties(('a','b','c','d','e'), block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r, 4*[[True, False, True, True, False]]))
return d
def test_clear_pending_deferreds(self):
self.addEngine(4)
did_list = []
d= self.multiengine.execute('a=10',block=False)
d.addCallback(lambda did: did_list.append(did))
d.addCallback(lambda _: self.multiengine.push(dict(b=10),block=False))
d.addCallback(lambda did: did_list.append(did))
d.addCallback(lambda _: self.multiengine.pull(('a','b'),block=False))
d.addCallback(lambda did: did_list.append(did))
d.addCallback(lambda _: self.multiengine.clear_pending_deferreds())
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[0],True))
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[1],True))
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[2],True))
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
return d
#-------------------------------------------------------------------------------
# Coordinator test cases
#-------------------------------------------------------------------------------
class IMultiEngineCoordinatorTestCase(object):
def testScatterGather(self):
self.addEngine(4)
d= self.multiengine.scatter('a', range(16))
d.addCallback(lambda r: self.multiengine.gather('a'))
d.addCallback(lambda r: self.assertEquals(r, range(16)))
d.addCallback(lambda _: self.multiengine.gather('asdf'))
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
return d
def testScatterGatherNumpy(self):
try:
import numpy
from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
except:
return
else:
self.addEngine(4)
a = numpy.arange(16)
d = self.multiengine.scatter('a', a)
d.addCallback(lambda r: self.multiengine.gather('a'))
d.addCallback(lambda r: assert_array_equal(r, a))
return d
def testMap(self):
self.addEngine(4)
def f(x):
return x**2
data = range(16)
d= self.multiengine.map(f, data)
d.addCallback(lambda r: self.assertEquals(r,[f(x) for x in data]))
return d
class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase):
def testScatterGatherNonblocking(self):
self.addEngine(4)
d= self.multiengine.scatter('a', range(16), block=False)
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.gather('a', block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r, range(16)))
return d
def testScatterGatherNumpyNonblocking(self):
try:
import numpy
from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
except:
return
else:
self.addEngine(4)
a = numpy.arange(16)
d = self.multiengine.scatter('a', a, block=False)
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.gather('a', block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: assert_array_equal(r, a))
return d
def test_clear_pending_deferreds(self):
self.addEngine(4)
did_list = []
d= self.multiengine.scatter('a',range(16),block=False)
d.addCallback(lambda did: did_list.append(did))
d.addCallback(lambda _: self.multiengine.gather('a',block=False))
d.addCallback(lambda did: did_list.append(did))
d.addCallback(lambda _: self.multiengine.map(lambda x: x, range(16),block=False))
d.addCallback(lambda did: did_list.append(did))
d.addCallback(lambda _: self.multiengine.clear_pending_deferreds())
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[0],True))
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[1],True))
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[2],True))
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
return d
#-------------------------------------------------------------------------------
# Extras test cases
#-------------------------------------------------------------------------------
class IMultiEngineExtrasTestCase(object):
def testZipPull(self):
self.addEngine(4)
d= self.multiengine.push(dict(a=10,b=20))
d.addCallback(lambda r: self.multiengine.zip_pull(('a','b')))
d.addCallback(lambda r: self.assert_(r, [4*[10],4*[20]]))
return d
def testRun(self):
self.addEngine(4)
import tempfile
fname = tempfile.mktemp('foo.py')
f= open(fname, 'w')
f.write('a = 10\nb=30')
f.close()
d= self.multiengine.run(fname)
d.addCallback(lambda r: self.multiengine.pull(('a','b')))
d.addCallback(lambda r: self.assertEquals(r, 4*[[10,30]]))
return d
class ISynchronousMultiEngineExtrasTestCase(IMultiEngineExtrasTestCase):
def testZipPullNonblocking(self):
self.addEngine(4)
d= self.multiengine.push(dict(a=10,b=20))
d.addCallback(lambda r: self.multiengine.zip_pull(('a','b'), block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assert_(r, [4*[10],4*[20]]))
return d
def testRunNonblocking(self):
self.addEngine(4)
import tempfile
fname = tempfile.mktemp('foo.py')
f= open(fname, 'w')
f.write('a = 10\nb=30')
f.close()
d= self.multiengine.run(fname, block=False)
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.pull(('a','b')))
d.addCallback(lambda r: self.assertEquals(r, 4*[[10,30]]))
return d
#-------------------------------------------------------------------------------
# IFullSynchronousMultiEngineTestCase
#-------------------------------------------------------------------------------
class IFullSynchronousMultiEngineTestCase(ISynchronousMultiEngineTestCase,
ISynchronousMultiEngineCoordinatorTestCase,
ISynchronousMultiEngineExtrasTestCase):
pass