|
|
# encoding: utf-8
|
|
|
|
|
|
""""""
|
|
|
|
|
|
__docformat__ = "restructuredtext en"
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Copyright (C) 2008 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
from twisted.internet import defer
|
|
|
|
|
|
from IPython.kernel import engineservice as es
|
|
|
from IPython.kernel import multiengine as me
|
|
|
from IPython.kernel import newserialized
|
|
|
from IPython.kernel.error import NotDefined
|
|
|
from IPython.testing import util
|
|
|
from IPython.testing.parametric import parametric, Parametric
|
|
|
from IPython.kernel import newserialized
|
|
|
from IPython.kernel.util import printer
|
|
|
from IPython.kernel.error import (InvalidEngineID,
|
|
|
NoEnginesRegistered,
|
|
|
CompositeError,
|
|
|
InvalidDeferredID)
|
|
|
from IPython.kernel.tests.engineservicetest import validCommands, invalidCommands
|
|
|
from IPython.kernel.core.interpreter import Interpreter
|
|
|
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Base classes and utilities
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
class IMultiEngineBaseTestCase(object):
|
|
|
"""Basic utilities for working with multiengine tests.
|
|
|
|
|
|
Some subclass should define:
|
|
|
|
|
|
* self.multiengine
|
|
|
* self.engines to keep track of engines for clean up"""
|
|
|
|
|
|
def createShell(self):
|
|
|
return Interpreter()
|
|
|
|
|
|
def addEngine(self, n=1):
|
|
|
for i in range(n):
|
|
|
e = es.EngineService()
|
|
|
e.startService()
|
|
|
regDict = self.controller.register_engine(es.QueuedEngine(e), None)
|
|
|
e.id = regDict['id']
|
|
|
self.engines.append(e)
|
|
|
|
|
|
|
|
|
def testf(x):
|
|
|
return 2.0*x
|
|
|
|
|
|
|
|
|
globala = 99
|
|
|
|
|
|
|
|
|
def testg(x):
|
|
|
return globala*x
|
|
|
|
|
|
|
|
|
def isdid(did):
|
|
|
if not isinstance(did, str):
|
|
|
return False
|
|
|
if not len(did)==40:
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
|
|
|
def _raise_it(f):
|
|
|
try:
|
|
|
f.raiseException()
|
|
|
except CompositeError, e:
|
|
|
e.raise_exception()
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# IMultiEngineTestCase
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
class IMultiEngineTestCase(IMultiEngineBaseTestCase):
|
|
|
"""A test for any object that implements IEngineMultiplexer.
|
|
|
|
|
|
self.multiengine must be defined and implement IEngineMultiplexer.
|
|
|
"""
|
|
|
|
|
|
def testIMultiEngineInterface(self):
|
|
|
"""Does self.engine claim to implement IEngineCore?"""
|
|
|
self.assert_(me.IEngineMultiplexer.providedBy(self.multiengine))
|
|
|
self.assert_(me.IMultiEngine.providedBy(self.multiengine))
|
|
|
|
|
|
def testIEngineMultiplexerInterfaceMethods(self):
|
|
|
"""Does self.engine have the methods and attributes in IEngineCore."""
|
|
|
for m in list(me.IEngineMultiplexer):
|
|
|
self.assert_(hasattr(self.multiengine, m))
|
|
|
|
|
|
def testIEngineMultiplexerDeferreds(self):
|
|
|
self.addEngine(1)
|
|
|
d= self.multiengine.execute('a=5', targets=0)
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(a=5),targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(a=5, b='asdf', c=[1,2,3]),targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull(('a','b','c'),targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.get_result(targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.reset(targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.keys(targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.push_serialized(dict(a=newserialized.serialize(10)),targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized('a',targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.clear_queue(targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.queue_status(targets=0))
|
|
|
return d
|
|
|
|
|
|
def testInvalidEngineID(self):
|
|
|
self.addEngine(1)
|
|
|
badID = 100
|
|
|
d = self.multiengine.execute('a=5', targets=badID)
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(a=5), targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.pull('a', targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.reset(targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.keys(targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.push_serialized(dict(a=newserialized.serialize(10)), targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.queue_status(targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidEngineID, f.raiseException))
|
|
|
return d
|
|
|
|
|
|
def testNoEnginesRegistered(self):
|
|
|
badID = 'all'
|
|
|
d= self.multiengine.execute('a=5', targets=badID)
|
|
|
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(a=5), targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.pull('a', targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.get_result(targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.reset(targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.keys(targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.push_serialized(dict(a=newserialized.serialize(10)), targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.queue_status(targets=badID))
|
|
|
d.addErrback(lambda f: self.assertRaises(NoEnginesRegistered, f.raiseException))
|
|
|
return d
|
|
|
|
|
|
def runExecuteAll(self, d, cmd, shell):
|
|
|
actual = shell.execute(cmd)
|
|
|
d.addCallback(lambda _: self.multiengine.execute(cmd))
|
|
|
def compare(result):
|
|
|
for r in result:
|
|
|
actual['id'] = r['id']
|
|
|
self.assertEquals(r, actual)
|
|
|
d.addCallback(compare)
|
|
|
|
|
|
def testExecuteAll(self):
|
|
|
self.addEngine(4)
|
|
|
d= defer.Deferred()
|
|
|
shell = Interpreter()
|
|
|
for cmd in validCommands:
|
|
|
self.runExecuteAll(d, cmd, shell)
|
|
|
d.callback(None)
|
|
|
return d
|
|
|
|
|
|
# The following two methods show how to do parametrized
|
|
|
# tests. This is really slick! Same is used above.
|
|
|
def runExecuteFailures(self, cmd, exc):
|
|
|
self.addEngine(4)
|
|
|
d= self.multiengine.execute(cmd)
|
|
|
d.addErrback(lambda f: self.assertRaises(exc, _raise_it, f))
|
|
|
return d
|
|
|
|
|
|
@parametric
|
|
|
def testExecuteFailures(cls):
|
|
|
return [(cls.runExecuteFailures,cmd,exc) for
|
|
|
cmd,exc in invalidCommands]
|
|
|
|
|
|
def testPushPull(self):
|
|
|
self.addEngine(1)
|
|
|
objs = [10,"hi there",1.2342354,{"p":(1,2)}]
|
|
|
d= self.multiengine.push(dict(key=objs[0]), targets=0)
|
|
|
d.addCallback(lambda _: self.multiengine.pull('key', targets=0))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, [objs[0]]))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(key=objs[1]), targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull('key', targets=0))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, [objs[1]]))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(key=objs[2]), targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull('key', targets=0))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, [objs[2]]))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(key=objs[3]), targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull('key', targets=0))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, [objs[3]]))
|
|
|
d.addCallback(lambda _: self.multiengine.reset(targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull('a', targets=0))
|
|
|
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(a=10,b=20)))
|
|
|
d.addCallback(lambda _: self.multiengine.pull(('a','b')))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, [[10,20]]))
|
|
|
return d
|
|
|
|
|
|
def testPushPullAll(self):
|
|
|
self.addEngine(4)
|
|
|
d= self.multiengine.push(dict(a=10))
|
|
|
d.addCallback(lambda _: self.multiengine.pull('a'))
|
|
|
d.addCallback(lambda r: self.assert_(r==[10,10,10,10]))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20)))
|
|
|
d.addCallback(lambda _: self.multiengine.pull(('a','b')))
|
|
|
d.addCallback(lambda r: self.assert_(r==4*[[10,20]]))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20), targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull(('a','b'), targets=0))
|
|
|
d.addCallback(lambda r: self.assert_(r==[[10,20]]))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(a=None, b=None), targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull(('a','b'), targets=0))
|
|
|
d.addCallback(lambda r: self.assert_(r==[[None,None]]))
|
|
|
return d
|
|
|
|
|
|
def testPushPullSerialized(self):
|
|
|
self.addEngine(1)
|
|
|
objs = [10,"hi there",1.2342354,{"p":(1,2)}]
|
|
|
d= self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[0])), targets=0)
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0))
|
|
|
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
|
|
|
d.addCallback(lambda r: self.assertEquals(r, objs[0]))
|
|
|
d.addCallback(lambda _: self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[1])), targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0))
|
|
|
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
|
|
|
d.addCallback(lambda r: self.assertEquals(r, objs[1]))
|
|
|
d.addCallback(lambda _: self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[2])), targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0))
|
|
|
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
|
|
|
d.addCallback(lambda r: self.assertEquals(r, objs[2]))
|
|
|
d.addCallback(lambda _: self.multiengine.push_serialized(dict(key=newserialized.serialize(objs[3])), targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized('key', targets=0))
|
|
|
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
|
|
|
d.addCallback(lambda r: self.assertEquals(r, objs[3]))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(a=10,b=range(5)), targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized(('a','b'), targets=0))
|
|
|
d.addCallback(lambda serial: [newserialized.IUnSerialized(s).getObject() for s in serial[0]])
|
|
|
d.addCallback(lambda r: self.assertEquals(r, [10, range(5)]))
|
|
|
d.addCallback(lambda _: self.multiengine.reset(targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0))
|
|
|
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
|
|
|
return d
|
|
|
|
|
|
objs = [10,"hi there",1.2342354,{"p":(1,2)}]
|
|
|
d= defer.succeed(None)
|
|
|
for o in objs:
|
|
|
self.multiengine.push_serialized(0, key=newserialized.serialize(o))
|
|
|
value = self.multiengine.pull_serialized(0, 'key')
|
|
|
value.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
|
|
|
d = self.assertDeferredEquals(value,o,d)
|
|
|
return d
|
|
|
|
|
|
def runGetResultAll(self, d, cmd, shell):
|
|
|
actual = shell.execute(cmd)
|
|
|
d.addCallback(lambda _: self.multiengine.execute(cmd))
|
|
|
d.addCallback(lambda _: self.multiengine.get_result())
|
|
|
def compare(result):
|
|
|
for r in result:
|
|
|
actual['id'] = r['id']
|
|
|
self.assertEquals(r, actual)
|
|
|
d.addCallback(compare)
|
|
|
|
|
|
def testGetResultAll(self):
|
|
|
self.addEngine(4)
|
|
|
d= defer.Deferred()
|
|
|
shell = Interpreter()
|
|
|
for cmd in validCommands:
|
|
|
self.runGetResultAll(d, cmd, shell)
|
|
|
d.callback(None)
|
|
|
return d
|
|
|
|
|
|
def testGetResultDefault(self):
|
|
|
self.addEngine(1)
|
|
|
target = 0
|
|
|
cmd = 'a=5'
|
|
|
shell = self.createShell()
|
|
|
shellResult = shell.execute(cmd)
|
|
|
def popit(dikt, key):
|
|
|
dikt.pop(key)
|
|
|
return dikt
|
|
|
d= self.multiengine.execute(cmd, targets=target)
|
|
|
d.addCallback(lambda _: self.multiengine.get_result(targets=target))
|
|
|
d.addCallback(lambda r: self.assertEquals(shellResult, popit(r[0],'id')))
|
|
|
return d
|
|
|
|
|
|
def testGetResultFailure(self):
|
|
|
self.addEngine(1)
|
|
|
d= self.multiengine.get_result(None, targets=0)
|
|
|
d.addErrback(lambda f: self.assertRaises(IndexError, _raise_it, f))
|
|
|
d.addCallback(lambda _: self.multiengine.get_result(10, targets=0))
|
|
|
d.addErrback(lambda f: self.assertRaises(IndexError, _raise_it, f))
|
|
|
return d
|
|
|
|
|
|
def testPushFunction(self):
|
|
|
self.addEngine(1)
|
|
|
d= self.multiengine.push_function(dict(f=testf), targets=0)
|
|
|
d.addCallback(lambda _: self.multiengine.execute('result = f(10)', targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull('result', targets=0))
|
|
|
d.addCallback(lambda r: self.assertEquals(r[0], testf(10)))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(globala=globala), targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.push_function(dict(g=testg), targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.execute('result = g(10)', targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull('result', targets=0))
|
|
|
d.addCallback(lambda r: self.assertEquals(r[0], testg(10)))
|
|
|
return d
|
|
|
|
|
|
def testPullFunction(self):
|
|
|
self.addEngine(1)
|
|
|
d= self.multiengine.push(dict(a=globala), targets=0)
|
|
|
d.addCallback(lambda _: self.multiengine.push_function(dict(f=testf), targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_function('f', targets=0))
|
|
|
d.addCallback(lambda r: self.assertEquals(r[0](10), testf(10)))
|
|
|
d.addCallback(lambda _: self.multiengine.execute("def g(x): return x*x", targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_function(('f','g'),targets=0))
|
|
|
d.addCallback(lambda r: self.assertEquals((r[0][0](10),r[0][1](10)), (testf(10), 100)))
|
|
|
return d
|
|
|
|
|
|
def testPushFunctionAll(self):
|
|
|
self.addEngine(4)
|
|
|
d= self.multiengine.push_function(dict(f=testf))
|
|
|
d.addCallback(lambda _: self.multiengine.execute('result = f(10)'))
|
|
|
d.addCallback(lambda _: self.multiengine.pull('result'))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[testf(10)]))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(globala=globala)))
|
|
|
d.addCallback(lambda _: self.multiengine.push_function(dict(testg=testg)))
|
|
|
d.addCallback(lambda _: self.multiengine.execute('result = testg(10)'))
|
|
|
d.addCallback(lambda _: self.multiengine.pull('result'))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[testg(10)]))
|
|
|
return d
|
|
|
|
|
|
def testPullFunctionAll(self):
|
|
|
self.addEngine(4)
|
|
|
d= self.multiengine.push_function(dict(f=testf))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_function('f'))
|
|
|
d.addCallback(lambda r: self.assertEquals([func(10) for func in r], 4*[testf(10)]))
|
|
|
return d
|
|
|
|
|
|
def testGetIDs(self):
|
|
|
self.addEngine(1)
|
|
|
d= self.multiengine.get_ids()
|
|
|
d.addCallback(lambda r: self.assertEquals(r, [0]))
|
|
|
d.addCallback(lambda _: self.addEngine(3))
|
|
|
d.addCallback(lambda _: self.multiengine.get_ids())
|
|
|
d.addCallback(lambda r: self.assertEquals(r, [0,1,2,3]))
|
|
|
return d
|
|
|
|
|
|
def testClearQueue(self):
|
|
|
self.addEngine(4)
|
|
|
d= self.multiengine.clear_queue()
|
|
|
d.addCallback(lambda r: self.assertEquals(r,4*[None]))
|
|
|
return d
|
|
|
|
|
|
def testQueueStatus(self):
|
|
|
self.addEngine(4)
|
|
|
d= self.multiengine.queue_status(targets=0)
|
|
|
d.addCallback(lambda r: self.assert_(isinstance(r[0],tuple)))
|
|
|
return d
|
|
|
|
|
|
def testGetSetProperties(self):
|
|
|
self.addEngine(4)
|
|
|
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
|
|
|
d= self.multiengine.set_properties(dikt)
|
|
|
d.addCallback(lambda r: self.multiengine.get_properties())
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[dikt]))
|
|
|
d.addCallback(lambda r: self.multiengine.get_properties(('c',)))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[{'c': dikt['c']}]))
|
|
|
d.addCallback(lambda r: self.multiengine.set_properties(dict(c=False)))
|
|
|
d.addCallback(lambda r: self.multiengine.get_properties(('c', 'd')))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[dict(c=False, d=None)]))
|
|
|
return d
|
|
|
|
|
|
def testClearProperties(self):
|
|
|
self.addEngine(4)
|
|
|
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
|
|
|
d= self.multiengine.set_properties(dikt)
|
|
|
d.addCallback(lambda r: self.multiengine.clear_properties())
|
|
|
d.addCallback(lambda r: self.multiengine.get_properties())
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[{}]))
|
|
|
return d
|
|
|
|
|
|
def testDelHasProperties(self):
|
|
|
self.addEngine(4)
|
|
|
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
|
|
|
d= self.multiengine.set_properties(dikt)
|
|
|
d.addCallback(lambda r: self.multiengine.del_properties(('b','e')))
|
|
|
d.addCallback(lambda r: self.multiengine.has_properties(('a','b','c','d','e')))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[[True, False, True, True, False]]))
|
|
|
return d
|
|
|
|
|
|
Parametric(IMultiEngineTestCase)
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# ISynchronousMultiEngineTestCase
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
class ISynchronousMultiEngineTestCase(IMultiEngineBaseTestCase):
|
|
|
|
|
|
def testISynchronousMultiEngineInterface(self):
|
|
|
"""Does self.engine claim to implement IEngineCore?"""
|
|
|
self.assert_(me.ISynchronousEngineMultiplexer.providedBy(self.multiengine))
|
|
|
self.assert_(me.ISynchronousMultiEngine.providedBy(self.multiengine))
|
|
|
|
|
|
def testExecute(self):
|
|
|
self.addEngine(4)
|
|
|
execute = self.multiengine.execute
|
|
|
d= execute('a=5', targets=0, block=True)
|
|
|
d.addCallback(lambda r: self.assert_(len(r)==1))
|
|
|
d.addCallback(lambda _: execute('b=10'))
|
|
|
d.addCallback(lambda r: self.assert_(len(r)==4))
|
|
|
d.addCallback(lambda _: execute('c=30', block=False))
|
|
|
d.addCallback(lambda did: self.assert_(isdid(did)))
|
|
|
d.addCallback(lambda _: execute('d=[0,1,2]', block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assert_(len(r)==4))
|
|
|
return d
|
|
|
|
|
|
def testPushPull(self):
|
|
|
data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
|
|
|
self.addEngine(4)
|
|
|
push = self.multiengine.push
|
|
|
pull = self.multiengine.pull
|
|
|
d= push({'data':data}, targets=0)
|
|
|
d.addCallback(lambda r: pull('data', targets=0))
|
|
|
d.addCallback(lambda r: self.assertEqual(r,[data]))
|
|
|
d.addCallback(lambda _: push({'data':data}))
|
|
|
d.addCallback(lambda r: pull('data'))
|
|
|
d.addCallback(lambda r: self.assertEqual(r,4*[data]))
|
|
|
d.addCallback(lambda _: push({'data':data}, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda _: pull('data', block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assertEqual(r,4*[data]))
|
|
|
d.addCallback(lambda _: push(dict(a=10,b=20)))
|
|
|
d.addCallback(lambda _: pull(('a','b')))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[[10,20]]))
|
|
|
return d
|
|
|
|
|
|
def testPushPullFunction(self):
|
|
|
self.addEngine(4)
|
|
|
pushf = self.multiengine.push_function
|
|
|
pullf = self.multiengine.pull_function
|
|
|
push = self.multiengine.push
|
|
|
pull = self.multiengine.pull
|
|
|
execute = self.multiengine.execute
|
|
|
d= pushf({'testf':testf}, targets=0)
|
|
|
d.addCallback(lambda r: pullf('testf', targets=0))
|
|
|
d.addCallback(lambda r: self.assertEqual(r[0](1.0), testf(1.0)))
|
|
|
d.addCallback(lambda _: execute('r = testf(10)', targets=0))
|
|
|
d.addCallback(lambda _: pull('r', targets=0))
|
|
|
d.addCallback(lambda r: self.assertEquals(r[0], testf(10)))
|
|
|
d.addCallback(lambda _: pushf({'testf':testf}, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda _: pullf('testf', block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assertEqual(r[0](1.0), testf(1.0)))
|
|
|
d.addCallback(lambda _: execute("def g(x): return x*x", targets=0))
|
|
|
d.addCallback(lambda _: pullf(('testf','g'),targets=0))
|
|
|
d.addCallback(lambda r: self.assertEquals((r[0][0](10),r[0][1](10)), (testf(10), 100)))
|
|
|
return d
|
|
|
|
|
|
def testGetResult(self):
|
|
|
shell = Interpreter()
|
|
|
result1 = shell.execute('a=10')
|
|
|
result1['id'] = 0
|
|
|
result2 = shell.execute('b=20')
|
|
|
result2['id'] = 0
|
|
|
execute= self.multiengine.execute
|
|
|
get_result = self.multiengine.get_result
|
|
|
self.addEngine(1)
|
|
|
d= execute('a=10')
|
|
|
d.addCallback(lambda _: get_result())
|
|
|
d.addCallback(lambda r: self.assertEquals(r[0], result1))
|
|
|
d.addCallback(lambda _: execute('b=20'))
|
|
|
d.addCallback(lambda _: get_result(1))
|
|
|
d.addCallback(lambda r: self.assertEquals(r[0], result1))
|
|
|
d.addCallback(lambda _: get_result(2, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assertEquals(r[0], result2))
|
|
|
return d
|
|
|
|
|
|
def testResetAndKeys(self):
|
|
|
self.addEngine(1)
|
|
|
|
|
|
#Blocking mode
|
|
|
d= self.multiengine.push(dict(a=10, b=20, c=range(10)), targets=0)
|
|
|
d.addCallback(lambda _: self.multiengine.keys(targets=0))
|
|
|
def keys_found(keys):
|
|
|
self.assert_('a' in keys[0])
|
|
|
self.assert_('b' in keys[0])
|
|
|
self.assert_('b' in keys[0])
|
|
|
d.addCallback(keys_found)
|
|
|
d.addCallback(lambda _: self.multiengine.reset(targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.keys(targets=0))
|
|
|
def keys_not_found(keys):
|
|
|
self.assert_('a' not in keys[0])
|
|
|
self.assert_('b' not in keys[0])
|
|
|
self.assert_('b' not in keys[0])
|
|
|
d.addCallback(keys_not_found)
|
|
|
|
|
|
#Non-blocking mode
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(a=10, b=20, c=range(10)), targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.keys(targets=0, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
def keys_found(keys):
|
|
|
self.assert_('a' in keys[0])
|
|
|
self.assert_('b' in keys[0])
|
|
|
self.assert_('b' in keys[0])
|
|
|
d.addCallback(keys_found)
|
|
|
d.addCallback(lambda _: self.multiengine.reset(targets=0, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda _: self.multiengine.keys(targets=0, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
def keys_not_found(keys):
|
|
|
self.assert_('a' not in keys[0])
|
|
|
self.assert_('b' not in keys[0])
|
|
|
self.assert_('b' not in keys[0])
|
|
|
d.addCallback(keys_not_found)
|
|
|
|
|
|
return d
|
|
|
|
|
|
def testPushPullSerialized(self):
|
|
|
self.addEngine(1)
|
|
|
dikt = dict(a=10,b='hi there',c=1.2345,d={'p':(1,2)})
|
|
|
sdikt = {}
|
|
|
for k,v in dikt.iteritems():
|
|
|
sdikt[k] = newserialized.serialize(v)
|
|
|
d= self.multiengine.push_serialized(dict(a=sdikt['a']), targets=0)
|
|
|
d.addCallback(lambda _: self.multiengine.pull('a',targets=0))
|
|
|
d.addCallback(lambda r: self.assertEquals(r[0], dikt['a']))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0))
|
|
|
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
|
|
|
d.addCallback(lambda r: self.assertEquals(r, dikt['a']))
|
|
|
d.addCallback(lambda _: self.multiengine.push_serialized(sdikt, targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized(sdikt.keys(), targets=0))
|
|
|
d.addCallback(lambda serial: [newserialized.IUnSerialized(s).getObject() for s in serial[0]])
|
|
|
d.addCallback(lambda r: self.assertEquals(r, dikt.values()))
|
|
|
d.addCallback(lambda _: self.multiengine.reset(targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0))
|
|
|
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
|
|
|
|
|
|
#Non-blocking mode
|
|
|
d.addCallback(lambda r: self.multiengine.push_serialized(dict(a=sdikt['a']), targets=0, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda _: self.multiengine.pull('a',targets=0))
|
|
|
d.addCallback(lambda r: self.assertEquals(r[0], dikt['a']))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda serial: newserialized.IUnSerialized(serial[0]).getObject())
|
|
|
d.addCallback(lambda r: self.assertEquals(r, dikt['a']))
|
|
|
d.addCallback(lambda _: self.multiengine.push_serialized(sdikt, targets=0, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized(sdikt.keys(), targets=0, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda serial: [newserialized.IUnSerialized(s).getObject() for s in serial[0]])
|
|
|
d.addCallback(lambda r: self.assertEquals(r, dikt.values()))
|
|
|
d.addCallback(lambda _: self.multiengine.reset(targets=0))
|
|
|
d.addCallback(lambda _: self.multiengine.pull_serialized('a', targets=0, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
|
|
|
return d
|
|
|
|
|
|
def testClearQueue(self):
|
|
|
self.addEngine(4)
|
|
|
d= self.multiengine.clear_queue()
|
|
|
d.addCallback(lambda r: self.multiengine.clear_queue(block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assertEquals(r,4*[None]))
|
|
|
return d
|
|
|
|
|
|
def testQueueStatus(self):
|
|
|
self.addEngine(4)
|
|
|
d= self.multiengine.queue_status(targets=0)
|
|
|
d.addCallback(lambda r: self.assert_(isinstance(r[0],tuple)))
|
|
|
d.addCallback(lambda r: self.multiengine.queue_status(targets=0, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assert_(isinstance(r[0],tuple)))
|
|
|
return d
|
|
|
|
|
|
def testGetIDs(self):
|
|
|
self.addEngine(1)
|
|
|
d= self.multiengine.get_ids()
|
|
|
d.addCallback(lambda r: self.assertEquals(r, [0]))
|
|
|
d.addCallback(lambda _: self.addEngine(3))
|
|
|
d.addCallback(lambda _: self.multiengine.get_ids())
|
|
|
d.addCallback(lambda r: self.assertEquals(r, [0,1,2,3]))
|
|
|
return d
|
|
|
|
|
|
def testGetSetProperties(self):
|
|
|
self.addEngine(4)
|
|
|
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
|
|
|
d= self.multiengine.set_properties(dikt)
|
|
|
d.addCallback(lambda r: self.multiengine.get_properties())
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[dikt]))
|
|
|
d.addCallback(lambda r: self.multiengine.get_properties(('c',)))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[{'c': dikt['c']}]))
|
|
|
d.addCallback(lambda r: self.multiengine.set_properties(dict(c=False)))
|
|
|
d.addCallback(lambda r: self.multiengine.get_properties(('c', 'd')))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[dict(c=False, d=None)]))
|
|
|
|
|
|
#Non-blocking
|
|
|
d.addCallback(lambda r: self.multiengine.set_properties(dikt, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.multiengine.get_properties(block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[dikt]))
|
|
|
d.addCallback(lambda r: self.multiengine.get_properties(('c',), block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[{'c': dikt['c']}]))
|
|
|
d.addCallback(lambda r: self.multiengine.set_properties(dict(c=False), block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.multiengine.get_properties(('c', 'd'), block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[dict(c=False, d=None)]))
|
|
|
return d
|
|
|
|
|
|
def testClearProperties(self):
|
|
|
self.addEngine(4)
|
|
|
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
|
|
|
d= self.multiengine.set_properties(dikt)
|
|
|
d.addCallback(lambda r: self.multiengine.clear_properties())
|
|
|
d.addCallback(lambda r: self.multiengine.get_properties())
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[{}]))
|
|
|
|
|
|
#Non-blocking
|
|
|
d.addCallback(lambda r: self.multiengine.set_properties(dikt, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.multiengine.clear_properties(block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.multiengine.get_properties(block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[{}]))
|
|
|
return d
|
|
|
|
|
|
def testDelHasProperties(self):
|
|
|
self.addEngine(4)
|
|
|
dikt = dict(a=5, b='asdf', c=True, d=None, e=range(5))
|
|
|
d= self.multiengine.set_properties(dikt)
|
|
|
d.addCallback(lambda r: self.multiengine.del_properties(('b','e')))
|
|
|
d.addCallback(lambda r: self.multiengine.has_properties(('a','b','c','d','e')))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[[True, False, True, True, False]]))
|
|
|
|
|
|
#Non-blocking
|
|
|
d.addCallback(lambda r: self.multiengine.set_properties(dikt, block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.multiengine.del_properties(('b','e'), block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.multiengine.has_properties(('a','b','c','d','e'), block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[[True, False, True, True, False]]))
|
|
|
return d
|
|
|
|
|
|
def test_clear_pending_deferreds(self):
|
|
|
self.addEngine(4)
|
|
|
did_list = []
|
|
|
d= self.multiengine.execute('a=10',block=False)
|
|
|
d.addCallback(lambda did: did_list.append(did))
|
|
|
d.addCallback(lambda _: self.multiengine.push(dict(b=10),block=False))
|
|
|
d.addCallback(lambda did: did_list.append(did))
|
|
|
d.addCallback(lambda _: self.multiengine.pull(('a','b'),block=False))
|
|
|
d.addCallback(lambda did: did_list.append(did))
|
|
|
d.addCallback(lambda _: self.multiengine.clear_pending_deferreds())
|
|
|
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[0],True))
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[1],True))
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[2],True))
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
|
|
|
return d
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Coordinator test cases
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
class IMultiEngineCoordinatorTestCase(object):
|
|
|
|
|
|
def testScatterGather(self):
|
|
|
self.addEngine(4)
|
|
|
d= self.multiengine.scatter('a', range(16))
|
|
|
d.addCallback(lambda r: self.multiengine.gather('a'))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, range(16)))
|
|
|
d.addCallback(lambda _: self.multiengine.gather('asdf'))
|
|
|
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
|
|
|
return d
|
|
|
|
|
|
def testScatterGatherNumpy(self):
|
|
|
try:
|
|
|
import numpy
|
|
|
from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
|
|
|
except:
|
|
|
return
|
|
|
else:
|
|
|
self.addEngine(4)
|
|
|
a = numpy.arange(16)
|
|
|
d = self.multiengine.scatter('a', a)
|
|
|
d.addCallback(lambda r: self.multiengine.gather('a'))
|
|
|
d.addCallback(lambda r: assert_array_equal(r, a))
|
|
|
return d
|
|
|
|
|
|
def testMap(self):
|
|
|
self.addEngine(4)
|
|
|
def f(x):
|
|
|
return x**2
|
|
|
data = range(16)
|
|
|
d= self.multiengine.map(f, data)
|
|
|
d.addCallback(lambda r: self.assertEquals(r,[f(x) for x in data]))
|
|
|
return d
|
|
|
|
|
|
|
|
|
class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase):
|
|
|
|
|
|
def testScatterGatherNonblocking(self):
|
|
|
self.addEngine(4)
|
|
|
d= self.multiengine.scatter('a', range(16), block=False)
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.multiengine.gather('a', block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, range(16)))
|
|
|
return d
|
|
|
|
|
|
def testScatterGatherNumpyNonblocking(self):
|
|
|
try:
|
|
|
import numpy
|
|
|
from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
|
|
|
except:
|
|
|
return
|
|
|
else:
|
|
|
self.addEngine(4)
|
|
|
a = numpy.arange(16)
|
|
|
d = self.multiengine.scatter('a', a, block=False)
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.multiengine.gather('a', block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: assert_array_equal(r, a))
|
|
|
return d
|
|
|
|
|
|
def testMapNonblocking(self):
|
|
|
self.addEngine(4)
|
|
|
def f(x):
|
|
|
return x**2
|
|
|
data = range(16)
|
|
|
d= self.multiengine.map(f, data, block=False)
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assertEquals(r,[f(x) for x in data]))
|
|
|
return d
|
|
|
|
|
|
def test_clear_pending_deferreds(self):
|
|
|
self.addEngine(4)
|
|
|
did_list = []
|
|
|
d= self.multiengine.scatter('a',range(16),block=False)
|
|
|
d.addCallback(lambda did: did_list.append(did))
|
|
|
d.addCallback(lambda _: self.multiengine.gather('a',block=False))
|
|
|
d.addCallback(lambda did: did_list.append(did))
|
|
|
d.addCallback(lambda _: self.multiengine.map(lambda x: x, range(16),block=False))
|
|
|
d.addCallback(lambda did: did_list.append(did))
|
|
|
d.addCallback(lambda _: self.multiengine.clear_pending_deferreds())
|
|
|
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[0],True))
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[1],True))
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
|
|
|
d.addCallback(lambda _: self.multiengine.get_pending_deferred(did_list[2],True))
|
|
|
d.addErrback(lambda f: self.assertRaises(InvalidDeferredID, f.raiseException))
|
|
|
return d
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Extras test cases
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
class IMultiEngineExtrasTestCase(object):
|
|
|
|
|
|
def testZipPull(self):
|
|
|
self.addEngine(4)
|
|
|
d= self.multiengine.push(dict(a=10,b=20))
|
|
|
d.addCallback(lambda r: self.multiengine.zip_pull(('a','b')))
|
|
|
d.addCallback(lambda r: self.assert_(r, [4*[10],4*[20]]))
|
|
|
return d
|
|
|
|
|
|
def testRun(self):
|
|
|
self.addEngine(4)
|
|
|
import tempfile
|
|
|
fname = tempfile.mktemp('foo.py')
|
|
|
f= open(fname, 'w')
|
|
|
f.write('a = 10\nb=30')
|
|
|
f.close()
|
|
|
d= self.multiengine.run(fname)
|
|
|
d.addCallback(lambda r: self.multiengine.pull(('a','b')))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[[10,30]]))
|
|
|
return d
|
|
|
|
|
|
|
|
|
class ISynchronousMultiEngineExtrasTestCase(IMultiEngineExtrasTestCase):
|
|
|
|
|
|
def testZipPullNonblocking(self):
|
|
|
self.addEngine(4)
|
|
|
d= self.multiengine.push(dict(a=10,b=20))
|
|
|
d.addCallback(lambda r: self.multiengine.zip_pull(('a','b'), block=False))
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.assert_(r, [4*[10],4*[20]]))
|
|
|
return d
|
|
|
|
|
|
def testRunNonblocking(self):
|
|
|
self.addEngine(4)
|
|
|
import tempfile
|
|
|
fname = tempfile.mktemp('foo.py')
|
|
|
f= open(fname, 'w')
|
|
|
f.write('a = 10\nb=30')
|
|
|
f.close()
|
|
|
d= self.multiengine.run(fname, block=False)
|
|
|
d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
|
|
|
d.addCallback(lambda r: self.multiengine.pull(('a','b')))
|
|
|
d.addCallback(lambda r: self.assertEquals(r, 4*[[10,30]]))
|
|
|
return d
|
|
|
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# IFullSynchronousMultiEngineTestCase
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
class IFullSynchronousMultiEngineTestCase(ISynchronousMultiEngineTestCase,
|
|
|
ISynchronousMultiEngineCoordinatorTestCase,
|
|
|
ISynchronousMultiEngineExtrasTestCase):
|
|
|
pass
|
|
|
|