##// END OF EJS Templates
Fix zmq request code for Python < 2.6.5
Fix zmq request code for Python < 2.6.5

File last commit:

r2526:ec6b47e5
r3404:a79b110e
Show More
taskclient.py
198 lines | 6.4 KiB | text/x-python | PythonLexer
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 # encoding: utf-8
# -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
A blocking version of the task client.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
from zope.interface import Interface, implements
Brian Granger
Work to address the review comments on Fernando's branch....
r2498 from twisted.python import components
Brian Granger
Fixed foolscap imports....
r2526
try:
from foolscap.api import DeadReferenceError
except ImportError:
from foolscap import DeadReferenceError
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
from IPython.kernel.twistedutil import blockingCallFromThread
Brian Granger
Fixed bugs in IPython.kernel....
r2517 from IPython.kernel import task, error
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 from IPython.kernel.mapper import (
SynchronousTaskMapper,
ITaskMapperFactory,
IMapper
)
from IPython.kernel.parallelfunction import (
ParallelFunction,
ITaskParallelDecorator
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
#-------------------------------------------------------------------------------
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 # The task client
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 #-------------------------------------------------------------------------------
class IBlockingTaskClient(Interface):
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 A vague interface of the blocking task client
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
pass
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 class BlockingTaskClient(object):
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 A blocking task client that adapts a non-blocking one.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 implements(
Brian Granger
Initial refactor of task dependency system....
r1952 IBlockingTaskClient,
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 ITaskMapperFactory,
IMapper,
ITaskParallelDecorator
)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
def __init__(self, task_controller):
self.task_controller = task_controller
self.block = True
Brian Granger
Fixed bugs in IPython.kernel....
r2517
def _bcft(self, *args, **kwargs):
try:
result = blockingCallFromThread(*args, **kwargs)
except DeadReferenceError:
raise error.ConnectionError(
"""A connection error has occurred in trying to connect to the
controller. This is usually caused by the controller dying or
being restarted. To resolve this issue try recreating the
task client."""
)
else:
return result
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def run(self, task, block=False):
"""Run a task on the `TaskController`.
Brian Granger
Initial refactor of task dependency system....
r1952 See the documentation of the `MapTask` and `StringTask` classes for
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 details on how to build a task of different types.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Parameters:
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 task : an `ITask` implementer
:Returns: The int taskid of the submitted task. Pass this to
`get_task_result` to get the `TaskResult` object.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
Brian Granger
Fixed bugs in IPython.kernel....
r2517 tid = self._bcft(self.task_controller.run, task)
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 if block:
return self.get_task_result(tid, block=True)
else:
return tid
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
def get_task_result(self, taskid, block=False):
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 Get a task result by taskid.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Parameters:
taskid : int
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 The taskid of the task to be retrieved.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 block : boolean
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 Should I block until the task is done?
:Returns: A `TaskResult` object that encapsulates the task result.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
Brian Granger
Fixed bugs in IPython.kernel....
r2517 return self._bcft(self.task_controller.get_task_result,
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 taskid, block)
def abort(self, taskid):
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 Abort a task by taskid.
:Parameters:
taskid : int
The taskid of the task to be aborted.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
Brian Granger
Fixed bugs in IPython.kernel....
r2517 return self._bcft(self.task_controller.abort, taskid)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
def barrier(self, taskids):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """Block until a set of tasks are completed.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
:Parameters:
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 taskids : list, tuple
A sequence of taskids to block on.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
Brian Granger
Fixed bugs in IPython.kernel....
r2517 return self._bcft(self.task_controller.barrier, taskids)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
def spin(self):
"""
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 Touch the scheduler, to resume scheduling without submitting a task.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
This method only needs to be called in unusual situations where the
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 scheduler is idle for some reason.
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234 """
Brian Granger
Fixed bugs in IPython.kernel....
r2517 return self._bcft(self.task_controller.spin)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
def queue_status(self, verbose=False):
"""
Get a dictionary with the current state of the task queue.
:Parameters:
verbose : boolean
If True, return a list of taskids. If False, simply give
the number of tasks with each status.
:Returns:
A dict with the queue status.
"""
Brian Granger
Fixed bugs in IPython.kernel....
r2517 return self._bcft(self.task_controller.queue_status, verbose)
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395
def clear(self):
"""
Clear all previously run tasks from the task controller.
This is needed because the task controller keep all task results
in memory. This can be a problem is there are many completed
tasks. Users should call this periodically to clean out these
cached task results.
"""
Brian Granger
Fixed bugs in IPython.kernel....
r2517 return self._bcft(self.task_controller.clear)
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395
def map(self, func, *sequences):
"""
Apply func to *sequences elementwise. Like Python's builtin map.
This version is load balanced.
"""
return self.mapper().map(func, *sequences)
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def mapper(self, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
"""
Create an `IMapper` implementer with a given set of arguments.
The `IMapper` created using a task controller is load balanced.
See the documentation for `IPython.kernel.task.BaseTask` for
documentation on the arguments to this method.
"""
return SynchronousTaskMapper(self, clear_before=clear_before,
clear_after=clear_after, retries=retries,
recovery_task=recovery_task, depend=depend, block=block)
def parallel(self, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
mapper = self.mapper(clear_before, clear_after, retries,
recovery_task, depend, block)
pf = ParallelFunction(mapper)
return pf
Brian E Granger
This is a manual merge of certain things in the ipython1-dev branch, revision 46, into the main ...
r1234
components.registerAdapter(BlockingTaskClient,
task.ITaskController, IBlockingTaskClient)