From d68cab7f27b55eb558b04362dbfee44b3e529b4b 2009-04-21 03:06:51 From: Brian Granger Date: 2009-04-21 03:06:51 Subject: [PATCH] Initial refactor of task dependency system. We are thinking about refactoring the task dependency system. Currently is is based on engine properties and a function sent with each task. In this commit, I have added a TaskRejectError that tasks can raise to indicate that the engine doesn't have the required dependencies. For now I have not removed any of the old stuff, but I have added warnings that we might remove the older *_prop* methods in IMultiEngine. See this ticket for more info: https://bugs.launchpad.net/bugs/361419 --- diff --git a/IPython/kernel/__init__.py b/IPython/kernel/__init__.py index 50747db..cd8d856 100755 --- a/IPython/kernel/__init__.py +++ b/IPython/kernel/__init__.py @@ -20,4 +20,6 @@ __docformat__ = "restructuredtext en" # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. -#----------------------------------------------------------------------------- \ No newline at end of file +#----------------------------------------------------------------------------- + +from IPython.kernel.error import TaskRejectError \ No newline at end of file diff --git a/IPython/kernel/error.py b/IPython/kernel/error.py index 4e86f5b..d91f9e0 100644 --- a/IPython/kernel/error.py +++ b/IPython/kernel/error.py @@ -107,6 +107,20 @@ class SecurityError(KernelError): class FileTimeoutError(KernelError): pass +class TaskRejectError(KernelError): + """Exception to raise when a task should be rejected by an engine. + + This exception can be used to allow a task running on an engine to test + if the engine (or the user's namespace on the engine) has the needed + task dependencies. If not, the task should raise this exception. For + the task to be retried on another engine, the task should be created + with the `retries` argument > 1. + + The advantage of this approach over our older properties system is that + tasks have full access to the user's namespace on the engines and the + properties don't have to be managed or tested by the controller. + """ + class CompositeError(KernelError): def __init__(self, message, elist): Exception.__init__(self, *(message, elist)) diff --git a/IPython/kernel/multiengineclient.py b/IPython/kernel/multiengineclient.py index 2f7ad16..4281df8 100644 --- a/IPython/kernel/multiengineclient.py +++ b/IPython/kernel/multiengineclient.py @@ -20,6 +20,7 @@ import sys import cPickle as pickle from types import FunctionType import linecache +import warnings from twisted.internet import reactor from twisted.python import components, log @@ -389,6 +390,14 @@ def strip_whitespace(source): #------------------------------------------------------------------------------- +_prop_warn = """\ + +We are currently refactoring the task dependency system. This might +involve the removal of this method and other methods related to engine +properties. Please see the docstrings for IPython.kernel.TaskRejectError +for more information.""" + + class IFullBlockingMultiEngineClient(Interface): pass @@ -730,22 +739,27 @@ class FullBlockingMultiEngineClient(InteractiveMultiEngineClient): return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block) def set_properties(self, properties, targets=None, block=None): + warnings.warn(_prop_warn) targets, block = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block) def get_properties(self, keys=None, targets=None, block=None): + warnings.warn(_prop_warn) targets, block = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block) def has_properties(self, keys, targets=None, block=None): + warnings.warn(_prop_warn) targets, block = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block) def del_properties(self, keys, targets=None, block=None): + warnings.warn(_prop_warn) targets, block = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block) def clear_properties(self, targets=None, block=None): + warnings.warn(_prop_warn) targets, block = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block) diff --git a/IPython/kernel/taskclient.py b/IPython/kernel/taskclient.py index dc95418..69225fb 100644 --- a/IPython/kernel/taskclient.py +++ b/IPython/kernel/taskclient.py @@ -49,7 +49,7 @@ class BlockingTaskClient(object): """ implements( - IBlockingTaskClient, + IBlockingTaskClient, ITaskMapperFactory, IMapper, ITaskParallelDecorator @@ -62,7 +62,7 @@ class BlockingTaskClient(object): def run(self, task, block=False): """Run a task on the `TaskController`. - See the documentation of the `MapTask` and `StringTask` classes for + See the documentation of the `MapTask` and `StringTask` classes for details on how to build a task of different types. :Parameters: diff --git a/docs/examples/kernel/taskreject.py b/docs/examples/kernel/taskreject.py new file mode 100644 index 0000000..deabb02 --- /dev/null +++ b/docs/examples/kernel/taskreject.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# encoding: utf-8 + +""" +A new example showing how to use `TaskRejectError` to handle dependencies +in the IPython task system. + +To run this example, do:: + + $ ipcluster local -n 4 + +Then, in another terminal start up IPython and do:: + + In [0]: %run taskreject.py + + In [1]: mec.execute('run=True', targets=[0,1]) + +After the first command, the scheduler will keep rescheduling the tasks, as +they will fail with `TaskRejectError`. But after the second command, there +are two engines that the tasks can run on. The tasks are quickly funneled +to these engines. + +If you want to see how the controller is scheduling and retrying the tasks +do a `tail -f` on the controller's log file in ~/.ipython/log. +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +from IPython.kernel import client +from IPython.kernel import TaskRejectError + +mec = client.MultiEngineClient() +tc = client.TaskClient() + +mec.execute('from IPython.kernel import TaskRejectError') +mec.execute('run = False') + +def map_task(): + if not run: + raise TaskRejectError('task dependency not met') + return 3.0e8 + +task_ids = [] + +for i in range(10): + task = client.MapTask(map_task, retries=20) + task_ids.append(tc.run(task, block=False)) + diff --git a/docs/source/parallel/visionhpc.txt b/docs/source/parallel/visionhpc.txt index f0c4e04..7a6b15f 100644 --- a/docs/source/parallel/visionhpc.txt +++ b/docs/source/parallel/visionhpc.txt @@ -2,6 +2,10 @@ IPython/Vision Beam Pattern Demo ================================== +.. note:: + + This page has not been updated to reflect the recent work on ipcluster. + This work makes it much easier to use IPython on a cluster. Installing and testing IPython at OSC systems =============================================