|
|
# encoding: utf-8
|
|
|
# -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
|
|
|
|
|
|
"""
|
|
|
A blocking version of the task client.
|
|
|
"""
|
|
|
|
|
|
__docformat__ = "restructuredtext en"
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Copyright (C) 2008 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
from zope.interface import Interface, implements
|
|
|
from twisted.python import components
|
|
|
|
|
|
try:
|
|
|
from foolscap.api import DeadReferenceError
|
|
|
except ImportError:
|
|
|
from foolscap import DeadReferenceError
|
|
|
|
|
|
from IPython.kernel.twistedutil import blockingCallFromThread
|
|
|
from IPython.kernel import task, error
|
|
|
from IPython.kernel.mapper import (
|
|
|
SynchronousTaskMapper,
|
|
|
ITaskMapperFactory,
|
|
|
IMapper
|
|
|
)
|
|
|
from IPython.kernel.parallelfunction import (
|
|
|
ParallelFunction,
|
|
|
ITaskParallelDecorator
|
|
|
)
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# The task client
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
class IBlockingTaskClient(Interface):
|
|
|
"""
|
|
|
A vague interface of the blocking task client
|
|
|
"""
|
|
|
pass
|
|
|
|
|
|
class BlockingTaskClient(object):
|
|
|
"""
|
|
|
A blocking task client that adapts a non-blocking one.
|
|
|
"""
|
|
|
|
|
|
implements(
|
|
|
IBlockingTaskClient,
|
|
|
ITaskMapperFactory,
|
|
|
IMapper,
|
|
|
ITaskParallelDecorator
|
|
|
)
|
|
|
|
|
|
def __init__(self, task_controller):
|
|
|
self.task_controller = task_controller
|
|
|
self.block = True
|
|
|
|
|
|
def _bcft(self, *args, **kwargs):
|
|
|
try:
|
|
|
result = blockingCallFromThread(*args, **kwargs)
|
|
|
except DeadReferenceError:
|
|
|
raise error.ConnectionError(
|
|
|
"""A connection error has occurred in trying to connect to the
|
|
|
controller. This is usually caused by the controller dying or
|
|
|
being restarted. To resolve this issue try recreating the
|
|
|
task client."""
|
|
|
)
|
|
|
else:
|
|
|
return result
|
|
|
|
|
|
def run(self, task, block=False):
|
|
|
"""Run a task on the `TaskController`.
|
|
|
|
|
|
See the documentation of the `MapTask` and `StringTask` classes for
|
|
|
details on how to build a task of different types.
|
|
|
|
|
|
:Parameters:
|
|
|
task : an `ITask` implementer
|
|
|
|
|
|
:Returns: The int taskid of the submitted task. Pass this to
|
|
|
`get_task_result` to get the `TaskResult` object.
|
|
|
"""
|
|
|
tid = self._bcft(self.task_controller.run, task)
|
|
|
if block:
|
|
|
return self.get_task_result(tid, block=True)
|
|
|
else:
|
|
|
return tid
|
|
|
|
|
|
def get_task_result(self, taskid, block=False):
|
|
|
"""
|
|
|
Get a task result by taskid.
|
|
|
|
|
|
:Parameters:
|
|
|
taskid : int
|
|
|
The taskid of the task to be retrieved.
|
|
|
block : boolean
|
|
|
Should I block until the task is done?
|
|
|
|
|
|
:Returns: A `TaskResult` object that encapsulates the task result.
|
|
|
"""
|
|
|
return self._bcft(self.task_controller.get_task_result,
|
|
|
taskid, block)
|
|
|
|
|
|
def abort(self, taskid):
|
|
|
"""
|
|
|
Abort a task by taskid.
|
|
|
|
|
|
:Parameters:
|
|
|
taskid : int
|
|
|
The taskid of the task to be aborted.
|
|
|
"""
|
|
|
return self._bcft(self.task_controller.abort, taskid)
|
|
|
|
|
|
def barrier(self, taskids):
|
|
|
"""Block until a set of tasks are completed.
|
|
|
|
|
|
:Parameters:
|
|
|
taskids : list, tuple
|
|
|
A sequence of taskids to block on.
|
|
|
"""
|
|
|
return self._bcft(self.task_controller.barrier, taskids)
|
|
|
|
|
|
def spin(self):
|
|
|
"""
|
|
|
Touch the scheduler, to resume scheduling without submitting a task.
|
|
|
|
|
|
This method only needs to be called in unusual situations where the
|
|
|
scheduler is idle for some reason.
|
|
|
"""
|
|
|
return self._bcft(self.task_controller.spin)
|
|
|
|
|
|
def queue_status(self, verbose=False):
|
|
|
"""
|
|
|
Get a dictionary with the current state of the task queue.
|
|
|
|
|
|
:Parameters:
|
|
|
verbose : boolean
|
|
|
If True, return a list of taskids. If False, simply give
|
|
|
the number of tasks with each status.
|
|
|
|
|
|
:Returns:
|
|
|
A dict with the queue status.
|
|
|
"""
|
|
|
return self._bcft(self.task_controller.queue_status, verbose)
|
|
|
|
|
|
def clear(self):
|
|
|
"""
|
|
|
Clear all previously run tasks from the task controller.
|
|
|
|
|
|
This is needed because the task controller keep all task results
|
|
|
in memory. This can be a problem is there are many completed
|
|
|
tasks. Users should call this periodically to clean out these
|
|
|
cached task results.
|
|
|
"""
|
|
|
return self._bcft(self.task_controller.clear)
|
|
|
|
|
|
def map(self, func, *sequences):
|
|
|
"""
|
|
|
Apply func to *sequences elementwise. Like Python's builtin map.
|
|
|
|
|
|
This version is load balanced.
|
|
|
"""
|
|
|
return self.mapper().map(func, *sequences)
|
|
|
|
|
|
def mapper(self, clear_before=False, clear_after=False, retries=0,
|
|
|
recovery_task=None, depend=None, block=True):
|
|
|
"""
|
|
|
Create an `IMapper` implementer with a given set of arguments.
|
|
|
|
|
|
The `IMapper` created using a task controller is load balanced.
|
|
|
|
|
|
See the documentation for `IPython.kernel.task.BaseTask` for
|
|
|
documentation on the arguments to this method.
|
|
|
"""
|
|
|
return SynchronousTaskMapper(self, clear_before=clear_before,
|
|
|
clear_after=clear_after, retries=retries,
|
|
|
recovery_task=recovery_task, depend=depend, block=block)
|
|
|
|
|
|
def parallel(self, clear_before=False, clear_after=False, retries=0,
|
|
|
recovery_task=None, depend=None, block=True):
|
|
|
mapper = self.mapper(clear_before, clear_after, retries,
|
|
|
recovery_task, depend, block)
|
|
|
pf = ParallelFunction(mapper)
|
|
|
return pf
|
|
|
|
|
|
components.registerAdapter(BlockingTaskClient,
|
|
|
task.ITaskController, IBlockingTaskClient)
|
|
|
|
|
|
|
|
|
|