taskfc.py
267 lines
| 9.2 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1234 | # encoding: utf-8 | ||
# -*- test-case-name: IPython.kernel.tests.test_taskxmlrpc -*- | ||||
"""A Foolscap interface to a TaskController. | ||||
This class lets Foolscap clients talk to a TaskController. | ||||
""" | ||||
__docformat__ = "restructuredtext en" | ||||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2008 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
import cPickle as pickle | ||||
import xmlrpclib, copy | ||||
from zope.interface import Interface, implements | ||||
from twisted.internet import defer | ||||
from twisted.python import components, failure | ||||
from foolscap import Referenceable | ||||
from IPython.kernel.twistedutil import blockingCallFromThread | ||||
from IPython.kernel import error, task as taskmodule, taskclient | ||||
from IPython.kernel.pickleutil import can, uncan | ||||
from IPython.kernel.clientinterfaces import ( | ||||
IFCClientInterfaceProvider, | ||||
IBlockingClientAdaptor | ||||
) | ||||
#------------------------------------------------------------------------------- | ||||
# The Controller side of things | ||||
#------------------------------------------------------------------------------- | ||||
class IFCTaskController(Interface): | ||||
"""Foolscap interface to task controller. | ||||
See the documentation of ITaskController for documentation about the methods. | ||||
""" | ||||
def remote_run(request, binTask): | ||||
"""""" | ||||
def remote_abort(request, taskid): | ||||
"""""" | ||||
def remote_get_task_result(request, taskid, block=False): | ||||
"""""" | ||||
def remote_barrier(request, taskids): | ||||
"""""" | ||||
def remote_spin(request): | ||||
"""""" | ||||
def remote_queue_status(request, verbose): | ||||
"""""" | ||||
class FCTaskControllerFromTaskController(Referenceable): | ||||
"""XML-RPC attachmeot for controller. | ||||
See IXMLRPCTaskController and ITaskController (and its children) for documentation. | ||||
""" | ||||
implements(IFCTaskController, IFCClientInterfaceProvider) | ||||
def __init__(self, taskController): | ||||
self.taskController = taskController | ||||
#--------------------------------------------------------------------------- | ||||
# Non interface methods | ||||
#--------------------------------------------------------------------------- | ||||
def packageFailure(self, f): | ||||
f.cleanFailure() | ||||
return self.packageSuccess(f) | ||||
def packageSuccess(self, obj): | ||||
serial = pickle.dumps(obj, 2) | ||||
return serial | ||||
#--------------------------------------------------------------------------- | ||||
# ITaskController related methods | ||||
#--------------------------------------------------------------------------- | ||||
def remote_run(self, ptask): | ||||
try: | ||||
ctask = pickle.loads(ptask) | ||||
task = taskmodule.uncanTask(ctask) | ||||
except: | ||||
d = defer.fail(pickle.UnpickleableError("Could not unmarshal task")) | ||||
else: | ||||
d = self.taskController.run(task) | ||||
d.addCallback(self.packageSuccess) | ||||
d.addErrback(self.packageFailure) | ||||
return d | ||||
def remote_abort(self, taskid): | ||||
d = self.taskController.abort(taskid) | ||||
d.addCallback(self.packageSuccess) | ||||
d.addErrback(self.packageFailure) | ||||
return d | ||||
def remote_get_task_result(self, taskid, block=False): | ||||
d = self.taskController.get_task_result(taskid, block) | ||||
d.addCallback(self.packageSuccess) | ||||
d.addErrback(self.packageFailure) | ||||
return d | ||||
def remote_barrier(self, taskids): | ||||
d = self.taskController.barrier(taskids) | ||||
d.addCallback(self.packageSuccess) | ||||
d.addErrback(self.packageFailure) | ||||
return d | ||||
def remote_spin(self): | ||||
d = self.taskController.spin() | ||||
d.addCallback(self.packageSuccess) | ||||
d.addErrback(self.packageFailure) | ||||
return d | ||||
def remote_queue_status(self, verbose): | ||||
d = self.taskController.queue_status(verbose) | ||||
d.addCallback(self.packageSuccess) | ||||
d.addErrback(self.packageFailure) | ||||
return d | ||||
def remote_get_client_name(self): | ||||
return 'IPython.kernel.taskfc.FCTaskClient' | ||||
components.registerAdapter(FCTaskControllerFromTaskController, | ||||
taskmodule.ITaskController, IFCTaskController) | ||||
#------------------------------------------------------------------------------- | ||||
# The Client side of things | ||||
#------------------------------------------------------------------------------- | ||||
class FCTaskClient(object): | ||||
"""XML-RPC based TaskController client that implements ITaskController. | ||||
:Parameters: | ||||
addr : (ip, port) | ||||
The ip (str) and port (int) tuple of the `TaskController`. | ||||
""" | ||||
implements(taskmodule.ITaskController, IBlockingClientAdaptor) | ||||
def __init__(self, remote_reference): | ||||
self.remote_reference = remote_reference | ||||
#--------------------------------------------------------------------------- | ||||
# Non interface methods | ||||
#--------------------------------------------------------------------------- | ||||
def unpackage(self, r): | ||||
return pickle.loads(r) | ||||
#--------------------------------------------------------------------------- | ||||
# ITaskController related methods | ||||
#--------------------------------------------------------------------------- | ||||
def run(self, task): | ||||
"""Run a task on the `TaskController`. | ||||
:Parameters: | ||||
task : a `Task` object | ||||
The Task object is created using the following signature: | ||||
Task(expression, pull=None, push={}, clear_before=False, | ||||
clear_after=False, retries=0, **options):) | ||||
The meaning of the arguments is as follows: | ||||
:Task Parameters: | ||||
expression : str | ||||
A str that is valid python code that is the task. | ||||
pull : str or list of str | ||||
The names of objects to be pulled as results. | ||||
push : dict | ||||
A dict of objects to be pushed into the engines namespace before | ||||
execution of the expression. | ||||
clear_before : boolean | ||||
Should the engine's namespace be cleared before the task is run. | ||||
Default=False. | ||||
clear_after : boolean | ||||
Should the engine's namespace be cleared after the task is run. | ||||
Default=False. | ||||
retries : int | ||||
The number of times to resumbit the task if it fails. Default=0. | ||||
options : dict | ||||
Any other keyword options for more elaborate uses of tasks | ||||
:Returns: The int taskid of the submitted task. Pass this to | ||||
`get_task_result` to get the `TaskResult` object. | ||||
""" | ||||
assert isinstance(task, taskmodule.Task), "task must be a Task object!" | ||||
ctask = taskmodule.canTask(task) # handles arbitrary function in .depend | ||||
# as well as arbitrary recovery_task chains | ||||
ptask = pickle.dumps(ctask, 2) | ||||
d = self.remote_reference.callRemote('run', ptask) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def get_task_result(self, taskid, block=False): | ||||
"""The task result by taskid. | ||||
:Parameters: | ||||
taskid : int | ||||
The taskid of the task to be retrieved. | ||||
block : boolean | ||||
Should I block until the task is done? | ||||
:Returns: A `TaskResult` object that encapsulates the task result. | ||||
""" | ||||
d = self.remote_reference.callRemote('get_task_result', taskid, block) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def abort(self, taskid): | ||||
"""Abort a task by taskid. | ||||
:Parameters: | ||||
taskid : int | ||||
The taskid of the task to be aborted. | ||||
block : boolean | ||||
Should I block until the task is aborted. | ||||
""" | ||||
d = self.remote_reference.callRemote('abort', taskid) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def barrier(self, taskids): | ||||
"""Block until all tasks are completed. | ||||
:Parameters: | ||||
taskids : list, tuple | ||||
A sequence of taskids to block on. | ||||
""" | ||||
d = self.remote_reference.callRemote('barrier', taskids) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def spin(self): | ||||
"""touch the scheduler, to resume scheduling without submitting | ||||
a task. | ||||
""" | ||||
d = self.remote_reference.callRemote('spin') | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def queue_status(self, verbose=False): | ||||
"""Return a dict with the status of the task queue.""" | ||||
d = self.remote_reference.callRemote('queue_status', verbose) | ||||
d.addCallback(self.unpackage) | ||||
return d | ||||
def adapt_to_blocking_client(self): | ||||
from IPython.kernel.taskclient import IBlockingTaskClient | ||||
return IBlockingTaskClient(self) | ||||