##// END OF EJS Templates
Updated ipstruct to fix doctest failures.
Updated ipstruct to fix doctest failures.

File last commit:

r1234:52b55407
r1280:e436240a
Show More
taskclient.py
161 lines | 5.2 KiB | text/x-python | PythonLexer
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 # encoding: utf-8
# -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
"""The Generic Task Client object.
This must be subclassed based on your connection method.
"""
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
from zope.interface import Interface, implements
from twisted.python import components, log
from IPython.kernel.twistedutil import blockingCallFromThread
from IPython.kernel import task, error
#-------------------------------------------------------------------------------
# Connecting Task Client
#-------------------------------------------------------------------------------
class InteractiveTaskClient(object):
def irun(self, *args, **kwargs):
"""Run a task on the `TaskController`.
This method is a shorthand for run(task) and its arguments are simply
passed onto a `Task` object:
irun(*args, **kwargs) -> run(Task(*args, **kwargs))
:Parameters:
expression : str
A str that is valid python code that is the task.
pull : str or list of str
The names of objects to be pulled as results.
push : dict
A dict of objects to be pushed into the engines namespace before
execution of the expression.
clear_before : boolean
Should the engine's namespace be cleared before the task is run.
Default=False.
clear_after : boolean
Should the engine's namespace be cleared after the task is run.
Default=False.
retries : int
The number of times to resumbit the task if it fails. Default=0.
options : dict
Any other keyword options for more elaborate uses of tasks
:Returns: A `TaskResult` object.
"""
block = kwargs.pop('block', False)
if len(args) == 1 and isinstance(args[0], task.Task):
t = args[0]
else:
t = task.Task(*args, **kwargs)
taskid = self.run(t)
print "TaskID = %i"%taskid
if block:
return self.get_task_result(taskid, block)
else:
return taskid
class IBlockingTaskClient(Interface):
"""
An interface for blocking task clients.
"""
pass
class BlockingTaskClient(InteractiveTaskClient):
"""
This class provides a blocking task client.
"""
implements(IBlockingTaskClient)
def __init__(self, task_controller):
self.task_controller = task_controller
self.block = True
def run(self, task):
"""
Run a task and return a task id that can be used to get the task result.
:Parameters:
task : `Task`
The `Task` object to run
"""
return blockingCallFromThread(self.task_controller.run, task)
def get_task_result(self, taskid, block=False):
"""
Get or poll for a task result.
:Parameters:
taskid : int
The id of the task whose result to get
block : boolean
If True, wait until the task is done and then result the
`TaskResult` object. If False, just poll for the result and
return None if the task is not done.
"""
return blockingCallFromThread(self.task_controller.get_task_result,
taskid, block)
def abort(self, taskid):
"""
Abort a task by task id if it has not been started.
"""
return blockingCallFromThread(self.task_controller.abort, taskid)
def barrier(self, taskids):
"""
Wait for a set of tasks to finish.
:Parameters:
taskids : list of ints
A list of task ids to wait for.
"""
return blockingCallFromThread(self.task_controller.barrier, taskids)
def spin(self):
"""
Cause the scheduler to schedule tasks.
This method only needs to be called in unusual situations where the
scheduler is idle for some reason.
"""
return blockingCallFromThread(self.task_controller.spin)
def queue_status(self, verbose=False):
"""
Get a dictionary with the current state of the task queue.
:Parameters:
verbose : boolean
If True, return a list of taskids. If False, simply give
the number of tasks with each status.
:Returns:
A dict with the queue status.
"""
return blockingCallFromThread(self.task_controller.queue_status, verbose)
components.registerAdapter(BlockingTaskClient,
task.ITaskController, IBlockingTaskClient)