##// END OF EJS Templates
Improve test suite robustness by cleaning up stale processes when possible.
Improve test suite robustness by cleaning up stale processes when possible.

File last commit:

r1952:d68cab7f
r2399:7e73e5f7
Show More
taskclient.py
180 lines | 5.8 KiB | text/x-python | PythonLexer
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 # encoding: utf-8
# -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
A blocking version of the task client.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
from zope.interface import Interface, implements
from twisted.python import components, log
from IPython.kernel.twistedutil import blockingCallFromThread
from IPython.kernel import task, error
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 from IPython.kernel.mapper import (
SynchronousTaskMapper,
ITaskMapperFactory,
IMapper
)
from IPython.kernel.parallelfunction import (
ParallelFunction,
ITaskParallelDecorator
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
#-------------------------------------------------------------------------------
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 # The task client
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 #-------------------------------------------------------------------------------
class IBlockingTaskClient(Interface):
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 A vague interface of the blocking task client
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
pass
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 class BlockingTaskClient(object):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 A blocking task client that adapts a non-blocking one.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 implements(
Brian Granger
Initial refactor of task dependency system....
r1952 IBlockingTaskClient,
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 ITaskMapperFactory,
IMapper,
ITaskParallelDecorator
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
def __init__(self, task_controller):
self.task_controller = task_controller
self.block = True
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def run(self, task, block=False):
"""Run a task on the `TaskController`.
Brian Granger
Initial refactor of task dependency system....
r1952 See the documentation of the `MapTask` and `StringTask` classes for
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 details on how to build a task of different types.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Parameters:
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 task : an `ITask` implementer
:Returns: The int taskid of the submitted task. Pass this to
`get_task_result` to get the `TaskResult` object.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 tid = blockingCallFromThread(self.task_controller.run, task)
if block:
return self.get_task_result(tid, block=True)
else:
return tid
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
def get_task_result(self, taskid, block=False):
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 Get a task result by taskid.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Parameters:
taskid : int
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 The taskid of the task to be retrieved.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 block : boolean
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 Should I block until the task is done?
:Returns: A `TaskResult` object that encapsulates the task result.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
return blockingCallFromThread(self.task_controller.get_task_result,
taskid, block)
def abort(self, taskid):
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 Abort a task by taskid.
:Parameters:
taskid : int
The taskid of the task to be aborted.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
return blockingCallFromThread(self.task_controller.abort, taskid)
def barrier(self, taskids):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """Block until a set of tasks are completed.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Parameters:
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 taskids : list, tuple
A sequence of taskids to block on.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
return blockingCallFromThread(self.task_controller.barrier, taskids)
def spin(self):
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 Touch the scheduler, to resume scheduling without submitting a task.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
This method only needs to be called in unusual situations where the
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 scheduler is idle for some reason.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
return blockingCallFromThread(self.task_controller.spin)
def queue_status(self, verbose=False):
"""
Get a dictionary with the current state of the task queue.
:Parameters:
verbose : boolean
If True, return a list of taskids. If False, simply give
the number of tasks with each status.
:Returns:
A dict with the queue status.
"""
return blockingCallFromThread(self.task_controller.queue_status, verbose)
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395
def clear(self):
"""
Clear all previously run tasks from the task controller.
This is needed because the task controller keep all task results
in memory. This can be a problem is there are many completed
tasks. Users should call this periodically to clean out these
cached task results.
"""
return blockingCallFromThread(self.task_controller.clear)
def map(self, func, *sequences):
"""
Apply func to *sequences elementwise. Like Python's builtin map.
This version is load balanced.
"""
return self.mapper().map(func, *sequences)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def mapper(self, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
"""
Create an `IMapper` implementer with a given set of arguments.
The `IMapper` created using a task controller is load balanced.
See the documentation for `IPython.kernel.task.BaseTask` for
documentation on the arguments to this method.
"""
return SynchronousTaskMapper(self, clear_before=clear_before,
clear_after=clear_after, retries=retries,
recovery_task=recovery_task, depend=depend, block=block)
def parallel(self, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
mapper = self.mapper(clear_before, clear_after, retries,
recovery_task, depend, block)
pf = ParallelFunction(mapper)
return pf
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
components.registerAdapter(BlockingTaskClient,
task.ITaskController, IBlockingTaskClient)