Show More
@@ -0,0 +1,53 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | # encoding: utf-8 | |||
|
3 | ||||
|
4 | """ | |||
|
5 | A new example showing how to use `TaskRejectError` to handle dependencies | |||
|
6 | in the IPython task system. | |||
|
7 | ||||
|
8 | To run this example, do:: | |||
|
9 | ||||
|
10 | $ ipcluster local -n 4 | |||
|
11 | ||||
|
12 | Then, in another terminal start up IPython and do:: | |||
|
13 | ||||
|
14 | In [0]: %run taskreject.py | |||
|
15 | ||||
|
16 | In [1]: mec.execute('run=True', targets=[0,1]) | |||
|
17 | ||||
|
18 | After the first command, the scheduler will keep rescheduling the tasks, as | |||
|
19 | they will fail with `TaskRejectError`. But after the second command, there | |||
|
20 | are two engines that the tasks can run on. The tasks are quickly funneled | |||
|
21 | to these engines. | |||
|
22 | ||||
|
23 | If you want to see how the controller is scheduling and retrying the tasks | |||
|
24 | do a `tail -f` on the controller's log file in ~/.ipython/log. | |||
|
25 | """ | |||
|
26 | ||||
|
27 | #----------------------------------------------------------------------------- | |||
|
28 | # Copyright (C) 2008-2009 The IPython Development Team | |||
|
29 | # | |||
|
30 | # Distributed under the terms of the BSD License. The full license is in | |||
|
31 | # the file COPYING, distributed as part of this software. | |||
|
32 | #----------------------------------------------------------------------------- | |||
|
33 | ||||
|
34 | from IPython.kernel import client | |||
|
35 | from IPython.kernel import TaskRejectError | |||
|
36 | ||||
|
37 | mec = client.MultiEngineClient() | |||
|
38 | tc = client.TaskClient() | |||
|
39 | ||||
|
40 | mec.execute('from IPython.kernel import TaskRejectError') | |||
|
41 | mec.execute('run = False') | |||
|
42 | ||||
|
43 | def map_task(): | |||
|
44 | if not run: | |||
|
45 | raise TaskRejectError('task dependency not met') | |||
|
46 | return 3.0e8 | |||
|
47 | ||||
|
48 | task_ids = [] | |||
|
49 | ||||
|
50 | for i in range(10): | |||
|
51 | task = client.MapTask(map_task, retries=20) | |||
|
52 | task_ids.append(tc.run(task, block=False)) | |||
|
53 |
@@ -21,3 +21,5 b' __docformat__ = "restructuredtext en"' | |||||
21 | # Distributed under the terms of the BSD License. The full license is in |
|
21 | # Distributed under the terms of the BSD License. The full license is in | |
22 | # the file COPYING, distributed as part of this software. |
|
22 | # the file COPYING, distributed as part of this software. | |
23 | #----------------------------------------------------------------------------- |
|
23 | #----------------------------------------------------------------------------- | |
|
24 | ||||
|
25 | from IPython.kernel.error import TaskRejectError No newline at end of file |
@@ -107,6 +107,20 b' class SecurityError(KernelError):' | |||||
107 | class FileTimeoutError(KernelError): |
|
107 | class FileTimeoutError(KernelError): | |
108 | pass |
|
108 | pass | |
109 |
|
109 | |||
|
110 | class TaskRejectError(KernelError): | |||
|
111 | """Exception to raise when a task should be rejected by an engine. | |||
|
112 | ||||
|
113 | This exception can be used to allow a task running on an engine to test | |||
|
114 | if the engine (or the user's namespace on the engine) has the needed | |||
|
115 | task dependencies. If not, the task should raise this exception. For | |||
|
116 | the task to be retried on another engine, the task should be created | |||
|
117 | with the `retries` argument > 1. | |||
|
118 | ||||
|
119 | The advantage of this approach over our older properties system is that | |||
|
120 | tasks have full access to the user's namespace on the engines and the | |||
|
121 | properties don't have to be managed or tested by the controller. | |||
|
122 | """ | |||
|
123 | ||||
110 | class CompositeError(KernelError): |
|
124 | class CompositeError(KernelError): | |
111 | def __init__(self, message, elist): |
|
125 | def __init__(self, message, elist): | |
112 | Exception.__init__(self, *(message, elist)) |
|
126 | Exception.__init__(self, *(message, elist)) |
@@ -20,6 +20,7 b' import sys' | |||||
20 | import cPickle as pickle |
|
20 | import cPickle as pickle | |
21 | from types import FunctionType |
|
21 | from types import FunctionType | |
22 | import linecache |
|
22 | import linecache | |
|
23 | import warnings | |||
23 |
|
24 | |||
24 | from twisted.internet import reactor |
|
25 | from twisted.internet import reactor | |
25 | from twisted.python import components, log |
|
26 | from twisted.python import components, log | |
@@ -389,6 +390,14 b' def strip_whitespace(source):' | |||||
389 | #------------------------------------------------------------------------------- |
|
390 | #------------------------------------------------------------------------------- | |
390 |
|
391 | |||
391 |
|
392 | |||
|
393 | _prop_warn = """\ | |||
|
394 | ||||
|
395 | We are currently refactoring the task dependency system. This might | |||
|
396 | involve the removal of this method and other methods related to engine | |||
|
397 | properties. Please see the docstrings for IPython.kernel.TaskRejectError | |||
|
398 | for more information.""" | |||
|
399 | ||||
|
400 | ||||
392 | class IFullBlockingMultiEngineClient(Interface): |
|
401 | class IFullBlockingMultiEngineClient(Interface): | |
393 | pass |
|
402 | pass | |
394 |
|
403 | |||
@@ -730,22 +739,27 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):' | |||||
730 | return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block) |
|
739 | return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block) | |
731 |
|
740 | |||
732 | def set_properties(self, properties, targets=None, block=None): |
|
741 | def set_properties(self, properties, targets=None, block=None): | |
|
742 | warnings.warn(_prop_warn) | |||
733 | targets, block = self._findTargetsAndBlock(targets, block) |
|
743 | targets, block = self._findTargetsAndBlock(targets, block) | |
734 | return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block) |
|
744 | return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block) | |
735 |
|
745 | |||
736 | def get_properties(self, keys=None, targets=None, block=None): |
|
746 | def get_properties(self, keys=None, targets=None, block=None): | |
|
747 | warnings.warn(_prop_warn) | |||
737 | targets, block = self._findTargetsAndBlock(targets, block) |
|
748 | targets, block = self._findTargetsAndBlock(targets, block) | |
738 | return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block) |
|
749 | return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block) | |
739 |
|
750 | |||
740 | def has_properties(self, keys, targets=None, block=None): |
|
751 | def has_properties(self, keys, targets=None, block=None): | |
|
752 | warnings.warn(_prop_warn) | |||
741 | targets, block = self._findTargetsAndBlock(targets, block) |
|
753 | targets, block = self._findTargetsAndBlock(targets, block) | |
742 | return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block) |
|
754 | return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block) | |
743 |
|
755 | |||
744 | def del_properties(self, keys, targets=None, block=None): |
|
756 | def del_properties(self, keys, targets=None, block=None): | |
|
757 | warnings.warn(_prop_warn) | |||
745 | targets, block = self._findTargetsAndBlock(targets, block) |
|
758 | targets, block = self._findTargetsAndBlock(targets, block) | |
746 | return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block) |
|
759 | return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block) | |
747 |
|
760 | |||
748 | def clear_properties(self, targets=None, block=None): |
|
761 | def clear_properties(self, targets=None, block=None): | |
|
762 | warnings.warn(_prop_warn) | |||
749 | targets, block = self._findTargetsAndBlock(targets, block) |
|
763 | targets, block = self._findTargetsAndBlock(targets, block) | |
750 | return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block) |
|
764 | return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block) | |
751 |
|
765 |
1 | NO CONTENT: modified file |
|
NO CONTENT: modified file |
@@ -2,6 +2,10 b'' | |||||
2 | IPython/Vision Beam Pattern Demo |
|
2 | IPython/Vision Beam Pattern Demo | |
3 | ================================== |
|
3 | ================================== | |
4 |
|
4 | |||
|
5 | .. note:: | |||
|
6 | ||||
|
7 | This page has not been updated to reflect the recent work on ipcluster. | |||
|
8 | This work makes it much easier to use IPython on a cluster. | |||
5 |
|
9 | |||
6 | Installing and testing IPython at OSC systems |
|
10 | Installing and testing IPython at OSC systems | |
7 | ============================================= |
|
11 | ============================================= |
General Comments 0
You need to be logged in to leave comments.
Login now