##// END OF EJS Templates
Merging in vvatsa's ssh mode for ipcluster with some changes....
Merging in vvatsa's ssh mode for ipcluster with some changes. We now have a fully working ssh mode for the new ipcluster. It should work well on Unix, Linux and OS X.

File last commit:

r1395:1feaf0a3
r1832:34a51241
Show More
taskfc.py
329 lines | 10.6 KiB | text/x-python | PythonLexer
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 # encoding: utf-8
# -*- test-case-name: IPython.kernel.tests.test_taskxmlrpc -*-
"""A Foolscap interface to a TaskController.
This class lets Foolscap clients talk to a TaskController.
"""
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import cPickle as pickle
import xmlrpclib, copy
from zope.interface import Interface, implements
from twisted.internet import defer
from twisted.python import components, failure
from foolscap import Referenceable
from IPython.kernel.twistedutil import blockingCallFromThread
from IPython.kernel import error, task as taskmodule, taskclient
from IPython.kernel.pickleutil import can, uncan
from IPython.kernel.clientinterfaces import (
IFCClientInterfaceProvider,
IBlockingClientAdaptor
)
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 from IPython.kernel.mapper import (
TaskMapper,
ITaskMapperFactory,
IMapper
)
from IPython.kernel.parallelfunction import (
ParallelFunction,
ITaskParallelDecorator
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
#-------------------------------------------------------------------------------
# The Controller side of things
#-------------------------------------------------------------------------------
class IFCTaskController(Interface):
"""Foolscap interface to task controller.
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 See the documentation of `ITaskController` for more information.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def remote_run(binTask):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def remote_abort(taskid):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def remote_get_task_result(taskid, block=False):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def remote_barrier(taskids):
""""""
def remote_spin():
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def remote_queue_status(verbose):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def remote_clear():
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """"""
class FCTaskControllerFromTaskController(Referenceable):
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 Adapt a `TaskController` to an `IFCTaskController`
This class is used to expose a `TaskController` over the wire using
the Foolscap network protocol.
"""
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 implements(IFCTaskController, IFCClientInterfaceProvider)
def __init__(self, taskController):
self.taskController = taskController
#---------------------------------------------------------------------------
# Non interface methods
#---------------------------------------------------------------------------
def packageFailure(self, f):
f.cleanFailure()
return self.packageSuccess(f)
def packageSuccess(self, obj):
serial = pickle.dumps(obj, 2)
return serial
#---------------------------------------------------------------------------
# ITaskController related methods
#---------------------------------------------------------------------------
def remote_run(self, ptask):
try:
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 task = pickle.loads(ptask)
task.uncan_task()
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 except:
d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
else:
d = self.taskController.run(task)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_abort(self, taskid):
d = self.taskController.abort(taskid)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_get_task_result(self, taskid, block=False):
d = self.taskController.get_task_result(taskid, block)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_barrier(self, taskids):
d = self.taskController.barrier(taskids)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_spin(self):
d = self.taskController.spin()
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_queue_status(self, verbose):
d = self.taskController.queue_status(verbose)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def remote_clear(self):
return self.taskController.clear()
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 def remote_get_client_name(self):
return 'IPython.kernel.taskfc.FCTaskClient'
components.registerAdapter(FCTaskControllerFromTaskController,
taskmodule.ITaskController, IFCTaskController)
#-------------------------------------------------------------------------------
# The Client side of things
#-------------------------------------------------------------------------------
class FCTaskClient(object):
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 Client class for Foolscap exposed `TaskController`.
This class is an adapter that makes a `RemoteReference` to a
`TaskController` look like an actual `ITaskController` on the client side.
This class also implements `IBlockingClientAdaptor` so that clients can
automatically get a blocking version of this class.
"""
implements(
taskmodule.ITaskController,
IBlockingClientAdaptor,
ITaskMapperFactory,
IMapper,
ITaskParallelDecorator
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
def __init__(self, remote_reference):
self.remote_reference = remote_reference
#---------------------------------------------------------------------------
# Non interface methods
#---------------------------------------------------------------------------
def unpackage(self, r):
return pickle.loads(r)
#---------------------------------------------------------------------------
# ITaskController related methods
#---------------------------------------------------------------------------
def run(self, task):
"""Run a task on the `TaskController`.
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 See the documentation of the `MapTask` and `StringTask` classes for
details on how to build a task of different types.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 :Parameters:
task : an `ITask` implementer
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Returns: The int taskid of the submitted task. Pass this to
`get_task_result` to get the `TaskResult` object.
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!"
task.can_task()
ptask = pickle.dumps(task, 2)
task.uncan_task()
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 d = self.remote_reference.callRemote('run', ptask)
d.addCallback(self.unpackage)
return d
def get_task_result(self, taskid, block=False):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Get a task result by taskid.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Parameters:
taskid : int
The taskid of the task to be retrieved.
block : boolean
Should I block until the task is done?
:Returns: A `TaskResult` object that encapsulates the task result.
"""
d = self.remote_reference.callRemote('get_task_result', taskid, block)
d.addCallback(self.unpackage)
return d
def abort(self, taskid):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Abort a task by taskid.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Parameters:
taskid : int
The taskid of the task to be aborted.
"""
d = self.remote_reference.callRemote('abort', taskid)
d.addCallback(self.unpackage)
return d
def barrier(self, taskids):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """Block until a set of tasks are completed.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Parameters:
taskids : list, tuple
A sequence of taskids to block on.
"""
d = self.remote_reference.callRemote('barrier', taskids)
d.addCallback(self.unpackage)
return d
def spin(self):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Touch the scheduler, to resume scheduling without submitting a task.
This method only needs to be called in unusual situations where the
scheduler is idle for some reason.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
d = self.remote_reference.callRemote('spin')
d.addCallback(self.unpackage)
return d
def queue_status(self, verbose=False):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Get a dictionary with the current state of the task queue.
:Parameters:
verbose : boolean
If True, return a list of taskids. If False, simply give
the number of tasks with each status.
:Returns:
A dict with the queue status.
"""
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 d = self.remote_reference.callRemote('queue_status', verbose)
d.addCallback(self.unpackage)
return d
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def clear(self):
"""
Clear all previously run tasks from the task controller.
This is needed because the task controller keep all task results
in memory. This can be a problem is there are many completed
tasks. Users should call this periodically to clean out these
cached task results.
"""
d = self.remote_reference.callRemote('clear')
return d
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 def adapt_to_blocking_client(self):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Wrap self in a blocking version that implements `IBlockingTaskClient.
"""
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 from IPython.kernel.taskclient import IBlockingTaskClient
return IBlockingTaskClient(self)
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395
def map(self, func, *sequences):
"""
Apply func to *sequences elementwise. Like Python's builtin map.
This version is load balanced.
"""
return self.mapper().map(func, *sequences)
def mapper(self, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
"""
Create an `IMapper` implementer with a given set of arguments.
The `IMapper` created using a task controller is load balanced.
See the documentation for `IPython.kernel.task.BaseTask` for
documentation on the arguments to this method.
"""
return TaskMapper(self, clear_before=clear_before,
clear_after=clear_after, retries=retries,
recovery_task=recovery_task, depend=depend, block=block)
def parallel(self, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
mapper = self.mapper(clear_before, clear_after, retries,
recovery_task, depend, block)
pf = ParallelFunction(mapper)
return pf
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234