##// END OF EJS Templates
Initial refactor of task dependency system....
Brian Granger -
Show More
@@ -0,0 +1,53 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3
4 """
5 A new example showing how to use `TaskRejectError` to handle dependencies
6 in the IPython task system.
7
8 To run this example, do::
9
10 $ ipcluster local -n 4
11
12 Then, in another terminal start up IPython and do::
13
14 In [0]: %run taskreject.py
15
16 In [1]: mec.execute('run=True', targets=[0,1])
17
18 After the first command, the scheduler will keep rescheduling the tasks, as
19 they will fail with `TaskRejectError`. But after the second command, there
20 are two engines that the tasks can run on. The tasks are quickly funneled
21 to these engines.
22
23 If you want to see how the controller is scheduling and retrying the tasks
24 do a `tail -f` on the controller's log file in ~/.ipython/log.
25 """
26
27 #-----------------------------------------------------------------------------
28 # Copyright (C) 2008-2009 The IPython Development Team
29 #
30 # Distributed under the terms of the BSD License. The full license is in
31 # the file COPYING, distributed as part of this software.
32 #-----------------------------------------------------------------------------
33
34 from IPython.kernel import client
35 from IPython.kernel import TaskRejectError
36
37 mec = client.MultiEngineClient()
38 tc = client.TaskClient()
39
40 mec.execute('from IPython.kernel import TaskRejectError')
41 mec.execute('run = False')
42
43 def map_task():
44 if not run:
45 raise TaskRejectError('task dependency not met')
46 return 3.0e8
47
48 task_ids = []
49
50 for i in range(10):
51 task = client.MapTask(map_task, retries=20)
52 task_ids.append(tc.run(task, block=False))
53
@@ -1,23 +1,25 b''
1 1 # encoding: utf-8
2 2 """The IPython1 kernel.
3 3
4 4 The IPython kernel actually refers to three things:
5 5
6 6 * The IPython Engine
7 7 * The IPython Controller
8 8 * Clients to the IPython Controller
9 9
10 10 The kernel module implements the engine, controller and client and all the
11 11 network protocols needed for the various entities to talk to each other.
12 12
13 13 An end user should probably begin by looking at the `client.py` module
14 14 if they need blocking clients or in `asyncclient.py` if they want asynchronous,
15 15 deferred/Twisted using clients.
16 16 """
17 17 __docformat__ = "restructuredtext en"
18 18 #-----------------------------------------------------------------------------
19 19 # Copyright (C) 2008 The IPython Development Team
20 20 #
21 21 # Distributed under the terms of the BSD License. The full license is in
22 22 # the file COPYING, distributed as part of this software.
23 #----------------------------------------------------------------------------- No newline at end of file
23 #-----------------------------------------------------------------------------
24
25 from IPython.kernel.error import TaskRejectError No newline at end of file
@@ -1,188 +1,202 b''
1 1 # encoding: utf-8
2 2
3 3 """Classes and functions for kernel related errors and exceptions."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 18 from IPython.kernel.core import error
19 19 from twisted.python import failure
20 20
21 21 #-------------------------------------------------------------------------------
22 22 # Error classes
23 23 #-------------------------------------------------------------------------------
24 24
25 25 class KernelError(error.IPythonError):
26 26 pass
27 27
28 28 class NotDefined(KernelError):
29 29 def __init__(self, name):
30 30 self.name = name
31 31 self.args = (name,)
32 32
33 33 def __repr__(self):
34 34 return '<NotDefined: %s>' % self.name
35 35
36 36 __str__ = __repr__
37 37
38 38 class QueueCleared(KernelError):
39 39 pass
40 40
41 41 class IdInUse(KernelError):
42 42 pass
43 43
44 44 class ProtocolError(KernelError):
45 45 pass
46 46
47 47 class ConnectionError(KernelError):
48 48 pass
49 49
50 50 class InvalidEngineID(KernelError):
51 51 pass
52 52
53 53 class NoEnginesRegistered(KernelError):
54 54 pass
55 55
56 56 class InvalidClientID(KernelError):
57 57 pass
58 58
59 59 class InvalidDeferredID(KernelError):
60 60 pass
61 61
62 62 class SerializationError(KernelError):
63 63 pass
64 64
65 65 class MessageSizeError(KernelError):
66 66 pass
67 67
68 68 class PBMessageSizeError(MessageSizeError):
69 69 pass
70 70
71 71 class ResultNotCompleted(KernelError):
72 72 pass
73 73
74 74 class ResultAlreadyRetrieved(KernelError):
75 75 pass
76 76
77 77 class ClientError(KernelError):
78 78 pass
79 79
80 80 class TaskAborted(KernelError):
81 81 pass
82 82
83 83 class TaskTimeout(KernelError):
84 84 pass
85 85
86 86 class NotAPendingResult(KernelError):
87 87 pass
88 88
89 89 class UnpickleableException(KernelError):
90 90 pass
91 91
92 92 class AbortedPendingDeferredError(KernelError):
93 93 pass
94 94
95 95 class InvalidProperty(KernelError):
96 96 pass
97 97
98 98 class MissingBlockArgument(KernelError):
99 99 pass
100 100
101 101 class StopLocalExecution(KernelError):
102 102 pass
103 103
104 104 class SecurityError(KernelError):
105 105 pass
106 106
107 107 class FileTimeoutError(KernelError):
108 108 pass
109 109
110 class TaskRejectError(KernelError):
111 """Exception to raise when a task should be rejected by an engine.
112
113 This exception can be used to allow a task running on an engine to test
114 if the engine (or the user's namespace on the engine) has the needed
115 task dependencies. If not, the task should raise this exception. For
116 the task to be retried on another engine, the task should be created
117 with the `retries` argument > 1.
118
119 The advantage of this approach over our older properties system is that
120 tasks have full access to the user's namespace on the engines and the
121 properties don't have to be managed or tested by the controller.
122 """
123
110 124 class CompositeError(KernelError):
111 125 def __init__(self, message, elist):
112 126 Exception.__init__(self, *(message, elist))
113 127 self.message = message
114 128 self.elist = elist
115 129
116 130 def _get_engine_str(self, ev):
117 131 try:
118 132 ei = ev._ipython_engine_info
119 133 except AttributeError:
120 134 return '[Engine Exception]'
121 135 else:
122 136 return '[%i:%s]: ' % (ei['engineid'], ei['method'])
123 137
124 138 def _get_traceback(self, ev):
125 139 try:
126 140 tb = ev._ipython_traceback_text
127 141 except AttributeError:
128 142 return 'No traceback available'
129 143 else:
130 144 return tb
131 145
132 146 def __str__(self):
133 147 s = str(self.message)
134 148 for et, ev, etb in self.elist:
135 149 engine_str = self._get_engine_str(ev)
136 150 s = s + '\n' + engine_str + str(et.__name__) + ': ' + str(ev)
137 151 return s
138 152
139 153 def print_tracebacks(self, excid=None):
140 154 if excid is None:
141 155 for (et,ev,etb) in self.elist:
142 156 print self._get_engine_str(ev)
143 157 print self._get_traceback(ev)
144 158 print
145 159 else:
146 160 try:
147 161 et,ev,etb = self.elist[excid]
148 162 except:
149 163 raise IndexError("an exception with index %i does not exist"%excid)
150 164 else:
151 165 print self._get_engine_str(ev)
152 166 print self._get_traceback(ev)
153 167
154 168 def raise_exception(self, excid=0):
155 169 try:
156 170 et,ev,etb = self.elist[excid]
157 171 except:
158 172 raise IndexError("an exception with index %i does not exist"%excid)
159 173 else:
160 174 raise et, ev, etb
161 175
162 176 def collect_exceptions(rlist, method):
163 177 elist = []
164 178 for r in rlist:
165 179 if isinstance(r, failure.Failure):
166 180 r.cleanFailure()
167 181 et, ev, etb = r.type, r.value, r.tb
168 182 # Sometimes we could have CompositeError in our list. Just take
169 183 # the errors out of them and put them in our new list. This
170 184 # has the effect of flattening lists of CompositeErrors into one
171 185 # CompositeError
172 186 if et==CompositeError:
173 187 for e in ev.elist:
174 188 elist.append(e)
175 189 else:
176 190 elist.append((et, ev, etb))
177 191 if len(elist)==0:
178 192 return rlist
179 193 else:
180 194 msg = "one or more exceptions from call to method: %s" % (method)
181 195 # This silliness is needed so the debugger has access to the exception
182 196 # instance (e in this case)
183 197 try:
184 198 raise CompositeError(msg, elist)
185 199 except CompositeError, e:
186 200 raise e
187 201
188 202
@@ -1,951 +1,965 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
3 3
4 4 """General Classes for IMultiEngine clients."""
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import cPickle as pickle
21 21 from types import FunctionType
22 22 import linecache
23 import warnings
23 24
24 25 from twisted.internet import reactor
25 26 from twisted.python import components, log
26 27 from twisted.python.failure import Failure
27 28 from zope.interface import Interface, implements, Attribute
28 29
29 30 from IPython.ColorANSI import TermColors
30 31
31 32 from IPython.kernel.twistedutil import blockingCallFromThread
32 33 from IPython.kernel import error
33 34 from IPython.kernel.parallelfunction import ParallelFunction
34 35 from IPython.kernel.mapper import (
35 36 MultiEngineMapper,
36 37 IMultiEngineMapperFactory,
37 38 IMapper
38 39 )
39 40 from IPython.kernel import map as Map
40 41 from IPython.kernel import multiengine as me
41 42 from IPython.kernel.multiengine import (IFullMultiEngine,
42 43 IFullSynchronousMultiEngine)
43 44
44 45
45 46 #-------------------------------------------------------------------------------
46 47 # Pending Result things
47 48 #-------------------------------------------------------------------------------
48 49
49 50 class IPendingResult(Interface):
50 51 """A representation of a result that is pending.
51 52
52 53 This class is similar to Twisted's `Deferred` object, but is designed to be
53 54 used in a synchronous context.
54 55 """
55 56
56 57 result_id=Attribute("ID of the deferred on the other side")
57 58 client=Attribute("A client that I came from")
58 59 r=Attribute("An attribute that is a property that calls and returns get_result")
59 60
60 61 def get_result(default=None, block=True):
61 62 """
62 63 Get a result that is pending.
63 64
64 65 :Parameters:
65 66 default
66 67 The value to return if the result is not ready.
67 68 block : boolean
68 69 Should I block for the result.
69 70
70 71 :Returns: The actual result or the default value.
71 72 """
72 73
73 74 def add_callback(f, *args, **kwargs):
74 75 """
75 76 Add a callback that is called with the result.
76 77
77 78 If the original result is foo, adding a callback will cause
78 79 f(foo, *args, **kwargs) to be returned instead. If multiple
79 80 callbacks are registered, they are chained together: the result of
80 81 one is passed to the next and so on.
81 82
82 83 Unlike Twisted's Deferred object, there is no errback chain. Thus
83 84 any exception raised will not be caught and handled. User must
84 85 catch these by hand when calling `get_result`.
85 86 """
86 87
87 88
88 89 class PendingResult(object):
89 90 """A representation of a result that is not yet ready.
90 91
91 92 A user should not create a `PendingResult` instance by hand.
92 93
93 94 Methods
94 95 =======
95 96
96 97 * `get_result`
97 98 * `add_callback`
98 99
99 100 Properties
100 101 ==========
101 102 * `r`
102 103 """
103 104
104 105 def __init__(self, client, result_id):
105 106 """Create a PendingResult with a result_id and a client instance.
106 107
107 108 The client should implement `_getPendingResult(result_id, block)`.
108 109 """
109 110 self.client = client
110 111 self.result_id = result_id
111 112 self.called = False
112 113 self.raised = False
113 114 self.callbacks = []
114 115
115 116 def get_result(self, default=None, block=True):
116 117 """Get a result that is pending.
117 118
118 119 This method will connect to an IMultiEngine adapted controller
119 120 and see if the result is ready. If the action triggers an exception
120 121 raise it and record it. This method records the result/exception once it is
121 122 retrieved. Calling `get_result` again will get this cached result or will
122 123 re-raise the exception. The .r attribute is a property that calls
123 124 `get_result` with block=True.
124 125
125 126 :Parameters:
126 127 default
127 128 The value to return if the result is not ready.
128 129 block : boolean
129 130 Should I block for the result.
130 131
131 132 :Returns: The actual result or the default value.
132 133 """
133 134
134 135 if self.called:
135 136 if self.raised:
136 137 raise self.result[0], self.result[1], self.result[2]
137 138 else:
138 139 return self.result
139 140 try:
140 141 result = self.client.get_pending_deferred(self.result_id, block)
141 142 except error.ResultNotCompleted:
142 143 return default
143 144 except:
144 145 # Reraise other error, but first record them so they can be reraised
145 146 # later if .r or get_result is called again.
146 147 self.result = sys.exc_info()
147 148 self.called = True
148 149 self.raised = True
149 150 raise
150 151 else:
151 152 for cb in self.callbacks:
152 153 result = cb[0](result, *cb[1], **cb[2])
153 154 self.result = result
154 155 self.called = True
155 156 return result
156 157
157 158 def add_callback(self, f, *args, **kwargs):
158 159 """Add a callback that is called with the result.
159 160
160 161 If the original result is result, adding a callback will cause
161 162 f(result, *args, **kwargs) to be returned instead. If multiple
162 163 callbacks are registered, they are chained together: the result of
163 164 one is passed to the next and so on.
164 165
165 166 Unlike Twisted's Deferred object, there is no errback chain. Thus
166 167 any exception raised will not be caught and handled. User must
167 168 catch these by hand when calling `get_result`.
168 169 """
169 170 assert callable(f)
170 171 self.callbacks.append((f, args, kwargs))
171 172
172 173 def __cmp__(self, other):
173 174 if self.result_id < other.result_id:
174 175 return -1
175 176 else:
176 177 return 1
177 178
178 179 def _get_r(self):
179 180 return self.get_result(block=True)
180 181
181 182 r = property(_get_r)
182 183 """This property is a shortcut to a `get_result(block=True)`."""
183 184
184 185
185 186 #-------------------------------------------------------------------------------
186 187 # Pretty printing wrappers for certain lists
187 188 #-------------------------------------------------------------------------------
188 189
189 190 class ResultList(list):
190 191 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
191 192
192 193 def __repr__(self):
193 194 output = []
194 195 # These colored prompts were not working on Windows
195 196 if sys.platform == 'win32':
196 197 blue = normal = red = green = ''
197 198 else:
198 199 blue = TermColors.Blue
199 200 normal = TermColors.Normal
200 201 red = TermColors.Red
201 202 green = TermColors.Green
202 203 output.append("<Results List>\n")
203 204 for cmd in self:
204 205 if isinstance(cmd, Failure):
205 206 output.append(cmd)
206 207 else:
207 208 target = cmd.get('id',None)
208 209 cmd_num = cmd.get('number',None)
209 210 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
210 211 cmd_stdout = cmd.get('stdout', None)
211 212 cmd_stderr = cmd.get('stderr', None)
212 213 output.append("%s[%i]%s In [%i]:%s %s\n" % \
213 214 (green, target,
214 215 blue, cmd_num, normal, cmd_stdin))
215 216 if cmd_stdout:
216 217 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
217 218 (green, target,
218 219 red, cmd_num, normal, cmd_stdout))
219 220 if cmd_stderr:
220 221 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
221 222 (green, target,
222 223 red, cmd_num, normal, cmd_stderr))
223 224 return ''.join(output)
224 225
225 226
226 227 def wrapResultList(result):
227 228 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
228 229 if len(result) == 0:
229 230 result = [result]
230 231 return ResultList(result)
231 232
232 233
233 234 class QueueStatusList(list):
234 235 """A subclass of list that pretty prints the output of `queue_status`."""
235 236
236 237 def __repr__(self):
237 238 output = []
238 239 output.append("<Queue Status List>\n")
239 240 for e in self:
240 241 output.append("Engine: %s\n" % repr(e[0]))
241 242 output.append(" Pending: %s\n" % repr(e[1]['pending']))
242 243 for q in e[1]['queue']:
243 244 output.append(" Command: %s\n" % repr(q))
244 245 return ''.join(output)
245 246
246 247
247 248 #-------------------------------------------------------------------------------
248 249 # InteractiveMultiEngineClient
249 250 #-------------------------------------------------------------------------------
250 251
251 252 class InteractiveMultiEngineClient(object):
252 253 """A mixin class that add a few methods to a multiengine client.
253 254
254 255 The methods in this mixin class are designed for interactive usage.
255 256 """
256 257
257 258 def activate(self):
258 259 """Make this `MultiEngineClient` active for parallel magic commands.
259 260
260 261 IPython has a magic command syntax to work with `MultiEngineClient` objects.
261 262 In a given IPython session there is a single active one. While
262 263 there can be many `MultiEngineClient` created and used by the user,
263 264 there is only one active one. The active `MultiEngineClient` is used whenever
264 265 the magic commands %px and %autopx are used.
265 266
266 267 The activate() method is called on a given `MultiEngineClient` to make it
267 268 active. Once this has been done, the magic commands can be used.
268 269 """
269 270
270 271 try:
271 272 __IPYTHON__.activeController = self
272 273 except NameError:
273 274 print "The IPython Controller magics only work within IPython."
274 275
275 276 def __setitem__(self, key, value):
276 277 """Add a dictionary interface for pushing/pulling.
277 278
278 279 This functions as a shorthand for `push`.
279 280
280 281 :Parameters:
281 282 key : str
282 283 What to call the remote object.
283 284 value : object
284 285 The local Python object to push.
285 286 """
286 287 targets, block = self._findTargetsAndBlock()
287 288 return self.push({key:value}, targets=targets, block=block)
288 289
289 290 def __getitem__(self, key):
290 291 """Add a dictionary interface for pushing/pulling.
291 292
292 293 This functions as a shorthand to `pull`.
293 294
294 295 :Parameters:
295 296 - `key`: A string representing the key.
296 297 """
297 298 if isinstance(key, str):
298 299 targets, block = self._findTargetsAndBlock()
299 300 return self.pull(key, targets=targets, block=block)
300 301 else:
301 302 raise TypeError("__getitem__ only takes strs")
302 303
303 304 def __len__(self):
304 305 """Return the number of available engines."""
305 306 return len(self.get_ids())
306 307
307 308 #---------------------------------------------------------------------------
308 309 # Make this a context manager for with
309 310 #---------------------------------------------------------------------------
310 311
311 312 def findsource_file(self,f):
312 313 linecache.checkcache()
313 314 s = findsource(f.f_code)
314 315 lnum = f.f_lineno
315 316 wsource = s[0][f.f_lineno:]
316 317 return strip_whitespace(wsource)
317 318
318 319 def findsource_ipython(self,f):
319 320 from IPython import ipapi
320 321 self.ip = ipapi.get()
321 322 wsource = [l+'\n' for l in
322 323 self.ip.IP.input_hist_raw[-1].splitlines()[1:]]
323 324 return strip_whitespace(wsource)
324 325
325 326 def __enter__(self):
326 327 f = sys._getframe(1)
327 328 local_ns = f.f_locals
328 329 global_ns = f.f_globals
329 330 if f.f_code.co_filename == '<ipython console>':
330 331 s = self.findsource_ipython(f)
331 332 else:
332 333 s = self.findsource_file(f)
333 334
334 335 self._with_context_result = self.execute(s)
335 336
336 337 def __exit__ (self, etype, value, tb):
337 338 if issubclass(etype,error.StopLocalExecution):
338 339 return True
339 340
340 341
341 342 def remote():
342 343 m = 'Special exception to stop local execution of parallel code.'
343 344 raise error.StopLocalExecution(m)
344 345
345 346 def strip_whitespace(source):
346 347 # Expand tabs to avoid any confusion.
347 348 wsource = [l.expandtabs(4) for l in source]
348 349 # Detect the indentation level
349 350 done = False
350 351 for line in wsource:
351 352 if line.isspace():
352 353 continue
353 354 for col,char in enumerate(line):
354 355 if char != ' ':
355 356 done = True
356 357 break
357 358 if done:
358 359 break
359 360 # Now we know how much leading space there is in the code. Next, we
360 361 # extract up to the first line that has less indentation.
361 362 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
362 363 # detect triple quoted strings that may have flush left text.
363 364 for lno,line in enumerate(wsource):
364 365 lead = line[:col]
365 366 if lead.isspace():
366 367 continue
367 368 else:
368 369 if not lead.lstrip().startswith('#'):
369 370 break
370 371 # The real 'with' source is up to lno
371 372 src_lines = [l[col:] for l in wsource[:lno+1]]
372 373
373 374 # Finally, check that the source's first non-comment line begins with the
374 375 # special call 'remote()'
375 376 for nline,line in enumerate(src_lines):
376 377 if line.isspace() or line.startswith('#'):
377 378 continue
378 379 if 'remote()' in line:
379 380 break
380 381 else:
381 382 raise ValueError('remote() call missing at the start of code')
382 383 src = ''.join(src_lines[nline+1:])
383 384 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
384 385 return src
385 386
386 387
387 388 #-------------------------------------------------------------------------------
388 389 # The top-level MultiEngine client adaptor
389 390 #-------------------------------------------------------------------------------
390 391
391 392
393 _prop_warn = """\
394
395 We are currently refactoring the task dependency system. This might
396 involve the removal of this method and other methods related to engine
397 properties. Please see the docstrings for IPython.kernel.TaskRejectError
398 for more information."""
399
400
392 401 class IFullBlockingMultiEngineClient(Interface):
393 402 pass
394 403
395 404
396 405 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
397 406 """
398 407 A blocking client to the `IMultiEngine` controller interface.
399 408
400 409 This class allows users to use a set of engines for a parallel
401 410 computation through the `IMultiEngine` interface. In this interface,
402 411 each engine has a specific id (an int) that is used to refer to the
403 412 engine, run code on it, etc.
404 413 """
405 414
406 415 implements(
407 416 IFullBlockingMultiEngineClient,
408 417 IMultiEngineMapperFactory,
409 418 IMapper
410 419 )
411 420
412 421 def __init__(self, smultiengine):
413 422 self.smultiengine = smultiengine
414 423 self.block = True
415 424 self.targets = 'all'
416 425
417 426 def _findBlock(self, block=None):
418 427 if block is None:
419 428 return self.block
420 429 else:
421 430 if block in (True, False):
422 431 return block
423 432 else:
424 433 raise ValueError("block must be True or False")
425 434
426 435 def _findTargets(self, targets=None):
427 436 if targets is None:
428 437 return self.targets
429 438 else:
430 439 if not isinstance(targets, (str,list,tuple,int)):
431 440 raise ValueError("targets must be a str, list, tuple or int")
432 441 return targets
433 442
434 443 def _findTargetsAndBlock(self, targets=None, block=None):
435 444 return self._findTargets(targets), self._findBlock(block)
436 445
437 446 def _blockFromThread(self, function, *args, **kwargs):
438 447 block = kwargs.get('block', None)
439 448 if block is None:
440 449 raise error.MissingBlockArgument("'block' keyword argument is missing")
441 450 result = blockingCallFromThread(function, *args, **kwargs)
442 451 if not block:
443 452 result = PendingResult(self, result)
444 453 return result
445 454
446 455 def get_pending_deferred(self, deferredID, block):
447 456 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
448 457
449 458 def barrier(self, pendingResults):
450 459 """Synchronize a set of `PendingResults`.
451 460
452 461 This method is a synchronization primitive that waits for a set of
453 462 `PendingResult` objects to complete. More specifically, barier does
454 463 the following.
455 464
456 465 * The `PendingResult`s are sorted by result_id.
457 466 * The `get_result` method is called for each `PendingResult` sequentially
458 467 with block=True.
459 468 * If a `PendingResult` gets a result that is an exception, it is
460 469 trapped and can be re-raised later by calling `get_result` again.
461 470 * The `PendingResult`s are flushed from the controller.
462 471
463 472 After barrier has been called on a `PendingResult`, its results can
464 473 be retrieved by calling `get_result` again or accesing the `r` attribute
465 474 of the instance.
466 475 """
467 476
468 477 # Convert to list for sorting and check class type
469 478 prList = list(pendingResults)
470 479 for pr in prList:
471 480 if not isinstance(pr, PendingResult):
472 481 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
473 482
474 483 # Sort the PendingResults so they are in order
475 484 prList.sort()
476 485 # Block on each PendingResult object
477 486 for pr in prList:
478 487 try:
479 488 result = pr.get_result(block=True)
480 489 except Exception:
481 490 pass
482 491
483 492 def flush(self):
484 493 """
485 494 Clear all pending deferreds/results from the controller.
486 495
487 496 For each `PendingResult` that is created by this client, the controller
488 497 holds on to the result for that `PendingResult`. This can be a problem
489 498 if there are a large number of `PendingResult` objects that are created.
490 499
491 500 Once the result of the `PendingResult` has been retrieved, the result
492 501 is removed from the controller, but if a user doesn't get a result (
493 502 they just ignore the `PendingResult`) the result is kept forever on the
494 503 controller. This method allows the user to clear out all un-retrieved
495 504 results on the controller.
496 505 """
497 506 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
498 507 return r
499 508
500 509 clear_pending_results = flush
501 510
502 511 #---------------------------------------------------------------------------
503 512 # IEngineMultiplexer related methods
504 513 #---------------------------------------------------------------------------
505 514
506 515 def execute(self, lines, targets=None, block=None):
507 516 """
508 517 Execute code on a set of engines.
509 518
510 519 :Parameters:
511 520 lines : str
512 521 The Python code to execute as a string
513 522 targets : id or list of ids
514 523 The engine to use for the execution
515 524 block : boolean
516 525 If False, this method will return the actual result. If False,
517 526 a `PendingResult` is returned which can be used to get the result
518 527 at a later time.
519 528 """
520 529 targets, block = self._findTargetsAndBlock(targets, block)
521 530 result = blockingCallFromThread(self.smultiengine.execute, lines,
522 531 targets=targets, block=block)
523 532 if block:
524 533 result = ResultList(result)
525 534 else:
526 535 result = PendingResult(self, result)
527 536 result.add_callback(wrapResultList)
528 537 return result
529 538
530 539 def push(self, namespace, targets=None, block=None):
531 540 """
532 541 Push a dictionary of keys and values to engines namespace.
533 542
534 543 Each engine has a persistent namespace. This method is used to push
535 544 Python objects into that namespace.
536 545
537 546 The objects in the namespace must be pickleable.
538 547
539 548 :Parameters:
540 549 namespace : dict
541 550 A dict that contains Python objects to be injected into
542 551 the engine persistent namespace.
543 552 targets : id or list of ids
544 553 The engine to use for the execution
545 554 block : boolean
546 555 If False, this method will return the actual result. If False,
547 556 a `PendingResult` is returned which can be used to get the result
548 557 at a later time.
549 558 """
550 559 targets, block = self._findTargetsAndBlock(targets, block)
551 560 return self._blockFromThread(self.smultiengine.push, namespace,
552 561 targets=targets, block=block)
553 562
554 563 def pull(self, keys, targets=None, block=None):
555 564 """
556 565 Pull Python objects by key out of engines namespaces.
557 566
558 567 :Parameters:
559 568 keys : str or list of str
560 569 The names of the variables to be pulled
561 570 targets : id or list of ids
562 571 The engine to use for the execution
563 572 block : boolean
564 573 If False, this method will return the actual result. If False,
565 574 a `PendingResult` is returned which can be used to get the result
566 575 at a later time.
567 576 """
568 577 targets, block = self._findTargetsAndBlock(targets, block)
569 578 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
570 579
571 580 def push_function(self, namespace, targets=None, block=None):
572 581 """
573 582 Push a Python function to an engine.
574 583
575 584 This method is used to push a Python function to an engine. This
576 585 method can then be used in code on the engines. Closures are not supported.
577 586
578 587 :Parameters:
579 588 namespace : dict
580 589 A dict whose values are the functions to be pushed. The keys give
581 590 that names that the function will appear as in the engines
582 591 namespace.
583 592 targets : id or list of ids
584 593 The engine to use for the execution
585 594 block : boolean
586 595 If False, this method will return the actual result. If False,
587 596 a `PendingResult` is returned which can be used to get the result
588 597 at a later time.
589 598 """
590 599 targets, block = self._findTargetsAndBlock(targets, block)
591 600 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
592 601
593 602 def pull_function(self, keys, targets=None, block=None):
594 603 """
595 604 Pull a Python function from an engine.
596 605
597 606 This method is used to pull a Python function from an engine.
598 607 Closures are not supported.
599 608
600 609 :Parameters:
601 610 keys : str or list of str
602 611 The names of the functions to be pulled
603 612 targets : id or list of ids
604 613 The engine to use for the execution
605 614 block : boolean
606 615 If False, this method will return the actual result. If False,
607 616 a `PendingResult` is returned which can be used to get the result
608 617 at a later time.
609 618 """
610 619 targets, block = self._findTargetsAndBlock(targets, block)
611 620 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
612 621
613 622 def push_serialized(self, namespace, targets=None, block=None):
614 623 targets, block = self._findTargetsAndBlock(targets, block)
615 624 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
616 625
617 626 def pull_serialized(self, keys, targets=None, block=None):
618 627 targets, block = self._findTargetsAndBlock(targets, block)
619 628 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
620 629
621 630 def get_result(self, i=None, targets=None, block=None):
622 631 """
623 632 Get a previous result.
624 633
625 634 When code is executed in an engine, a dict is created and returned. This
626 635 method retrieves that dict for previous commands.
627 636
628 637 :Parameters:
629 638 i : int
630 639 The number of the result to get
631 640 targets : id or list of ids
632 641 The engine to use for the execution
633 642 block : boolean
634 643 If False, this method will return the actual result. If False,
635 644 a `PendingResult` is returned which can be used to get the result
636 645 at a later time.
637 646 """
638 647 targets, block = self._findTargetsAndBlock(targets, block)
639 648 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
640 649 if block:
641 650 result = ResultList(result)
642 651 else:
643 652 result = PendingResult(self, result)
644 653 result.add_callback(wrapResultList)
645 654 return result
646 655
647 656 def reset(self, targets=None, block=None):
648 657 """
649 658 Reset an engine.
650 659
651 660 This method clears out the namespace of an engine.
652 661
653 662 :Parameters:
654 663 targets : id or list of ids
655 664 The engine to use for the execution
656 665 block : boolean
657 666 If False, this method will return the actual result. If False,
658 667 a `PendingResult` is returned which can be used to get the result
659 668 at a later time.
660 669 """
661 670 targets, block = self._findTargetsAndBlock(targets, block)
662 671 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
663 672
664 673 def keys(self, targets=None, block=None):
665 674 """
666 675 Get a list of all the variables in an engine's namespace.
667 676
668 677 :Parameters:
669 678 targets : id or list of ids
670 679 The engine to use for the execution
671 680 block : boolean
672 681 If False, this method will return the actual result. If False,
673 682 a `PendingResult` is returned which can be used to get the result
674 683 at a later time.
675 684 """
676 685 targets, block = self._findTargetsAndBlock(targets, block)
677 686 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
678 687
679 688 def kill(self, controller=False, targets=None, block=None):
680 689 """
681 690 Kill the engines and controller.
682 691
683 692 This method is used to stop the engine and controller by calling
684 693 `reactor.stop`.
685 694
686 695 :Parameters:
687 696 controller : boolean
688 697 If True, kill the engines and controller. If False, just the
689 698 engines
690 699 targets : id or list of ids
691 700 The engine to use for the execution
692 701 block : boolean
693 702 If False, this method will return the actual result. If False,
694 703 a `PendingResult` is returned which can be used to get the result
695 704 at a later time.
696 705 """
697 706 targets, block = self._findTargetsAndBlock(targets, block)
698 707 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
699 708
700 709 def clear_queue(self, targets=None, block=None):
701 710 """
702 711 Clear out the controller's queue for an engine.
703 712
704 713 The controller maintains a queue for each engine. This clear it out.
705 714
706 715 :Parameters:
707 716 targets : id or list of ids
708 717 The engine to use for the execution
709 718 block : boolean
710 719 If False, this method will return the actual result. If False,
711 720 a `PendingResult` is returned which can be used to get the result
712 721 at a later time.
713 722 """
714 723 targets, block = self._findTargetsAndBlock(targets, block)
715 724 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
716 725
717 726 def queue_status(self, targets=None, block=None):
718 727 """
719 728 Get the status of an engines queue.
720 729
721 730 :Parameters:
722 731 targets : id or list of ids
723 732 The engine to use for the execution
724 733 block : boolean
725 734 If False, this method will return the actual result. If False,
726 735 a `PendingResult` is returned which can be used to get the result
727 736 at a later time.
728 737 """
729 738 targets, block = self._findTargetsAndBlock(targets, block)
730 739 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
731 740
732 741 def set_properties(self, properties, targets=None, block=None):
742 warnings.warn(_prop_warn)
733 743 targets, block = self._findTargetsAndBlock(targets, block)
734 744 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
735 745
736 746 def get_properties(self, keys=None, targets=None, block=None):
747 warnings.warn(_prop_warn)
737 748 targets, block = self._findTargetsAndBlock(targets, block)
738 749 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
739 750
740 751 def has_properties(self, keys, targets=None, block=None):
752 warnings.warn(_prop_warn)
741 753 targets, block = self._findTargetsAndBlock(targets, block)
742 754 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
743 755
744 756 def del_properties(self, keys, targets=None, block=None):
757 warnings.warn(_prop_warn)
745 758 targets, block = self._findTargetsAndBlock(targets, block)
746 759 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
747 760
748 761 def clear_properties(self, targets=None, block=None):
762 warnings.warn(_prop_warn)
749 763 targets, block = self._findTargetsAndBlock(targets, block)
750 764 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
751 765
752 766 #---------------------------------------------------------------------------
753 767 # IMultiEngine related methods
754 768 #---------------------------------------------------------------------------
755 769
756 770 def get_ids(self):
757 771 """
758 772 Returns the ids of currently registered engines.
759 773 """
760 774 result = blockingCallFromThread(self.smultiengine.get_ids)
761 775 return result
762 776
763 777 #---------------------------------------------------------------------------
764 778 # IMultiEngineCoordinator
765 779 #---------------------------------------------------------------------------
766 780
767 781 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
768 782 """
769 783 Partition a Python sequence and send the partitions to a set of engines.
770 784 """
771 785 targets, block = self._findTargetsAndBlock(targets, block)
772 786 return self._blockFromThread(self.smultiengine.scatter, key, seq,
773 787 dist, flatten, targets=targets, block=block)
774 788
775 789 def gather(self, key, dist='b', targets=None, block=None):
776 790 """
777 791 Gather a partitioned sequence on a set of engines as a single local seq.
778 792 """
779 793 targets, block = self._findTargetsAndBlock(targets, block)
780 794 return self._blockFromThread(self.smultiengine.gather, key, dist,
781 795 targets=targets, block=block)
782 796
783 797 def raw_map(self, func, seq, dist='b', targets=None, block=None):
784 798 """
785 799 A parallelized version of Python's builtin map.
786 800
787 801 This has a slightly different syntax than the builtin `map`.
788 802 This is needed because we need to have keyword arguments and thus
789 803 can't use *args to capture all the sequences. Instead, they must
790 804 be passed in a list or tuple.
791 805
792 806 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
793 807
794 808 Most users will want to use parallel functions or the `mapper`
795 809 and `map` methods for an API that follows that of the builtin
796 810 `map`.
797 811 """
798 812 targets, block = self._findTargetsAndBlock(targets, block)
799 813 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
800 814 dist, targets=targets, block=block)
801 815
802 816 def map(self, func, *sequences):
803 817 """
804 818 A parallel version of Python's builtin `map` function.
805 819
806 820 This method applies a function to sequences of arguments. It
807 821 follows the same syntax as the builtin `map`.
808 822
809 823 This method creates a mapper objects by calling `self.mapper` with
810 824 no arguments and then uses that mapper to do the mapping. See
811 825 the documentation of `mapper` for more details.
812 826 """
813 827 return self.mapper().map(func, *sequences)
814 828
815 829 def mapper(self, dist='b', targets='all', block=None):
816 830 """
817 831 Create a mapper object that has a `map` method.
818 832
819 833 This method returns an object that implements the `IMapper`
820 834 interface. This method is a factory that is used to control how
821 835 the map happens.
822 836
823 837 :Parameters:
824 838 dist : str
825 839 What decomposition to use, 'b' is the only one supported
826 840 currently
827 841 targets : str, int, sequence of ints
828 842 Which engines to use for the map
829 843 block : boolean
830 844 Should calls to `map` block or not
831 845 """
832 846 return MultiEngineMapper(self, dist, targets, block)
833 847
834 848 def parallel(self, dist='b', targets=None, block=None):
835 849 """
836 850 A decorator that turns a function into a parallel function.
837 851
838 852 This can be used as:
839 853
840 854 @parallel()
841 855 def f(x, y)
842 856 ...
843 857
844 858 f(range(10), range(10))
845 859
846 860 This causes f(0,0), f(1,1), ... to be called in parallel.
847 861
848 862 :Parameters:
849 863 dist : str
850 864 What decomposition to use, 'b' is the only one supported
851 865 currently
852 866 targets : str, int, sequence of ints
853 867 Which engines to use for the map
854 868 block : boolean
855 869 Should calls to `map` block or not
856 870 """
857 871 targets, block = self._findTargetsAndBlock(targets, block)
858 872 mapper = self.mapper(dist, targets, block)
859 873 pf = ParallelFunction(mapper)
860 874 return pf
861 875
862 876 #---------------------------------------------------------------------------
863 877 # IMultiEngineExtras
864 878 #---------------------------------------------------------------------------
865 879
866 880 def zip_pull(self, keys, targets=None, block=None):
867 881 targets, block = self._findTargetsAndBlock(targets, block)
868 882 return self._blockFromThread(self.smultiengine.zip_pull, keys,
869 883 targets=targets, block=block)
870 884
871 885 def run(self, filename, targets=None, block=None):
872 886 """
873 887 Run a Python code in a file on the engines.
874 888
875 889 :Parameters:
876 890 filename : str
877 891 The name of the local file to run
878 892 targets : id or list of ids
879 893 The engine to use for the execution
880 894 block : boolean
881 895 If False, this method will return the actual result. If False,
882 896 a `PendingResult` is returned which can be used to get the result
883 897 at a later time.
884 898 """
885 899 targets, block = self._findTargetsAndBlock(targets, block)
886 900 return self._blockFromThread(self.smultiengine.run, filename,
887 901 targets=targets, block=block)
888 902
889 903 def benchmark(self, push_size=10000):
890 904 """
891 905 Run performance benchmarks for the current IPython cluster.
892 906
893 907 This method tests both the latency of sending command and data to the
894 908 engines as well as the throughput of sending large objects to the
895 909 engines using push. The latency is measured by having one or more
896 910 engines execute the command 'pass'. The throughput is measure by
897 911 sending an NumPy array of size `push_size` to one or more engines.
898 912
899 913 These benchmarks will vary widely on different hardware and networks
900 914 and thus can be used to get an idea of the performance characteristics
901 915 of a particular configuration of an IPython controller and engines.
902 916
903 917 This function is not testable within our current testing framework.
904 918 """
905 919 import timeit, __builtin__
906 920 __builtin__._mec_self = self
907 921 benchmarks = {}
908 922 repeat = 3
909 923 count = 10
910 924
911 925 timer = timeit.Timer('_mec_self.execute("pass",0)')
912 926 result = 1000*min(timer.repeat(repeat,count))/count
913 927 benchmarks['single_engine_latency'] = (result,'msec')
914 928
915 929 timer = timeit.Timer('_mec_self.execute("pass")')
916 930 result = 1000*min(timer.repeat(repeat,count))/count
917 931 benchmarks['all_engine_latency'] = (result,'msec')
918 932
919 933 try:
920 934 import numpy as np
921 935 except:
922 936 pass
923 937 else:
924 938 timer = timeit.Timer(
925 939 "_mec_self.push(d)",
926 940 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
927 941 )
928 942 result = min(timer.repeat(repeat,count))/count
929 943 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
930 944
931 945 try:
932 946 import numpy as np
933 947 except:
934 948 pass
935 949 else:
936 950 timer = timeit.Timer(
937 951 "_mec_self.push(d,0)",
938 952 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
939 953 )
940 954 result = min(timer.repeat(repeat,count))/count
941 955 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
942 956
943 957 return benchmarks
944 958
945 959
946 960 components.registerAdapter(FullBlockingMultiEngineClient,
947 961 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
948 962
949 963
950 964
951 965
@@ -1,180 +1,180 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
3 3
4 4 """
5 5 A blocking version of the task client.
6 6 """
7 7
8 8 __docformat__ = "restructuredtext en"
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Copyright (C) 2008 The IPython Development Team
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #-------------------------------------------------------------------------------
16 16
17 17 #-------------------------------------------------------------------------------
18 18 # Imports
19 19 #-------------------------------------------------------------------------------
20 20
21 21 from zope.interface import Interface, implements
22 22 from twisted.python import components, log
23 23
24 24 from IPython.kernel.twistedutil import blockingCallFromThread
25 25 from IPython.kernel import task, error
26 26 from IPython.kernel.mapper import (
27 27 SynchronousTaskMapper,
28 28 ITaskMapperFactory,
29 29 IMapper
30 30 )
31 31 from IPython.kernel.parallelfunction import (
32 32 ParallelFunction,
33 33 ITaskParallelDecorator
34 34 )
35 35
36 36 #-------------------------------------------------------------------------------
37 37 # The task client
38 38 #-------------------------------------------------------------------------------
39 39
40 40 class IBlockingTaskClient(Interface):
41 41 """
42 42 A vague interface of the blocking task client
43 43 """
44 44 pass
45 45
46 46 class BlockingTaskClient(object):
47 47 """
48 48 A blocking task client that adapts a non-blocking one.
49 49 """
50 50
51 51 implements(
52 IBlockingTaskClient,
52 IBlockingTaskClient,
53 53 ITaskMapperFactory,
54 54 IMapper,
55 55 ITaskParallelDecorator
56 56 )
57 57
58 58 def __init__(self, task_controller):
59 59 self.task_controller = task_controller
60 60 self.block = True
61 61
62 62 def run(self, task, block=False):
63 63 """Run a task on the `TaskController`.
64 64
65 See the documentation of the `MapTask` and `StringTask` classes for
65 See the documentation of the `MapTask` and `StringTask` classes for
66 66 details on how to build a task of different types.
67 67
68 68 :Parameters:
69 69 task : an `ITask` implementer
70 70
71 71 :Returns: The int taskid of the submitted task. Pass this to
72 72 `get_task_result` to get the `TaskResult` object.
73 73 """
74 74 tid = blockingCallFromThread(self.task_controller.run, task)
75 75 if block:
76 76 return self.get_task_result(tid, block=True)
77 77 else:
78 78 return tid
79 79
80 80 def get_task_result(self, taskid, block=False):
81 81 """
82 82 Get a task result by taskid.
83 83
84 84 :Parameters:
85 85 taskid : int
86 86 The taskid of the task to be retrieved.
87 87 block : boolean
88 88 Should I block until the task is done?
89 89
90 90 :Returns: A `TaskResult` object that encapsulates the task result.
91 91 """
92 92 return blockingCallFromThread(self.task_controller.get_task_result,
93 93 taskid, block)
94 94
95 95 def abort(self, taskid):
96 96 """
97 97 Abort a task by taskid.
98 98
99 99 :Parameters:
100 100 taskid : int
101 101 The taskid of the task to be aborted.
102 102 """
103 103 return blockingCallFromThread(self.task_controller.abort, taskid)
104 104
105 105 def barrier(self, taskids):
106 106 """Block until a set of tasks are completed.
107 107
108 108 :Parameters:
109 109 taskids : list, tuple
110 110 A sequence of taskids to block on.
111 111 """
112 112 return blockingCallFromThread(self.task_controller.barrier, taskids)
113 113
114 114 def spin(self):
115 115 """
116 116 Touch the scheduler, to resume scheduling without submitting a task.
117 117
118 118 This method only needs to be called in unusual situations where the
119 119 scheduler is idle for some reason.
120 120 """
121 121 return blockingCallFromThread(self.task_controller.spin)
122 122
123 123 def queue_status(self, verbose=False):
124 124 """
125 125 Get a dictionary with the current state of the task queue.
126 126
127 127 :Parameters:
128 128 verbose : boolean
129 129 If True, return a list of taskids. If False, simply give
130 130 the number of tasks with each status.
131 131
132 132 :Returns:
133 133 A dict with the queue status.
134 134 """
135 135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
136 136
137 137 def clear(self):
138 138 """
139 139 Clear all previously run tasks from the task controller.
140 140
141 141 This is needed because the task controller keep all task results
142 142 in memory. This can be a problem is there are many completed
143 143 tasks. Users should call this periodically to clean out these
144 144 cached task results.
145 145 """
146 146 return blockingCallFromThread(self.task_controller.clear)
147 147
148 148 def map(self, func, *sequences):
149 149 """
150 150 Apply func to *sequences elementwise. Like Python's builtin map.
151 151
152 152 This version is load balanced.
153 153 """
154 154 return self.mapper().map(func, *sequences)
155 155
156 156 def mapper(self, clear_before=False, clear_after=False, retries=0,
157 157 recovery_task=None, depend=None, block=True):
158 158 """
159 159 Create an `IMapper` implementer with a given set of arguments.
160 160
161 161 The `IMapper` created using a task controller is load balanced.
162 162
163 163 See the documentation for `IPython.kernel.task.BaseTask` for
164 164 documentation on the arguments to this method.
165 165 """
166 166 return SynchronousTaskMapper(self, clear_before=clear_before,
167 167 clear_after=clear_after, retries=retries,
168 168 recovery_task=recovery_task, depend=depend, block=block)
169 169
170 170 def parallel(self, clear_before=False, clear_after=False, retries=0,
171 171 recovery_task=None, depend=None, block=True):
172 172 mapper = self.mapper(clear_before, clear_after, retries,
173 173 recovery_task, depend, block)
174 174 pf = ParallelFunction(mapper)
175 175 return pf
176 176
177 177 components.registerAdapter(BlockingTaskClient,
178 178 task.ITaskController, IBlockingTaskClient)
179 179
180 180
@@ -1,246 +1,250 b''
1 1 ==================================
2 2 IPython/Vision Beam Pattern Demo
3 3 ==================================
4 4
5 .. note::
6
7 This page has not been updated to reflect the recent work on ipcluster.
8 This work makes it much easier to use IPython on a cluster.
5 9
6 10 Installing and testing IPython at OSC systems
7 11 =============================================
8 12
9 13 All components were installed from source and I have my environment set up to
10 14 include ~/usr/local in my various necessary paths ($PATH, $PYTHONPATH, etc).
11 15 Other than a slow filesystem for unpacking tarballs, the install went without a
12 16 hitch. For each needed component, I just downloaded the source tarball,
13 17 unpacked it via::
14 18
15 19 tar xzf (or xjf if it's bz2) filename.tar.{gz,bz2}
16 20
17 21 and then installed them (including IPython itself) with::
18 22
19 23 cd dirname/ # path to unpacked tarball
20 24 python setup.py install --prefix=~/usr/local/
21 25
22 26 The components I installed are listed below. For each one I give the main
23 27 project link as well as a direct one to the file I actually dowloaded and used.
24 28
25 29 - nose, used for testing:
26 30 http://somethingaboutorange.com/mrl/projects/nose/
27 31 http://somethingaboutorange.com/mrl/projects/nose/nose-0.10.3.tar.gz
28 32
29 33 - Zope interface, used to declare interfaces in twisted and ipython. Note:
30 34 you must get this from the page linked below and not fro the defaul
31 35 one(http://www.zope.org/Products/ZopeInterface) because the latter has an
32 36 older version, it hasn't been updated in a long time. This pypi link has
33 37 the current release (3.4.1 as of this writing):
34 38 http://pypi.python.org/pypi/zope.interface
35 39 http://pypi.python.org/packages/source/z/zope.interface/zope.interface-3.4.1.tar.gz
36 40
37 41 - pyopenssl, security layer used by foolscap. Note: version 0.7 *must* be
38 42 used:
39 43 http://sourceforge.net/projects/pyopenssl/
40 44 http://downloads.sourceforge.net/pyopenssl/pyOpenSSL-0.6.tar.gz?modtime=1212595285&big_mirror=0
41 45
42 46
43 47 - Twisted, used for all networking:
44 48 http://twistedmatrix.com/trac/wiki/Downloads
45 49 http://tmrc.mit.edu/mirror/twisted/Twisted/8.1/Twisted-8.1.0.tar.bz2
46 50
47 51 - Foolscap, used for managing connections securely:
48 52 http://foolscap.lothar.com/trac
49 53 http://foolscap.lothar.com/releases/foolscap-0.3.1.tar.gz
50 54
51 55
52 56 - IPython itself:
53 57 http://ipython.scipy.org/
54 58 http://ipython.scipy.org/dist/ipython-0.9.1.tar.gz
55 59
56 60
57 61 I then ran the ipython test suite via::
58 62
59 63 iptest -vv
60 64
61 65 and it passed with only::
62 66
63 67 ======================================================================
64 68 ERROR: testGetResult_2
65 69 ----------------------------------------------------------------------
66 70 DirtyReactorAggregateError: Reactor was unclean.
67 71 Selectables:
68 72 <Negotiation #0 on 10105>
69 73
70 74 ----------------------------------------------------------------------
71 75 Ran 419 tests in 33.971s
72 76
73 77 FAILED (SKIP=4, errors=1)
74 78
75 79 In three more runs of the test suite I was able to reproduce this error
76 80 sometimes but not always; for now I think we can move on but we need to
77 81 investigate further. Especially if we start seeing problems in real use (the
78 82 test suite stresses the networking layer in particular ways that aren't
79 83 necessarily typical of normal use).
80 84
81 85 Next, I started an 8-engine cluster via::
82 86
83 87 perez@opt-login01[~]> ipcluster -n 8
84 88 Starting controller: Controller PID: 30845
85 89 ^X Starting engines: Engines PIDs: [30846, 30847, 30848, 30849,
86 90 30850, 30851, 30852, 30853]
87 91 Log files: /home/perez/.ipython/log/ipcluster-30845-*
88 92
89 93 Your cluster is up and running.
90 94
91 95 [... etc]
92 96
93 97 and in a separate ipython session checked that the cluster is running and I can
94 98 access all the engines::
95 99
96 100 In [1]: from IPython.kernel import client
97 101
98 102 In [2]: mec = client.MultiEngineClient()
99 103
100 104 In [3]: mec.get_ids()
101 105 Out[3]: [0, 1, 2, 3, 4, 5, 6, 7]
102 106
103 107 and run trivial code in them (after importing the ``random`` module in all
104 108 engines)::
105 109
106 110 In [11]: mec.execute("x=random.randint(0,10)")
107 111 Out[11]:
108 112 <Results List>
109 113 [0] In [3]: x=random.randint(0,10)
110 114 [1] In [3]: x=random.randint(0,10)
111 115 [2] In [3]: x=random.randint(0,10)
112 116 [3] In [3]: x=random.randint(0,10)
113 117 [4] In [3]: x=random.randint(0,10)
114 118 [5] In [3]: x=random.randint(0,10)
115 119 [6] In [3]: x=random.randint(0,10)
116 120 [7] In [3]: x=random.randint(0,10)
117 121
118 122 In [12]: mec.pull('x')
119 123 Out[12]: [10, 0, 8, 10, 2, 9, 10, 7]
120 124
121 125
122 126 We'll continue conducting more complex tests later, including instaling Vision
123 127 locally and running the beam demo.
124 128
125 129
126 130 Michel's original instructions
127 131 ==============================
128 132
129 133 I got a Vision network that reproduces the beam pattern demo working:
130 134
131 135 .. image:: vision_beam_pattern.png
132 136 :width: 400
133 137 :target: vision_beam_pattern.png
134 138 :align: center
135 139
136 140
137 141 I created a package called beamPattern that provides the function run() in its
138 142 __init__.py file.
139 143
140 144 A subpackage beamPattern/VisionInterface provides Vision nodes for:
141 145
142 146 - computing Elevation and Azimuth from a 3D vector
143 147
144 148 - Reading .mat files
145 149
146 150 - taking the results gathered from the engines and creating the output that a
147 151 single engine would have had produced
148 152
149 153 The Mec node connect to a controller. In my network it was local but an furl
150 154 can be specified to connect to a remote controller.
151 155
152 156 The PRun Func node is from the IPython library of nodes. the import statement
153 157 is used to get the run function from the beamPattern package and bu puting
154 158 "run" in the function entry of this node we push this function to the engines.
155 159 In addition to the node will create input ports for all arguments of the
156 160 function being pushed (i.e. the run function)
157 161
158 162 The second input port on PRun Fun take an integer specifying the rank of the
159 163 argument we want to scatter. All other arguments will be pushed to the engines.
160 164
161 165 The ElevAzim node has a 3D vector widget and computes the El And Az values
162 166 which are passed into the PRun Fun node through the ports created
163 167 automatically. The Mat node allows to select the .mat file, reads it and passed
164 168 the data to the locdata port created automatically on PRun Func
165 169
166 170 The calculation is executed in parallel, and the results are gathered and
167 171 output. Instead of having a list of 3 vectors we nd up with a list of n*3
168 172 vectors where n is the number of engines. unpackDectorResults will turn it into
169 173 a list of 3. We then plot x, y, and 10*log10(z)
170 174
171 175
172 176 Installation
173 177 ------------
174 178
175 179 - inflate beamPattern into the site-packages directory for the MGL tools.
176 180
177 181 - place the appended IPythonNodes.py and StandardNodes.py into the Vision
178 182 package of the MGL tools.
179 183
180 184 - place the appended items.py in the NetworkEditor package of the MGL tools
181 185
182 186 - run vision for the network beamPat5_net.py::
183 187
184 188 vision beamPat5_net.py
185 189
186 190 Once the network is running, you can:
187 191
188 192 - double click on the MEC node and either use an emptty string for the furl to
189 193 connect to a local engine or cut and paste the furl to the engine you want to
190 194 use
191 195
192 196 - click on the yellow lighting bold to run the network.
193 197
194 198 - Try modifying the MAT file or change the Vector used top compute elevation
195 199 and Azimut.
196 200
197 201
198 202 Fernando's notes
199 203 ================
200 204
201 205 - I had to install IPython and all its dependencies for the python used by the
202 206 MGL tools.
203 207
204 208 - Then I had to install scipy 0.6.0 for it, since the nodes needed Scipy. To
205 209 do this I sourced the mglenv.sh script and then ran::
206 210
207 211 python setup.py install --prefix=~/usr/opt/mgl
208 212
209 213
210 214 Using PBS
211 215 =========
212 216
213 217 The following PBS script can be used to start the engines::
214 218
215 219 #PBS -N bgranger-ipython
216 220 #PBS -j oe
217 221 #PBS -l walltime=00:10:00
218 222 #PBS -l nodes=4:ppn=4
219 223
220 224 cd $PBS_O_WORKDIR
221 225 export PATH=$HOME/usr/local/bin
222 226 export PYTHONPATH=$HOME/usr/local/lib/python2.4/site-packages
223 227 /usr/local/bin/mpiexec -n 16 ipengine
224 228
225 229
226 230 If this file is called ``ipython_pbs.sh``, then the in one login windows
227 231 (i.e. on the head-node -- ``opt-login01.osc.edu``), run ``ipcontroller``. In
228 232 another login window on the same node, run the above script::
229 233
230 234 qsub ipython_pbs.sh
231 235
232 236 If you look at the first window, you will see some diagnostic output
233 237 from ipcontroller. You can then get the furl from your own
234 238 ``~/.ipython/security`` directory and then connect to it remotely.
235 239
236 240 You might need to set up an SSH tunnel, however; if this doesn't work as
237 241 advertised::
238 242
239 243 ssh -L 10115:localhost:10105 bic
240 244
241 245
242 246 Links to other resources
243 247 ========================
244 248
245 249 - http://www.osc.edu/~unpingco/glenn_NewLynx2_Demo.avi
246 250
General Comments 0
You need to be logged in to leave comments. Login now