##// END OF EJS Templates
Initial work on updating the frontend APIs. Also fixed a few bugs in ...
Brian E Granger -
Show More
@@ -605,15 +605,21 b' class FCFullSynchronousMultiEngineClient(object):'
605 return d
605 return d
606
606
607 def map(self, func, seq, style='basic', targets='all', block=True):
607 def map(self, func, seq, style='basic', targets='all', block=True):
608 """
609 Call a callable on elements of a sequence.
610
611 map(f, range(10)) -> [f(0), f(1), f(2), ...]
612 map(f, zip(range))
613 """
608 d_list = []
614 d_list = []
609 if isinstance(func, FunctionType):
615 if isinstance(func, FunctionType):
610 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
616 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
611 d.addCallback(lambda did: self.get_pending_deferred(did, True))
617 d.addCallback(lambda did: self.get_pending_deferred(did, True))
612 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, _ipython_map_seq)'
618 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
613 elif isinstance(func, str):
619 elif isinstance(func, str):
614 d = defer.succeed(None)
620 d = defer.succeed(None)
615 sourceToRun = \
621 sourceToRun = \
616 '_ipython_map_seq_result = map(%s, _ipython_map_seq)' % func
622 '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
617 else:
623 else:
618 raise TypeError("func must be a function or str")
624 raise TypeError("func must be a function or str")
619
625
@@ -17,7 +17,7 b' __docformat__ = "restructuredtext en"'
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import copy, time
19 import copy, time
20 from types import FunctionType as function
20 from types import FunctionType
21
21
22 import zope.interface as zi, string
22 import zope.interface as zi, string
23 from twisted.internet import defer, reactor
23 from twisted.internet import defer, reactor
@@ -31,18 +31,20 b' from IPython.kernel.twistedutil import gatherBoth, DeferredList'
31
31
32 from IPython.kernel.pickleutil import can,uncan, CannedFunction
32 from IPython.kernel.pickleutil import can,uncan, CannedFunction
33
33
34 def canTask(task):
34 def can_task(task):
35 t = copy.copy(task)
35 t = copy.copy(task)
36 t.depend = can(t.depend)
36 t.depend = can(t.depend)
37 t.expression = can(t.expression)
37 if t.recovery_task:
38 if t.recovery_task:
38 t.recovery_task = canTask(t.recovery_task)
39 t.recovery_task = can_task(t.recovery_task)
39 return t
40 return t
40
41
41 def uncanTask(task):
42 def uncan_task(task):
42 t = copy.copy(task)
43 t = copy.copy(task)
43 t.depend = uncan(t.depend)
44 t.depend = uncan(t.depend)
45 t.expression = uncan(t.expression)
44 if t.recovery_task and t.recovery_task is not task:
46 if t.recovery_task and t.recovery_task is not task:
45 t.recovery_task = uncanTask(t.recovery_task)
47 t.recovery_task = uncan_task(t.recovery_task)
46 return t
48 return t
47
49
48 time_format = '%Y/%m/%d %H:%M:%S'
50 time_format = '%Y/%m/%d %H:%M:%S'
@@ -96,10 +98,18 b' class Task(object):'
96 >>> t = Task('mpi.send(blah,blah)', depend = hasMPI)
98 >>> t = Task('mpi.send(blah,blah)', depend = hasMPI)
97 """
99 """
98
100
99 def __init__(self, expression, pull=None, push=None,
101 def __init__(self, expression, args=None, kwargs=None, pull=None, push=None,
100 clear_before=False, clear_after=False, retries=0,
102 clear_before=False, clear_after=False, retries=0,
101 recovery_task=None, depend=None, **options):
103 recovery_task=None, depend=None, **options):
102 self.expression = expression
104 self.expression = expression
105 if args is None:
106 self.args = ()
107 else:
108 self.args = args
109 if kwargs is None:
110 self.kwargs = {}
111 else:
112 self.kwargs = kwargs
103 if isinstance(pull, str):
113 if isinstance(pull, str):
104 self.pull = [pull]
114 self.pull = [pull]
105 else:
115 else:
@@ -266,16 +276,30 b' class WorkerFromQueuedEngine(object):'
266 d = self.queuedEngine.reset()
276 d = self.queuedEngine.reset()
267 else:
277 else:
268 d = defer.succeed(None)
278 d = defer.succeed(None)
269
270 if task.push is not None:
271 d.addCallback(lambda r: self.queuedEngine.push(task.push))
272
273 d.addCallback(lambda r: self.queuedEngine.execute(task.expression))
274
279
275 if task.pull is not None:
280 if isinstance(task.expression, FunctionType):
276 d.addCallback(lambda r: self.queuedEngine.pull(task.pull))
281 d.addCallback(lambda r: self.queuedEngine.push_function(
282 dict(_ipython_task_function=task.expression))
283 )
284 d.addCallback(lambda r: self.queuedEngine.push(
285 dict(_ipython_task_args=task.args,_ipython_task_kwargs=task.kwargs))
286 )
287 d.addCallback(lambda r: self.queuedEngine.execute(
288 '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')
289 )
290 d.addCallback(lambda r: self.queuedEngine.pull('_ipython_task_result'))
291 elif isinstance(task.expression, str):
292 if task.push is not None:
293 d.addCallback(lambda r: self.queuedEngine.push(task.push))
294
295 d.addCallback(lambda r: self.queuedEngine.execute(task.expression))
296
297 if task.pull is not None:
298 d.addCallback(lambda r: self.queuedEngine.pull(task.pull))
299 else:
300 d.addCallback(lambda r: None)
277 else:
301 else:
278 d.addCallback(lambda r: None)
302 raise TypeError("task expression must be a str or function")
279
303
280 def reseter(result):
304 def reseter(result):
281 self.queuedEngine.reset()
305 self.queuedEngine.reset()
@@ -284,7 +308,10 b' class WorkerFromQueuedEngine(object):'
284 if task.clear_after:
308 if task.clear_after:
285 d.addBoth(reseter)
309 d.addBoth(reseter)
286
310
287 return d.addBoth(self._zipResults, task.pull, time.time(), time.localtime())
311 if isinstance(task.expression, FunctionType):
312 return d.addBoth(self._zipResults, None, time.time(), time.localtime())
313 else:
314 return d.addBoth(self._zipResults, task.pull, time.time(), time.localtime())
288
315
289 def _zipResults(self, result, names, start, start_struct):
316 def _zipResults(self, result, names, start, start_struct):
290 """Callback for construting the TaskResult object."""
317 """Callback for construting the TaskResult object."""
@@ -292,12 +319,16 b' class WorkerFromQueuedEngine(object):'
292 tr = TaskResult(result, self.queuedEngine.id)
319 tr = TaskResult(result, self.queuedEngine.id)
293 else:
320 else:
294 if names is None:
321 if names is None:
295 resultDict = {}
322 resultDict = {}
296 elif len(names) == 1:
323 elif len(names) == 1:
297 resultDict = {names[0]:result}
324 resultDict = {names[0]:result}
298 else:
325 else:
299 resultDict = dict(zip(names, result))
326 resultDict = dict(zip(names, result))
300 tr = TaskResult(resultDict, self.queuedEngine.id)
327 tr = TaskResult(resultDict, self.queuedEngine.id)
328 if names is None:
329 tr.result = result
330 else:
331 tr.result = None
301 # the time info
332 # the time info
302 tr.submitted = time.strftime(time_format, start_struct)
333 tr.submitted = time.strftime(time_format, start_struct)
303 tr.completed = time.strftime(time_format)
334 tr.completed = time.strftime(time_format)
@@ -93,7 +93,7 b' class FCTaskControllerFromTaskController(Referenceable):'
93 def remote_run(self, ptask):
93 def remote_run(self, ptask):
94 try:
94 try:
95 ctask = pickle.loads(ptask)
95 ctask = pickle.loads(ptask)
96 task = taskmodule.uncanTask(ctask)
96 task = taskmodule.uncan_task(ctask)
97 except:
97 except:
98 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
98 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
99 else:
99 else:
@@ -201,7 +201,7 b' class FCTaskClient(object):'
201 `get_task_result` to get the `TaskResult` object.
201 `get_task_result` to get the `TaskResult` object.
202 """
202 """
203 assert isinstance(task, taskmodule.Task), "task must be a Task object!"
203 assert isinstance(task, taskmodule.Task), "task must be a Task object!"
204 ctask = taskmodule.canTask(task) # handles arbitrary function in .depend
204 ctask = taskmodule.can_task(task) # handles arbitrary function in .depend
205 # as well as arbitrary recovery_task chains
205 # as well as arbitrary recovery_task chains
206 ptask = pickle.dumps(ctask, 2)
206 ptask = pickle.dumps(ctask, 2)
207 d = self.remote_reference.callRemote('run', ptask)
207 d = self.remote_reference.callRemote('run', ptask)
@@ -38,7 +38,7 b' try:'
38 IEngineQueuedTestCase
38 IEngineQueuedTestCase
39 except ImportError:
39 except ImportError:
40 print "we got an error!!!"
40 print "we got an error!!!"
41 pass
41 raise
42 else:
42 else:
43 class EngineFCTest(DeferredTestCase,
43 class EngineFCTest(DeferredTestCase,
44 IEngineCoreTestCase,
44 IEngineCoreTestCase,
@@ -20,8 +20,6 b' try:'
20 from twisted.internet import defer
20 from twisted.internet import defer
21 from twisted.python import failure
21 from twisted.python import failure
22
22
23 from IPython.testing import tcommon
24 from IPython.testing.tcommon import *
25 from IPython.testing.util import DeferredTestCase
23 from IPython.testing.util import DeferredTestCase
26 import IPython.kernel.pendingdeferred as pd
24 import IPython.kernel.pendingdeferred as pd
27 from IPython.kernel import error
25 from IPython.kernel import error
@@ -29,26 +27,7 b' try:'
29 except ImportError:
27 except ImportError:
30 pass
28 pass
31 else:
29 else:
32
30
33 #-------------------------------------------------------------------------------
34 # Setup for inline and standalone doctests
35 #-------------------------------------------------------------------------------
36
37
38 # If you have standalone doctests in a separate file, set their names in the
39 # dt_files variable (as a single string or a list thereof):
40 dt_files = []
41
42 # If you have any modules whose docstrings should be scanned for embedded tests
43 # as examples accorging to standard doctest practice, set them here (as a
44 # single string or a list thereof):
45 dt_modules = []
46
47 #-------------------------------------------------------------------------------
48 # Regular Unittests
49 #-------------------------------------------------------------------------------
50
51
52 class Foo(object):
31 class Foo(object):
53
32
54 def bar(self, bahz):
33 def bar(self, bahz):
@@ -205,14 +184,3 b' else:'
205 d3 = self.pdm.get_pending_deferred(did,False)
184 d3 = self.pdm.get_pending_deferred(did,False)
206 d3.addCallback(lambda r: self.assertEquals(r,'bar'))
185 d3.addCallback(lambda r: self.assertEquals(r,'bar'))
207
186
208 #-------------------------------------------------------------------------------
209 # Regular Unittests
210 #-------------------------------------------------------------------------------
211
212 # This ensures that the code will run either standalone as a script, or that it
213 # can be picked up by Twisted's `trial` test wrapper to run all the tests.
214 if tcommon.pexpect is not None:
215 if __name__ == '__main__':
216 unittest.main(testLoader=IPDocTestLoader(dt_files,dt_modules))
217 else:
218 testSuite = lambda : makeTestSuite(__name__,dt_files,dt_modules)
1 NO CONTENT: file renamed from IPython/testing/attic/parametric.py to IPython/testing/parametric.py
NO CONTENT: file renamed from IPython/testing/attic/parametric.py to IPython/testing/parametric.py
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now