# encoding: utf-8 # -*- test-case-name: IPython.kernel.tests.test_taskxmlrpc -*- """A Foolscap interface to a TaskController. This class lets Foolscap clients talk to a TaskController. """ __docformat__ = "restructuredtext en" #------------------------------------------------------------------------------- # Copyright (C) 2008 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Imports #------------------------------------------------------------------------------- import cPickle as pickle import xmlrpclib, copy from zope.interface import Interface, implements from twisted.internet import defer from twisted.python import components, failure from foolscap import Referenceable from IPython.kernel.twistedutil import blockingCallFromThread from IPython.kernel import error, task as taskmodule, taskclient from IPython.kernel.pickleutil import can, uncan from IPython.kernel.clientinterfaces import ( IFCClientInterfaceProvider, IBlockingClientAdaptor ) #------------------------------------------------------------------------------- # The Controller side of things #------------------------------------------------------------------------------- class IFCTaskController(Interface): """Foolscap interface to task controller. See the documentation of ITaskController for documentation about the methods. """ def remote_run(request, binTask): """""" def remote_abort(request, taskid): """""" def remote_get_task_result(request, taskid, block=False): """""" def remote_barrier(request, taskids): """""" def remote_spin(request): """""" def remote_queue_status(request, verbose): """""" class FCTaskControllerFromTaskController(Referenceable): """XML-RPC attachmeot for controller. See IXMLRPCTaskController and ITaskController (and its children) for documentation. """ implements(IFCTaskController, IFCClientInterfaceProvider) def __init__(self, taskController): self.taskController = taskController #--------------------------------------------------------------------------- # Non interface methods #--------------------------------------------------------------------------- def packageFailure(self, f): f.cleanFailure() return self.packageSuccess(f) def packageSuccess(self, obj): serial = pickle.dumps(obj, 2) return serial #--------------------------------------------------------------------------- # ITaskController related methods #--------------------------------------------------------------------------- def remote_run(self, ptask): try: ctask = pickle.loads(ptask) task = taskmodule.uncanTask(ctask) except: d = defer.fail(pickle.UnpickleableError("Could not unmarshal task")) else: d = self.taskController.run(task) d.addCallback(self.packageSuccess) d.addErrback(self.packageFailure) return d def remote_abort(self, taskid): d = self.taskController.abort(taskid) d.addCallback(self.packageSuccess) d.addErrback(self.packageFailure) return d def remote_get_task_result(self, taskid, block=False): d = self.taskController.get_task_result(taskid, block) d.addCallback(self.packageSuccess) d.addErrback(self.packageFailure) return d def remote_barrier(self, taskids): d = self.taskController.barrier(taskids) d.addCallback(self.packageSuccess) d.addErrback(self.packageFailure) return d def remote_spin(self): d = self.taskController.spin() d.addCallback(self.packageSuccess) d.addErrback(self.packageFailure) return d def remote_queue_status(self, verbose): d = self.taskController.queue_status(verbose) d.addCallback(self.packageSuccess) d.addErrback(self.packageFailure) return d def remote_get_client_name(self): return 'IPython.kernel.taskfc.FCTaskClient' components.registerAdapter(FCTaskControllerFromTaskController, taskmodule.ITaskController, IFCTaskController) #------------------------------------------------------------------------------- # The Client side of things #------------------------------------------------------------------------------- class FCTaskClient(object): """XML-RPC based TaskController client that implements ITaskController. :Parameters: addr : (ip, port) The ip (str) and port (int) tuple of the `TaskController`. """ implements(taskmodule.ITaskController, IBlockingClientAdaptor) def __init__(self, remote_reference): self.remote_reference = remote_reference #--------------------------------------------------------------------------- # Non interface methods #--------------------------------------------------------------------------- def unpackage(self, r): return pickle.loads(r) #--------------------------------------------------------------------------- # ITaskController related methods #--------------------------------------------------------------------------- def run(self, task): """Run a task on the `TaskController`. :Parameters: task : a `Task` object The Task object is created using the following signature: Task(expression, pull=None, push={}, clear_before=False, clear_after=False, retries=0, **options):) The meaning of the arguments is as follows: :Task Parameters: expression : str A str that is valid python code that is the task. pull : str or list of str The names of objects to be pulled as results. push : dict A dict of objects to be pushed into the engines namespace before execution of the expression. clear_before : boolean Should the engine's namespace be cleared before the task is run. Default=False. clear_after : boolean Should the engine's namespace be cleared after the task is run. Default=False. retries : int The number of times to resumbit the task if it fails. Default=0. options : dict Any other keyword options for more elaborate uses of tasks :Returns: The int taskid of the submitted task. Pass this to `get_task_result` to get the `TaskResult` object. """ assert isinstance(task, taskmodule.Task), "task must be a Task object!" ctask = taskmodule.canTask(task) # handles arbitrary function in .depend # as well as arbitrary recovery_task chains ptask = pickle.dumps(ctask, 2) d = self.remote_reference.callRemote('run', ptask) d.addCallback(self.unpackage) return d def get_task_result(self, taskid, block=False): """The task result by taskid. :Parameters: taskid : int The taskid of the task to be retrieved. block : boolean Should I block until the task is done? :Returns: A `TaskResult` object that encapsulates the task result. """ d = self.remote_reference.callRemote('get_task_result', taskid, block) d.addCallback(self.unpackage) return d def abort(self, taskid): """Abort a task by taskid. :Parameters: taskid : int The taskid of the task to be aborted. block : boolean Should I block until the task is aborted. """ d = self.remote_reference.callRemote('abort', taskid) d.addCallback(self.unpackage) return d def barrier(self, taskids): """Block until all tasks are completed. :Parameters: taskids : list, tuple A sequence of taskids to block on. """ d = self.remote_reference.callRemote('barrier', taskids) d.addCallback(self.unpackage) return d def spin(self): """touch the scheduler, to resume scheduling without submitting a task. """ d = self.remote_reference.callRemote('spin') d.addCallback(self.unpackage) return d def queue_status(self, verbose=False): """Return a dict with the status of the task queue.""" d = self.remote_reference.callRemote('queue_status', verbose) d.addCallback(self.unpackage) return d def adapt_to_blocking_client(self): from IPython.kernel.taskclient import IBlockingTaskClient return IBlockingTaskClient(self)