Show More
tasktest.py
187 lines
| 7.3 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1234 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
__docformat__ = "restructuredtext en" | ||||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2008 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
import time | ||||
from IPython.kernel import task, engineservice as es | ||||
from IPython.kernel.util import printer | ||||
from IPython.kernel import error | ||||
#------------------------------------------------------------------------------- | ||||
# Tests | ||||
#------------------------------------------------------------------------------- | ||||
def _raise_it(f): | ||||
try: | ||||
f.raiseException() | ||||
except CompositeError, e: | ||||
e.raise_exception() | ||||
class TaskTestBase(object): | ||||
def addEngine(self, n=1): | ||||
for i in range(n): | ||||
e = es.EngineService() | ||||
e.startService() | ||||
regDict = self.controller.register_engine(es.QueuedEngine(e), None) | ||||
e.id = regDict['id'] | ||||
self.engines.append(e) | ||||
class ITaskControllerTestCase(TaskTestBase): | ||||
Brian E Granger
|
r1395 | def test_task_ids(self): | ||
Brian E Granger
|
r1234 | self.addEngine(1) | ||
Brian E Granger
|
r1395 | d = self.tc.run(task.StringTask('a=5')) | ||
Brian E Granger
|
r1234 | d.addCallback(lambda r: self.assertEquals(r, 0)) | ||
Brian E Granger
|
r1395 | d.addCallback(lambda r: self.tc.run(task.StringTask('a=5'))) | ||
Brian E Granger
|
r1234 | d.addCallback(lambda r: self.assertEquals(r, 1)) | ||
Brian E Granger
|
r1395 | d.addCallback(lambda r: self.tc.run(task.StringTask('a=5'))) | ||
Brian E Granger
|
r1234 | d.addCallback(lambda r: self.assertEquals(r, 2)) | ||
Brian E Granger
|
r1395 | d.addCallback(lambda r: self.tc.run(task.StringTask('a=5'))) | ||
Brian E Granger
|
r1234 | d.addCallback(lambda r: self.assertEquals(r, 3)) | ||
return d | ||||
Brian E Granger
|
r1395 | def test_abort(self): | ||
Brian E Granger
|
r1234 | """Cannot do a proper abort test, because blocking execution prevents | ||
abort from being called before task completes""" | ||||
self.addEngine(1) | ||||
Brian E Granger
|
r1395 | t = task.StringTask('a=5') | ||
Brian E Granger
|
r1234 | d = self.tc.abort(0) | ||
d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException)) | ||||
d.addCallback(lambda _:self.tc.run(t)) | ||||
d.addCallback(self.tc.abort) | ||||
d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException)) | ||||
return d | ||||
Brian E Granger
|
r1395 | def test_abort_type(self): | ||
Brian E Granger
|
r1234 | self.addEngine(1) | ||
d = self.tc.abort('asdfadsf') | ||||
d.addErrback(lambda f: self.assertRaises(TypeError, f.raiseException)) | ||||
return d | ||||
Brian E Granger
|
r1395 | def test_clear_before_and_after(self): | ||
Brian E Granger
|
r1234 | self.addEngine(1) | ||
Brian E Granger
|
r1395 | t = task.StringTask('a=1', clear_before=True, pull='b', clear_after=True) | ||
Brian E Granger
|
r1234 | d = self.multiengine.execute('b=1', targets=0) | ||
d.addCallback(lambda _: self.tc.run(t)) | ||||
d.addCallback(lambda tid: self.tc.get_task_result(tid,block=True)) | ||||
d.addCallback(lambda tr: tr.failure) | ||||
d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException)) | ||||
d.addCallback(lambda _:self.multiengine.pull('a', targets=0)) | ||||
d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f)) | ||||
return d | ||||
Brian E Granger
|
r1395 | def test_simple_retries(self): | ||
Brian E Granger
|
r1234 | self.addEngine(1) | ||
Brian E Granger
|
r1395 | t = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10) | ||
t2 = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10) | ||||
Brian E Granger
|
r1234 | d = self.multiengine.execute('i=0', targets=0) | ||
d.addCallback(lambda r: self.tc.run(t)) | ||||
d.addCallback(self.tc.get_task_result, block=True) | ||||
d.addCallback(lambda tr: tr.ns.i) | ||||
d.addErrback(lambda f: self.assertRaises(AssertionError, f.raiseException)) | ||||
d.addCallback(lambda r: self.tc.run(t2)) | ||||
d.addCallback(self.tc.get_task_result, block=True) | ||||
d.addCallback(lambda tr: tr.ns.i) | ||||
d.addCallback(lambda r: self.assertEquals(r, 16)) | ||||
return d | ||||
Brian E Granger
|
r1395 | def test_recovery_tasks(self): | ||
Brian E Granger
|
r1234 | self.addEngine(1) | ||
Brian E Granger
|
r1395 | t = task.StringTask("i=16", pull='i') | ||
t2 = task.StringTask("raise Exception", recovery_task=t, retries = 2) | ||||
Brian E Granger
|
r1234 | |||
d = self.tc.run(t2) | ||||
d.addCallback(self.tc.get_task_result, block=True) | ||||
d.addCallback(lambda tr: tr.ns.i) | ||||
d.addCallback(lambda r: self.assertEquals(r, 16)) | ||||
return d | ||||
Brian E Granger
|
r1395 | def test_setup_ns(self): | ||
Brian E Granger
|
r1234 | self.addEngine(1) | ||
d = self.multiengine.execute('a=0', targets=0) | ||||
ns = dict(a=1, b=0) | ||||
Brian E Granger
|
r1395 | t = task.StringTask("", push=ns, pull=['a','b']) | ||
Brian E Granger
|
r1234 | d.addCallback(lambda r: self.tc.run(t)) | ||
d.addCallback(self.tc.get_task_result, block=True) | ||||
d.addCallback(lambda tr: {'a':tr.ns.a, 'b':tr['b']}) | ||||
d.addCallback(lambda r: self.assertEquals(r, ns)) | ||||
return d | ||||
Brian E Granger
|
r1395 | def test_string_task_results(self): | ||
Brian E Granger
|
r1234 | self.addEngine(1) | ||
Brian E Granger
|
r1395 | t1 = task.StringTask('a=5', pull='a') | ||
Brian E Granger
|
r1234 | d = self.tc.run(t1) | ||
d.addCallback(self.tc.get_task_result, block=True) | ||||
Brian E Granger
|
r1395 | d.addCallback(lambda tr: (tr.ns.a,tr['a'],tr.failure, tr.raise_exception())) | ||
Brian E Granger
|
r1234 | d.addCallback(lambda r: self.assertEquals(r, (5,5,None,None))) | ||
Brian E Granger
|
r1395 | t2 = task.StringTask('7=5') | ||
Brian E Granger
|
r1234 | d.addCallback(lambda r: self.tc.run(t2)) | ||
d.addCallback(self.tc.get_task_result, block=True) | ||||
d.addCallback(lambda tr: tr.ns) | ||||
d.addErrback(lambda f: self.assertRaises(SyntaxError, f.raiseException)) | ||||
Brian E Granger
|
r1395 | t3 = task.StringTask('', pull='b') | ||
Brian E Granger
|
r1234 | d.addCallback(lambda r: self.tc.run(t3)) | ||
d.addCallback(self.tc.get_task_result, block=True) | ||||
d.addCallback(lambda tr: tr.ns) | ||||
d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException)) | ||||
return d | ||||
Brian E Granger
|
r1395 | |||
def test_map_task(self): | ||||
self.addEngine(1) | ||||
t1 = task.MapTask(lambda x: 2*x,(10,)) | ||||
d = self.tc.run(t1) | ||||
d.addCallback(self.tc.get_task_result, block=True) | ||||
d.addCallback(lambda r: self.assertEquals(r,20)) | ||||
t2 = task.MapTask(lambda : 20) | ||||
d.addCallback(lambda _: self.tc.run(t2)) | ||||
d.addCallback(self.tc.get_task_result, block=True) | ||||
d.addCallback(lambda r: self.assertEquals(r,20)) | ||||
t3 = task.MapTask(lambda x: x,(),{'x':20}) | ||||
d.addCallback(lambda _: self.tc.run(t3)) | ||||
d.addCallback(self.tc.get_task_result, block=True) | ||||
d.addCallback(lambda r: self.assertEquals(r,20)) | ||||
return d | ||||
def test_map_task_failure(self): | ||||
self.addEngine(1) | ||||
t1 = task.MapTask(lambda x: 1/0,(10,)) | ||||
d = self.tc.run(t1) | ||||
d.addCallback(self.tc.get_task_result, block=True) | ||||
d.addErrback(lambda f: self.assertRaises(ZeroDivisionError, f.raiseException)) | ||||
return d | ||||
def test_map_task_args(self): | ||||
self.assertRaises(TypeError, task.MapTask, 'asdfasdf') | ||||
self.assertRaises(TypeError, task.MapTask, lambda x: x, 10) | ||||
self.assertRaises(TypeError, task.MapTask, lambda x: x, (10,),30) | ||||
def test_clear(self): | ||||
self.addEngine(1) | ||||
t1 = task.MapTask(lambda x: 2*x,(10,)) | ||||
d = self.tc.run(t1) | ||||
d.addCallback(lambda _: self.tc.get_task_result(0, block=True)) | ||||
d.addCallback(lambda r: self.assertEquals(r,20)) | ||||
d.addCallback(lambda _: self.tc.clear()) | ||||
d.addCallback(lambda _: self.tc.get_task_result(0, block=True)) | ||||
d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException)) | ||||
return d | ||||