##// END OF EJS Templates
Merging upstream.
Merging upstream.

File last commit:

r1952:d68cab7f
r2340:ef5de34e merge
Show More
taskclient.py
180 lines | 5.8 KiB | text/x-python | PythonLexer
# encoding: utf-8
# -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
"""
A blocking version of the task client.
"""
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
from zope.interface import Interface, implements
from twisted.python import components, log
from IPython.kernel.twistedutil import blockingCallFromThread
from IPython.kernel import task, error
from IPython.kernel.mapper import (
SynchronousTaskMapper,
ITaskMapperFactory,
IMapper
)
from IPython.kernel.parallelfunction import (
ParallelFunction,
ITaskParallelDecorator
)
#-------------------------------------------------------------------------------
# The task client
#-------------------------------------------------------------------------------
class IBlockingTaskClient(Interface):
"""
A vague interface of the blocking task client
"""
pass
class BlockingTaskClient(object):
"""
A blocking task client that adapts a non-blocking one.
"""
implements(
IBlockingTaskClient,
ITaskMapperFactory,
IMapper,
ITaskParallelDecorator
)
def __init__(self, task_controller):
self.task_controller = task_controller
self.block = True
def run(self, task, block=False):
"""Run a task on the `TaskController`.
See the documentation of the `MapTask` and `StringTask` classes for
details on how to build a task of different types.
:Parameters:
task : an `ITask` implementer
:Returns: The int taskid of the submitted task. Pass this to
`get_task_result` to get the `TaskResult` object.
"""
tid = blockingCallFromThread(self.task_controller.run, task)
if block:
return self.get_task_result(tid, block=True)
else:
return tid
def get_task_result(self, taskid, block=False):
"""
Get a task result by taskid.
:Parameters:
taskid : int
The taskid of the task to be retrieved.
block : boolean
Should I block until the task is done?
:Returns: A `TaskResult` object that encapsulates the task result.
"""
return blockingCallFromThread(self.task_controller.get_task_result,
taskid, block)
def abort(self, taskid):
"""
Abort a task by taskid.
:Parameters:
taskid : int
The taskid of the task to be aborted.
"""
return blockingCallFromThread(self.task_controller.abort, taskid)
def barrier(self, taskids):
"""Block until a set of tasks are completed.
:Parameters:
taskids : list, tuple
A sequence of taskids to block on.
"""
return blockingCallFromThread(self.task_controller.barrier, taskids)
def spin(self):
"""
Touch the scheduler, to resume scheduling without submitting a task.
This method only needs to be called in unusual situations where the
scheduler is idle for some reason.
"""
return blockingCallFromThread(self.task_controller.spin)
def queue_status(self, verbose=False):
"""
Get a dictionary with the current state of the task queue.
:Parameters:
verbose : boolean
If True, return a list of taskids. If False, simply give
the number of tasks with each status.
:Returns:
A dict with the queue status.
"""
return blockingCallFromThread(self.task_controller.queue_status, verbose)
def clear(self):
"""
Clear all previously run tasks from the task controller.
This is needed because the task controller keep all task results
in memory. This can be a problem is there are many completed
tasks. Users should call this periodically to clean out these
cached task results.
"""
return blockingCallFromThread(self.task_controller.clear)
def map(self, func, *sequences):
"""
Apply func to *sequences elementwise. Like Python's builtin map.
This version is load balanced.
"""
return self.mapper().map(func, *sequences)
def mapper(self, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
"""
Create an `IMapper` implementer with a given set of arguments.
The `IMapper` created using a task controller is load balanced.
See the documentation for `IPython.kernel.task.BaseTask` for
documentation on the arguments to this method.
"""
return SynchronousTaskMapper(self, clear_before=clear_before,
clear_after=clear_after, retries=retries,
recovery_task=recovery_task, depend=depend, block=block)
def parallel(self, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
mapper = self.mapper(clear_before, clear_after, retries,
recovery_task, depend, block)
pf = ParallelFunction(mapper)
return pf
components.registerAdapter(BlockingTaskClient,
task.ITaskController, IBlockingTaskClient)