##// END OF EJS Templates
Adding documentation to the new benchmark method of MultiEngineClient.
Brian Granger -
r1879:77f8166c merge
parent child Browse files
Show More
@@ -1,896 +1,951 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
3 3
4 4 """General Classes for IMultiEngine clients."""
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import cPickle as pickle
21 21 from types import FunctionType
22 22 import linecache
23 23
24 24 from twisted.internet import reactor
25 25 from twisted.python import components, log
26 26 from twisted.python.failure import Failure
27 27 from zope.interface import Interface, implements, Attribute
28 28
29 29 from IPython.ColorANSI import TermColors
30 30
31 31 from IPython.kernel.twistedutil import blockingCallFromThread
32 32 from IPython.kernel import error
33 33 from IPython.kernel.parallelfunction import ParallelFunction
34 34 from IPython.kernel.mapper import (
35 35 MultiEngineMapper,
36 36 IMultiEngineMapperFactory,
37 37 IMapper
38 38 )
39 39 from IPython.kernel import map as Map
40 40 from IPython.kernel import multiengine as me
41 41 from IPython.kernel.multiengine import (IFullMultiEngine,
42 42 IFullSynchronousMultiEngine)
43 43
44 44
45 45 #-------------------------------------------------------------------------------
46 46 # Pending Result things
47 47 #-------------------------------------------------------------------------------
48 48
49 49 class IPendingResult(Interface):
50 50 """A representation of a result that is pending.
51 51
52 52 This class is similar to Twisted's `Deferred` object, but is designed to be
53 53 used in a synchronous context.
54 54 """
55 55
56 56 result_id=Attribute("ID of the deferred on the other side")
57 57 client=Attribute("A client that I came from")
58 58 r=Attribute("An attribute that is a property that calls and returns get_result")
59 59
60 60 def get_result(default=None, block=True):
61 61 """
62 62 Get a result that is pending.
63 63
64 64 :Parameters:
65 65 default
66 66 The value to return if the result is not ready.
67 67 block : boolean
68 68 Should I block for the result.
69 69
70 70 :Returns: The actual result or the default value.
71 71 """
72 72
73 73 def add_callback(f, *args, **kwargs):
74 74 """
75 75 Add a callback that is called with the result.
76 76
77 77 If the original result is foo, adding a callback will cause
78 78 f(foo, *args, **kwargs) to be returned instead. If multiple
79 79 callbacks are registered, they are chained together: the result of
80 80 one is passed to the next and so on.
81 81
82 82 Unlike Twisted's Deferred object, there is no errback chain. Thus
83 83 any exception raised will not be caught and handled. User must
84 84 catch these by hand when calling `get_result`.
85 85 """
86 86
87 87
88 88 class PendingResult(object):
89 89 """A representation of a result that is not yet ready.
90 90
91 91 A user should not create a `PendingResult` instance by hand.
92 92
93 93 Methods
94 94 =======
95 95
96 96 * `get_result`
97 97 * `add_callback`
98 98
99 99 Properties
100 100 ==========
101 101 * `r`
102 102 """
103 103
104 104 def __init__(self, client, result_id):
105 105 """Create a PendingResult with a result_id and a client instance.
106 106
107 107 The client should implement `_getPendingResult(result_id, block)`.
108 108 """
109 109 self.client = client
110 110 self.result_id = result_id
111 111 self.called = False
112 112 self.raised = False
113 113 self.callbacks = []
114 114
115 115 def get_result(self, default=None, block=True):
116 116 """Get a result that is pending.
117 117
118 118 This method will connect to an IMultiEngine adapted controller
119 119 and see if the result is ready. If the action triggers an exception
120 120 raise it and record it. This method records the result/exception once it is
121 121 retrieved. Calling `get_result` again will get this cached result or will
122 122 re-raise the exception. The .r attribute is a property that calls
123 123 `get_result` with block=True.
124 124
125 125 :Parameters:
126 126 default
127 127 The value to return if the result is not ready.
128 128 block : boolean
129 129 Should I block for the result.
130 130
131 131 :Returns: The actual result or the default value.
132 132 """
133 133
134 134 if self.called:
135 135 if self.raised:
136 136 raise self.result[0], self.result[1], self.result[2]
137 137 else:
138 138 return self.result
139 139 try:
140 140 result = self.client.get_pending_deferred(self.result_id, block)
141 141 except error.ResultNotCompleted:
142 142 return default
143 143 except:
144 144 # Reraise other error, but first record them so they can be reraised
145 145 # later if .r or get_result is called again.
146 146 self.result = sys.exc_info()
147 147 self.called = True
148 148 self.raised = True
149 149 raise
150 150 else:
151 151 for cb in self.callbacks:
152 152 result = cb[0](result, *cb[1], **cb[2])
153 153 self.result = result
154 154 self.called = True
155 155 return result
156 156
157 157 def add_callback(self, f, *args, **kwargs):
158 158 """Add a callback that is called with the result.
159 159
160 160 If the original result is result, adding a callback will cause
161 161 f(result, *args, **kwargs) to be returned instead. If multiple
162 162 callbacks are registered, they are chained together: the result of
163 163 one is passed to the next and so on.
164 164
165 165 Unlike Twisted's Deferred object, there is no errback chain. Thus
166 166 any exception raised will not be caught and handled. User must
167 167 catch these by hand when calling `get_result`.
168 168 """
169 169 assert callable(f)
170 170 self.callbacks.append((f, args, kwargs))
171 171
172 172 def __cmp__(self, other):
173 173 if self.result_id < other.result_id:
174 174 return -1
175 175 else:
176 176 return 1
177 177
178 178 def _get_r(self):
179 179 return self.get_result(block=True)
180 180
181 181 r = property(_get_r)
182 182 """This property is a shortcut to a `get_result(block=True)`."""
183 183
184 184
185 185 #-------------------------------------------------------------------------------
186 186 # Pretty printing wrappers for certain lists
187 187 #-------------------------------------------------------------------------------
188 188
189 189 class ResultList(list):
190 190 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
191 191
192 192 def __repr__(self):
193 193 output = []
194 194 # These colored prompts were not working on Windows
195 195 if sys.platform == 'win32':
196 196 blue = normal = red = green = ''
197 197 else:
198 198 blue = TermColors.Blue
199 199 normal = TermColors.Normal
200 200 red = TermColors.Red
201 201 green = TermColors.Green
202 202 output.append("<Results List>\n")
203 203 for cmd in self:
204 204 if isinstance(cmd, Failure):
205 205 output.append(cmd)
206 206 else:
207 207 target = cmd.get('id',None)
208 208 cmd_num = cmd.get('number',None)
209 209 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
210 210 cmd_stdout = cmd.get('stdout', None)
211 211 cmd_stderr = cmd.get('stderr', None)
212 212 output.append("%s[%i]%s In [%i]:%s %s\n" % \
213 213 (green, target,
214 214 blue, cmd_num, normal, cmd_stdin))
215 215 if cmd_stdout:
216 216 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
217 217 (green, target,
218 218 red, cmd_num, normal, cmd_stdout))
219 219 if cmd_stderr:
220 220 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
221 221 (green, target,
222 222 red, cmd_num, normal, cmd_stderr))
223 223 return ''.join(output)
224 224
225 225
226 226 def wrapResultList(result):
227 227 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
228 228 if len(result) == 0:
229 229 result = [result]
230 230 return ResultList(result)
231 231
232 232
233 233 class QueueStatusList(list):
234 234 """A subclass of list that pretty prints the output of `queue_status`."""
235 235
236 236 def __repr__(self):
237 237 output = []
238 238 output.append("<Queue Status List>\n")
239 239 for e in self:
240 240 output.append("Engine: %s\n" % repr(e[0]))
241 241 output.append(" Pending: %s\n" % repr(e[1]['pending']))
242 242 for q in e[1]['queue']:
243 243 output.append(" Command: %s\n" % repr(q))
244 244 return ''.join(output)
245 245
246 246
247 247 #-------------------------------------------------------------------------------
248 248 # InteractiveMultiEngineClient
249 249 #-------------------------------------------------------------------------------
250 250
251 251 class InteractiveMultiEngineClient(object):
252 252 """A mixin class that add a few methods to a multiengine client.
253 253
254 254 The methods in this mixin class are designed for interactive usage.
255 255 """
256 256
257 257 def activate(self):
258 258 """Make this `MultiEngineClient` active for parallel magic commands.
259 259
260 260 IPython has a magic command syntax to work with `MultiEngineClient` objects.
261 261 In a given IPython session there is a single active one. While
262 262 there can be many `MultiEngineClient` created and used by the user,
263 263 there is only one active one. The active `MultiEngineClient` is used whenever
264 264 the magic commands %px and %autopx are used.
265 265
266 266 The activate() method is called on a given `MultiEngineClient` to make it
267 267 active. Once this has been done, the magic commands can be used.
268 268 """
269 269
270 270 try:
271 271 __IPYTHON__.activeController = self
272 272 except NameError:
273 273 print "The IPython Controller magics only work within IPython."
274 274
275 275 def __setitem__(self, key, value):
276 276 """Add a dictionary interface for pushing/pulling.
277 277
278 278 This functions as a shorthand for `push`.
279 279
280 280 :Parameters:
281 281 key : str
282 282 What to call the remote object.
283 283 value : object
284 284 The local Python object to push.
285 285 """
286 286 targets, block = self._findTargetsAndBlock()
287 287 return self.push({key:value}, targets=targets, block=block)
288 288
289 289 def __getitem__(self, key):
290 290 """Add a dictionary interface for pushing/pulling.
291 291
292 292 This functions as a shorthand to `pull`.
293 293
294 294 :Parameters:
295 295 - `key`: A string representing the key.
296 296 """
297 297 if isinstance(key, str):
298 298 targets, block = self._findTargetsAndBlock()
299 299 return self.pull(key, targets=targets, block=block)
300 300 else:
301 301 raise TypeError("__getitem__ only takes strs")
302 302
303 303 def __len__(self):
304 304 """Return the number of available engines."""
305 305 return len(self.get_ids())
306 306
307 307 #---------------------------------------------------------------------------
308 308 # Make this a context manager for with
309 309 #---------------------------------------------------------------------------
310 310
311 311 def findsource_file(self,f):
312 312 linecache.checkcache()
313 313 s = findsource(f.f_code)
314 314 lnum = f.f_lineno
315 315 wsource = s[0][f.f_lineno:]
316 316 return strip_whitespace(wsource)
317 317
318 318 def findsource_ipython(self,f):
319 319 from IPython import ipapi
320 320 self.ip = ipapi.get()
321 321 wsource = [l+'\n' for l in
322 322 self.ip.IP.input_hist_raw[-1].splitlines()[1:]]
323 323 return strip_whitespace(wsource)
324 324
325 325 def __enter__(self):
326 326 f = sys._getframe(1)
327 327 local_ns = f.f_locals
328 328 global_ns = f.f_globals
329 329 if f.f_code.co_filename == '<ipython console>':
330 330 s = self.findsource_ipython(f)
331 331 else:
332 332 s = self.findsource_file(f)
333 333
334 334 self._with_context_result = self.execute(s)
335 335
336 336 def __exit__ (self, etype, value, tb):
337 337 if issubclass(etype,error.StopLocalExecution):
338 338 return True
339 339
340 340
341 341 def remote():
342 342 m = 'Special exception to stop local execution of parallel code.'
343 343 raise error.StopLocalExecution(m)
344 344
345 345 def strip_whitespace(source):
346 346 # Expand tabs to avoid any confusion.
347 347 wsource = [l.expandtabs(4) for l in source]
348 348 # Detect the indentation level
349 349 done = False
350 350 for line in wsource:
351 351 if line.isspace():
352 352 continue
353 353 for col,char in enumerate(line):
354 354 if char != ' ':
355 355 done = True
356 356 break
357 357 if done:
358 358 break
359 359 # Now we know how much leading space there is in the code. Next, we
360 360 # extract up to the first line that has less indentation.
361 361 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
362 362 # detect triple quoted strings that may have flush left text.
363 363 for lno,line in enumerate(wsource):
364 364 lead = line[:col]
365 365 if lead.isspace():
366 366 continue
367 367 else:
368 368 if not lead.lstrip().startswith('#'):
369 369 break
370 370 # The real 'with' source is up to lno
371 371 src_lines = [l[col:] for l in wsource[:lno+1]]
372 372
373 373 # Finally, check that the source's first non-comment line begins with the
374 374 # special call 'remote()'
375 375 for nline,line in enumerate(src_lines):
376 376 if line.isspace() or line.startswith('#'):
377 377 continue
378 378 if 'remote()' in line:
379 379 break
380 380 else:
381 381 raise ValueError('remote() call missing at the start of code')
382 382 src = ''.join(src_lines[nline+1:])
383 383 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
384 384 return src
385 385
386 386
387 387 #-------------------------------------------------------------------------------
388 388 # The top-level MultiEngine client adaptor
389 389 #-------------------------------------------------------------------------------
390 390
391 391
392 392 class IFullBlockingMultiEngineClient(Interface):
393 393 pass
394 394
395 395
396 396 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
397 397 """
398 398 A blocking client to the `IMultiEngine` controller interface.
399 399
400 400 This class allows users to use a set of engines for a parallel
401 401 computation through the `IMultiEngine` interface. In this interface,
402 402 each engine has a specific id (an int) that is used to refer to the
403 403 engine, run code on it, etc.
404 404 """
405 405
406 406 implements(
407 407 IFullBlockingMultiEngineClient,
408 408 IMultiEngineMapperFactory,
409 409 IMapper
410 410 )
411 411
412 412 def __init__(self, smultiengine):
413 413 self.smultiengine = smultiengine
414 414 self.block = True
415 415 self.targets = 'all'
416 416
417 417 def _findBlock(self, block=None):
418 418 if block is None:
419 419 return self.block
420 420 else:
421 421 if block in (True, False):
422 422 return block
423 423 else:
424 424 raise ValueError("block must be True or False")
425 425
426 426 def _findTargets(self, targets=None):
427 427 if targets is None:
428 428 return self.targets
429 429 else:
430 430 if not isinstance(targets, (str,list,tuple,int)):
431 431 raise ValueError("targets must be a str, list, tuple or int")
432 432 return targets
433 433
434 434 def _findTargetsAndBlock(self, targets=None, block=None):
435 435 return self._findTargets(targets), self._findBlock(block)
436 436
437 437 def _blockFromThread(self, function, *args, **kwargs):
438 438 block = kwargs.get('block', None)
439 439 if block is None:
440 440 raise error.MissingBlockArgument("'block' keyword argument is missing")
441 441 result = blockingCallFromThread(function, *args, **kwargs)
442 442 if not block:
443 443 result = PendingResult(self, result)
444 444 return result
445 445
446 446 def get_pending_deferred(self, deferredID, block):
447 447 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
448 448
449 449 def barrier(self, pendingResults):
450 450 """Synchronize a set of `PendingResults`.
451 451
452 452 This method is a synchronization primitive that waits for a set of
453 453 `PendingResult` objects to complete. More specifically, barier does
454 454 the following.
455 455
456 456 * The `PendingResult`s are sorted by result_id.
457 457 * The `get_result` method is called for each `PendingResult` sequentially
458 458 with block=True.
459 459 * If a `PendingResult` gets a result that is an exception, it is
460 460 trapped and can be re-raised later by calling `get_result` again.
461 461 * The `PendingResult`s are flushed from the controller.
462 462
463 463 After barrier has been called on a `PendingResult`, its results can
464 464 be retrieved by calling `get_result` again or accesing the `r` attribute
465 465 of the instance.
466 466 """
467 467
468 468 # Convert to list for sorting and check class type
469 469 prList = list(pendingResults)
470 470 for pr in prList:
471 471 if not isinstance(pr, PendingResult):
472 472 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
473 473
474 474 # Sort the PendingResults so they are in order
475 475 prList.sort()
476 476 # Block on each PendingResult object
477 477 for pr in prList:
478 478 try:
479 479 result = pr.get_result(block=True)
480 480 except Exception:
481 481 pass
482 482
483 483 def flush(self):
484 484 """
485 485 Clear all pending deferreds/results from the controller.
486 486
487 487 For each `PendingResult` that is created by this client, the controller
488 488 holds on to the result for that `PendingResult`. This can be a problem
489 489 if there are a large number of `PendingResult` objects that are created.
490 490
491 491 Once the result of the `PendingResult` has been retrieved, the result
492 492 is removed from the controller, but if a user doesn't get a result (
493 493 they just ignore the `PendingResult`) the result is kept forever on the
494 494 controller. This method allows the user to clear out all un-retrieved
495 495 results on the controller.
496 496 """
497 497 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
498 498 return r
499 499
500 500 clear_pending_results = flush
501 501
502 502 #---------------------------------------------------------------------------
503 503 # IEngineMultiplexer related methods
504 504 #---------------------------------------------------------------------------
505 505
506 506 def execute(self, lines, targets=None, block=None):
507 507 """
508 508 Execute code on a set of engines.
509 509
510 510 :Parameters:
511 511 lines : str
512 512 The Python code to execute as a string
513 513 targets : id or list of ids
514 514 The engine to use for the execution
515 515 block : boolean
516 516 If False, this method will return the actual result. If False,
517 517 a `PendingResult` is returned which can be used to get the result
518 518 at a later time.
519 519 """
520 520 targets, block = self._findTargetsAndBlock(targets, block)
521 521 result = blockingCallFromThread(self.smultiengine.execute, lines,
522 522 targets=targets, block=block)
523 523 if block:
524 524 result = ResultList(result)
525 525 else:
526 526 result = PendingResult(self, result)
527 527 result.add_callback(wrapResultList)
528 528 return result
529 529
530 530 def push(self, namespace, targets=None, block=None):
531 531 """
532 532 Push a dictionary of keys and values to engines namespace.
533 533
534 534 Each engine has a persistent namespace. This method is used to push
535 535 Python objects into that namespace.
536 536
537 537 The objects in the namespace must be pickleable.
538 538
539 539 :Parameters:
540 540 namespace : dict
541 541 A dict that contains Python objects to be injected into
542 542 the engine persistent namespace.
543 543 targets : id or list of ids
544 544 The engine to use for the execution
545 545 block : boolean
546 546 If False, this method will return the actual result. If False,
547 547 a `PendingResult` is returned which can be used to get the result
548 548 at a later time.
549 549 """
550 550 targets, block = self._findTargetsAndBlock(targets, block)
551 551 return self._blockFromThread(self.smultiengine.push, namespace,
552 552 targets=targets, block=block)
553 553
554 554 def pull(self, keys, targets=None, block=None):
555 555 """
556 556 Pull Python objects by key out of engines namespaces.
557 557
558 558 :Parameters:
559 559 keys : str or list of str
560 560 The names of the variables to be pulled
561 561 targets : id or list of ids
562 562 The engine to use for the execution
563 563 block : boolean
564 564 If False, this method will return the actual result. If False,
565 565 a `PendingResult` is returned which can be used to get the result
566 566 at a later time.
567 567 """
568 568 targets, block = self._findTargetsAndBlock(targets, block)
569 569 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
570 570
571 571 def push_function(self, namespace, targets=None, block=None):
572 572 """
573 573 Push a Python function to an engine.
574 574
575 575 This method is used to push a Python function to an engine. This
576 576 method can then be used in code on the engines. Closures are not supported.
577 577
578 578 :Parameters:
579 579 namespace : dict
580 580 A dict whose values are the functions to be pushed. The keys give
581 581 that names that the function will appear as in the engines
582 582 namespace.
583 583 targets : id or list of ids
584 584 The engine to use for the execution
585 585 block : boolean
586 586 If False, this method will return the actual result. If False,
587 587 a `PendingResult` is returned which can be used to get the result
588 588 at a later time.
589 589 """
590 590 targets, block = self._findTargetsAndBlock(targets, block)
591 591 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
592 592
593 593 def pull_function(self, keys, targets=None, block=None):
594 594 """
595 595 Pull a Python function from an engine.
596 596
597 597 This method is used to pull a Python function from an engine.
598 598 Closures are not supported.
599 599
600 600 :Parameters:
601 601 keys : str or list of str
602 602 The names of the functions to be pulled
603 603 targets : id or list of ids
604 604 The engine to use for the execution
605 605 block : boolean
606 606 If False, this method will return the actual result. If False,
607 607 a `PendingResult` is returned which can be used to get the result
608 608 at a later time.
609 609 """
610 610 targets, block = self._findTargetsAndBlock(targets, block)
611 611 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
612 612
613 613 def push_serialized(self, namespace, targets=None, block=None):
614 614 targets, block = self._findTargetsAndBlock(targets, block)
615 615 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
616 616
617 617 def pull_serialized(self, keys, targets=None, block=None):
618 618 targets, block = self._findTargetsAndBlock(targets, block)
619 619 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
620 620
621 621 def get_result(self, i=None, targets=None, block=None):
622 622 """
623 623 Get a previous result.
624 624
625 625 When code is executed in an engine, a dict is created and returned. This
626 626 method retrieves that dict for previous commands.
627 627
628 628 :Parameters:
629 629 i : int
630 630 The number of the result to get
631 631 targets : id or list of ids
632 632 The engine to use for the execution
633 633 block : boolean
634 634 If False, this method will return the actual result. If False,
635 635 a `PendingResult` is returned which can be used to get the result
636 636 at a later time.
637 637 """
638 638 targets, block = self._findTargetsAndBlock(targets, block)
639 639 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
640 640 if block:
641 641 result = ResultList(result)
642 642 else:
643 643 result = PendingResult(self, result)
644 644 result.add_callback(wrapResultList)
645 645 return result
646 646
647 647 def reset(self, targets=None, block=None):
648 648 """
649 649 Reset an engine.
650 650
651 651 This method clears out the namespace of an engine.
652 652
653 653 :Parameters:
654 654 targets : id or list of ids
655 655 The engine to use for the execution
656 656 block : boolean
657 657 If False, this method will return the actual result. If False,
658 658 a `PendingResult` is returned which can be used to get the result
659 659 at a later time.
660 660 """
661 661 targets, block = self._findTargetsAndBlock(targets, block)
662 662 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
663 663
664 664 def keys(self, targets=None, block=None):
665 665 """
666 666 Get a list of all the variables in an engine's namespace.
667 667
668 668 :Parameters:
669 669 targets : id or list of ids
670 670 The engine to use for the execution
671 671 block : boolean
672 672 If False, this method will return the actual result. If False,
673 673 a `PendingResult` is returned which can be used to get the result
674 674 at a later time.
675 675 """
676 676 targets, block = self._findTargetsAndBlock(targets, block)
677 677 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
678 678
679 679 def kill(self, controller=False, targets=None, block=None):
680 680 """
681 681 Kill the engines and controller.
682 682
683 683 This method is used to stop the engine and controller by calling
684 684 `reactor.stop`.
685 685
686 686 :Parameters:
687 687 controller : boolean
688 688 If True, kill the engines and controller. If False, just the
689 689 engines
690 690 targets : id or list of ids
691 691 The engine to use for the execution
692 692 block : boolean
693 693 If False, this method will return the actual result. If False,
694 694 a `PendingResult` is returned which can be used to get the result
695 695 at a later time.
696 696 """
697 697 targets, block = self._findTargetsAndBlock(targets, block)
698 698 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
699 699
700 700 def clear_queue(self, targets=None, block=None):
701 701 """
702 702 Clear out the controller's queue for an engine.
703 703
704 704 The controller maintains a queue for each engine. This clear it out.
705 705
706 706 :Parameters:
707 707 targets : id or list of ids
708 708 The engine to use for the execution
709 709 block : boolean
710 710 If False, this method will return the actual result. If False,
711 711 a `PendingResult` is returned which can be used to get the result
712 712 at a later time.
713 713 """
714 714 targets, block = self._findTargetsAndBlock(targets, block)
715 715 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
716 716
717 717 def queue_status(self, targets=None, block=None):
718 718 """
719 719 Get the status of an engines queue.
720 720
721 721 :Parameters:
722 722 targets : id or list of ids
723 723 The engine to use for the execution
724 724 block : boolean
725 725 If False, this method will return the actual result. If False,
726 726 a `PendingResult` is returned which can be used to get the result
727 727 at a later time.
728 728 """
729 729 targets, block = self._findTargetsAndBlock(targets, block)
730 730 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
731 731
732 732 def set_properties(self, properties, targets=None, block=None):
733 733 targets, block = self._findTargetsAndBlock(targets, block)
734 734 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
735 735
736 736 def get_properties(self, keys=None, targets=None, block=None):
737 737 targets, block = self._findTargetsAndBlock(targets, block)
738 738 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
739 739
740 740 def has_properties(self, keys, targets=None, block=None):
741 741 targets, block = self._findTargetsAndBlock(targets, block)
742 742 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
743 743
744 744 def del_properties(self, keys, targets=None, block=None):
745 745 targets, block = self._findTargetsAndBlock(targets, block)
746 746 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
747 747
748 748 def clear_properties(self, targets=None, block=None):
749 749 targets, block = self._findTargetsAndBlock(targets, block)
750 750 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
751 751
752 752 #---------------------------------------------------------------------------
753 753 # IMultiEngine related methods
754 754 #---------------------------------------------------------------------------
755 755
756 756 def get_ids(self):
757 757 """
758 758 Returns the ids of currently registered engines.
759 759 """
760 760 result = blockingCallFromThread(self.smultiengine.get_ids)
761 761 return result
762 762
763 763 #---------------------------------------------------------------------------
764 764 # IMultiEngineCoordinator
765 765 #---------------------------------------------------------------------------
766 766
767 767 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
768 768 """
769 769 Partition a Python sequence and send the partitions to a set of engines.
770 770 """
771 771 targets, block = self._findTargetsAndBlock(targets, block)
772 772 return self._blockFromThread(self.smultiengine.scatter, key, seq,
773 773 dist, flatten, targets=targets, block=block)
774 774
775 775 def gather(self, key, dist='b', targets=None, block=None):
776 776 """
777 777 Gather a partitioned sequence on a set of engines as a single local seq.
778 778 """
779 779 targets, block = self._findTargetsAndBlock(targets, block)
780 780 return self._blockFromThread(self.smultiengine.gather, key, dist,
781 781 targets=targets, block=block)
782 782
783 783 def raw_map(self, func, seq, dist='b', targets=None, block=None):
784 784 """
785 785 A parallelized version of Python's builtin map.
786 786
787 787 This has a slightly different syntax than the builtin `map`.
788 788 This is needed because we need to have keyword arguments and thus
789 789 can't use *args to capture all the sequences. Instead, they must
790 790 be passed in a list or tuple.
791 791
792 792 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
793 793
794 794 Most users will want to use parallel functions or the `mapper`
795 795 and `map` methods for an API that follows that of the builtin
796 796 `map`.
797 797 """
798 798 targets, block = self._findTargetsAndBlock(targets, block)
799 799 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
800 800 dist, targets=targets, block=block)
801 801
802 802 def map(self, func, *sequences):
803 803 """
804 804 A parallel version of Python's builtin `map` function.
805 805
806 806 This method applies a function to sequences of arguments. It
807 807 follows the same syntax as the builtin `map`.
808 808
809 809 This method creates a mapper objects by calling `self.mapper` with
810 810 no arguments and then uses that mapper to do the mapping. See
811 811 the documentation of `mapper` for more details.
812 812 """
813 813 return self.mapper().map(func, *sequences)
814 814
815 815 def mapper(self, dist='b', targets='all', block=None):
816 816 """
817 817 Create a mapper object that has a `map` method.
818 818
819 819 This method returns an object that implements the `IMapper`
820 820 interface. This method is a factory that is used to control how
821 821 the map happens.
822 822
823 823 :Parameters:
824 824 dist : str
825 825 What decomposition to use, 'b' is the only one supported
826 826 currently
827 827 targets : str, int, sequence of ints
828 828 Which engines to use for the map
829 829 block : boolean
830 830 Should calls to `map` block or not
831 831 """
832 832 return MultiEngineMapper(self, dist, targets, block)
833 833
834 834 def parallel(self, dist='b', targets=None, block=None):
835 835 """
836 836 A decorator that turns a function into a parallel function.
837 837
838 838 This can be used as:
839 839
840 840 @parallel()
841 841 def f(x, y)
842 842 ...
843 843
844 844 f(range(10), range(10))
845 845
846 846 This causes f(0,0), f(1,1), ... to be called in parallel.
847 847
848 848 :Parameters:
849 849 dist : str
850 850 What decomposition to use, 'b' is the only one supported
851 851 currently
852 852 targets : str, int, sequence of ints
853 853 Which engines to use for the map
854 854 block : boolean
855 855 Should calls to `map` block or not
856 856 """
857 857 targets, block = self._findTargetsAndBlock(targets, block)
858 858 mapper = self.mapper(dist, targets, block)
859 859 pf = ParallelFunction(mapper)
860 860 return pf
861 861
862 862 #---------------------------------------------------------------------------
863 863 # IMultiEngineExtras
864 864 #---------------------------------------------------------------------------
865 865
866 866 def zip_pull(self, keys, targets=None, block=None):
867 867 targets, block = self._findTargetsAndBlock(targets, block)
868 868 return self._blockFromThread(self.smultiengine.zip_pull, keys,
869 869 targets=targets, block=block)
870 870
871 871 def run(self, filename, targets=None, block=None):
872 872 """
873 873 Run a Python code in a file on the engines.
874 874
875 875 :Parameters:
876 876 filename : str
877 877 The name of the local file to run
878 878 targets : id or list of ids
879 879 The engine to use for the execution
880 880 block : boolean
881 881 If False, this method will return the actual result. If False,
882 882 a `PendingResult` is returned which can be used to get the result
883 883 at a later time.
884 884 """
885 885 targets, block = self._findTargetsAndBlock(targets, block)
886 886 return self._blockFromThread(self.smultiengine.run, filename,
887 887 targets=targets, block=block)
888
889 def benchmark(self, push_size=10000):
890 """
891 Run performance benchmarks for the current IPython cluster.
892
893 This method tests both the latency of sending command and data to the
894 engines as well as the throughput of sending large objects to the
895 engines using push. The latency is measured by having one or more
896 engines execute the command 'pass'. The throughput is measure by
897 sending an NumPy array of size `push_size` to one or more engines.
898
899 These benchmarks will vary widely on different hardware and networks
900 and thus can be used to get an idea of the performance characteristics
901 of a particular configuration of an IPython controller and engines.
902
903 This function is not testable within our current testing framework.
904 """
905 import timeit, __builtin__
906 __builtin__._mec_self = self
907 benchmarks = {}
908 repeat = 3
909 count = 10
910
911 timer = timeit.Timer('_mec_self.execute("pass",0)')
912 result = 1000*min(timer.repeat(repeat,count))/count
913 benchmarks['single_engine_latency'] = (result,'msec')
914
915 timer = timeit.Timer('_mec_self.execute("pass")')
916 result = 1000*min(timer.repeat(repeat,count))/count
917 benchmarks['all_engine_latency'] = (result,'msec')
888 918
919 try:
920 import numpy as np
921 except:
922 pass
923 else:
924 timer = timeit.Timer(
925 "_mec_self.push(d)",
926 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
927 )
928 result = min(timer.repeat(repeat,count))/count
929 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
930
931 try:
932 import numpy as np
933 except:
934 pass
935 else:
936 timer = timeit.Timer(
937 "_mec_self.push(d,0)",
938 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
939 )
940 result = min(timer.repeat(repeat,count))/count
941 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
942
943 return benchmarks
889 944
890 945
891 946 components.registerAdapter(FullBlockingMultiEngineClient,
892 947 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
893 948
894 949
895 950
896 951
General Comments 0
You need to be logged in to leave comments. Login now