##// END OF EJS Templates
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ipython branch. This is not a true merge in the formal sense because all history is not coming over with the files. For a detailed history of the added files, please see the ipython1-dev branch or the svn repository on scipy.org that ipython1-dev came from. More specifically, here is what I have done in this commit: 1) Moved the following by hand ipython1.config -> IPython.config ipython1.kernel -> IPython.kernel ipython1.external -> IPython.external ipython1.core -> IPython.kernel.core ipython1.testutils -> IPython.testing ipython1.tools -> IPython.tools 2) Moved IPython.tools.guid -> IPython1.external.guid 3) Renamed: ipython1 -> IPython IPython.core -> IPython.kernel.core IPython.testutils -> IPython.testing 4) Then did a "bzr add" for all the new stuff. That is all folks!

File last commit:

r1234:52b55407
r1234:52b55407
Show More
multienginetest.py
838 lines | 39.6 KiB | text/x-python | PythonLexer
# encoding: utf-8
""""""
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
from twisted.internet import defer
from IPython.kernel import engineservice as es
from IPython.kernel import multiengine as me
from IPython.kernel import newserialized
from IPython.kernel.error import NotDefined
from IPython.testutils import util
from IPython.testutils.parametric import parametric, Parametric
from IPython.kernel import newserialized
from IPython.kernel.util import printer
from IPython.kernel.error import (InvalidEngineID,
NoEnginesRegistered,
CompositeError,
InvalidDeferredID)
from IPython.kernel.tests.engineservicetest import validCommands, invalidCommands
from IPython.kernel.core.interpreter import Interpreter
#-------------------------------------------------------------------------------
# Base classes and utilities
#-------------------------------------------------------------------------------
class IMultiEngineBaseTestCase(object):
"""Basic utilities for working with multiengine tests.
Some subclass should define:
* self.multiengine
* self.engines to keep track of engines for clean up"""
def createShell(self):
return Interpreter()
def addEngine(self, n=1):
for i in range(n):
e = es.EngineService()
e.startService()
regDict = self.controller.register_engine(es.QueuedEngine(e), None)
e.id = regDict['id']
self.engines.append(e)
def testf(x):
return 2.0*x
globala = 99
def testg(x):
return globala*x
def isdid(did):
if not isinstance(did, str):
return False
if not len(did)==40:
return False
return True
def _raise_it(f):
try:
f.raiseException()
except CompositeError, e:
e.raise_exception()
#-------------------------------------------------------------------------------
# IMultiEngineTestCase
#-------------------------------------------------------------------------------
class IMultiEngineTestCase(IMultiEngineBaseTestCase):
"""A test for any object that implements IEngineMultiplexer.
self.multiengine must be defined and implement IEngineMultiplexer.
"""
def testIMultiEngineInterface(self):
"""Does self.engine claim to implement IEngineCore?"""
self.assert_(me.IEngineMultiplexer.providedBy(self.multiengine))
self.assert_(me.IMultiEngine.providedBy(self.multiengine))
def testIEngineMultiplexerInterfaceMethods(self):
"""Does self.engine have the methods and attributes in IEngineCore."""
for m in list(me.IEngineMultiplexer):
self.assert_(hasattr(self.multiengine, m))
def testIEngineMultiplexerDeferreds(self):
self.addEngine(1)
d= self.multiengine.execute('a=5', targets=0)
d.addCallback(lambda _: self.multiengine.push(dict(a=5),targets=0))
d.addCallback(lambda _: self.multiengine.push(dict(a=5, b='asdf', c=[1,2,3]),targets=0))
d.addCallback(lambda _: self.multiengine.pull(('a','b','c'),targets=0))
d.addCallback(lambda _: self.multiengine.get_result(targets=0))
d.addCallback(lambda _: self.multiengine.reset(targets=0))
d.addCallback(lambda _: self.multiengine.keys(targets=0))
d.addCallback(lambda _: self.multiengine.push_serialized(dict(a=newserialized.serialize(10)),targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('a',targets=0))
d.addCallback(lambda _: self.multiengine.clear_queue(targets=0))
d.addCallback(lambda _: self.multiengine.queue_status(targets=0))
return d
def testInvalidEngineID(self):
self.addEngine(1)
badID = 100
d = self.multiengine.execute('a=5', targets=badID)
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.push(dict(a=5), targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.pull('a', targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.reset(targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.keys(targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.push_serialized(dict(a=newserialized.serialize(10)), targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
d.addCallback(lambda _: self.multiengine.queue_status(targets=badID))
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
return d
def testNoEnginesRegistered(self):
badID = 'all'
d= self.multiengine.execute('a=5', targets=badID)
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.push(dict(a=5), targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.pull('a', targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.get_result(targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.reset(targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.keys(targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.push_serialized(dict(a=newserialized.serialize(10)), targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
d.addCallback(lambda _: self.multiengine.queue_status(targets=badID))
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
return d
def runExecuteAll(self, d, cmd, shell):
actual = shell.execute(cmd)
d.addCallback(lambda _: self.multiengine.execute(cmd))
def compare(result):
for r in result:
actual['id'] = r['id']
self.assertEquals(r, actual)
d.addCallback(compare)
def testExecuteAll(self):
self.addEngine(4)
d= defer.Deferred()
shell = Interpreter()
for cmd in validCommands:
self.runExecuteAll(d, cmd, shell)
d.callback(None)
return d
# The following two methods show how to do parametrized
# tests. This is really slick! Same is used above.
def runExecuteFailures(self, cmd, exc):
self.addEngine(4)
d= self.multiengine.execute(cmd)
d.addErrback(lambda f: self.assertRaises(exc, _raise_it, f))
return d
@parametric
def testExecuteFailures(cls):
return [(cls.runExecuteFailures,cmd,exc) for
cmd,exc in invalidCommands]
def testPushPull(self):
self.addEngine(1)
objs = [10,"hi there",1.2342354,{"p":(1,2)}]
d= self.multiengine.push(dict(key=objs[0]), targets=0)
d.addCallback(lambda _: self.multiengine.pull('key', targets=0))
d.addCallback(lambda r: self.assertEquals(r, [objs[0]]))
d.addCallback(lambda _: self.multiengine.push(dict(key=objs[1]), targets=0))
d.addCallback(lambda _: self.multiengine.pull('key', targets=0))
d.addCallback(lambda r: self.assertEquals(r, [objs[1]]))
d.addCallback(lambda _: self.multiengine.push(dict(key=objs[2]), targets=0))
d.addCallback(lambda _: self.multiengine.pull('key', targets=0))
d.addCallback(lambda r: self.assertEquals(r, [objs[2]]))
d.addCallback(lambda _: self.multiengine.push(dict(key=objs[3]), targets=0))
d.addCallback(lambda _: self.multiengine.pull('key', targets=0))
d.addCallback(lambda r: self.assertEquals(r, [objs[3]]))
d.addCallback(lambda _: self.multiengine.reset(targets=0))
d.addCallback(lambda _: self.multiengine.pull('a', targets=0))
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
d.addCallback(lambda _: self.multiengine.push(dict(a=10,b=20)))
d.addCallback(lambda _: self.multiengine.pull(('a','b')))
d.addCallback(lambda r: self.assertEquals(r, [[10,20]]))
return d
def testPushPullAll(self):
self.addEngine(4)
d= self.multiengine.push(dict(a=10))
d.addCallback(lambda _: self.multiengine.pull('a'))
d.addCallback(lambda r: self.assert_(r==[10,10,10,10]))
d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20)))
d.addCallback(lambda _: self.multiengine.pull(('a','b')))
d.addCallback(lambda r: self.assert_(r==4*[[10,20]]))
d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20), targets=0))
d.addCallback(lambda _: self.multiengine.pull(('a','b'), targets=0))
d.addCallback(lambda r: self.assert_(r==[[10,20]]))
d.addCallback(lambda _: self.multiengine.push(dict(a=None, b=None), targets=0))
d.addCallback(lambda _: self.multiengine.pull(('a','b'), targets=0))
d.addCallback(lambda r: self.assert_(r==[[None,None]]))
return d
def testPushPullSerialized(self):
self.addEngine(1)
objs = [10,"hi there",1.2342354,{"p":(1,2)}]
d= self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[0])), targets=0)
d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0))
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d.addCallback(lambda r: self.assertEquals(r, objs[0]))
d.addCallback(lambda _: self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[1])), targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0))
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d.addCallback(lambda r: self.assertEquals(r, objs[1]))
d.addCallback(lambda _: self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[2])), targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0))
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d.addCallback(lambda r: self.assertEquals(r, objs[2]))
d.addCallback(lambda _: self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[3])), targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0))
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d.addCallback(lambda r: self.assertEquals(r, objs[3]))
d.addCallback(lambda _: self.multiengine.push(dict(a=10,b=range(5)), targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized(('a','b'), targets=0))
d.addCallback(lambda serial: [newserialized.IUnSerialized(s).getObject() for s in serial[0]])
d.addCallback(lambda r: self.assertEquals(r, [10, range(5)]))
d.addCallback(lambda _: self.multiengine.reset(targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0))
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
return d
objs = [10,"hi there",1.2342354,{"p":(1,2)}]
d= defer.succeed(None)
for o in objs:
self.multiengine.push_serialized(0, key=newserialized.serialize(o))
value = self.multiengine.pull_serialized(0, 'key')
value.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d = self.assertDeferredEquals(value,o,d)
return d
def runGetResultAll(self, d, cmd, shell):
actual = shell.execute(cmd)
d.addCallback(lambda _: self.multiengine.execute(cmd))
d.addCallback(lambda _: self.multiengine.get_result())
def compare(result):
for r in result:
actual['id'] = r['id']
self.assertEquals(r, actual)
d.addCallback(compare)
def testGetResultAll(self):
self.addEngine(4)
d= defer.Deferred()
shell = Interpreter()
for cmd in validCommands:
self.runGetResultAll(d, cmd, shell)
d.callback(None)
return d
def testGetResultDefault(self):
self.addEngine(1)
target = 0
cmd = 'a=5'
shell = self.createShell()
shellResult = shell.execute(cmd)
def popit(dikt, key):
dikt.pop(key)
return dikt
d= self.multiengine.execute(cmd, targets=target)
d.addCallback(lambda _: self.multiengine.get_result(targets=target))
d.addCallback(lambda r: self.assertEquals(shellResult, popit(r[0],'id')))
return d
def testGetResultFailure(self):
self.addEngine(1)
d= self.multiengine.get_result(None, targets=0)
d.addErrback(lambda f: self.assertRaises(IndexError, _raise_it, f))
d.addCallback(lambda _: self.multiengine.get_result(10, targets=0))
d.addErrback(lambda f: self.assertRaises(IndexError, _raise_it, f))
return d
def testPushFunction(self):
self.addEngine(1)
d= self.multiengine.push_function(dict(f=testf), targets=0)
d.addCallback(lambda _: self.multiengine.execute('result = f(10)', targets=0))
d.addCallback(lambda _: self.multiengine.pull('result', targets=0))
d.addCallback(lambda r: self.assertEquals(r[0], testf(10)))
d.addCallback(lambda _: self.multiengine.push(dict(globala=globala), targets=0))
d.addCallback(lambda _: self.multiengine.push_function(dict(g=testg), targets=0))
d.addCallback(lambda _: self.multiengine.execute('result = g(10)', targets=0))
d.addCallback(lambda _: self.multiengine.pull('result', targets=0))
d.addCallback(lambda r: self.assertEquals(r[0], testg(10)))
return d
def testPullFunction(self):
self.addEngine(1)
d= self.multiengine.push(dict(a=globala), targets=0)
d.addCallback(lambda _: self.multiengine.push_function(dict(f=testf), targets=0))
d.addCallback(lambda _: self.multiengine.pull_function('f', targets=0))
d.addCallback(lambda r: self.assertEquals(r[0](10), testf(10)))
d.addCallback(lambda _: self.multiengine.execute("def g(x): return x*x", targets=0))
d.addCallback(lambda _: self.multiengine.pull_function(('f','g'),targets=0))
d.addCallback(lambda r: self.assertEquals((r[0][0](10),r[0][1](10)), (testf(10), 100)))
return d
def testPushFunctionAll(self):
self.addEngine(4)
d= self.multiengine.push_function(dict(f=testf))
d.addCallback(lambda _: self.multiengine.execute('result = f(10)'))
d.addCallback(lambda _: self.multiengine.pull('result'))
d.addCallback(lambda r: self.assertEquals(r, 4*[testf(10)]))
d.addCallback(lambda _: self.multiengine.push(dict(globala=globala)))
d.addCallback(lambda _: self.multiengine.push_function(dict(testg=testg)))
d.addCallback(lambda _: self.multiengine.execute('result = testg(10)'))
d.addCallback(lambda _: self.multiengine.pull('result'))
d.addCallback(lambda r: self.assertEquals(r, 4*[testg(10)]))
return d
def testPullFunctionAll(self):
self.addEngine(4)
d= self.multiengine.push_function(dict(f=testf))
d.addCallback(lambda _: self.multiengine.pull_function('f'))
d.addCallback(lambda r: self.assertEquals([func(10) for func in r], 4*[testf(10)]))
return d
def testGetIDs(self):
self.addEngine(1)
d= self.multiengine.get_ids()
d.addCallback(lambda r: self.assertEquals(r, [0]))
d.addCallback(lambda _: self.addEngine(3))
d.addCallback(lambda _: self.multiengine.get_ids())
d.addCallback(lambda r: self.assertEquals(r, [0,1,2,3]))
return d
def testClearQueue(self):
self.addEngine(4)
d= self.multiengine.clear_queue()
d.addCallback(lambda r: self.assertEquals(r,4*[None]))
return d
def testQueueStatus(self):
self.addEngine(4)
d= self.multiengine.queue_status(targets=0)
d.addCallback(lambda r: self.assert_(isinstance(r[0],tuple)))
return d
def testGetSetProperties(self):
self.addEngine(4)
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
d= self.multiengine.set_properties(dikt)
d.addCallback(lambda r: self.multiengine.get_properties())
d.addCallback(lambda r: self.assertEquals(r, 4*[dikt]))
d.addCallback(lambda r: self.multiengine.get_properties(('c',)))
d.addCallback(lambda r: self.assertEquals(r, 4*[{'c': dikt['c']}]))
d.addCallback(lambda r: self.multiengine.set_properties(dict(c=False)))
d.addCallback(lambda r: self.multiengine.get_properties(('c', 'd')))
d.addCallback(lambda r: self.assertEquals(r, 4*[dict(c=False, d=None)]))
return d
def testClearProperties(self):
self.addEngine(4)
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
d= self.multiengine.set_properties(dikt)
d.addCallback(lambda r: self.multiengine.clear_properties())
d.addCallback(lambda r: self.multiengine.get_properties())
d.addCallback(lambda r: self.assertEquals(r, 4*[{}]))
return d
def testDelHasProperties(self):
self.addEngine(4)
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
d= self.multiengine.set_properties(dikt)
d.addCallback(lambda r: self.multiengine.del_properties(('b','e')))
d.addCallback(lambda r: self.multiengine.has_properties(('a','b','c','d','e')))
d.addCallback(lambda r: self.assertEquals(r, 4*[[True, False, True, True, False]]))
return d
Parametric(IMultiEngineTestCase)
#-------------------------------------------------------------------------------
# ISynchronousMultiEngineTestCase
#-------------------------------------------------------------------------------
class ISynchronousMultiEngineTestCase(IMultiEngineBaseTestCase):
def testISynchronousMultiEngineInterface(self):
"""Does self.engine claim to implement IEngineCore?"""
self.assert_(me.ISynchronousEngineMultiplexer.providedBy(self.multiengine))
self.assert_(me.ISynchronousMultiEngine.providedBy(self.multiengine))
def testExecute(self):
self.addEngine(4)
execute = self.multiengine.execute
d= execute('a=5', targets=0, block=True)
d.addCallback(lambda r: self.assert_(len(r)==1))
d.addCallback(lambda _: execute('b=10'))
d.addCallback(lambda r: self.assert_(len(r)==4))
d.addCallback(lambda _: execute('c=30', block=False))
d.addCallback(lambda did: self.assert_(isdid(did)))
d.addCallback(lambda _: execute('d=[0,1,2]', block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assert_(len(r)==4))
return d
def testPushPull(self):
data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
self.addEngine(4)
push = self.multiengine.push
pull = self.multiengine.pull
d= push({'data':data}, targets=0)
d.addCallback(lambda r: pull('data', targets=0))
d.addCallback(lambda r: self.assertEqual(r,[data]))
d.addCallback(lambda _: push({'data':data}))
d.addCallback(lambda r: pull('data'))
d.addCallback(lambda r: self.assertEqual(r,4*[data]))
d.addCallback(lambda _: push({'data':data}, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda _: pull('data', block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEqual(r,4*[data]))
d.addCallback(lambda _: push(dict(a=10,b=20)))
d.addCallback(lambda _: pull(('a','b')))
d.addCallback(lambda r: self.assertEquals(r, 4*[[10,20]]))
return d
def testPushPullFunction(self):
self.addEngine(4)
pushf = self.multiengine.push_function
pullf = self.multiengine.pull_function
push = self.multiengine.push
pull = self.multiengine.pull
execute = self.multiengine.execute
d= pushf({'testf':testf}, targets=0)
d.addCallback(lambda r: pullf('testf', targets=0))
d.addCallback(lambda r: self.assertEqual(r[0](1.0), testf(1.0)))
d.addCallback(lambda _: execute('r = testf(10)', targets=0))
d.addCallback(lambda _: pull('r', targets=0))
d.addCallback(lambda r: self.assertEquals(r[0], testf(10)))
d.addCallback(lambda _: pushf({'testf':testf}, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda _: pullf('testf', block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEqual(r[0](1.0), testf(1.0)))
d.addCallback(lambda _: execute("def g(x): return x*x", targets=0))
d.addCallback(lambda _: pullf(('testf','g'),targets=0))
d.addCallback(lambda r: self.assertEquals((r[0][0](10),r[0][1](10)), (testf(10), 100)))
return d
def testGetResult(self):
shell = Interpreter()
result1 = shell.execute('a=10')
result1['id'] = 0
result2 = shell.execute('b=20')
result2['id'] = 0
execute= self.multiengine.execute
get_result = self.multiengine.get_result
self.addEngine(1)
d= execute('a=10')
d.addCallback(lambda _: get_result())
d.addCallback(lambda r: self.assertEquals(r[0], result1))
d.addCallback(lambda _: execute('b=20'))
d.addCallback(lambda _: get_result(1))
d.addCallback(lambda r: self.assertEquals(r[0], result1))
d.addCallback(lambda _: get_result(2, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r[0], result2))
return d
def testResetAndKeys(self):
self.addEngine(1)
#Blocking mode
d= self.multiengine.push(dict(a=10, b=20, c=range(10)), targets=0)
d.addCallback(lambda _: self.multiengine.keys(targets=0))
def keys_found(keys):
self.assert_('a' in keys[0])
self.assert_('b' in keys[0])
self.assert_('b' in keys[0])
d.addCallback(keys_found)
d.addCallback(lambda _: self.multiengine.reset(targets=0))
d.addCallback(lambda _: self.multiengine.keys(targets=0))
def keys_not_found(keys):
self.assert_('a' not in keys[0])
self.assert_('b' not in keys[0])
self.assert_('b' not in keys[0])
d.addCallback(keys_not_found)
#Non-blocking mode
d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20, c=range(10)), targets=0))
d.addCallback(lambda _: self.multiengine.keys(targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
def keys_found(keys):
self.assert_('a' in keys[0])
self.assert_('b' in keys[0])
self.assert_('b' in keys[0])
d.addCallback(keys_found)
d.addCallback(lambda _: self.multiengine.reset(targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda _: self.multiengine.keys(targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
def keys_not_found(keys):
self.assert_('a' not in keys[0])
self.assert_('b' not in keys[0])
self.assert_('b' not in keys[0])
d.addCallback(keys_not_found)
return d
def testPushPullSerialized(self):
self.addEngine(1)
dikt = dict(a=10,b='hi there',c=1.2345,d={'p':(1,2)})
sdikt = {}
for k,v in dikt.iteritems():
sdikt[k] = newserialized.serialize(v)
d= self.multiengine.push_serialized(dict(a=sdikt['a']), targets=0)
d.addCallback(lambda _: self.multiengine.pull('a',targets=0))
d.addCallback(lambda r: self.assertEquals(r[0], dikt['a']))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0))
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d.addCallback(lambda r: self.assertEquals(r, dikt['a']))
d.addCallback(lambda _: self.multiengine.push_serialized(sdikt, targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized(sdikt.keys(), targets=0))
d.addCallback(lambda serial: [newserialized.IUnSerialized(s).getObject() for s in serial[0]])
d.addCallback(lambda r: self.assertEquals(r, dikt.values()))
d.addCallback(lambda _: self.multiengine.reset(targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0))
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
#Non-blocking mode
d.addCallback(lambda r: self.multiengine.push_serialized(dict(a=sdikt['a']), targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda _: self.multiengine.pull('a',targets=0))
d.addCallback(lambda r: self.assertEquals(r[0], dikt['a']))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
d.addCallback(lambda r: self.assertEquals(r, dikt['a']))
d.addCallback(lambda _: self.multiengine.push_serialized(sdikt, targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda _: self.multiengine.pull_serialized(sdikt.keys(), targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda serial: [newserialized.IUnSerialized(s).getObject() for s in serial[0]])
d.addCallback(lambda r: self.assertEquals(r, dikt.values()))
d.addCallback(lambda _: self.multiengine.reset(targets=0))
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
return d
def testClearQueue(self):
self.addEngine(4)
d= self.multiengine.clear_queue()
d.addCallback(lambda r: self.multiengine.clear_queue(block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r,4*[None]))
return d
def testQueueStatus(self):
self.addEngine(4)
d= self.multiengine.queue_status(targets=0)
d.addCallback(lambda r: self.assert_(isinstance(r[0],tuple)))
d.addCallback(lambda r: self.multiengine.queue_status(targets=0, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assert_(isinstance(r[0],tuple)))
return d
def testGetIDs(self):
self.addEngine(1)
d= self.multiengine.get_ids()
d.addCallback(lambda r: self.assertEquals(r, [0]))
d.addCallback(lambda _: self.addEngine(3))
d.addCallback(lambda _: self.multiengine.get_ids())
d.addCallback(lambda r: self.assertEquals(r, [0,1,2,3]))
return d
def testGetSetProperties(self):
self.addEngine(4)
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
d= self.multiengine.set_properties(dikt)
d.addCallback(lambda r: self.multiengine.get_properties())
d.addCallback(lambda r: self.assertEquals(r, 4*[dikt]))
d.addCallback(lambda r: self.multiengine.get_properties(('c',)))
d.addCallback(lambda r: self.assertEquals(r, 4*[{'c': dikt['c']}]))
d.addCallback(lambda r: self.multiengine.set_properties(dict(c=False)))
d.addCallback(lambda r: self.multiengine.get_properties(('c', 'd')))
d.addCallback(lambda r: self.assertEquals(r, 4*[dict(c=False, d=None)]))
#Non-blocking
d.addCallback(lambda r: self.multiengine.set_properties(dikt, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.get_properties(block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r, 4*[dikt]))
d.addCallback(lambda r: self.multiengine.get_properties(('c',), block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r, 4*[{'c': dikt['c']}]))
d.addCallback(lambda r: self.multiengine.set_properties(dict(c=False), block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.get_properties(('c', 'd'), block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r, 4*[dict(c=False, d=None)]))
return d
def testClearProperties(self):
self.addEngine(4)
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
d= self.multiengine.set_properties(dikt)
d.addCallback(lambda r: self.multiengine.clear_properties())
d.addCallback(lambda r: self.multiengine.get_properties())
d.addCallback(lambda r: self.assertEquals(r, 4*[{}]))
#Non-blocking
d.addCallback(lambda r: self.multiengine.set_properties(dikt, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.clear_properties(block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.get_properties(block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r, 4*[{}]))
return d
def testDelHasProperties(self):
self.addEngine(4)
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
d= self.multiengine.set_properties(dikt)
d.addCallback(lambda r: self.multiengine.del_properties(('b','e')))
d.addCallback(lambda r: self.multiengine.has_properties(('a','b','c','d','e')))
d.addCallback(lambda r: self.assertEquals(r, 4*[[True, False, True, True, False]]))
#Non-blocking
d.addCallback(lambda r: self.multiengine.set_properties(dikt, block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.del_properties(('b','e'), block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.has_properties(('a','b','c','d','e'), block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r, 4*[[True, False, True, True, False]]))
return d
def test_clear_pending_deferreds(self):
self.addEngine(4)
did_list = []
d= self.multiengine.execute('a=10',block=False)
d.addCallback(lambda did: did_list.append(did))
d.addCallback(lambda _: self.multiengine.push(dict(b=10),block=False))
d.addCallback(lambda did: did_list.append(did))
d.addCallback(lambda _: self.multiengine.pull(('a','b'),block=False))
d.addCallback(lambda did: did_list.append(did))
d.addCallback(lambda _: self.multiengine.clear_pending_deferreds())
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[0],True))
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[1],True))
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[2],True))
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
return d
#-------------------------------------------------------------------------------
# Coordinator test cases
#-------------------------------------------------------------------------------
class IMultiEngineCoordinatorTestCase(object):
def testScatterGather(self):
self.addEngine(4)
d= self.multiengine.scatter('a', range(16))
d.addCallback(lambda r: self.multiengine.gather('a'))
d.addCallback(lambda r: self.assertEquals(r, range(16)))
d.addCallback(lambda _: self.multiengine.gather('asdf'))
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
return d
def testScatterGatherNumpy(self):
try:
import numpy
from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
except:
return
else:
self.addEngine(4)
a = numpy.arange(16)
d = self.multiengine.scatter('a', a)
d.addCallback(lambda r: self.multiengine.gather('a'))
d.addCallback(lambda r: assert_array_equal(r, a))
return d
def testMap(self):
self.addEngine(4)
def f(x):
return x**2
data = range(16)
d= self.multiengine.map(f, data)
d.addCallback(lambda r: self.assertEquals(r,[f(x) for x in data]))
return d
class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase):
def testScatterGatherNonblocking(self):
self.addEngine(4)
d= self.multiengine.scatter('a', range(16), block=False)
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.gather('a', block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r, range(16)))
return d
def testScatterGatherNumpyNonblocking(self):
try:
import numpy
from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
except:
return
else:
self.addEngine(4)
a = numpy.arange(16)
d = self.multiengine.scatter('a', a, block=False)
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.gather('a', block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: assert_array_equal(r, a))
return d
def testMapNonblocking(self):
self.addEngine(4)
def f(x):
return x**2
data = range(16)
d= self.multiengine.map(f, data, block=False)
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assertEquals(r,[f(x) for x in data]))
return d
def test_clear_pending_deferreds(self):
self.addEngine(4)
did_list = []
d= self.multiengine.scatter('a',range(16),block=False)
d.addCallback(lambda did: did_list.append(did))
d.addCallback(lambda _: self.multiengine.gather('a',block=False))
d.addCallback(lambda did: did_list.append(did))
d.addCallback(lambda _: self.multiengine.map(lambda x: x, range(16),block=False))
d.addCallback(lambda did: did_list.append(did))
d.addCallback(lambda _: self.multiengine.clear_pending_deferreds())
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[0],True))
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[1],True))
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[2],True))
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
return d
#-------------------------------------------------------------------------------
# Extras test cases
#-------------------------------------------------------------------------------
class IMultiEngineExtrasTestCase(object):
def testZipPull(self):
self.addEngine(4)
d= self.multiengine.push(dict(a=10,b=20))
d.addCallback(lambda r: self.multiengine.zip_pull(('a','b')))
d.addCallback(lambda r: self.assert_(r, [4*[10],4*[20]]))
return d
def testRun(self):
self.addEngine(4)
import tempfile
fname = tempfile.mktemp('foo.py')
f= open(fname, 'w')
f.write('a = 10\nb=30')
f.close()
d= self.multiengine.run(fname)
d.addCallback(lambda r: self.multiengine.pull(('a','b')))
d.addCallback(lambda r: self.assertEquals(r, 4*[[10,30]]))
return d
class ISynchronousMultiEngineExtrasTestCase(IMultiEngineExtrasTestCase):
def testZipPullNonblocking(self):
self.addEngine(4)
d= self.multiengine.push(dict(a=10,b=20))
d.addCallback(lambda r: self.multiengine.zip_pull(('a','b'), block=False))
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.assert_(r, [4*[10],4*[20]]))
return d
def testRunNonblocking(self):
self.addEngine(4)
import tempfile
fname = tempfile.mktemp('foo.py')
f= open(fname, 'w')
f.write('a = 10\nb=30')
f.close()
d= self.multiengine.run(fname, block=False)
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
d.addCallback(lambda r: self.multiengine.pull(('a','b')))
d.addCallback(lambda r: self.assertEquals(r, 4*[[10,30]]))
return d
#-------------------------------------------------------------------------------
# IFullSynchronousMultiEngineTestCase
#-------------------------------------------------------------------------------
class IFullSynchronousMultiEngineTestCase(ISynchronousMultiEngineTestCase,
ISynchronousMultiEngineCoordinatorTestCase,
ISynchronousMultiEngineExtrasTestCase):
pass