Show More
taskclient.py
198 lines
| 6.4 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1234 | # encoding: utf-8 | ||
# -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*- | ||||
Brian E Granger
|
r1395 | """ | ||
A blocking version of the task client. | ||||
Brian E Granger
|
r1234 | """ | ||
__docformat__ = "restructuredtext en" | ||||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2008 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
from zope.interface import Interface, implements | ||||
Brian Granger
|
r2498 | from twisted.python import components | ||
Brian Granger
|
r2526 | |||
try: | ||||
from foolscap.api import DeadReferenceError | ||||
except ImportError: | ||||
from foolscap import DeadReferenceError | ||||
Brian E Granger
|
r1234 | |||
from IPython.kernel.twistedutil import blockingCallFromThread | ||||
Brian Granger
|
r2517 | from IPython.kernel import task, error | ||
Brian E Granger
|
r1395 | from IPython.kernel.mapper import ( | ||
SynchronousTaskMapper, | ||||
ITaskMapperFactory, | ||||
IMapper | ||||
) | ||||
from IPython.kernel.parallelfunction import ( | ||||
ParallelFunction, | ||||
ITaskParallelDecorator | ||||
) | ||||
Brian E Granger
|
r1234 | |||
#------------------------------------------------------------------------------- | ||||
Brian E Granger
|
r1395 | # The task client | ||
Brian E Granger
|
r1234 | #------------------------------------------------------------------------------- | ||
class IBlockingTaskClient(Interface): | ||||
""" | ||||
Brian E Granger
|
r1395 | A vague interface of the blocking task client | ||
Brian E Granger
|
r1234 | """ | ||
pass | ||||
Brian E Granger
|
r1395 | class BlockingTaskClient(object): | ||
Brian E Granger
|
r1234 | """ | ||
Brian E Granger
|
r1395 | A blocking task client that adapts a non-blocking one. | ||
Brian E Granger
|
r1234 | """ | ||
Brian E Granger
|
r1395 | implements( | ||
Brian Granger
|
r1952 | IBlockingTaskClient, | ||
Brian E Granger
|
r1395 | ITaskMapperFactory, | ||
IMapper, | ||||
ITaskParallelDecorator | ||||
) | ||||
Brian E Granger
|
r1234 | |||
def __init__(self, task_controller): | ||||
self.task_controller = task_controller | ||||
self.block = True | ||||
Brian Granger
|
r2517 | |||
def _bcft(self, *args, **kwargs): | ||||
try: | ||||
result = blockingCallFromThread(*args, **kwargs) | ||||
except DeadReferenceError: | ||||
raise error.ConnectionError( | ||||
"""A connection error has occurred in trying to connect to the | ||||
controller. This is usually caused by the controller dying or | ||||
being restarted. To resolve this issue try recreating the | ||||
task client.""" | ||||
) | ||||
else: | ||||
return result | ||||
Brian E Granger
|
r1395 | def run(self, task, block=False): | ||
"""Run a task on the `TaskController`. | ||||
Brian Granger
|
r1952 | See the documentation of the `MapTask` and `StringTask` classes for | ||
Brian E Granger
|
r1395 | details on how to build a task of different types. | ||
Brian E Granger
|
r1234 | |||
:Parameters: | ||||
Brian E Granger
|
r1395 | task : an `ITask` implementer | ||
:Returns: The int taskid of the submitted task. Pass this to | ||||
`get_task_result` to get the `TaskResult` object. | ||||
Brian E Granger
|
r1234 | """ | ||
Brian Granger
|
r2517 | tid = self._bcft(self.task_controller.run, task) | ||
Brian E Granger
|
r1395 | if block: | ||
return self.get_task_result(tid, block=True) | ||||
else: | ||||
return tid | ||||
Brian E Granger
|
r1234 | |||
def get_task_result(self, taskid, block=False): | ||||
""" | ||||
Brian E Granger
|
r1395 | Get a task result by taskid. | ||
Brian E Granger
|
r1234 | |||
:Parameters: | ||||
taskid : int | ||||
Brian E Granger
|
r1395 | The taskid of the task to be retrieved. | ||
Brian E Granger
|
r1234 | block : boolean | ||
Brian E Granger
|
r1395 | Should I block until the task is done? | ||
:Returns: A `TaskResult` object that encapsulates the task result. | ||||
Brian E Granger
|
r1234 | """ | ||
Brian Granger
|
r2517 | return self._bcft(self.task_controller.get_task_result, | ||
Brian E Granger
|
r1234 | taskid, block) | ||
def abort(self, taskid): | ||||
""" | ||||
Brian E Granger
|
r1395 | Abort a task by taskid. | ||
:Parameters: | ||||
taskid : int | ||||
The taskid of the task to be aborted. | ||||
Brian E Granger
|
r1234 | """ | ||
Brian Granger
|
r2517 | return self._bcft(self.task_controller.abort, taskid) | ||
Brian E Granger
|
r1234 | |||
def barrier(self, taskids): | ||||
Brian E Granger
|
r1395 | """Block until a set of tasks are completed. | ||
Brian E Granger
|
r1234 | |||
:Parameters: | ||||
Brian E Granger
|
r1395 | taskids : list, tuple | ||
A sequence of taskids to block on. | ||||
Brian E Granger
|
r1234 | """ | ||
Brian Granger
|
r2517 | return self._bcft(self.task_controller.barrier, taskids) | ||
Brian E Granger
|
r1234 | |||
def spin(self): | ||||
""" | ||||
Brian E Granger
|
r1395 | Touch the scheduler, to resume scheduling without submitting a task. | ||
Brian E Granger
|
r1234 | |||
This method only needs to be called in unusual situations where the | ||||
Brian E Granger
|
r1395 | scheduler is idle for some reason. | ||
Brian E Granger
|
r1234 | """ | ||
Brian Granger
|
r2517 | return self._bcft(self.task_controller.spin) | ||
Brian E Granger
|
r1234 | |||
def queue_status(self, verbose=False): | ||||
""" | ||||
Get a dictionary with the current state of the task queue. | ||||
:Parameters: | ||||
verbose : boolean | ||||
If True, return a list of taskids. If False, simply give | ||||
the number of tasks with each status. | ||||
:Returns: | ||||
A dict with the queue status. | ||||
""" | ||||
Brian Granger
|
r2517 | return self._bcft(self.task_controller.queue_status, verbose) | ||
Brian E Granger
|
r1395 | |||
def clear(self): | ||||
""" | ||||
Clear all previously run tasks from the task controller. | ||||
This is needed because the task controller keep all task results | ||||
in memory. This can be a problem is there are many completed | ||||
tasks. Users should call this periodically to clean out these | ||||
cached task results. | ||||
""" | ||||
Brian Granger
|
r2517 | return self._bcft(self.task_controller.clear) | ||
Brian E Granger
|
r1395 | |||
def map(self, func, *sequences): | ||||
""" | ||||
Apply func to *sequences elementwise. Like Python's builtin map. | ||||
This version is load balanced. | ||||
""" | ||||
return self.mapper().map(func, *sequences) | ||||
Brian E Granger
|
r1234 | |||
Brian E Granger
|
r1395 | def mapper(self, clear_before=False, clear_after=False, retries=0, | ||
recovery_task=None, depend=None, block=True): | ||||
""" | ||||
Create an `IMapper` implementer with a given set of arguments. | ||||
The `IMapper` created using a task controller is load balanced. | ||||
See the documentation for `IPython.kernel.task.BaseTask` for | ||||
documentation on the arguments to this method. | ||||
""" | ||||
return SynchronousTaskMapper(self, clear_before=clear_before, | ||||
clear_after=clear_after, retries=retries, | ||||
recovery_task=recovery_task, depend=depend, block=block) | ||||
def parallel(self, clear_before=False, clear_after=False, retries=0, | ||||
recovery_task=None, depend=None, block=True): | ||||
mapper = self.mapper(clear_before, clear_after, retries, | ||||
recovery_task, depend, block) | ||||
pf = ParallelFunction(mapper) | ||||
return pf | ||||
Brian E Granger
|
r1234 | |||
components.registerAdapter(BlockingTaskClient, | ||||
task.ITaskController, IBlockingTaskClient) | ||||