# encoding: utf-8 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*- """The Generic Task Client object. This must be subclassed based on your connection method. """ __docformat__ = "restructuredtext en" #------------------------------------------------------------------------------- # Copyright (C) 2008 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Imports #------------------------------------------------------------------------------- from zope.interface import Interface, implements from twisted.python import components, log from IPython.kernel.twistedutil import blockingCallFromThread from IPython.kernel import task, error #------------------------------------------------------------------------------- # Connecting Task Client #------------------------------------------------------------------------------- class InteractiveTaskClient(object): def irun(self, *args, **kwargs): """Run a task on the `TaskController`. This method is a shorthand for run(task) and its arguments are simply passed onto a `Task` object: irun(*args, **kwargs) -> run(Task(*args, **kwargs)) :Parameters: expression : str A str that is valid python code that is the task. pull : str or list of str The names of objects to be pulled as results. push : dict A dict of objects to be pushed into the engines namespace before execution of the expression. clear_before : boolean Should the engine's namespace be cleared before the task is run. Default=False. clear_after : boolean Should the engine's namespace be cleared after the task is run. Default=False. retries : int The number of times to resumbit the task if it fails. Default=0. options : dict Any other keyword options for more elaborate uses of tasks :Returns: A `TaskResult` object. """ block = kwargs.pop('block', False) if len(args) == 1 and isinstance(args[0], task.Task): t = args[0] else: t = task.Task(*args, **kwargs) taskid = self.run(t) print "TaskID = %i"%taskid if block: return self.get_task_result(taskid, block) else: return taskid class IBlockingTaskClient(Interface): """ An interface for blocking task clients. """ pass class BlockingTaskClient(InteractiveTaskClient): """ This class provides a blocking task client. """ implements(IBlockingTaskClient) def __init__(self, task_controller): self.task_controller = task_controller self.block = True def run(self, task): """ Run a task and return a task id that can be used to get the task result. :Parameters: task : `Task` The `Task` object to run """ return blockingCallFromThread(self.task_controller.run, task) def get_task_result(self, taskid, block=False): """ Get or poll for a task result. :Parameters: taskid : int The id of the task whose result to get block : boolean If True, wait until the task is done and then result the `TaskResult` object. If False, just poll for the result and return None if the task is not done. """ return blockingCallFromThread(self.task_controller.get_task_result, taskid, block) def abort(self, taskid): """ Abort a task by task id if it has not been started. """ return blockingCallFromThread(self.task_controller.abort, taskid) def barrier(self, taskids): """ Wait for a set of tasks to finish. :Parameters: taskids : list of ints A list of task ids to wait for. """ return blockingCallFromThread(self.task_controller.barrier, taskids) def spin(self): """ Cause the scheduler to schedule tasks. This method only needs to be called in unusual situations where the scheduler is idle for some reason. """ return blockingCallFromThread(self.task_controller.spin) def queue_status(self, verbose=False): """ Get a dictionary with the current state of the task queue. :Parameters: verbose : boolean If True, return a list of taskids. If False, simply give the number of tasks with each status. :Returns: A dict with the queue status. """ return blockingCallFromThread(self.task_controller.queue_status, verbose) components.registerAdapter(BlockingTaskClient, task.ITaskController, IBlockingTaskClient)