##// END OF EJS Templates
Initial refactor of task dependency system....
Brian Granger -
Show More
@@ -0,0 +1,53 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3
4 """
5 A new example showing how to use `TaskRejectError` to handle dependencies
6 in the IPython task system.
7
8 To run this example, do::
9
10 $ ipcluster local -n 4
11
12 Then, in another terminal start up IPython and do::
13
14 In [0]: %run taskreject.py
15
16 In [1]: mec.execute('run=True', targets=[0,1])
17
18 After the first command, the scheduler will keep rescheduling the tasks, as
19 they will fail with `TaskRejectError`. But after the second command, there
20 are two engines that the tasks can run on. The tasks are quickly funneled
21 to these engines.
22
23 If you want to see how the controller is scheduling and retrying the tasks
24 do a `tail -f` on the controller's log file in ~/.ipython/log.
25 """
26
27 #-----------------------------------------------------------------------------
28 # Copyright (C) 2008-2009 The IPython Development Team
29 #
30 # Distributed under the terms of the BSD License. The full license is in
31 # the file COPYING, distributed as part of this software.
32 #-----------------------------------------------------------------------------
33
34 from IPython.kernel import client
35 from IPython.kernel import TaskRejectError
36
37 mec = client.MultiEngineClient()
38 tc = client.TaskClient()
39
40 mec.execute('from IPython.kernel import TaskRejectError')
41 mec.execute('run = False')
42
43 def map_task():
44 if not run:
45 raise TaskRejectError('task dependency not met')
46 return 3.0e8
47
48 task_ids = []
49
50 for i in range(10):
51 task = client.MapTask(map_task, retries=20)
52 task_ids.append(tc.run(task, block=False))
53
@@ -1,23 +1,25 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """The IPython1 kernel.
2 """The IPython1 kernel.
3
3
4 The IPython kernel actually refers to three things:
4 The IPython kernel actually refers to three things:
5
5
6 * The IPython Engine
6 * The IPython Engine
7 * The IPython Controller
7 * The IPython Controller
8 * Clients to the IPython Controller
8 * Clients to the IPython Controller
9
9
10 The kernel module implements the engine, controller and client and all the
10 The kernel module implements the engine, controller and client and all the
11 network protocols needed for the various entities to talk to each other.
11 network protocols needed for the various entities to talk to each other.
12
12
13 An end user should probably begin by looking at the `client.py` module
13 An end user should probably begin by looking at the `client.py` module
14 if they need blocking clients or in `asyncclient.py` if they want asynchronous,
14 if they need blocking clients or in `asyncclient.py` if they want asynchronous,
15 deferred/Twisted using clients.
15 deferred/Twisted using clients.
16 """
16 """
17 __docformat__ = "restructuredtext en"
17 __docformat__ = "restructuredtext en"
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Copyright (C) 2008 The IPython Development Team
19 # Copyright (C) 2008 The IPython Development Team
20 #
20 #
21 # Distributed under the terms of the BSD License. The full license is in
21 # Distributed under the terms of the BSD License. The full license is in
22 # the file COPYING, distributed as part of this software.
22 # the file COPYING, distributed as part of this software.
23 #----------------------------------------------------------------------------- No newline at end of file
23 #-----------------------------------------------------------------------------
24
25 from IPython.kernel.error import TaskRejectError No newline at end of file
@@ -1,188 +1,202 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """Classes and functions for kernel related errors and exceptions."""
3 """Classes and functions for kernel related errors and exceptions."""
4
4
5 __docformat__ = "restructuredtext en"
5 __docformat__ = "restructuredtext en"
6
6
7 #-------------------------------------------------------------------------------
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
8 # Copyright (C) 2008 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 from IPython.kernel.core import error
18 from IPython.kernel.core import error
19 from twisted.python import failure
19 from twisted.python import failure
20
20
21 #-------------------------------------------------------------------------------
21 #-------------------------------------------------------------------------------
22 # Error classes
22 # Error classes
23 #-------------------------------------------------------------------------------
23 #-------------------------------------------------------------------------------
24
24
25 class KernelError(error.IPythonError):
25 class KernelError(error.IPythonError):
26 pass
26 pass
27
27
28 class NotDefined(KernelError):
28 class NotDefined(KernelError):
29 def __init__(self, name):
29 def __init__(self, name):
30 self.name = name
30 self.name = name
31 self.args = (name,)
31 self.args = (name,)
32
32
33 def __repr__(self):
33 def __repr__(self):
34 return '<NotDefined: %s>' % self.name
34 return '<NotDefined: %s>' % self.name
35
35
36 __str__ = __repr__
36 __str__ = __repr__
37
37
38 class QueueCleared(KernelError):
38 class QueueCleared(KernelError):
39 pass
39 pass
40
40
41 class IdInUse(KernelError):
41 class IdInUse(KernelError):
42 pass
42 pass
43
43
44 class ProtocolError(KernelError):
44 class ProtocolError(KernelError):
45 pass
45 pass
46
46
47 class ConnectionError(KernelError):
47 class ConnectionError(KernelError):
48 pass
48 pass
49
49
50 class InvalidEngineID(KernelError):
50 class InvalidEngineID(KernelError):
51 pass
51 pass
52
52
53 class NoEnginesRegistered(KernelError):
53 class NoEnginesRegistered(KernelError):
54 pass
54 pass
55
55
56 class InvalidClientID(KernelError):
56 class InvalidClientID(KernelError):
57 pass
57 pass
58
58
59 class InvalidDeferredID(KernelError):
59 class InvalidDeferredID(KernelError):
60 pass
60 pass
61
61
62 class SerializationError(KernelError):
62 class SerializationError(KernelError):
63 pass
63 pass
64
64
65 class MessageSizeError(KernelError):
65 class MessageSizeError(KernelError):
66 pass
66 pass
67
67
68 class PBMessageSizeError(MessageSizeError):
68 class PBMessageSizeError(MessageSizeError):
69 pass
69 pass
70
70
71 class ResultNotCompleted(KernelError):
71 class ResultNotCompleted(KernelError):
72 pass
72 pass
73
73
74 class ResultAlreadyRetrieved(KernelError):
74 class ResultAlreadyRetrieved(KernelError):
75 pass
75 pass
76
76
77 class ClientError(KernelError):
77 class ClientError(KernelError):
78 pass
78 pass
79
79
80 class TaskAborted(KernelError):
80 class TaskAborted(KernelError):
81 pass
81 pass
82
82
83 class TaskTimeout(KernelError):
83 class TaskTimeout(KernelError):
84 pass
84 pass
85
85
86 class NotAPendingResult(KernelError):
86 class NotAPendingResult(KernelError):
87 pass
87 pass
88
88
89 class UnpickleableException(KernelError):
89 class UnpickleableException(KernelError):
90 pass
90 pass
91
91
92 class AbortedPendingDeferredError(KernelError):
92 class AbortedPendingDeferredError(KernelError):
93 pass
93 pass
94
94
95 class InvalidProperty(KernelError):
95 class InvalidProperty(KernelError):
96 pass
96 pass
97
97
98 class MissingBlockArgument(KernelError):
98 class MissingBlockArgument(KernelError):
99 pass
99 pass
100
100
101 class StopLocalExecution(KernelError):
101 class StopLocalExecution(KernelError):
102 pass
102 pass
103
103
104 class SecurityError(KernelError):
104 class SecurityError(KernelError):
105 pass
105 pass
106
106
107 class FileTimeoutError(KernelError):
107 class FileTimeoutError(KernelError):
108 pass
108 pass
109
109
110 class TaskRejectError(KernelError):
111 """Exception to raise when a task should be rejected by an engine.
112
113 This exception can be used to allow a task running on an engine to test
114 if the engine (or the user's namespace on the engine) has the needed
115 task dependencies. If not, the task should raise this exception. For
116 the task to be retried on another engine, the task should be created
117 with the `retries` argument > 1.
118
119 The advantage of this approach over our older properties system is that
120 tasks have full access to the user's namespace on the engines and the
121 properties don't have to be managed or tested by the controller.
122 """
123
110 class CompositeError(KernelError):
124 class CompositeError(KernelError):
111 def __init__(self, message, elist):
125 def __init__(self, message, elist):
112 Exception.__init__(self, *(message, elist))
126 Exception.__init__(self, *(message, elist))
113 self.message = message
127 self.message = message
114 self.elist = elist
128 self.elist = elist
115
129
116 def _get_engine_str(self, ev):
130 def _get_engine_str(self, ev):
117 try:
131 try:
118 ei = ev._ipython_engine_info
132 ei = ev._ipython_engine_info
119 except AttributeError:
133 except AttributeError:
120 return '[Engine Exception]'
134 return '[Engine Exception]'
121 else:
135 else:
122 return '[%i:%s]: ' % (ei['engineid'], ei['method'])
136 return '[%i:%s]: ' % (ei['engineid'], ei['method'])
123
137
124 def _get_traceback(self, ev):
138 def _get_traceback(self, ev):
125 try:
139 try:
126 tb = ev._ipython_traceback_text
140 tb = ev._ipython_traceback_text
127 except AttributeError:
141 except AttributeError:
128 return 'No traceback available'
142 return 'No traceback available'
129 else:
143 else:
130 return tb
144 return tb
131
145
132 def __str__(self):
146 def __str__(self):
133 s = str(self.message)
147 s = str(self.message)
134 for et, ev, etb in self.elist:
148 for et, ev, etb in self.elist:
135 engine_str = self._get_engine_str(ev)
149 engine_str = self._get_engine_str(ev)
136 s = s + '\n' + engine_str + str(et.__name__) + ': ' + str(ev)
150 s = s + '\n' + engine_str + str(et.__name__) + ': ' + str(ev)
137 return s
151 return s
138
152
139 def print_tracebacks(self, excid=None):
153 def print_tracebacks(self, excid=None):
140 if excid is None:
154 if excid is None:
141 for (et,ev,etb) in self.elist:
155 for (et,ev,etb) in self.elist:
142 print self._get_engine_str(ev)
156 print self._get_engine_str(ev)
143 print self._get_traceback(ev)
157 print self._get_traceback(ev)
144 print
158 print
145 else:
159 else:
146 try:
160 try:
147 et,ev,etb = self.elist[excid]
161 et,ev,etb = self.elist[excid]
148 except:
162 except:
149 raise IndexError("an exception with index %i does not exist"%excid)
163 raise IndexError("an exception with index %i does not exist"%excid)
150 else:
164 else:
151 print self._get_engine_str(ev)
165 print self._get_engine_str(ev)
152 print self._get_traceback(ev)
166 print self._get_traceback(ev)
153
167
154 def raise_exception(self, excid=0):
168 def raise_exception(self, excid=0):
155 try:
169 try:
156 et,ev,etb = self.elist[excid]
170 et,ev,etb = self.elist[excid]
157 except:
171 except:
158 raise IndexError("an exception with index %i does not exist"%excid)
172 raise IndexError("an exception with index %i does not exist"%excid)
159 else:
173 else:
160 raise et, ev, etb
174 raise et, ev, etb
161
175
162 def collect_exceptions(rlist, method):
176 def collect_exceptions(rlist, method):
163 elist = []
177 elist = []
164 for r in rlist:
178 for r in rlist:
165 if isinstance(r, failure.Failure):
179 if isinstance(r, failure.Failure):
166 r.cleanFailure()
180 r.cleanFailure()
167 et, ev, etb = r.type, r.value, r.tb
181 et, ev, etb = r.type, r.value, r.tb
168 # Sometimes we could have CompositeError in our list. Just take
182 # Sometimes we could have CompositeError in our list. Just take
169 # the errors out of them and put them in our new list. This
183 # the errors out of them and put them in our new list. This
170 # has the effect of flattening lists of CompositeErrors into one
184 # has the effect of flattening lists of CompositeErrors into one
171 # CompositeError
185 # CompositeError
172 if et==CompositeError:
186 if et==CompositeError:
173 for e in ev.elist:
187 for e in ev.elist:
174 elist.append(e)
188 elist.append(e)
175 else:
189 else:
176 elist.append((et, ev, etb))
190 elist.append((et, ev, etb))
177 if len(elist)==0:
191 if len(elist)==0:
178 return rlist
192 return rlist
179 else:
193 else:
180 msg = "one or more exceptions from call to method: %s" % (method)
194 msg = "one or more exceptions from call to method: %s" % (method)
181 # This silliness is needed so the debugger has access to the exception
195 # This silliness is needed so the debugger has access to the exception
182 # instance (e in this case)
196 # instance (e in this case)
183 try:
197 try:
184 raise CompositeError(msg, elist)
198 raise CompositeError(msg, elist)
185 except CompositeError, e:
199 except CompositeError, e:
186 raise e
200 raise e
187
201
188
202
@@ -1,951 +1,965 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
3
3
4 """General Classes for IMultiEngine clients."""
4 """General Classes for IMultiEngine clients."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import cPickle as pickle
20 import cPickle as pickle
21 from types import FunctionType
21 from types import FunctionType
22 import linecache
22 import linecache
23 import warnings
23
24
24 from twisted.internet import reactor
25 from twisted.internet import reactor
25 from twisted.python import components, log
26 from twisted.python import components, log
26 from twisted.python.failure import Failure
27 from twisted.python.failure import Failure
27 from zope.interface import Interface, implements, Attribute
28 from zope.interface import Interface, implements, Attribute
28
29
29 from IPython.ColorANSI import TermColors
30 from IPython.ColorANSI import TermColors
30
31
31 from IPython.kernel.twistedutil import blockingCallFromThread
32 from IPython.kernel.twistedutil import blockingCallFromThread
32 from IPython.kernel import error
33 from IPython.kernel import error
33 from IPython.kernel.parallelfunction import ParallelFunction
34 from IPython.kernel.parallelfunction import ParallelFunction
34 from IPython.kernel.mapper import (
35 from IPython.kernel.mapper import (
35 MultiEngineMapper,
36 MultiEngineMapper,
36 IMultiEngineMapperFactory,
37 IMultiEngineMapperFactory,
37 IMapper
38 IMapper
38 )
39 )
39 from IPython.kernel import map as Map
40 from IPython.kernel import map as Map
40 from IPython.kernel import multiengine as me
41 from IPython.kernel import multiengine as me
41 from IPython.kernel.multiengine import (IFullMultiEngine,
42 from IPython.kernel.multiengine import (IFullMultiEngine,
42 IFullSynchronousMultiEngine)
43 IFullSynchronousMultiEngine)
43
44
44
45
45 #-------------------------------------------------------------------------------
46 #-------------------------------------------------------------------------------
46 # Pending Result things
47 # Pending Result things
47 #-------------------------------------------------------------------------------
48 #-------------------------------------------------------------------------------
48
49
49 class IPendingResult(Interface):
50 class IPendingResult(Interface):
50 """A representation of a result that is pending.
51 """A representation of a result that is pending.
51
52
52 This class is similar to Twisted's `Deferred` object, but is designed to be
53 This class is similar to Twisted's `Deferred` object, but is designed to be
53 used in a synchronous context.
54 used in a synchronous context.
54 """
55 """
55
56
56 result_id=Attribute("ID of the deferred on the other side")
57 result_id=Attribute("ID of the deferred on the other side")
57 client=Attribute("A client that I came from")
58 client=Attribute("A client that I came from")
58 r=Attribute("An attribute that is a property that calls and returns get_result")
59 r=Attribute("An attribute that is a property that calls and returns get_result")
59
60
60 def get_result(default=None, block=True):
61 def get_result(default=None, block=True):
61 """
62 """
62 Get a result that is pending.
63 Get a result that is pending.
63
64
64 :Parameters:
65 :Parameters:
65 default
66 default
66 The value to return if the result is not ready.
67 The value to return if the result is not ready.
67 block : boolean
68 block : boolean
68 Should I block for the result.
69 Should I block for the result.
69
70
70 :Returns: The actual result or the default value.
71 :Returns: The actual result or the default value.
71 """
72 """
72
73
73 def add_callback(f, *args, **kwargs):
74 def add_callback(f, *args, **kwargs):
74 """
75 """
75 Add a callback that is called with the result.
76 Add a callback that is called with the result.
76
77
77 If the original result is foo, adding a callback will cause
78 If the original result is foo, adding a callback will cause
78 f(foo, *args, **kwargs) to be returned instead. If multiple
79 f(foo, *args, **kwargs) to be returned instead. If multiple
79 callbacks are registered, they are chained together: the result of
80 callbacks are registered, they are chained together: the result of
80 one is passed to the next and so on.
81 one is passed to the next and so on.
81
82
82 Unlike Twisted's Deferred object, there is no errback chain. Thus
83 Unlike Twisted's Deferred object, there is no errback chain. Thus
83 any exception raised will not be caught and handled. User must
84 any exception raised will not be caught and handled. User must
84 catch these by hand when calling `get_result`.
85 catch these by hand when calling `get_result`.
85 """
86 """
86
87
87
88
88 class PendingResult(object):
89 class PendingResult(object):
89 """A representation of a result that is not yet ready.
90 """A representation of a result that is not yet ready.
90
91
91 A user should not create a `PendingResult` instance by hand.
92 A user should not create a `PendingResult` instance by hand.
92
93
93 Methods
94 Methods
94 =======
95 =======
95
96
96 * `get_result`
97 * `get_result`
97 * `add_callback`
98 * `add_callback`
98
99
99 Properties
100 Properties
100 ==========
101 ==========
101 * `r`
102 * `r`
102 """
103 """
103
104
104 def __init__(self, client, result_id):
105 def __init__(self, client, result_id):
105 """Create a PendingResult with a result_id and a client instance.
106 """Create a PendingResult with a result_id and a client instance.
106
107
107 The client should implement `_getPendingResult(result_id, block)`.
108 The client should implement `_getPendingResult(result_id, block)`.
108 """
109 """
109 self.client = client
110 self.client = client
110 self.result_id = result_id
111 self.result_id = result_id
111 self.called = False
112 self.called = False
112 self.raised = False
113 self.raised = False
113 self.callbacks = []
114 self.callbacks = []
114
115
115 def get_result(self, default=None, block=True):
116 def get_result(self, default=None, block=True):
116 """Get a result that is pending.
117 """Get a result that is pending.
117
118
118 This method will connect to an IMultiEngine adapted controller
119 This method will connect to an IMultiEngine adapted controller
119 and see if the result is ready. If the action triggers an exception
120 and see if the result is ready. If the action triggers an exception
120 raise it and record it. This method records the result/exception once it is
121 raise it and record it. This method records the result/exception once it is
121 retrieved. Calling `get_result` again will get this cached result or will
122 retrieved. Calling `get_result` again will get this cached result or will
122 re-raise the exception. The .r attribute is a property that calls
123 re-raise the exception. The .r attribute is a property that calls
123 `get_result` with block=True.
124 `get_result` with block=True.
124
125
125 :Parameters:
126 :Parameters:
126 default
127 default
127 The value to return if the result is not ready.
128 The value to return if the result is not ready.
128 block : boolean
129 block : boolean
129 Should I block for the result.
130 Should I block for the result.
130
131
131 :Returns: The actual result or the default value.
132 :Returns: The actual result or the default value.
132 """
133 """
133
134
134 if self.called:
135 if self.called:
135 if self.raised:
136 if self.raised:
136 raise self.result[0], self.result[1], self.result[2]
137 raise self.result[0], self.result[1], self.result[2]
137 else:
138 else:
138 return self.result
139 return self.result
139 try:
140 try:
140 result = self.client.get_pending_deferred(self.result_id, block)
141 result = self.client.get_pending_deferred(self.result_id, block)
141 except error.ResultNotCompleted:
142 except error.ResultNotCompleted:
142 return default
143 return default
143 except:
144 except:
144 # Reraise other error, but first record them so they can be reraised
145 # Reraise other error, but first record them so they can be reraised
145 # later if .r or get_result is called again.
146 # later if .r or get_result is called again.
146 self.result = sys.exc_info()
147 self.result = sys.exc_info()
147 self.called = True
148 self.called = True
148 self.raised = True
149 self.raised = True
149 raise
150 raise
150 else:
151 else:
151 for cb in self.callbacks:
152 for cb in self.callbacks:
152 result = cb[0](result, *cb[1], **cb[2])
153 result = cb[0](result, *cb[1], **cb[2])
153 self.result = result
154 self.result = result
154 self.called = True
155 self.called = True
155 return result
156 return result
156
157
157 def add_callback(self, f, *args, **kwargs):
158 def add_callback(self, f, *args, **kwargs):
158 """Add a callback that is called with the result.
159 """Add a callback that is called with the result.
159
160
160 If the original result is result, adding a callback will cause
161 If the original result is result, adding a callback will cause
161 f(result, *args, **kwargs) to be returned instead. If multiple
162 f(result, *args, **kwargs) to be returned instead. If multiple
162 callbacks are registered, they are chained together: the result of
163 callbacks are registered, they are chained together: the result of
163 one is passed to the next and so on.
164 one is passed to the next and so on.
164
165
165 Unlike Twisted's Deferred object, there is no errback chain. Thus
166 Unlike Twisted's Deferred object, there is no errback chain. Thus
166 any exception raised will not be caught and handled. User must
167 any exception raised will not be caught and handled. User must
167 catch these by hand when calling `get_result`.
168 catch these by hand when calling `get_result`.
168 """
169 """
169 assert callable(f)
170 assert callable(f)
170 self.callbacks.append((f, args, kwargs))
171 self.callbacks.append((f, args, kwargs))
171
172
172 def __cmp__(self, other):
173 def __cmp__(self, other):
173 if self.result_id < other.result_id:
174 if self.result_id < other.result_id:
174 return -1
175 return -1
175 else:
176 else:
176 return 1
177 return 1
177
178
178 def _get_r(self):
179 def _get_r(self):
179 return self.get_result(block=True)
180 return self.get_result(block=True)
180
181
181 r = property(_get_r)
182 r = property(_get_r)
182 """This property is a shortcut to a `get_result(block=True)`."""
183 """This property is a shortcut to a `get_result(block=True)`."""
183
184
184
185
185 #-------------------------------------------------------------------------------
186 #-------------------------------------------------------------------------------
186 # Pretty printing wrappers for certain lists
187 # Pretty printing wrappers for certain lists
187 #-------------------------------------------------------------------------------
188 #-------------------------------------------------------------------------------
188
189
189 class ResultList(list):
190 class ResultList(list):
190 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
191 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
191
192
192 def __repr__(self):
193 def __repr__(self):
193 output = []
194 output = []
194 # These colored prompts were not working on Windows
195 # These colored prompts were not working on Windows
195 if sys.platform == 'win32':
196 if sys.platform == 'win32':
196 blue = normal = red = green = ''
197 blue = normal = red = green = ''
197 else:
198 else:
198 blue = TermColors.Blue
199 blue = TermColors.Blue
199 normal = TermColors.Normal
200 normal = TermColors.Normal
200 red = TermColors.Red
201 red = TermColors.Red
201 green = TermColors.Green
202 green = TermColors.Green
202 output.append("<Results List>\n")
203 output.append("<Results List>\n")
203 for cmd in self:
204 for cmd in self:
204 if isinstance(cmd, Failure):
205 if isinstance(cmd, Failure):
205 output.append(cmd)
206 output.append(cmd)
206 else:
207 else:
207 target = cmd.get('id',None)
208 target = cmd.get('id',None)
208 cmd_num = cmd.get('number',None)
209 cmd_num = cmd.get('number',None)
209 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
210 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
210 cmd_stdout = cmd.get('stdout', None)
211 cmd_stdout = cmd.get('stdout', None)
211 cmd_stderr = cmd.get('stderr', None)
212 cmd_stderr = cmd.get('stderr', None)
212 output.append("%s[%i]%s In [%i]:%s %s\n" % \
213 output.append("%s[%i]%s In [%i]:%s %s\n" % \
213 (green, target,
214 (green, target,
214 blue, cmd_num, normal, cmd_stdin))
215 blue, cmd_num, normal, cmd_stdin))
215 if cmd_stdout:
216 if cmd_stdout:
216 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
217 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
217 (green, target,
218 (green, target,
218 red, cmd_num, normal, cmd_stdout))
219 red, cmd_num, normal, cmd_stdout))
219 if cmd_stderr:
220 if cmd_stderr:
220 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
221 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
221 (green, target,
222 (green, target,
222 red, cmd_num, normal, cmd_stderr))
223 red, cmd_num, normal, cmd_stderr))
223 return ''.join(output)
224 return ''.join(output)
224
225
225
226
226 def wrapResultList(result):
227 def wrapResultList(result):
227 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
228 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
228 if len(result) == 0:
229 if len(result) == 0:
229 result = [result]
230 result = [result]
230 return ResultList(result)
231 return ResultList(result)
231
232
232
233
233 class QueueStatusList(list):
234 class QueueStatusList(list):
234 """A subclass of list that pretty prints the output of `queue_status`."""
235 """A subclass of list that pretty prints the output of `queue_status`."""
235
236
236 def __repr__(self):
237 def __repr__(self):
237 output = []
238 output = []
238 output.append("<Queue Status List>\n")
239 output.append("<Queue Status List>\n")
239 for e in self:
240 for e in self:
240 output.append("Engine: %s\n" % repr(e[0]))
241 output.append("Engine: %s\n" % repr(e[0]))
241 output.append(" Pending: %s\n" % repr(e[1]['pending']))
242 output.append(" Pending: %s\n" % repr(e[1]['pending']))
242 for q in e[1]['queue']:
243 for q in e[1]['queue']:
243 output.append(" Command: %s\n" % repr(q))
244 output.append(" Command: %s\n" % repr(q))
244 return ''.join(output)
245 return ''.join(output)
245
246
246
247
247 #-------------------------------------------------------------------------------
248 #-------------------------------------------------------------------------------
248 # InteractiveMultiEngineClient
249 # InteractiveMultiEngineClient
249 #-------------------------------------------------------------------------------
250 #-------------------------------------------------------------------------------
250
251
251 class InteractiveMultiEngineClient(object):
252 class InteractiveMultiEngineClient(object):
252 """A mixin class that add a few methods to a multiengine client.
253 """A mixin class that add a few methods to a multiengine client.
253
254
254 The methods in this mixin class are designed for interactive usage.
255 The methods in this mixin class are designed for interactive usage.
255 """
256 """
256
257
257 def activate(self):
258 def activate(self):
258 """Make this `MultiEngineClient` active for parallel magic commands.
259 """Make this `MultiEngineClient` active for parallel magic commands.
259
260
260 IPython has a magic command syntax to work with `MultiEngineClient` objects.
261 IPython has a magic command syntax to work with `MultiEngineClient` objects.
261 In a given IPython session there is a single active one. While
262 In a given IPython session there is a single active one. While
262 there can be many `MultiEngineClient` created and used by the user,
263 there can be many `MultiEngineClient` created and used by the user,
263 there is only one active one. The active `MultiEngineClient` is used whenever
264 there is only one active one. The active `MultiEngineClient` is used whenever
264 the magic commands %px and %autopx are used.
265 the magic commands %px and %autopx are used.
265
266
266 The activate() method is called on a given `MultiEngineClient` to make it
267 The activate() method is called on a given `MultiEngineClient` to make it
267 active. Once this has been done, the magic commands can be used.
268 active. Once this has been done, the magic commands can be used.
268 """
269 """
269
270
270 try:
271 try:
271 __IPYTHON__.activeController = self
272 __IPYTHON__.activeController = self
272 except NameError:
273 except NameError:
273 print "The IPython Controller magics only work within IPython."
274 print "The IPython Controller magics only work within IPython."
274
275
275 def __setitem__(self, key, value):
276 def __setitem__(self, key, value):
276 """Add a dictionary interface for pushing/pulling.
277 """Add a dictionary interface for pushing/pulling.
277
278
278 This functions as a shorthand for `push`.
279 This functions as a shorthand for `push`.
279
280
280 :Parameters:
281 :Parameters:
281 key : str
282 key : str
282 What to call the remote object.
283 What to call the remote object.
283 value : object
284 value : object
284 The local Python object to push.
285 The local Python object to push.
285 """
286 """
286 targets, block = self._findTargetsAndBlock()
287 targets, block = self._findTargetsAndBlock()
287 return self.push({key:value}, targets=targets, block=block)
288 return self.push({key:value}, targets=targets, block=block)
288
289
289 def __getitem__(self, key):
290 def __getitem__(self, key):
290 """Add a dictionary interface for pushing/pulling.
291 """Add a dictionary interface for pushing/pulling.
291
292
292 This functions as a shorthand to `pull`.
293 This functions as a shorthand to `pull`.
293
294
294 :Parameters:
295 :Parameters:
295 - `key`: A string representing the key.
296 - `key`: A string representing the key.
296 """
297 """
297 if isinstance(key, str):
298 if isinstance(key, str):
298 targets, block = self._findTargetsAndBlock()
299 targets, block = self._findTargetsAndBlock()
299 return self.pull(key, targets=targets, block=block)
300 return self.pull(key, targets=targets, block=block)
300 else:
301 else:
301 raise TypeError("__getitem__ only takes strs")
302 raise TypeError("__getitem__ only takes strs")
302
303
303 def __len__(self):
304 def __len__(self):
304 """Return the number of available engines."""
305 """Return the number of available engines."""
305 return len(self.get_ids())
306 return len(self.get_ids())
306
307
307 #---------------------------------------------------------------------------
308 #---------------------------------------------------------------------------
308 # Make this a context manager for with
309 # Make this a context manager for with
309 #---------------------------------------------------------------------------
310 #---------------------------------------------------------------------------
310
311
311 def findsource_file(self,f):
312 def findsource_file(self,f):
312 linecache.checkcache()
313 linecache.checkcache()
313 s = findsource(f.f_code)
314 s = findsource(f.f_code)
314 lnum = f.f_lineno
315 lnum = f.f_lineno
315 wsource = s[0][f.f_lineno:]
316 wsource = s[0][f.f_lineno:]
316 return strip_whitespace(wsource)
317 return strip_whitespace(wsource)
317
318
318 def findsource_ipython(self,f):
319 def findsource_ipython(self,f):
319 from IPython import ipapi
320 from IPython import ipapi
320 self.ip = ipapi.get()
321 self.ip = ipapi.get()
321 wsource = [l+'\n' for l in
322 wsource = [l+'\n' for l in
322 self.ip.IP.input_hist_raw[-1].splitlines()[1:]]
323 self.ip.IP.input_hist_raw[-1].splitlines()[1:]]
323 return strip_whitespace(wsource)
324 return strip_whitespace(wsource)
324
325
325 def __enter__(self):
326 def __enter__(self):
326 f = sys._getframe(1)
327 f = sys._getframe(1)
327 local_ns = f.f_locals
328 local_ns = f.f_locals
328 global_ns = f.f_globals
329 global_ns = f.f_globals
329 if f.f_code.co_filename == '<ipython console>':
330 if f.f_code.co_filename == '<ipython console>':
330 s = self.findsource_ipython(f)
331 s = self.findsource_ipython(f)
331 else:
332 else:
332 s = self.findsource_file(f)
333 s = self.findsource_file(f)
333
334
334 self._with_context_result = self.execute(s)
335 self._with_context_result = self.execute(s)
335
336
336 def __exit__ (self, etype, value, tb):
337 def __exit__ (self, etype, value, tb):
337 if issubclass(etype,error.StopLocalExecution):
338 if issubclass(etype,error.StopLocalExecution):
338 return True
339 return True
339
340
340
341
341 def remote():
342 def remote():
342 m = 'Special exception to stop local execution of parallel code.'
343 m = 'Special exception to stop local execution of parallel code.'
343 raise error.StopLocalExecution(m)
344 raise error.StopLocalExecution(m)
344
345
345 def strip_whitespace(source):
346 def strip_whitespace(source):
346 # Expand tabs to avoid any confusion.
347 # Expand tabs to avoid any confusion.
347 wsource = [l.expandtabs(4) for l in source]
348 wsource = [l.expandtabs(4) for l in source]
348 # Detect the indentation level
349 # Detect the indentation level
349 done = False
350 done = False
350 for line in wsource:
351 for line in wsource:
351 if line.isspace():
352 if line.isspace():
352 continue
353 continue
353 for col,char in enumerate(line):
354 for col,char in enumerate(line):
354 if char != ' ':
355 if char != ' ':
355 done = True
356 done = True
356 break
357 break
357 if done:
358 if done:
358 break
359 break
359 # Now we know how much leading space there is in the code. Next, we
360 # Now we know how much leading space there is in the code. Next, we
360 # extract up to the first line that has less indentation.
361 # extract up to the first line that has less indentation.
361 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
362 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
362 # detect triple quoted strings that may have flush left text.
363 # detect triple quoted strings that may have flush left text.
363 for lno,line in enumerate(wsource):
364 for lno,line in enumerate(wsource):
364 lead = line[:col]
365 lead = line[:col]
365 if lead.isspace():
366 if lead.isspace():
366 continue
367 continue
367 else:
368 else:
368 if not lead.lstrip().startswith('#'):
369 if not lead.lstrip().startswith('#'):
369 break
370 break
370 # The real 'with' source is up to lno
371 # The real 'with' source is up to lno
371 src_lines = [l[col:] for l in wsource[:lno+1]]
372 src_lines = [l[col:] for l in wsource[:lno+1]]
372
373
373 # Finally, check that the source's first non-comment line begins with the
374 # Finally, check that the source's first non-comment line begins with the
374 # special call 'remote()'
375 # special call 'remote()'
375 for nline,line in enumerate(src_lines):
376 for nline,line in enumerate(src_lines):
376 if line.isspace() or line.startswith('#'):
377 if line.isspace() or line.startswith('#'):
377 continue
378 continue
378 if 'remote()' in line:
379 if 'remote()' in line:
379 break
380 break
380 else:
381 else:
381 raise ValueError('remote() call missing at the start of code')
382 raise ValueError('remote() call missing at the start of code')
382 src = ''.join(src_lines[nline+1:])
383 src = ''.join(src_lines[nline+1:])
383 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
384 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
384 return src
385 return src
385
386
386
387
387 #-------------------------------------------------------------------------------
388 #-------------------------------------------------------------------------------
388 # The top-level MultiEngine client adaptor
389 # The top-level MultiEngine client adaptor
389 #-------------------------------------------------------------------------------
390 #-------------------------------------------------------------------------------
390
391
391
392
393 _prop_warn = """\
394
395 We are currently refactoring the task dependency system. This might
396 involve the removal of this method and other methods related to engine
397 properties. Please see the docstrings for IPython.kernel.TaskRejectError
398 for more information."""
399
400
392 class IFullBlockingMultiEngineClient(Interface):
401 class IFullBlockingMultiEngineClient(Interface):
393 pass
402 pass
394
403
395
404
396 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
405 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
397 """
406 """
398 A blocking client to the `IMultiEngine` controller interface.
407 A blocking client to the `IMultiEngine` controller interface.
399
408
400 This class allows users to use a set of engines for a parallel
409 This class allows users to use a set of engines for a parallel
401 computation through the `IMultiEngine` interface. In this interface,
410 computation through the `IMultiEngine` interface. In this interface,
402 each engine has a specific id (an int) that is used to refer to the
411 each engine has a specific id (an int) that is used to refer to the
403 engine, run code on it, etc.
412 engine, run code on it, etc.
404 """
413 """
405
414
406 implements(
415 implements(
407 IFullBlockingMultiEngineClient,
416 IFullBlockingMultiEngineClient,
408 IMultiEngineMapperFactory,
417 IMultiEngineMapperFactory,
409 IMapper
418 IMapper
410 )
419 )
411
420
412 def __init__(self, smultiengine):
421 def __init__(self, smultiengine):
413 self.smultiengine = smultiengine
422 self.smultiengine = smultiengine
414 self.block = True
423 self.block = True
415 self.targets = 'all'
424 self.targets = 'all'
416
425
417 def _findBlock(self, block=None):
426 def _findBlock(self, block=None):
418 if block is None:
427 if block is None:
419 return self.block
428 return self.block
420 else:
429 else:
421 if block in (True, False):
430 if block in (True, False):
422 return block
431 return block
423 else:
432 else:
424 raise ValueError("block must be True or False")
433 raise ValueError("block must be True or False")
425
434
426 def _findTargets(self, targets=None):
435 def _findTargets(self, targets=None):
427 if targets is None:
436 if targets is None:
428 return self.targets
437 return self.targets
429 else:
438 else:
430 if not isinstance(targets, (str,list,tuple,int)):
439 if not isinstance(targets, (str,list,tuple,int)):
431 raise ValueError("targets must be a str, list, tuple or int")
440 raise ValueError("targets must be a str, list, tuple or int")
432 return targets
441 return targets
433
442
434 def _findTargetsAndBlock(self, targets=None, block=None):
443 def _findTargetsAndBlock(self, targets=None, block=None):
435 return self._findTargets(targets), self._findBlock(block)
444 return self._findTargets(targets), self._findBlock(block)
436
445
437 def _blockFromThread(self, function, *args, **kwargs):
446 def _blockFromThread(self, function, *args, **kwargs):
438 block = kwargs.get('block', None)
447 block = kwargs.get('block', None)
439 if block is None:
448 if block is None:
440 raise error.MissingBlockArgument("'block' keyword argument is missing")
449 raise error.MissingBlockArgument("'block' keyword argument is missing")
441 result = blockingCallFromThread(function, *args, **kwargs)
450 result = blockingCallFromThread(function, *args, **kwargs)
442 if not block:
451 if not block:
443 result = PendingResult(self, result)
452 result = PendingResult(self, result)
444 return result
453 return result
445
454
446 def get_pending_deferred(self, deferredID, block):
455 def get_pending_deferred(self, deferredID, block):
447 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
456 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
448
457
449 def barrier(self, pendingResults):
458 def barrier(self, pendingResults):
450 """Synchronize a set of `PendingResults`.
459 """Synchronize a set of `PendingResults`.
451
460
452 This method is a synchronization primitive that waits for a set of
461 This method is a synchronization primitive that waits for a set of
453 `PendingResult` objects to complete. More specifically, barier does
462 `PendingResult` objects to complete. More specifically, barier does
454 the following.
463 the following.
455
464
456 * The `PendingResult`s are sorted by result_id.
465 * The `PendingResult`s are sorted by result_id.
457 * The `get_result` method is called for each `PendingResult` sequentially
466 * The `get_result` method is called for each `PendingResult` sequentially
458 with block=True.
467 with block=True.
459 * If a `PendingResult` gets a result that is an exception, it is
468 * If a `PendingResult` gets a result that is an exception, it is
460 trapped and can be re-raised later by calling `get_result` again.
469 trapped and can be re-raised later by calling `get_result` again.
461 * The `PendingResult`s are flushed from the controller.
470 * The `PendingResult`s are flushed from the controller.
462
471
463 After barrier has been called on a `PendingResult`, its results can
472 After barrier has been called on a `PendingResult`, its results can
464 be retrieved by calling `get_result` again or accesing the `r` attribute
473 be retrieved by calling `get_result` again or accesing the `r` attribute
465 of the instance.
474 of the instance.
466 """
475 """
467
476
468 # Convert to list for sorting and check class type
477 # Convert to list for sorting and check class type
469 prList = list(pendingResults)
478 prList = list(pendingResults)
470 for pr in prList:
479 for pr in prList:
471 if not isinstance(pr, PendingResult):
480 if not isinstance(pr, PendingResult):
472 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
481 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
473
482
474 # Sort the PendingResults so they are in order
483 # Sort the PendingResults so they are in order
475 prList.sort()
484 prList.sort()
476 # Block on each PendingResult object
485 # Block on each PendingResult object
477 for pr in prList:
486 for pr in prList:
478 try:
487 try:
479 result = pr.get_result(block=True)
488 result = pr.get_result(block=True)
480 except Exception:
489 except Exception:
481 pass
490 pass
482
491
483 def flush(self):
492 def flush(self):
484 """
493 """
485 Clear all pending deferreds/results from the controller.
494 Clear all pending deferreds/results from the controller.
486
495
487 For each `PendingResult` that is created by this client, the controller
496 For each `PendingResult` that is created by this client, the controller
488 holds on to the result for that `PendingResult`. This can be a problem
497 holds on to the result for that `PendingResult`. This can be a problem
489 if there are a large number of `PendingResult` objects that are created.
498 if there are a large number of `PendingResult` objects that are created.
490
499
491 Once the result of the `PendingResult` has been retrieved, the result
500 Once the result of the `PendingResult` has been retrieved, the result
492 is removed from the controller, but if a user doesn't get a result (
501 is removed from the controller, but if a user doesn't get a result (
493 they just ignore the `PendingResult`) the result is kept forever on the
502 they just ignore the `PendingResult`) the result is kept forever on the
494 controller. This method allows the user to clear out all un-retrieved
503 controller. This method allows the user to clear out all un-retrieved
495 results on the controller.
504 results on the controller.
496 """
505 """
497 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
506 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
498 return r
507 return r
499
508
500 clear_pending_results = flush
509 clear_pending_results = flush
501
510
502 #---------------------------------------------------------------------------
511 #---------------------------------------------------------------------------
503 # IEngineMultiplexer related methods
512 # IEngineMultiplexer related methods
504 #---------------------------------------------------------------------------
513 #---------------------------------------------------------------------------
505
514
506 def execute(self, lines, targets=None, block=None):
515 def execute(self, lines, targets=None, block=None):
507 """
516 """
508 Execute code on a set of engines.
517 Execute code on a set of engines.
509
518
510 :Parameters:
519 :Parameters:
511 lines : str
520 lines : str
512 The Python code to execute as a string
521 The Python code to execute as a string
513 targets : id or list of ids
522 targets : id or list of ids
514 The engine to use for the execution
523 The engine to use for the execution
515 block : boolean
524 block : boolean
516 If False, this method will return the actual result. If False,
525 If False, this method will return the actual result. If False,
517 a `PendingResult` is returned which can be used to get the result
526 a `PendingResult` is returned which can be used to get the result
518 at a later time.
527 at a later time.
519 """
528 """
520 targets, block = self._findTargetsAndBlock(targets, block)
529 targets, block = self._findTargetsAndBlock(targets, block)
521 result = blockingCallFromThread(self.smultiengine.execute, lines,
530 result = blockingCallFromThread(self.smultiengine.execute, lines,
522 targets=targets, block=block)
531 targets=targets, block=block)
523 if block:
532 if block:
524 result = ResultList(result)
533 result = ResultList(result)
525 else:
534 else:
526 result = PendingResult(self, result)
535 result = PendingResult(self, result)
527 result.add_callback(wrapResultList)
536 result.add_callback(wrapResultList)
528 return result
537 return result
529
538
530 def push(self, namespace, targets=None, block=None):
539 def push(self, namespace, targets=None, block=None):
531 """
540 """
532 Push a dictionary of keys and values to engines namespace.
541 Push a dictionary of keys and values to engines namespace.
533
542
534 Each engine has a persistent namespace. This method is used to push
543 Each engine has a persistent namespace. This method is used to push
535 Python objects into that namespace.
544 Python objects into that namespace.
536
545
537 The objects in the namespace must be pickleable.
546 The objects in the namespace must be pickleable.
538
547
539 :Parameters:
548 :Parameters:
540 namespace : dict
549 namespace : dict
541 A dict that contains Python objects to be injected into
550 A dict that contains Python objects to be injected into
542 the engine persistent namespace.
551 the engine persistent namespace.
543 targets : id or list of ids
552 targets : id or list of ids
544 The engine to use for the execution
553 The engine to use for the execution
545 block : boolean
554 block : boolean
546 If False, this method will return the actual result. If False,
555 If False, this method will return the actual result. If False,
547 a `PendingResult` is returned which can be used to get the result
556 a `PendingResult` is returned which can be used to get the result
548 at a later time.
557 at a later time.
549 """
558 """
550 targets, block = self._findTargetsAndBlock(targets, block)
559 targets, block = self._findTargetsAndBlock(targets, block)
551 return self._blockFromThread(self.smultiengine.push, namespace,
560 return self._blockFromThread(self.smultiengine.push, namespace,
552 targets=targets, block=block)
561 targets=targets, block=block)
553
562
554 def pull(self, keys, targets=None, block=None):
563 def pull(self, keys, targets=None, block=None):
555 """
564 """
556 Pull Python objects by key out of engines namespaces.
565 Pull Python objects by key out of engines namespaces.
557
566
558 :Parameters:
567 :Parameters:
559 keys : str or list of str
568 keys : str or list of str
560 The names of the variables to be pulled
569 The names of the variables to be pulled
561 targets : id or list of ids
570 targets : id or list of ids
562 The engine to use for the execution
571 The engine to use for the execution
563 block : boolean
572 block : boolean
564 If False, this method will return the actual result. If False,
573 If False, this method will return the actual result. If False,
565 a `PendingResult` is returned which can be used to get the result
574 a `PendingResult` is returned which can be used to get the result
566 at a later time.
575 at a later time.
567 """
576 """
568 targets, block = self._findTargetsAndBlock(targets, block)
577 targets, block = self._findTargetsAndBlock(targets, block)
569 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
578 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
570
579
571 def push_function(self, namespace, targets=None, block=None):
580 def push_function(self, namespace, targets=None, block=None):
572 """
581 """
573 Push a Python function to an engine.
582 Push a Python function to an engine.
574
583
575 This method is used to push a Python function to an engine. This
584 This method is used to push a Python function to an engine. This
576 method can then be used in code on the engines. Closures are not supported.
585 method can then be used in code on the engines. Closures are not supported.
577
586
578 :Parameters:
587 :Parameters:
579 namespace : dict
588 namespace : dict
580 A dict whose values are the functions to be pushed. The keys give
589 A dict whose values are the functions to be pushed. The keys give
581 that names that the function will appear as in the engines
590 that names that the function will appear as in the engines
582 namespace.
591 namespace.
583 targets : id or list of ids
592 targets : id or list of ids
584 The engine to use for the execution
593 The engine to use for the execution
585 block : boolean
594 block : boolean
586 If False, this method will return the actual result. If False,
595 If False, this method will return the actual result. If False,
587 a `PendingResult` is returned which can be used to get the result
596 a `PendingResult` is returned which can be used to get the result
588 at a later time.
597 at a later time.
589 """
598 """
590 targets, block = self._findTargetsAndBlock(targets, block)
599 targets, block = self._findTargetsAndBlock(targets, block)
591 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
600 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
592
601
593 def pull_function(self, keys, targets=None, block=None):
602 def pull_function(self, keys, targets=None, block=None):
594 """
603 """
595 Pull a Python function from an engine.
604 Pull a Python function from an engine.
596
605
597 This method is used to pull a Python function from an engine.
606 This method is used to pull a Python function from an engine.
598 Closures are not supported.
607 Closures are not supported.
599
608
600 :Parameters:
609 :Parameters:
601 keys : str or list of str
610 keys : str or list of str
602 The names of the functions to be pulled
611 The names of the functions to be pulled
603 targets : id or list of ids
612 targets : id or list of ids
604 The engine to use for the execution
613 The engine to use for the execution
605 block : boolean
614 block : boolean
606 If False, this method will return the actual result. If False,
615 If False, this method will return the actual result. If False,
607 a `PendingResult` is returned which can be used to get the result
616 a `PendingResult` is returned which can be used to get the result
608 at a later time.
617 at a later time.
609 """
618 """
610 targets, block = self._findTargetsAndBlock(targets, block)
619 targets, block = self._findTargetsAndBlock(targets, block)
611 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
620 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
612
621
613 def push_serialized(self, namespace, targets=None, block=None):
622 def push_serialized(self, namespace, targets=None, block=None):
614 targets, block = self._findTargetsAndBlock(targets, block)
623 targets, block = self._findTargetsAndBlock(targets, block)
615 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
624 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
616
625
617 def pull_serialized(self, keys, targets=None, block=None):
626 def pull_serialized(self, keys, targets=None, block=None):
618 targets, block = self._findTargetsAndBlock(targets, block)
627 targets, block = self._findTargetsAndBlock(targets, block)
619 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
628 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
620
629
621 def get_result(self, i=None, targets=None, block=None):
630 def get_result(self, i=None, targets=None, block=None):
622 """
631 """
623 Get a previous result.
632 Get a previous result.
624
633
625 When code is executed in an engine, a dict is created and returned. This
634 When code is executed in an engine, a dict is created and returned. This
626 method retrieves that dict for previous commands.
635 method retrieves that dict for previous commands.
627
636
628 :Parameters:
637 :Parameters:
629 i : int
638 i : int
630 The number of the result to get
639 The number of the result to get
631 targets : id or list of ids
640 targets : id or list of ids
632 The engine to use for the execution
641 The engine to use for the execution
633 block : boolean
642 block : boolean
634 If False, this method will return the actual result. If False,
643 If False, this method will return the actual result. If False,
635 a `PendingResult` is returned which can be used to get the result
644 a `PendingResult` is returned which can be used to get the result
636 at a later time.
645 at a later time.
637 """
646 """
638 targets, block = self._findTargetsAndBlock(targets, block)
647 targets, block = self._findTargetsAndBlock(targets, block)
639 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
648 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
640 if block:
649 if block:
641 result = ResultList(result)
650 result = ResultList(result)
642 else:
651 else:
643 result = PendingResult(self, result)
652 result = PendingResult(self, result)
644 result.add_callback(wrapResultList)
653 result.add_callback(wrapResultList)
645 return result
654 return result
646
655
647 def reset(self, targets=None, block=None):
656 def reset(self, targets=None, block=None):
648 """
657 """
649 Reset an engine.
658 Reset an engine.
650
659
651 This method clears out the namespace of an engine.
660 This method clears out the namespace of an engine.
652
661
653 :Parameters:
662 :Parameters:
654 targets : id or list of ids
663 targets : id or list of ids
655 The engine to use for the execution
664 The engine to use for the execution
656 block : boolean
665 block : boolean
657 If False, this method will return the actual result. If False,
666 If False, this method will return the actual result. If False,
658 a `PendingResult` is returned which can be used to get the result
667 a `PendingResult` is returned which can be used to get the result
659 at a later time.
668 at a later time.
660 """
669 """
661 targets, block = self._findTargetsAndBlock(targets, block)
670 targets, block = self._findTargetsAndBlock(targets, block)
662 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
671 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
663
672
664 def keys(self, targets=None, block=None):
673 def keys(self, targets=None, block=None):
665 """
674 """
666 Get a list of all the variables in an engine's namespace.
675 Get a list of all the variables in an engine's namespace.
667
676
668 :Parameters:
677 :Parameters:
669 targets : id or list of ids
678 targets : id or list of ids
670 The engine to use for the execution
679 The engine to use for the execution
671 block : boolean
680 block : boolean
672 If False, this method will return the actual result. If False,
681 If False, this method will return the actual result. If False,
673 a `PendingResult` is returned which can be used to get the result
682 a `PendingResult` is returned which can be used to get the result
674 at a later time.
683 at a later time.
675 """
684 """
676 targets, block = self._findTargetsAndBlock(targets, block)
685 targets, block = self._findTargetsAndBlock(targets, block)
677 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
686 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
678
687
679 def kill(self, controller=False, targets=None, block=None):
688 def kill(self, controller=False, targets=None, block=None):
680 """
689 """
681 Kill the engines and controller.
690 Kill the engines and controller.
682
691
683 This method is used to stop the engine and controller by calling
692 This method is used to stop the engine and controller by calling
684 `reactor.stop`.
693 `reactor.stop`.
685
694
686 :Parameters:
695 :Parameters:
687 controller : boolean
696 controller : boolean
688 If True, kill the engines and controller. If False, just the
697 If True, kill the engines and controller. If False, just the
689 engines
698 engines
690 targets : id or list of ids
699 targets : id or list of ids
691 The engine to use for the execution
700 The engine to use for the execution
692 block : boolean
701 block : boolean
693 If False, this method will return the actual result. If False,
702 If False, this method will return the actual result. If False,
694 a `PendingResult` is returned which can be used to get the result
703 a `PendingResult` is returned which can be used to get the result
695 at a later time.
704 at a later time.
696 """
705 """
697 targets, block = self._findTargetsAndBlock(targets, block)
706 targets, block = self._findTargetsAndBlock(targets, block)
698 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
707 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
699
708
700 def clear_queue(self, targets=None, block=None):
709 def clear_queue(self, targets=None, block=None):
701 """
710 """
702 Clear out the controller's queue for an engine.
711 Clear out the controller's queue for an engine.
703
712
704 The controller maintains a queue for each engine. This clear it out.
713 The controller maintains a queue for each engine. This clear it out.
705
714
706 :Parameters:
715 :Parameters:
707 targets : id or list of ids
716 targets : id or list of ids
708 The engine to use for the execution
717 The engine to use for the execution
709 block : boolean
718 block : boolean
710 If False, this method will return the actual result. If False,
719 If False, this method will return the actual result. If False,
711 a `PendingResult` is returned which can be used to get the result
720 a `PendingResult` is returned which can be used to get the result
712 at a later time.
721 at a later time.
713 """
722 """
714 targets, block = self._findTargetsAndBlock(targets, block)
723 targets, block = self._findTargetsAndBlock(targets, block)
715 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
724 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
716
725
717 def queue_status(self, targets=None, block=None):
726 def queue_status(self, targets=None, block=None):
718 """
727 """
719 Get the status of an engines queue.
728 Get the status of an engines queue.
720
729
721 :Parameters:
730 :Parameters:
722 targets : id or list of ids
731 targets : id or list of ids
723 The engine to use for the execution
732 The engine to use for the execution
724 block : boolean
733 block : boolean
725 If False, this method will return the actual result. If False,
734 If False, this method will return the actual result. If False,
726 a `PendingResult` is returned which can be used to get the result
735 a `PendingResult` is returned which can be used to get the result
727 at a later time.
736 at a later time.
728 """
737 """
729 targets, block = self._findTargetsAndBlock(targets, block)
738 targets, block = self._findTargetsAndBlock(targets, block)
730 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
739 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
731
740
732 def set_properties(self, properties, targets=None, block=None):
741 def set_properties(self, properties, targets=None, block=None):
742 warnings.warn(_prop_warn)
733 targets, block = self._findTargetsAndBlock(targets, block)
743 targets, block = self._findTargetsAndBlock(targets, block)
734 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
744 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
735
745
736 def get_properties(self, keys=None, targets=None, block=None):
746 def get_properties(self, keys=None, targets=None, block=None):
747 warnings.warn(_prop_warn)
737 targets, block = self._findTargetsAndBlock(targets, block)
748 targets, block = self._findTargetsAndBlock(targets, block)
738 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
749 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
739
750
740 def has_properties(self, keys, targets=None, block=None):
751 def has_properties(self, keys, targets=None, block=None):
752 warnings.warn(_prop_warn)
741 targets, block = self._findTargetsAndBlock(targets, block)
753 targets, block = self._findTargetsAndBlock(targets, block)
742 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
754 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
743
755
744 def del_properties(self, keys, targets=None, block=None):
756 def del_properties(self, keys, targets=None, block=None):
757 warnings.warn(_prop_warn)
745 targets, block = self._findTargetsAndBlock(targets, block)
758 targets, block = self._findTargetsAndBlock(targets, block)
746 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
759 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
747
760
748 def clear_properties(self, targets=None, block=None):
761 def clear_properties(self, targets=None, block=None):
762 warnings.warn(_prop_warn)
749 targets, block = self._findTargetsAndBlock(targets, block)
763 targets, block = self._findTargetsAndBlock(targets, block)
750 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
764 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
751
765
752 #---------------------------------------------------------------------------
766 #---------------------------------------------------------------------------
753 # IMultiEngine related methods
767 # IMultiEngine related methods
754 #---------------------------------------------------------------------------
768 #---------------------------------------------------------------------------
755
769
756 def get_ids(self):
770 def get_ids(self):
757 """
771 """
758 Returns the ids of currently registered engines.
772 Returns the ids of currently registered engines.
759 """
773 """
760 result = blockingCallFromThread(self.smultiengine.get_ids)
774 result = blockingCallFromThread(self.smultiengine.get_ids)
761 return result
775 return result
762
776
763 #---------------------------------------------------------------------------
777 #---------------------------------------------------------------------------
764 # IMultiEngineCoordinator
778 # IMultiEngineCoordinator
765 #---------------------------------------------------------------------------
779 #---------------------------------------------------------------------------
766
780
767 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
781 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
768 """
782 """
769 Partition a Python sequence and send the partitions to a set of engines.
783 Partition a Python sequence and send the partitions to a set of engines.
770 """
784 """
771 targets, block = self._findTargetsAndBlock(targets, block)
785 targets, block = self._findTargetsAndBlock(targets, block)
772 return self._blockFromThread(self.smultiengine.scatter, key, seq,
786 return self._blockFromThread(self.smultiengine.scatter, key, seq,
773 dist, flatten, targets=targets, block=block)
787 dist, flatten, targets=targets, block=block)
774
788
775 def gather(self, key, dist='b', targets=None, block=None):
789 def gather(self, key, dist='b', targets=None, block=None):
776 """
790 """
777 Gather a partitioned sequence on a set of engines as a single local seq.
791 Gather a partitioned sequence on a set of engines as a single local seq.
778 """
792 """
779 targets, block = self._findTargetsAndBlock(targets, block)
793 targets, block = self._findTargetsAndBlock(targets, block)
780 return self._blockFromThread(self.smultiengine.gather, key, dist,
794 return self._blockFromThread(self.smultiengine.gather, key, dist,
781 targets=targets, block=block)
795 targets=targets, block=block)
782
796
783 def raw_map(self, func, seq, dist='b', targets=None, block=None):
797 def raw_map(self, func, seq, dist='b', targets=None, block=None):
784 """
798 """
785 A parallelized version of Python's builtin map.
799 A parallelized version of Python's builtin map.
786
800
787 This has a slightly different syntax than the builtin `map`.
801 This has a slightly different syntax than the builtin `map`.
788 This is needed because we need to have keyword arguments and thus
802 This is needed because we need to have keyword arguments and thus
789 can't use *args to capture all the sequences. Instead, they must
803 can't use *args to capture all the sequences. Instead, they must
790 be passed in a list or tuple.
804 be passed in a list or tuple.
791
805
792 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
806 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
793
807
794 Most users will want to use parallel functions or the `mapper`
808 Most users will want to use parallel functions or the `mapper`
795 and `map` methods for an API that follows that of the builtin
809 and `map` methods for an API that follows that of the builtin
796 `map`.
810 `map`.
797 """
811 """
798 targets, block = self._findTargetsAndBlock(targets, block)
812 targets, block = self._findTargetsAndBlock(targets, block)
799 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
813 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
800 dist, targets=targets, block=block)
814 dist, targets=targets, block=block)
801
815
802 def map(self, func, *sequences):
816 def map(self, func, *sequences):
803 """
817 """
804 A parallel version of Python's builtin `map` function.
818 A parallel version of Python's builtin `map` function.
805
819
806 This method applies a function to sequences of arguments. It
820 This method applies a function to sequences of arguments. It
807 follows the same syntax as the builtin `map`.
821 follows the same syntax as the builtin `map`.
808
822
809 This method creates a mapper objects by calling `self.mapper` with
823 This method creates a mapper objects by calling `self.mapper` with
810 no arguments and then uses that mapper to do the mapping. See
824 no arguments and then uses that mapper to do the mapping. See
811 the documentation of `mapper` for more details.
825 the documentation of `mapper` for more details.
812 """
826 """
813 return self.mapper().map(func, *sequences)
827 return self.mapper().map(func, *sequences)
814
828
815 def mapper(self, dist='b', targets='all', block=None):
829 def mapper(self, dist='b', targets='all', block=None):
816 """
830 """
817 Create a mapper object that has a `map` method.
831 Create a mapper object that has a `map` method.
818
832
819 This method returns an object that implements the `IMapper`
833 This method returns an object that implements the `IMapper`
820 interface. This method is a factory that is used to control how
834 interface. This method is a factory that is used to control how
821 the map happens.
835 the map happens.
822
836
823 :Parameters:
837 :Parameters:
824 dist : str
838 dist : str
825 What decomposition to use, 'b' is the only one supported
839 What decomposition to use, 'b' is the only one supported
826 currently
840 currently
827 targets : str, int, sequence of ints
841 targets : str, int, sequence of ints
828 Which engines to use for the map
842 Which engines to use for the map
829 block : boolean
843 block : boolean
830 Should calls to `map` block or not
844 Should calls to `map` block or not
831 """
845 """
832 return MultiEngineMapper(self, dist, targets, block)
846 return MultiEngineMapper(self, dist, targets, block)
833
847
834 def parallel(self, dist='b', targets=None, block=None):
848 def parallel(self, dist='b', targets=None, block=None):
835 """
849 """
836 A decorator that turns a function into a parallel function.
850 A decorator that turns a function into a parallel function.
837
851
838 This can be used as:
852 This can be used as:
839
853
840 @parallel()
854 @parallel()
841 def f(x, y)
855 def f(x, y)
842 ...
856 ...
843
857
844 f(range(10), range(10))
858 f(range(10), range(10))
845
859
846 This causes f(0,0), f(1,1), ... to be called in parallel.
860 This causes f(0,0), f(1,1), ... to be called in parallel.
847
861
848 :Parameters:
862 :Parameters:
849 dist : str
863 dist : str
850 What decomposition to use, 'b' is the only one supported
864 What decomposition to use, 'b' is the only one supported
851 currently
865 currently
852 targets : str, int, sequence of ints
866 targets : str, int, sequence of ints
853 Which engines to use for the map
867 Which engines to use for the map
854 block : boolean
868 block : boolean
855 Should calls to `map` block or not
869 Should calls to `map` block or not
856 """
870 """
857 targets, block = self._findTargetsAndBlock(targets, block)
871 targets, block = self._findTargetsAndBlock(targets, block)
858 mapper = self.mapper(dist, targets, block)
872 mapper = self.mapper(dist, targets, block)
859 pf = ParallelFunction(mapper)
873 pf = ParallelFunction(mapper)
860 return pf
874 return pf
861
875
862 #---------------------------------------------------------------------------
876 #---------------------------------------------------------------------------
863 # IMultiEngineExtras
877 # IMultiEngineExtras
864 #---------------------------------------------------------------------------
878 #---------------------------------------------------------------------------
865
879
866 def zip_pull(self, keys, targets=None, block=None):
880 def zip_pull(self, keys, targets=None, block=None):
867 targets, block = self._findTargetsAndBlock(targets, block)
881 targets, block = self._findTargetsAndBlock(targets, block)
868 return self._blockFromThread(self.smultiengine.zip_pull, keys,
882 return self._blockFromThread(self.smultiengine.zip_pull, keys,
869 targets=targets, block=block)
883 targets=targets, block=block)
870
884
871 def run(self, filename, targets=None, block=None):
885 def run(self, filename, targets=None, block=None):
872 """
886 """
873 Run a Python code in a file on the engines.
887 Run a Python code in a file on the engines.
874
888
875 :Parameters:
889 :Parameters:
876 filename : str
890 filename : str
877 The name of the local file to run
891 The name of the local file to run
878 targets : id or list of ids
892 targets : id or list of ids
879 The engine to use for the execution
893 The engine to use for the execution
880 block : boolean
894 block : boolean
881 If False, this method will return the actual result. If False,
895 If False, this method will return the actual result. If False,
882 a `PendingResult` is returned which can be used to get the result
896 a `PendingResult` is returned which can be used to get the result
883 at a later time.
897 at a later time.
884 """
898 """
885 targets, block = self._findTargetsAndBlock(targets, block)
899 targets, block = self._findTargetsAndBlock(targets, block)
886 return self._blockFromThread(self.smultiengine.run, filename,
900 return self._blockFromThread(self.smultiengine.run, filename,
887 targets=targets, block=block)
901 targets=targets, block=block)
888
902
889 def benchmark(self, push_size=10000):
903 def benchmark(self, push_size=10000):
890 """
904 """
891 Run performance benchmarks for the current IPython cluster.
905 Run performance benchmarks for the current IPython cluster.
892
906
893 This method tests both the latency of sending command and data to the
907 This method tests both the latency of sending command and data to the
894 engines as well as the throughput of sending large objects to the
908 engines as well as the throughput of sending large objects to the
895 engines using push. The latency is measured by having one or more
909 engines using push. The latency is measured by having one or more
896 engines execute the command 'pass'. The throughput is measure by
910 engines execute the command 'pass'. The throughput is measure by
897 sending an NumPy array of size `push_size` to one or more engines.
911 sending an NumPy array of size `push_size` to one or more engines.
898
912
899 These benchmarks will vary widely on different hardware and networks
913 These benchmarks will vary widely on different hardware and networks
900 and thus can be used to get an idea of the performance characteristics
914 and thus can be used to get an idea of the performance characteristics
901 of a particular configuration of an IPython controller and engines.
915 of a particular configuration of an IPython controller and engines.
902
916
903 This function is not testable within our current testing framework.
917 This function is not testable within our current testing framework.
904 """
918 """
905 import timeit, __builtin__
919 import timeit, __builtin__
906 __builtin__._mec_self = self
920 __builtin__._mec_self = self
907 benchmarks = {}
921 benchmarks = {}
908 repeat = 3
922 repeat = 3
909 count = 10
923 count = 10
910
924
911 timer = timeit.Timer('_mec_self.execute("pass",0)')
925 timer = timeit.Timer('_mec_self.execute("pass",0)')
912 result = 1000*min(timer.repeat(repeat,count))/count
926 result = 1000*min(timer.repeat(repeat,count))/count
913 benchmarks['single_engine_latency'] = (result,'msec')
927 benchmarks['single_engine_latency'] = (result,'msec')
914
928
915 timer = timeit.Timer('_mec_self.execute("pass")')
929 timer = timeit.Timer('_mec_self.execute("pass")')
916 result = 1000*min(timer.repeat(repeat,count))/count
930 result = 1000*min(timer.repeat(repeat,count))/count
917 benchmarks['all_engine_latency'] = (result,'msec')
931 benchmarks['all_engine_latency'] = (result,'msec')
918
932
919 try:
933 try:
920 import numpy as np
934 import numpy as np
921 except:
935 except:
922 pass
936 pass
923 else:
937 else:
924 timer = timeit.Timer(
938 timer = timeit.Timer(
925 "_mec_self.push(d)",
939 "_mec_self.push(d)",
926 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
940 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
927 )
941 )
928 result = min(timer.repeat(repeat,count))/count
942 result = min(timer.repeat(repeat,count))/count
929 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
943 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
930
944
931 try:
945 try:
932 import numpy as np
946 import numpy as np
933 except:
947 except:
934 pass
948 pass
935 else:
949 else:
936 timer = timeit.Timer(
950 timer = timeit.Timer(
937 "_mec_self.push(d,0)",
951 "_mec_self.push(d,0)",
938 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
952 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
939 )
953 )
940 result = min(timer.repeat(repeat,count))/count
954 result = min(timer.repeat(repeat,count))/count
941 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
955 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
942
956
943 return benchmarks
957 return benchmarks
944
958
945
959
946 components.registerAdapter(FullBlockingMultiEngineClient,
960 components.registerAdapter(FullBlockingMultiEngineClient,
947 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
961 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
948
962
949
963
950
964
951
965
@@ -1,180 +1,180 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
3
3
4 """
4 """
5 A blocking version of the task client.
5 A blocking version of the task client.
6 """
6 """
7
7
8 __docformat__ = "restructuredtext en"
8 __docformat__ = "restructuredtext en"
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Copyright (C) 2008 The IPython Development Team
11 # Copyright (C) 2008 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16
16
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18 # Imports
18 # Imports
19 #-------------------------------------------------------------------------------
19 #-------------------------------------------------------------------------------
20
20
21 from zope.interface import Interface, implements
21 from zope.interface import Interface, implements
22 from twisted.python import components, log
22 from twisted.python import components, log
23
23
24 from IPython.kernel.twistedutil import blockingCallFromThread
24 from IPython.kernel.twistedutil import blockingCallFromThread
25 from IPython.kernel import task, error
25 from IPython.kernel import task, error
26 from IPython.kernel.mapper import (
26 from IPython.kernel.mapper import (
27 SynchronousTaskMapper,
27 SynchronousTaskMapper,
28 ITaskMapperFactory,
28 ITaskMapperFactory,
29 IMapper
29 IMapper
30 )
30 )
31 from IPython.kernel.parallelfunction import (
31 from IPython.kernel.parallelfunction import (
32 ParallelFunction,
32 ParallelFunction,
33 ITaskParallelDecorator
33 ITaskParallelDecorator
34 )
34 )
35
35
36 #-------------------------------------------------------------------------------
36 #-------------------------------------------------------------------------------
37 # The task client
37 # The task client
38 #-------------------------------------------------------------------------------
38 #-------------------------------------------------------------------------------
39
39
40 class IBlockingTaskClient(Interface):
40 class IBlockingTaskClient(Interface):
41 """
41 """
42 A vague interface of the blocking task client
42 A vague interface of the blocking task client
43 """
43 """
44 pass
44 pass
45
45
46 class BlockingTaskClient(object):
46 class BlockingTaskClient(object):
47 """
47 """
48 A blocking task client that adapts a non-blocking one.
48 A blocking task client that adapts a non-blocking one.
49 """
49 """
50
50
51 implements(
51 implements(
52 IBlockingTaskClient,
52 IBlockingTaskClient,
53 ITaskMapperFactory,
53 ITaskMapperFactory,
54 IMapper,
54 IMapper,
55 ITaskParallelDecorator
55 ITaskParallelDecorator
56 )
56 )
57
57
58 def __init__(self, task_controller):
58 def __init__(self, task_controller):
59 self.task_controller = task_controller
59 self.task_controller = task_controller
60 self.block = True
60 self.block = True
61
61
62 def run(self, task, block=False):
62 def run(self, task, block=False):
63 """Run a task on the `TaskController`.
63 """Run a task on the `TaskController`.
64
64
65 See the documentation of the `MapTask` and `StringTask` classes for
65 See the documentation of the `MapTask` and `StringTask` classes for
66 details on how to build a task of different types.
66 details on how to build a task of different types.
67
67
68 :Parameters:
68 :Parameters:
69 task : an `ITask` implementer
69 task : an `ITask` implementer
70
70
71 :Returns: The int taskid of the submitted task. Pass this to
71 :Returns: The int taskid of the submitted task. Pass this to
72 `get_task_result` to get the `TaskResult` object.
72 `get_task_result` to get the `TaskResult` object.
73 """
73 """
74 tid = blockingCallFromThread(self.task_controller.run, task)
74 tid = blockingCallFromThread(self.task_controller.run, task)
75 if block:
75 if block:
76 return self.get_task_result(tid, block=True)
76 return self.get_task_result(tid, block=True)
77 else:
77 else:
78 return tid
78 return tid
79
79
80 def get_task_result(self, taskid, block=False):
80 def get_task_result(self, taskid, block=False):
81 """
81 """
82 Get a task result by taskid.
82 Get a task result by taskid.
83
83
84 :Parameters:
84 :Parameters:
85 taskid : int
85 taskid : int
86 The taskid of the task to be retrieved.
86 The taskid of the task to be retrieved.
87 block : boolean
87 block : boolean
88 Should I block until the task is done?
88 Should I block until the task is done?
89
89
90 :Returns: A `TaskResult` object that encapsulates the task result.
90 :Returns: A `TaskResult` object that encapsulates the task result.
91 """
91 """
92 return blockingCallFromThread(self.task_controller.get_task_result,
92 return blockingCallFromThread(self.task_controller.get_task_result,
93 taskid, block)
93 taskid, block)
94
94
95 def abort(self, taskid):
95 def abort(self, taskid):
96 """
96 """
97 Abort a task by taskid.
97 Abort a task by taskid.
98
98
99 :Parameters:
99 :Parameters:
100 taskid : int
100 taskid : int
101 The taskid of the task to be aborted.
101 The taskid of the task to be aborted.
102 """
102 """
103 return blockingCallFromThread(self.task_controller.abort, taskid)
103 return blockingCallFromThread(self.task_controller.abort, taskid)
104
104
105 def barrier(self, taskids):
105 def barrier(self, taskids):
106 """Block until a set of tasks are completed.
106 """Block until a set of tasks are completed.
107
107
108 :Parameters:
108 :Parameters:
109 taskids : list, tuple
109 taskids : list, tuple
110 A sequence of taskids to block on.
110 A sequence of taskids to block on.
111 """
111 """
112 return blockingCallFromThread(self.task_controller.barrier, taskids)
112 return blockingCallFromThread(self.task_controller.barrier, taskids)
113
113
114 def spin(self):
114 def spin(self):
115 """
115 """
116 Touch the scheduler, to resume scheduling without submitting a task.
116 Touch the scheduler, to resume scheduling without submitting a task.
117
117
118 This method only needs to be called in unusual situations where the
118 This method only needs to be called in unusual situations where the
119 scheduler is idle for some reason.
119 scheduler is idle for some reason.
120 """
120 """
121 return blockingCallFromThread(self.task_controller.spin)
121 return blockingCallFromThread(self.task_controller.spin)
122
122
123 def queue_status(self, verbose=False):
123 def queue_status(self, verbose=False):
124 """
124 """
125 Get a dictionary with the current state of the task queue.
125 Get a dictionary with the current state of the task queue.
126
126
127 :Parameters:
127 :Parameters:
128 verbose : boolean
128 verbose : boolean
129 If True, return a list of taskids. If False, simply give
129 If True, return a list of taskids. If False, simply give
130 the number of tasks with each status.
130 the number of tasks with each status.
131
131
132 :Returns:
132 :Returns:
133 A dict with the queue status.
133 A dict with the queue status.
134 """
134 """
135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
136
136
137 def clear(self):
137 def clear(self):
138 """
138 """
139 Clear all previously run tasks from the task controller.
139 Clear all previously run tasks from the task controller.
140
140
141 This is needed because the task controller keep all task results
141 This is needed because the task controller keep all task results
142 in memory. This can be a problem is there are many completed
142 in memory. This can be a problem is there are many completed
143 tasks. Users should call this periodically to clean out these
143 tasks. Users should call this periodically to clean out these
144 cached task results.
144 cached task results.
145 """
145 """
146 return blockingCallFromThread(self.task_controller.clear)
146 return blockingCallFromThread(self.task_controller.clear)
147
147
148 def map(self, func, *sequences):
148 def map(self, func, *sequences):
149 """
149 """
150 Apply func to *sequences elementwise. Like Python's builtin map.
150 Apply func to *sequences elementwise. Like Python's builtin map.
151
151
152 This version is load balanced.
152 This version is load balanced.
153 """
153 """
154 return self.mapper().map(func, *sequences)
154 return self.mapper().map(func, *sequences)
155
155
156 def mapper(self, clear_before=False, clear_after=False, retries=0,
156 def mapper(self, clear_before=False, clear_after=False, retries=0,
157 recovery_task=None, depend=None, block=True):
157 recovery_task=None, depend=None, block=True):
158 """
158 """
159 Create an `IMapper` implementer with a given set of arguments.
159 Create an `IMapper` implementer with a given set of arguments.
160
160
161 The `IMapper` created using a task controller is load balanced.
161 The `IMapper` created using a task controller is load balanced.
162
162
163 See the documentation for `IPython.kernel.task.BaseTask` for
163 See the documentation for `IPython.kernel.task.BaseTask` for
164 documentation on the arguments to this method.
164 documentation on the arguments to this method.
165 """
165 """
166 return SynchronousTaskMapper(self, clear_before=clear_before,
166 return SynchronousTaskMapper(self, clear_before=clear_before,
167 clear_after=clear_after, retries=retries,
167 clear_after=clear_after, retries=retries,
168 recovery_task=recovery_task, depend=depend, block=block)
168 recovery_task=recovery_task, depend=depend, block=block)
169
169
170 def parallel(self, clear_before=False, clear_after=False, retries=0,
170 def parallel(self, clear_before=False, clear_after=False, retries=0,
171 recovery_task=None, depend=None, block=True):
171 recovery_task=None, depend=None, block=True):
172 mapper = self.mapper(clear_before, clear_after, retries,
172 mapper = self.mapper(clear_before, clear_after, retries,
173 recovery_task, depend, block)
173 recovery_task, depend, block)
174 pf = ParallelFunction(mapper)
174 pf = ParallelFunction(mapper)
175 return pf
175 return pf
176
176
177 components.registerAdapter(BlockingTaskClient,
177 components.registerAdapter(BlockingTaskClient,
178 task.ITaskController, IBlockingTaskClient)
178 task.ITaskController, IBlockingTaskClient)
179
179
180
180
@@ -1,246 +1,250 b''
1 ==================================
1 ==================================
2 IPython/Vision Beam Pattern Demo
2 IPython/Vision Beam Pattern Demo
3 ==================================
3 ==================================
4
4
5 .. note::
6
7 This page has not been updated to reflect the recent work on ipcluster.
8 This work makes it much easier to use IPython on a cluster.
5
9
6 Installing and testing IPython at OSC systems
10 Installing and testing IPython at OSC systems
7 =============================================
11 =============================================
8
12
9 All components were installed from source and I have my environment set up to
13 All components were installed from source and I have my environment set up to
10 include ~/usr/local in my various necessary paths ($PATH, $PYTHONPATH, etc).
14 include ~/usr/local in my various necessary paths ($PATH, $PYTHONPATH, etc).
11 Other than a slow filesystem for unpacking tarballs, the install went without a
15 Other than a slow filesystem for unpacking tarballs, the install went without a
12 hitch. For each needed component, I just downloaded the source tarball,
16 hitch. For each needed component, I just downloaded the source tarball,
13 unpacked it via::
17 unpacked it via::
14
18
15 tar xzf (or xjf if it's bz2) filename.tar.{gz,bz2}
19 tar xzf (or xjf if it's bz2) filename.tar.{gz,bz2}
16
20
17 and then installed them (including IPython itself) with::
21 and then installed them (including IPython itself) with::
18
22
19 cd dirname/ # path to unpacked tarball
23 cd dirname/ # path to unpacked tarball
20 python setup.py install --prefix=~/usr/local/
24 python setup.py install --prefix=~/usr/local/
21
25
22 The components I installed are listed below. For each one I give the main
26 The components I installed are listed below. For each one I give the main
23 project link as well as a direct one to the file I actually dowloaded and used.
27 project link as well as a direct one to the file I actually dowloaded and used.
24
28
25 - nose, used for testing:
29 - nose, used for testing:
26 http://somethingaboutorange.com/mrl/projects/nose/
30 http://somethingaboutorange.com/mrl/projects/nose/
27 http://somethingaboutorange.com/mrl/projects/nose/nose-0.10.3.tar.gz
31 http://somethingaboutorange.com/mrl/projects/nose/nose-0.10.3.tar.gz
28
32
29 - Zope interface, used to declare interfaces in twisted and ipython. Note:
33 - Zope interface, used to declare interfaces in twisted and ipython. Note:
30 you must get this from the page linked below and not fro the defaul
34 you must get this from the page linked below and not fro the defaul
31 one(http://www.zope.org/Products/ZopeInterface) because the latter has an
35 one(http://www.zope.org/Products/ZopeInterface) because the latter has an
32 older version, it hasn't been updated in a long time. This pypi link has
36 older version, it hasn't been updated in a long time. This pypi link has
33 the current release (3.4.1 as of this writing):
37 the current release (3.4.1 as of this writing):
34 http://pypi.python.org/pypi/zope.interface
38 http://pypi.python.org/pypi/zope.interface
35 http://pypi.python.org/packages/source/z/zope.interface/zope.interface-3.4.1.tar.gz
39 http://pypi.python.org/packages/source/z/zope.interface/zope.interface-3.4.1.tar.gz
36
40
37 - pyopenssl, security layer used by foolscap. Note: version 0.7 *must* be
41 - pyopenssl, security layer used by foolscap. Note: version 0.7 *must* be
38 used:
42 used:
39 http://sourceforge.net/projects/pyopenssl/
43 http://sourceforge.net/projects/pyopenssl/
40 http://downloads.sourceforge.net/pyopenssl/pyOpenSSL-0.6.tar.gz?modtime=1212595285&big_mirror=0
44 http://downloads.sourceforge.net/pyopenssl/pyOpenSSL-0.6.tar.gz?modtime=1212595285&big_mirror=0
41
45
42
46
43 - Twisted, used for all networking:
47 - Twisted, used for all networking:
44 http://twistedmatrix.com/trac/wiki/Downloads
48 http://twistedmatrix.com/trac/wiki/Downloads
45 http://tmrc.mit.edu/mirror/twisted/Twisted/8.1/Twisted-8.1.0.tar.bz2
49 http://tmrc.mit.edu/mirror/twisted/Twisted/8.1/Twisted-8.1.0.tar.bz2
46
50
47 - Foolscap, used for managing connections securely:
51 - Foolscap, used for managing connections securely:
48 http://foolscap.lothar.com/trac
52 http://foolscap.lothar.com/trac
49 http://foolscap.lothar.com/releases/foolscap-0.3.1.tar.gz
53 http://foolscap.lothar.com/releases/foolscap-0.3.1.tar.gz
50
54
51
55
52 - IPython itself:
56 - IPython itself:
53 http://ipython.scipy.org/
57 http://ipython.scipy.org/
54 http://ipython.scipy.org/dist/ipython-0.9.1.tar.gz
58 http://ipython.scipy.org/dist/ipython-0.9.1.tar.gz
55
59
56
60
57 I then ran the ipython test suite via::
61 I then ran the ipython test suite via::
58
62
59 iptest -vv
63 iptest -vv
60
64
61 and it passed with only::
65 and it passed with only::
62
66
63 ======================================================================
67 ======================================================================
64 ERROR: testGetResult_2
68 ERROR: testGetResult_2
65 ----------------------------------------------------------------------
69 ----------------------------------------------------------------------
66 DirtyReactorAggregateError: Reactor was unclean.
70 DirtyReactorAggregateError: Reactor was unclean.
67 Selectables:
71 Selectables:
68 <Negotiation #0 on 10105>
72 <Negotiation #0 on 10105>
69
73
70 ----------------------------------------------------------------------
74 ----------------------------------------------------------------------
71 Ran 419 tests in 33.971s
75 Ran 419 tests in 33.971s
72
76
73 FAILED (SKIP=4, errors=1)
77 FAILED (SKIP=4, errors=1)
74
78
75 In three more runs of the test suite I was able to reproduce this error
79 In three more runs of the test suite I was able to reproduce this error
76 sometimes but not always; for now I think we can move on but we need to
80 sometimes but not always; for now I think we can move on but we need to
77 investigate further. Especially if we start seeing problems in real use (the
81 investigate further. Especially if we start seeing problems in real use (the
78 test suite stresses the networking layer in particular ways that aren't
82 test suite stresses the networking layer in particular ways that aren't
79 necessarily typical of normal use).
83 necessarily typical of normal use).
80
84
81 Next, I started an 8-engine cluster via::
85 Next, I started an 8-engine cluster via::
82
86
83 perez@opt-login01[~]> ipcluster -n 8
87 perez@opt-login01[~]> ipcluster -n 8
84 Starting controller: Controller PID: 30845
88 Starting controller: Controller PID: 30845
85 ^X Starting engines: Engines PIDs: [30846, 30847, 30848, 30849,
89 ^X Starting engines: Engines PIDs: [30846, 30847, 30848, 30849,
86 30850, 30851, 30852, 30853]
90 30850, 30851, 30852, 30853]
87 Log files: /home/perez/.ipython/log/ipcluster-30845-*
91 Log files: /home/perez/.ipython/log/ipcluster-30845-*
88
92
89 Your cluster is up and running.
93 Your cluster is up and running.
90
94
91 [... etc]
95 [... etc]
92
96
93 and in a separate ipython session checked that the cluster is running and I can
97 and in a separate ipython session checked that the cluster is running and I can
94 access all the engines::
98 access all the engines::
95
99
96 In [1]: from IPython.kernel import client
100 In [1]: from IPython.kernel import client
97
101
98 In [2]: mec = client.MultiEngineClient()
102 In [2]: mec = client.MultiEngineClient()
99
103
100 In [3]: mec.get_ids()
104 In [3]: mec.get_ids()
101 Out[3]: [0, 1, 2, 3, 4, 5, 6, 7]
105 Out[3]: [0, 1, 2, 3, 4, 5, 6, 7]
102
106
103 and run trivial code in them (after importing the ``random`` module in all
107 and run trivial code in them (after importing the ``random`` module in all
104 engines)::
108 engines)::
105
109
106 In [11]: mec.execute("x=random.randint(0,10)")
110 In [11]: mec.execute("x=random.randint(0,10)")
107 Out[11]:
111 Out[11]:
108 <Results List>
112 <Results List>
109 [0] In [3]: x=random.randint(0,10)
113 [0] In [3]: x=random.randint(0,10)
110 [1] In [3]: x=random.randint(0,10)
114 [1] In [3]: x=random.randint(0,10)
111 [2] In [3]: x=random.randint(0,10)
115 [2] In [3]: x=random.randint(0,10)
112 [3] In [3]: x=random.randint(0,10)
116 [3] In [3]: x=random.randint(0,10)
113 [4] In [3]: x=random.randint(0,10)
117 [4] In [3]: x=random.randint(0,10)
114 [5] In [3]: x=random.randint(0,10)
118 [5] In [3]: x=random.randint(0,10)
115 [6] In [3]: x=random.randint(0,10)
119 [6] In [3]: x=random.randint(0,10)
116 [7] In [3]: x=random.randint(0,10)
120 [7] In [3]: x=random.randint(0,10)
117
121
118 In [12]: mec.pull('x')
122 In [12]: mec.pull('x')
119 Out[12]: [10, 0, 8, 10, 2, 9, 10, 7]
123 Out[12]: [10, 0, 8, 10, 2, 9, 10, 7]
120
124
121
125
122 We'll continue conducting more complex tests later, including instaling Vision
126 We'll continue conducting more complex tests later, including instaling Vision
123 locally and running the beam demo.
127 locally and running the beam demo.
124
128
125
129
126 Michel's original instructions
130 Michel's original instructions
127 ==============================
131 ==============================
128
132
129 I got a Vision network that reproduces the beam pattern demo working:
133 I got a Vision network that reproduces the beam pattern demo working:
130
134
131 .. image:: vision_beam_pattern.png
135 .. image:: vision_beam_pattern.png
132 :width: 400
136 :width: 400
133 :target: vision_beam_pattern.png
137 :target: vision_beam_pattern.png
134 :align: center
138 :align: center
135
139
136
140
137 I created a package called beamPattern that provides the function run() in its
141 I created a package called beamPattern that provides the function run() in its
138 __init__.py file.
142 __init__.py file.
139
143
140 A subpackage beamPattern/VisionInterface provides Vision nodes for:
144 A subpackage beamPattern/VisionInterface provides Vision nodes for:
141
145
142 - computing Elevation and Azimuth from a 3D vector
146 - computing Elevation and Azimuth from a 3D vector
143
147
144 - Reading .mat files
148 - Reading .mat files
145
149
146 - taking the results gathered from the engines and creating the output that a
150 - taking the results gathered from the engines and creating the output that a
147 single engine would have had produced
151 single engine would have had produced
148
152
149 The Mec node connect to a controller. In my network it was local but an furl
153 The Mec node connect to a controller. In my network it was local but an furl
150 can be specified to connect to a remote controller.
154 can be specified to connect to a remote controller.
151
155
152 The PRun Func node is from the IPython library of nodes. the import statement
156 The PRun Func node is from the IPython library of nodes. the import statement
153 is used to get the run function from the beamPattern package and bu puting
157 is used to get the run function from the beamPattern package and bu puting
154 "run" in the function entry of this node we push this function to the engines.
158 "run" in the function entry of this node we push this function to the engines.
155 In addition to the node will create input ports for all arguments of the
159 In addition to the node will create input ports for all arguments of the
156 function being pushed (i.e. the run function)
160 function being pushed (i.e. the run function)
157
161
158 The second input port on PRun Fun take an integer specifying the rank of the
162 The second input port on PRun Fun take an integer specifying the rank of the
159 argument we want to scatter. All other arguments will be pushed to the engines.
163 argument we want to scatter. All other arguments will be pushed to the engines.
160
164
161 The ElevAzim node has a 3D vector widget and computes the El And Az values
165 The ElevAzim node has a 3D vector widget and computes the El And Az values
162 which are passed into the PRun Fun node through the ports created
166 which are passed into the PRun Fun node through the ports created
163 automatically. The Mat node allows to select the .mat file, reads it and passed
167 automatically. The Mat node allows to select the .mat file, reads it and passed
164 the data to the locdata port created automatically on PRun Func
168 the data to the locdata port created automatically on PRun Func
165
169
166 The calculation is executed in parallel, and the results are gathered and
170 The calculation is executed in parallel, and the results are gathered and
167 output. Instead of having a list of 3 vectors we nd up with a list of n*3
171 output. Instead of having a list of 3 vectors we nd up with a list of n*3
168 vectors where n is the number of engines. unpackDectorResults will turn it into
172 vectors where n is the number of engines. unpackDectorResults will turn it into
169 a list of 3. We then plot x, y, and 10*log10(z)
173 a list of 3. We then plot x, y, and 10*log10(z)
170
174
171
175
172 Installation
176 Installation
173 ------------
177 ------------
174
178
175 - inflate beamPattern into the site-packages directory for the MGL tools.
179 - inflate beamPattern into the site-packages directory for the MGL tools.
176
180
177 - place the appended IPythonNodes.py and StandardNodes.py into the Vision
181 - place the appended IPythonNodes.py and StandardNodes.py into the Vision
178 package of the MGL tools.
182 package of the MGL tools.
179
183
180 - place the appended items.py in the NetworkEditor package of the MGL tools
184 - place the appended items.py in the NetworkEditor package of the MGL tools
181
185
182 - run vision for the network beamPat5_net.py::
186 - run vision for the network beamPat5_net.py::
183
187
184 vision beamPat5_net.py
188 vision beamPat5_net.py
185
189
186 Once the network is running, you can:
190 Once the network is running, you can:
187
191
188 - double click on the MEC node and either use an emptty string for the furl to
192 - double click on the MEC node and either use an emptty string for the furl to
189 connect to a local engine or cut and paste the furl to the engine you want to
193 connect to a local engine or cut and paste the furl to the engine you want to
190 use
194 use
191
195
192 - click on the yellow lighting bold to run the network.
196 - click on the yellow lighting bold to run the network.
193
197
194 - Try modifying the MAT file or change the Vector used top compute elevation
198 - Try modifying the MAT file or change the Vector used top compute elevation
195 and Azimut.
199 and Azimut.
196
200
197
201
198 Fernando's notes
202 Fernando's notes
199 ================
203 ================
200
204
201 - I had to install IPython and all its dependencies for the python used by the
205 - I had to install IPython and all its dependencies for the python used by the
202 MGL tools.
206 MGL tools.
203
207
204 - Then I had to install scipy 0.6.0 for it, since the nodes needed Scipy. To
208 - Then I had to install scipy 0.6.0 for it, since the nodes needed Scipy. To
205 do this I sourced the mglenv.sh script and then ran::
209 do this I sourced the mglenv.sh script and then ran::
206
210
207 python setup.py install --prefix=~/usr/opt/mgl
211 python setup.py install --prefix=~/usr/opt/mgl
208
212
209
213
210 Using PBS
214 Using PBS
211 =========
215 =========
212
216
213 The following PBS script can be used to start the engines::
217 The following PBS script can be used to start the engines::
214
218
215 #PBS -N bgranger-ipython
219 #PBS -N bgranger-ipython
216 #PBS -j oe
220 #PBS -j oe
217 #PBS -l walltime=00:10:00
221 #PBS -l walltime=00:10:00
218 #PBS -l nodes=4:ppn=4
222 #PBS -l nodes=4:ppn=4
219
223
220 cd $PBS_O_WORKDIR
224 cd $PBS_O_WORKDIR
221 export PATH=$HOME/usr/local/bin
225 export PATH=$HOME/usr/local/bin
222 export PYTHONPATH=$HOME/usr/local/lib/python2.4/site-packages
226 export PYTHONPATH=$HOME/usr/local/lib/python2.4/site-packages
223 /usr/local/bin/mpiexec -n 16 ipengine
227 /usr/local/bin/mpiexec -n 16 ipengine
224
228
225
229
226 If this file is called ``ipython_pbs.sh``, then the in one login windows
230 If this file is called ``ipython_pbs.sh``, then the in one login windows
227 (i.e. on the head-node -- ``opt-login01.osc.edu``), run ``ipcontroller``. In
231 (i.e. on the head-node -- ``opt-login01.osc.edu``), run ``ipcontroller``. In
228 another login window on the same node, run the above script::
232 another login window on the same node, run the above script::
229
233
230 qsub ipython_pbs.sh
234 qsub ipython_pbs.sh
231
235
232 If you look at the first window, you will see some diagnostic output
236 If you look at the first window, you will see some diagnostic output
233 from ipcontroller. You can then get the furl from your own
237 from ipcontroller. You can then get the furl from your own
234 ``~/.ipython/security`` directory and then connect to it remotely.
238 ``~/.ipython/security`` directory and then connect to it remotely.
235
239
236 You might need to set up an SSH tunnel, however; if this doesn't work as
240 You might need to set up an SSH tunnel, however; if this doesn't work as
237 advertised::
241 advertised::
238
242
239 ssh -L 10115:localhost:10105 bic
243 ssh -L 10115:localhost:10105 bic
240
244
241
245
242 Links to other resources
246 Links to other resources
243 ========================
247 ========================
244
248
245 - http://www.osc.edu/~unpingco/glenn_NewLynx2_Demo.avi
249 - http://www.osc.edu/~unpingco/glenn_NewLynx2_Demo.avi
246
250
General Comments 0
You need to be logged in to leave comments. Login now