##// END OF EJS Templates
Initial refactor of task dependency system....
Initial refactor of task dependency system. We are thinking about refactoring the task dependency system. Currently is is based on engine properties and a function sent with each task. In this commit, I have added a TaskRejectError that tasks can raise to indicate that the engine doesn't have the required dependencies. For now I have not removed any of the old stuff, but I have added warnings that we might remove the older *_prop* methods in IMultiEngine. See this ticket for more info: https://bugs.launchpad.net/bugs/361419

File last commit:

r1952:d68cab7f
r1952:d68cab7f
Show More
taskreject.py
53 lines | 1.5 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
# encoding: utf-8
"""
A new example showing how to use `TaskRejectError` to handle dependencies
in the IPython task system.
To run this example, do::
$ ipcluster local -n 4
Then, in another terminal start up IPython and do::
In [0]: %run taskreject.py
In [1]: mec.execute('run=True', targets=[0,1])
After the first command, the scheduler will keep rescheduling the tasks, as
they will fail with `TaskRejectError`. But after the second command, there
are two engines that the tasks can run on. The tasks are quickly funneled
to these engines.
If you want to see how the controller is scheduling and retrying the tasks
do a `tail -f` on the controller's log file in ~/.ipython/log.
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2009 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
from IPython.kernel import client
from IPython.kernel import TaskRejectError
mec = client.MultiEngineClient()
tc = client.TaskClient()
mec.execute('from IPython.kernel import TaskRejectError')
mec.execute('run = False')
def map_task():
if not run:
raise TaskRejectError('task dependency not met')
return 3.0e8
task_ids = []
for i in range(10):
task = client.MapTask(map_task, retries=20)
task_ids.append(tc.run(task, block=False))