Show More
@@ -0,0 +1,53 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | # encoding: utf-8 | |
|
3 | ||
|
4 | """ | |
|
5 | A new example showing how to use `TaskRejectError` to handle dependencies | |
|
6 | in the IPython task system. | |
|
7 | ||
|
8 | To run this example, do:: | |
|
9 | ||
|
10 | $ ipcluster local -n 4 | |
|
11 | ||
|
12 | Then, in another terminal start up IPython and do:: | |
|
13 | ||
|
14 | In [0]: %run taskreject.py | |
|
15 | ||
|
16 | In [1]: mec.execute('run=True', targets=[0,1]) | |
|
17 | ||
|
18 | After the first command, the scheduler will keep rescheduling the tasks, as | |
|
19 | they will fail with `TaskRejectError`. But after the second command, there | |
|
20 | are two engines that the tasks can run on. The tasks are quickly funneled | |
|
21 | to these engines. | |
|
22 | ||
|
23 | If you want to see how the controller is scheduling and retrying the tasks | |
|
24 | do a `tail -f` on the controller's log file in ~/.ipython/log. | |
|
25 | """ | |
|
26 | ||
|
27 | #----------------------------------------------------------------------------- | |
|
28 | # Copyright (C) 2008-2009 The IPython Development Team | |
|
29 | # | |
|
30 | # Distributed under the terms of the BSD License. The full license is in | |
|
31 | # the file COPYING, distributed as part of this software. | |
|
32 | #----------------------------------------------------------------------------- | |
|
33 | ||
|
34 | from IPython.kernel import client | |
|
35 | from IPython.kernel import TaskRejectError | |
|
36 | ||
|
37 | mec = client.MultiEngineClient() | |
|
38 | tc = client.TaskClient() | |
|
39 | ||
|
40 | mec.execute('from IPython.kernel import TaskRejectError') | |
|
41 | mec.execute('run = False') | |
|
42 | ||
|
43 | def map_task(): | |
|
44 | if not run: | |
|
45 | raise TaskRejectError('task dependency not met') | |
|
46 | return 3.0e8 | |
|
47 | ||
|
48 | task_ids = [] | |
|
49 | ||
|
50 | for i in range(10): | |
|
51 | task = client.MapTask(map_task, retries=20) | |
|
52 | task_ids.append(tc.run(task, block=False)) | |
|
53 |
@@ -20,4 +20,6 b' __docformat__ = "restructuredtext en"' | |||
|
20 | 20 | # |
|
21 | 21 | # Distributed under the terms of the BSD License. The full license is in |
|
22 | 22 | # the file COPYING, distributed as part of this software. |
|
23 | #----------------------------------------------------------------------------- No newline at end of file | |
|
23 | #----------------------------------------------------------------------------- | |
|
24 | ||
|
25 | from IPython.kernel.error import TaskRejectError No newline at end of file |
@@ -107,6 +107,20 b' class SecurityError(KernelError):' | |||
|
107 | 107 | class FileTimeoutError(KernelError): |
|
108 | 108 | pass |
|
109 | 109 | |
|
110 | class TaskRejectError(KernelError): | |
|
111 | """Exception to raise when a task should be rejected by an engine. | |
|
112 | ||
|
113 | This exception can be used to allow a task running on an engine to test | |
|
114 | if the engine (or the user's namespace on the engine) has the needed | |
|
115 | task dependencies. If not, the task should raise this exception. For | |
|
116 | the task to be retried on another engine, the task should be created | |
|
117 | with the `retries` argument > 1. | |
|
118 | ||
|
119 | The advantage of this approach over our older properties system is that | |
|
120 | tasks have full access to the user's namespace on the engines and the | |
|
121 | properties don't have to be managed or tested by the controller. | |
|
122 | """ | |
|
123 | ||
|
110 | 124 | class CompositeError(KernelError): |
|
111 | 125 | def __init__(self, message, elist): |
|
112 | 126 | Exception.__init__(self, *(message, elist)) |
@@ -20,6 +20,7 b' import sys' | |||
|
20 | 20 | import cPickle as pickle |
|
21 | 21 | from types import FunctionType |
|
22 | 22 | import linecache |
|
23 | import warnings | |
|
23 | 24 | |
|
24 | 25 | from twisted.internet import reactor |
|
25 | 26 | from twisted.python import components, log |
@@ -389,6 +390,14 b' def strip_whitespace(source):' | |||
|
389 | 390 | #------------------------------------------------------------------------------- |
|
390 | 391 | |
|
391 | 392 | |
|
393 | _prop_warn = """\ | |
|
394 | ||
|
395 | We are currently refactoring the task dependency system. This might | |
|
396 | involve the removal of this method and other methods related to engine | |
|
397 | properties. Please see the docstrings for IPython.kernel.TaskRejectError | |
|
398 | for more information.""" | |
|
399 | ||
|
400 | ||
|
392 | 401 | class IFullBlockingMultiEngineClient(Interface): |
|
393 | 402 | pass |
|
394 | 403 | |
@@ -730,22 +739,27 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):' | |||
|
730 | 739 | return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block) |
|
731 | 740 | |
|
732 | 741 | def set_properties(self, properties, targets=None, block=None): |
|
742 | warnings.warn(_prop_warn) | |
|
733 | 743 | targets, block = self._findTargetsAndBlock(targets, block) |
|
734 | 744 | return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block) |
|
735 | 745 | |
|
736 | 746 | def get_properties(self, keys=None, targets=None, block=None): |
|
747 | warnings.warn(_prop_warn) | |
|
737 | 748 | targets, block = self._findTargetsAndBlock(targets, block) |
|
738 | 749 | return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block) |
|
739 | 750 | |
|
740 | 751 | def has_properties(self, keys, targets=None, block=None): |
|
752 | warnings.warn(_prop_warn) | |
|
741 | 753 | targets, block = self._findTargetsAndBlock(targets, block) |
|
742 | 754 | return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block) |
|
743 | 755 | |
|
744 | 756 | def del_properties(self, keys, targets=None, block=None): |
|
757 | warnings.warn(_prop_warn) | |
|
745 | 758 | targets, block = self._findTargetsAndBlock(targets, block) |
|
746 | 759 | return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block) |
|
747 | 760 | |
|
748 | 761 | def clear_properties(self, targets=None, block=None): |
|
762 | warnings.warn(_prop_warn) | |
|
749 | 763 | targets, block = self._findTargetsAndBlock(targets, block) |
|
750 | 764 | return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block) |
|
751 | 765 |
@@ -49,7 +49,7 b' class BlockingTaskClient(object):' | |||
|
49 | 49 | """ |
|
50 | 50 | |
|
51 | 51 | implements( |
|
52 |
IBlockingTaskClient, |
|
|
52 | IBlockingTaskClient, | |
|
53 | 53 | ITaskMapperFactory, |
|
54 | 54 | IMapper, |
|
55 | 55 | ITaskParallelDecorator |
@@ -62,7 +62,7 b' class BlockingTaskClient(object):' | |||
|
62 | 62 | def run(self, task, block=False): |
|
63 | 63 | """Run a task on the `TaskController`. |
|
64 | 64 | |
|
65 |
See the documentation of the `MapTask` and `StringTask` classes for |
|
|
65 | See the documentation of the `MapTask` and `StringTask` classes for | |
|
66 | 66 | details on how to build a task of different types. |
|
67 | 67 | |
|
68 | 68 | :Parameters: |
@@ -2,6 +2,10 b'' | |||
|
2 | 2 | IPython/Vision Beam Pattern Demo |
|
3 | 3 | ================================== |
|
4 | 4 | |
|
5 | .. note:: | |
|
6 | ||
|
7 | This page has not been updated to reflect the recent work on ipcluster. | |
|
8 | This work makes it much easier to use IPython on a cluster. | |
|
5 | 9 | |
|
6 | 10 | Installing and testing IPython at OSC systems |
|
7 | 11 | ============================================= |
General Comments 0
You need to be logged in to leave comments.
Login now