diff --git a/IPython/kernel/multienginefc.py b/IPython/kernel/multienginefc.py index df53f22..150f660 100644 --- a/IPython/kernel/multienginefc.py +++ b/IPython/kernel/multienginefc.py @@ -605,15 +605,21 @@ class FCFullSynchronousMultiEngineClient(object): return d def map(self, func, seq, style='basic', targets='all', block=True): + """ + Call a callable on elements of a sequence. + + map(f, range(10)) -> [f(0), f(1), f(2), ...] + map(f, zip(range)) + """ d_list = [] if isinstance(func, FunctionType): d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False) d.addCallback(lambda did: self.get_pending_deferred(did, True)) - sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, _ipython_map_seq)' + sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))' elif isinstance(func, str): d = defer.succeed(None) sourceToRun = \ - '_ipython_map_seq_result = map(%s, _ipython_map_seq)' % func + '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func else: raise TypeError("func must be a function or str") diff --git a/IPython/kernel/task.py b/IPython/kernel/task.py index 18ecc6d..31e5c3f 100644 --- a/IPython/kernel/task.py +++ b/IPython/kernel/task.py @@ -17,7 +17,7 @@ __docformat__ = "restructuredtext en" #------------------------------------------------------------------------------- import copy, time -from types import FunctionType as function +from types import FunctionType import zope.interface as zi, string from twisted.internet import defer, reactor @@ -31,18 +31,20 @@ from IPython.kernel.twistedutil import gatherBoth, DeferredList from IPython.kernel.pickleutil import can,uncan, CannedFunction -def canTask(task): +def can_task(task): t = copy.copy(task) t.depend = can(t.depend) + t.expression = can(t.expression) if t.recovery_task: - t.recovery_task = canTask(t.recovery_task) + t.recovery_task = can_task(t.recovery_task) return t -def uncanTask(task): +def uncan_task(task): t = copy.copy(task) t.depend = uncan(t.depend) + t.expression = uncan(t.expression) if t.recovery_task and t.recovery_task is not task: - t.recovery_task = uncanTask(t.recovery_task) + t.recovery_task = uncan_task(t.recovery_task) return t time_format = '%Y/%m/%d %H:%M:%S' @@ -96,10 +98,18 @@ class Task(object): >>> t = Task('mpi.send(blah,blah)', depend = hasMPI) """ - def __init__(self, expression, pull=None, push=None, + def __init__(self, expression, args=None, kwargs=None, pull=None, push=None, clear_before=False, clear_after=False, retries=0, recovery_task=None, depend=None, **options): self.expression = expression + if args is None: + self.args = () + else: + self.args = args + if kwargs is None: + self.kwargs = {} + else: + self.kwargs = kwargs if isinstance(pull, str): self.pull = [pull] else: @@ -266,16 +276,30 @@ class WorkerFromQueuedEngine(object): d = self.queuedEngine.reset() else: d = defer.succeed(None) - - if task.push is not None: - d.addCallback(lambda r: self.queuedEngine.push(task.push)) - - d.addCallback(lambda r: self.queuedEngine.execute(task.expression)) - if task.pull is not None: - d.addCallback(lambda r: self.queuedEngine.pull(task.pull)) + if isinstance(task.expression, FunctionType): + d.addCallback(lambda r: self.queuedEngine.push_function( + dict(_ipython_task_function=task.expression)) + ) + d.addCallback(lambda r: self.queuedEngine.push( + dict(_ipython_task_args=task.args,_ipython_task_kwargs=task.kwargs)) + ) + d.addCallback(lambda r: self.queuedEngine.execute( + '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)') + ) + d.addCallback(lambda r: self.queuedEngine.pull('_ipython_task_result')) + elif isinstance(task.expression, str): + if task.push is not None: + d.addCallback(lambda r: self.queuedEngine.push(task.push)) + + d.addCallback(lambda r: self.queuedEngine.execute(task.expression)) + + if task.pull is not None: + d.addCallback(lambda r: self.queuedEngine.pull(task.pull)) + else: + d.addCallback(lambda r: None) else: - d.addCallback(lambda r: None) + raise TypeError("task expression must be a str or function") def reseter(result): self.queuedEngine.reset() @@ -284,7 +308,10 @@ class WorkerFromQueuedEngine(object): if task.clear_after: d.addBoth(reseter) - return d.addBoth(self._zipResults, task.pull, time.time(), time.localtime()) + if isinstance(task.expression, FunctionType): + return d.addBoth(self._zipResults, None, time.time(), time.localtime()) + else: + return d.addBoth(self._zipResults, task.pull, time.time(), time.localtime()) def _zipResults(self, result, names, start, start_struct): """Callback for construting the TaskResult object.""" @@ -292,12 +319,16 @@ class WorkerFromQueuedEngine(object): tr = TaskResult(result, self.queuedEngine.id) else: if names is None: - resultDict = {} + resultDict = {} elif len(names) == 1: resultDict = {names[0]:result} else: resultDict = dict(zip(names, result)) tr = TaskResult(resultDict, self.queuedEngine.id) + if names is None: + tr.result = result + else: + tr.result = None # the time info tr.submitted = time.strftime(time_format, start_struct) tr.completed = time.strftime(time_format) diff --git a/IPython/kernel/taskfc.py b/IPython/kernel/taskfc.py index b4096e7..9d21723 100644 --- a/IPython/kernel/taskfc.py +++ b/IPython/kernel/taskfc.py @@ -93,7 +93,7 @@ class FCTaskControllerFromTaskController(Referenceable): def remote_run(self, ptask): try: ctask = pickle.loads(ptask) - task = taskmodule.uncanTask(ctask) + task = taskmodule.uncan_task(ctask) except: d = defer.fail(pickle.UnpickleableError("Could not unmarshal task")) else: @@ -201,7 +201,7 @@ class FCTaskClient(object): `get_task_result` to get the `TaskResult` object. """ assert isinstance(task, taskmodule.Task), "task must be a Task object!" - ctask = taskmodule.canTask(task) # handles arbitrary function in .depend + ctask = taskmodule.can_task(task) # handles arbitrary function in .depend # as well as arbitrary recovery_task chains ptask = pickle.dumps(ctask, 2) d = self.remote_reference.callRemote('run', ptask) diff --git a/IPython/kernel/tests/test_enginefc.py b/IPython/kernel/tests/test_enginefc.py index be99c4c..7f482cf 100644 --- a/IPython/kernel/tests/test_enginefc.py +++ b/IPython/kernel/tests/test_enginefc.py @@ -38,7 +38,7 @@ try: IEngineQueuedTestCase except ImportError: print "we got an error!!!" - pass + raise else: class EngineFCTest(DeferredTestCase, IEngineCoreTestCase, diff --git a/IPython/kernel/tests/test_pendingdeferred.py b/IPython/kernel/tests/test_pendingdeferred.py index 52e0bc0..2ac9bda 100644 --- a/IPython/kernel/tests/test_pendingdeferred.py +++ b/IPython/kernel/tests/test_pendingdeferred.py @@ -20,8 +20,6 @@ try: from twisted.internet import defer from twisted.python import failure - from IPython.testing import tcommon - from IPython.testing.tcommon import * from IPython.testing.util import DeferredTestCase import IPython.kernel.pendingdeferred as pd from IPython.kernel import error @@ -29,26 +27,7 @@ try: except ImportError: pass else: - - #------------------------------------------------------------------------------- - # Setup for inline and standalone doctests - #------------------------------------------------------------------------------- - - - # If you have standalone doctests in a separate file, set their names in the - # dt_files variable (as a single string or a list thereof): - dt_files = [] - - # If you have any modules whose docstrings should be scanned for embedded tests - # as examples accorging to standard doctest practice, set them here (as a - # single string or a list thereof): - dt_modules = [] - - #------------------------------------------------------------------------------- - # Regular Unittests - #------------------------------------------------------------------------------- - - + class Foo(object): def bar(self, bahz): @@ -205,14 +184,3 @@ else: d3 = self.pdm.get_pending_deferred(did,False) d3.addCallback(lambda r: self.assertEquals(r,'bar')) -#------------------------------------------------------------------------------- -# Regular Unittests -#------------------------------------------------------------------------------- - -# This ensures that the code will run either standalone as a script, or that it -# can be picked up by Twisted's `trial` test wrapper to run all the tests. -if tcommon.pexpect is not None: - if __name__ == '__main__': - unittest.main(testLoader=IPDocTestLoader(dt_files,dt_modules)) - else: - testSuite = lambda : makeTestSuite(__name__,dt_files,dt_modules) diff --git a/IPython/testing/attic/parametric.py b/IPython/testing/parametric.py similarity index 100% rename from IPython/testing/attic/parametric.py rename to IPython/testing/parametric.py diff --git a/IPython/testing/tests/test_testutils.py b/IPython/testing/tests/test_testutils.py deleted file mode 100755 index 683661b..0000000 --- a/IPython/testing/tests/test_testutils.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -"""Simple template for unit tests. - -This file should be renamed to - -test_FEATURE.py - -so that it is recognized by the overall test driver (Twisted's 'trial'), which -looks for all test_*.py files in the current directory to extract tests from -them. -""" -__docformat__ = "restructuredtext en" - -#------------------------------------------------------------------------------- -# Copyright (C) 2005 Fernando Perez -# Brian E Granger -# Benjamin Ragan-Kelley -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- - -#------------------------------------------------------------------------------- -# Imports -#------------------------------------------------------------------------------- - -from IPython.testing import tcommon -from IPython.testing.tcommon import * - -#------------------------------------------------------------------------------- -# Setup for inline and standalone doctests -#------------------------------------------------------------------------------- - - -# If you have standalone doctests in a separate file, set their names in the -# dt_files variable (as a single string or a list thereof). The mkPath call -# forms an absolute path based on the current file, it is not needed if you -# provide the full pahts. -dt_files = fullPath(__file__,[]) - - -# If you have any modules whose docstrings should be scanned for embedded tests -# as examples accorging to standard doctest practice, set them here (as a -# single string or a list thereof): -dt_modules = ['IPython.testing.tutils'] - -#------------------------------------------------------------------------------- -# Regular Unittests -#------------------------------------------------------------------------------- - -## class FooTestCase(unittest.TestCase): -## def test_foo(self): -## pass - -#------------------------------------------------------------------------------- -# Regular Unittests -#------------------------------------------------------------------------------- - -# This ensures that the code will run either standalone as a script, or that it -# can be picked up by Twisted's `trial` test wrapper to run all the tests. -if tcommon.pexpect is not None: - if __name__ == '__main__': - unittest.main(testLoader=IPDocTestLoader(dt_files,dt_modules)) - else: - testSuite = lambda : makeTestSuite(__name__,dt_files,dt_modules)