taskclient.py
161 lines
| 5.2 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1234 | # encoding: utf-8 | ||
# -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*- | ||||
"""The Generic Task Client object. | ||||
This must be subclassed based on your connection method. | ||||
""" | ||||
__docformat__ = "restructuredtext en" | ||||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2008 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
from zope.interface import Interface, implements | ||||
from twisted.python import components, log | ||||
from IPython.kernel.twistedutil import blockingCallFromThread | ||||
from IPython.kernel import task, error | ||||
#------------------------------------------------------------------------------- | ||||
# Connecting Task Client | ||||
#------------------------------------------------------------------------------- | ||||
class InteractiveTaskClient(object): | ||||
def irun(self, *args, **kwargs): | ||||
"""Run a task on the `TaskController`. | ||||
This method is a shorthand for run(task) and its arguments are simply | ||||
passed onto a `Task` object: | ||||
irun(*args, **kwargs) -> run(Task(*args, **kwargs)) | ||||
:Parameters: | ||||
expression : str | ||||
A str that is valid python code that is the task. | ||||
pull : str or list of str | ||||
The names of objects to be pulled as results. | ||||
push : dict | ||||
A dict of objects to be pushed into the engines namespace before | ||||
execution of the expression. | ||||
clear_before : boolean | ||||
Should the engine's namespace be cleared before the task is run. | ||||
Default=False. | ||||
clear_after : boolean | ||||
Should the engine's namespace be cleared after the task is run. | ||||
Default=False. | ||||
retries : int | ||||
The number of times to resumbit the task if it fails. Default=0. | ||||
options : dict | ||||
Any other keyword options for more elaborate uses of tasks | ||||
:Returns: A `TaskResult` object. | ||||
""" | ||||
block = kwargs.pop('block', False) | ||||
if len(args) == 1 and isinstance(args[0], task.Task): | ||||
t = args[0] | ||||
else: | ||||
t = task.Task(*args, **kwargs) | ||||
taskid = self.run(t) | ||||
print "TaskID = %i"%taskid | ||||
if block: | ||||
return self.get_task_result(taskid, block) | ||||
else: | ||||
return taskid | ||||
class IBlockingTaskClient(Interface): | ||||
""" | ||||
An interface for blocking task clients. | ||||
""" | ||||
pass | ||||
class BlockingTaskClient(InteractiveTaskClient): | ||||
""" | ||||
This class provides a blocking task client. | ||||
""" | ||||
implements(IBlockingTaskClient) | ||||
def __init__(self, task_controller): | ||||
self.task_controller = task_controller | ||||
self.block = True | ||||
def run(self, task): | ||||
""" | ||||
Run a task and return a task id that can be used to get the task result. | ||||
:Parameters: | ||||
task : `Task` | ||||
The `Task` object to run | ||||
""" | ||||
return blockingCallFromThread(self.task_controller.run, task) | ||||
def get_task_result(self, taskid, block=False): | ||||
""" | ||||
Get or poll for a task result. | ||||
:Parameters: | ||||
taskid : int | ||||
The id of the task whose result to get | ||||
block : boolean | ||||
If True, wait until the task is done and then result the | ||||
`TaskResult` object. If False, just poll for the result and | ||||
return None if the task is not done. | ||||
""" | ||||
return blockingCallFromThread(self.task_controller.get_task_result, | ||||
taskid, block) | ||||
def abort(self, taskid): | ||||
""" | ||||
Abort a task by task id if it has not been started. | ||||
""" | ||||
return blockingCallFromThread(self.task_controller.abort, taskid) | ||||
def barrier(self, taskids): | ||||
""" | ||||
Wait for a set of tasks to finish. | ||||
:Parameters: | ||||
taskids : list of ints | ||||
A list of task ids to wait for. | ||||
""" | ||||
return blockingCallFromThread(self.task_controller.barrier, taskids) | ||||
def spin(self): | ||||
""" | ||||
Cause the scheduler to schedule tasks. | ||||
This method only needs to be called in unusual situations where the | ||||
scheduler is idle for some reason. | ||||
""" | ||||
return blockingCallFromThread(self.task_controller.spin) | ||||
def queue_status(self, verbose=False): | ||||
""" | ||||
Get a dictionary with the current state of the task queue. | ||||
:Parameters: | ||||
verbose : boolean | ||||
If True, return a list of taskids. If False, simply give | ||||
the number of tasks with each status. | ||||
:Returns: | ||||
A dict with the queue status. | ||||
""" | ||||
return blockingCallFromThread(self.task_controller.queue_status, verbose) | ||||
components.registerAdapter(BlockingTaskClient, | ||||
task.ITaskController, IBlockingTaskClient) | ||||