##// END OF EJS Templates
Merge branch '0.10.1-sge' of http://github.com/satra/ipython into satra-0.10.1-sge...
Merge branch '0.10.1-sge' of http://github.com/satra/ipython into satra-0.10.1-sge This branch adds Sun Grid Engine support to the ipcluster startup system. Credits: Justin Riley and Satra Ghosh @ MIT, Matthew Brucher. Closes gh-152 (pull request)

File last commit:

r2610:927b9732
r3066:f780f2e9 merge
Show More
taskfc.py
345 lines | 11.2 KiB | text/x-python | PythonLexer
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 # encoding: utf-8
# -*- test-case-name: IPython.kernel.tests.test_taskxmlrpc -*-
"""A Foolscap interface to a TaskController.
This class lets Foolscap clients talk to a TaskController.
"""
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import cPickle as pickle
import xmlrpclib, copy
from zope.interface import Interface, implements
from twisted.internet import defer
from twisted.python import components, failure
Fernando Perez
Suppress foolscap deprecation warnings, bug reported by Satra....
r2610 try:
# This is preferred in foolscap v > 0.4.3
from foolscap.api import Referenceable
except ImportError:
# Fallback for older versions
from foolscap import Referenceable
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
from IPython.kernel.twistedutil import blockingCallFromThread
from IPython.kernel import error, task as taskmodule, taskclient
from IPython.kernel.pickleutil import can, uncan
from IPython.kernel.clientinterfaces import (
IFCClientInterfaceProvider,
IBlockingClientAdaptor
)
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 from IPython.kernel.mapper import (
TaskMapper,
ITaskMapperFactory,
IMapper
)
from IPython.kernel.parallelfunction import (
ParallelFunction,
ITaskParallelDecorator
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
#-------------------------------------------------------------------------------
# The Controller side of things
#-------------------------------------------------------------------------------
class IFCTaskController(Interface):
"""Foolscap interface to task controller.
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 See the documentation of `ITaskController` for more information.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def remote_run(binTask):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def remote_abort(taskid):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def remote_get_task_result(taskid, block=False):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def remote_barrier(taskids):
""""""
def remote_spin():
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def remote_queue_status(verbose):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """"""
Fernando Perez
added optional taskids argument to TaskClient.clear()...
r2593 def remote_clear(taskids=None):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """"""
class FCTaskControllerFromTaskController(Referenceable):
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 Adapt a `TaskController` to an `IFCTaskController`
This class is used to expose a `TaskController` over the wire using
the Foolscap network protocol.
"""
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 implements(IFCTaskController, IFCClientInterfaceProvider)
def __init__(self, taskController):
self.taskController = taskController
#---------------------------------------------------------------------------
# Non interface methods
#---------------------------------------------------------------------------
def packageFailure(self, f):
f.cleanFailure()
return self.packageSuccess(f)
def packageSuccess(self, obj):
serial = pickle.dumps(obj, 2)
return serial
#---------------------------------------------------------------------------
# ITaskController related methods
#---------------------------------------------------------------------------
def remote_run(self, ptask):
try:
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 task = pickle.loads(ptask)
task.uncan_task()
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 except:
d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
else:
d = self.taskController.run(task)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_abort(self, taskid):
d = self.taskController.abort(taskid)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_get_task_result(self, taskid, block=False):
d = self.taskController.get_task_result(taskid, block)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_barrier(self, taskids):
d = self.taskController.barrier(taskids)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_spin(self):
d = self.taskController.spin()
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_queue_status(self, verbose):
d = self.taskController.queue_status(verbose)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
Fernando Perez
added optional taskids argument to TaskClient.clear()...
r2593 def remote_clear(self,taskids=None):
d = self.taskController.clear(taskids)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 def remote_get_client_name(self):
return 'IPython.kernel.taskfc.FCTaskClient'
components.registerAdapter(FCTaskControllerFromTaskController,
taskmodule.ITaskController, IFCTaskController)
#-------------------------------------------------------------------------------
# The Client side of things
#-------------------------------------------------------------------------------
class FCTaskClient(object):
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 Client class for Foolscap exposed `TaskController`.
This class is an adapter that makes a `RemoteReference` to a
`TaskController` look like an actual `ITaskController` on the client side.
This class also implements `IBlockingClientAdaptor` so that clients can
automatically get a blocking version of this class.
"""
implements(
taskmodule.ITaskController,
IBlockingClientAdaptor,
ITaskMapperFactory,
IMapper,
ITaskParallelDecorator
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
def __init__(self, remote_reference):
self.remote_reference = remote_reference
#---------------------------------------------------------------------------
# Non interface methods
#---------------------------------------------------------------------------
def unpackage(self, r):
return pickle.loads(r)
#---------------------------------------------------------------------------
# ITaskController related methods
#---------------------------------------------------------------------------
def run(self, task):
"""Run a task on the `TaskController`.
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 See the documentation of the `MapTask` and `StringTask` classes for
details on how to build a task of different types.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 :Parameters:
task : an `ITask` implementer
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Returns: The int taskid of the submitted task. Pass this to
`get_task_result` to get the `TaskResult` object.
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!"
task.can_task()
ptask = pickle.dumps(task, 2)
task.uncan_task()
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 d = self.remote_reference.callRemote('run', ptask)
d.addCallback(self.unpackage)
return d
def get_task_result(self, taskid, block=False):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Get a task result by taskid.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Parameters:
taskid : int
The taskid of the task to be retrieved.
block : boolean
Should I block until the task is done?
:Returns: A `TaskResult` object that encapsulates the task result.
"""
d = self.remote_reference.callRemote('get_task_result', taskid, block)
d.addCallback(self.unpackage)
return d
def abort(self, taskid):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Abort a task by taskid.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Parameters:
taskid : int
The taskid of the task to be aborted.
"""
d = self.remote_reference.callRemote('abort', taskid)
d.addCallback(self.unpackage)
return d
def barrier(self, taskids):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """Block until a set of tasks are completed.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Parameters:
taskids : list, tuple
A sequence of taskids to block on.
"""
d = self.remote_reference.callRemote('barrier', taskids)
d.addCallback(self.unpackage)
return d
def spin(self):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Touch the scheduler, to resume scheduling without submitting a task.
This method only needs to be called in unusual situations where the
scheduler is idle for some reason.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
d = self.remote_reference.callRemote('spin')
d.addCallback(self.unpackage)
return d
def queue_status(self, verbose=False):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Get a dictionary with the current state of the task queue.
:Parameters:
verbose : boolean
If True, return a list of taskids. If False, simply give
the number of tasks with each status.
:Returns:
A dict with the queue status.
"""
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 d = self.remote_reference.callRemote('queue_status', verbose)
d.addCallback(self.unpackage)
return d
Fernando Perez
added optional taskids argument to TaskClient.clear()...
r2593 def clear(self,taskids=None):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Fernando Perez
added optional taskids argument to TaskClient.clear()...
r2593 Clear previously run tasks from the task controller.
:Parameters:
taskids : list, tuple, None
A sequence of taskids whose results we should drop.
if None: clear all results
:Returns:
An int, the number of tasks cleared
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395
This is needed because the task controller keep all task results
in memory. This can be a problem is there are many completed
tasks. Users should call this periodically to clean out these
cached task results.
"""
Fernando Perez
added optional taskids argument to TaskClient.clear()...
r2593 d = self.remote_reference.callRemote('clear', taskids)
d.addCallback(self.unpackage)
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 return d
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 def adapt_to_blocking_client(self):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Wrap self in a blocking version that implements `IBlockingTaskClient.
"""
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 from IPython.kernel.taskclient import IBlockingTaskClient
return IBlockingTaskClient(self)
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395
def map(self, func, *sequences):
"""
Apply func to *sequences elementwise. Like Python's builtin map.
This version is load balanced.
"""
return self.mapper().map(func, *sequences)
def mapper(self, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
"""
Create an `IMapper` implementer with a given set of arguments.
The `IMapper` created using a task controller is load balanced.
See the documentation for `IPython.kernel.task.BaseTask` for
documentation on the arguments to this method.
"""
return TaskMapper(self, clear_before=clear_before,
clear_after=clear_after, retries=retries,
recovery_task=recovery_task, depend=depend, block=block)
def parallel(self, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
mapper = self.mapper(clear_before, clear_after, retries,
recovery_task, depend, block)
pf = ParallelFunction(mapper)
return pf
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234