##// END OF EJS Templates
split get_results into get_result/result_status, add AsyncHubResult
MinRK -
Show More
@@ -10,6 +10,8 b''
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 import time
14
13 15 from IPython.external.decorator import decorator
14 16 import error
15 17
@@ -189,6 +191,23 b' class AsyncResult(object):'
189 191 raise AttributeError("%r object has no attribute %r"%(
190 192 self.__class__.__name__, key))
191 193 return self.__getitem__(key)
194
195 # asynchronous iterator:
196 def __iter__(self):
197 if self._single_result:
198 raise TypeError("AsyncResults with a single result are not iterable.")
199 try:
200 rlist = self.get(0)
201 except error.TimeoutError:
202 # wait for each result individually
203 for msg_id in self.msg_ids:
204 ar = AsyncResult(self._client, msg_id, self._fname)
205 yield ar.get()
206 else:
207 # already done
208 for r in rlist:
209 yield r
210
192 211
193 212
194 213 class AsyncMapResult(AsyncResult):
@@ -227,6 +246,49 b' class AsyncMapResult(AsyncResult):'
227 246 # already done
228 247 for r in rlist:
229 248 yield r
249
250
251 class AsyncHubResult(AsyncResult):
252 """Class to wrap pending results that must be requested from the Hub"""
230 253
254 def wait(self, timeout=-1):
255 """wait for result to complete."""
256 start = time.time()
257 if self._ready:
258 return
259 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
260 local_ready = self._client.barrier(local_ids, timeout)
261 if local_ready:
262 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
263 if not remote_ids:
264 self._ready = True
265 else:
266 rdict = self._client.result_status(remote_ids, status_only=False)
267 pending = rdict['pending']
268 while pending and time.time() < start+timeout:
269 rdict = self._client.result_status(remote_ids, status_only=False)
270 pending = rdict['pending']
271 if pending:
272 time.sleep(0.1)
273 if not pending:
274 self._ready = True
275 if self._ready:
276 try:
277 results = map(self._client.results.get, self.msg_ids)
278 self._result = results
279 if self._single_result:
280 r = results[0]
281 if isinstance(r, Exception):
282 raise r
283 else:
284 results = error.collect_exceptions(results, self._fname)
285 self._result = self._reconstruct_result(results)
286 except Exception, e:
287 self._exception = e
288 self._success = False
289 else:
290 self._success = True
291 finally:
292 self._metadata = map(self._client.metadata.get, self.msg_ids)
231 293
232 __all__ = ['AsyncResult', 'AsyncMapResult'] No newline at end of file
294 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
@@ -32,7 +32,7 b' from IPython.external.ssh import tunnel'
32 32 import error
33 33 import map as Map
34 34 import streamsession as ss
35 from asyncresult import AsyncResult, AsyncMapResult
35 from asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult
36 36 from clusterdir import ClusterDir, ClusterDirError
37 37 from dependency import Dependency, depend, require, dependent
38 38 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
@@ -485,6 +485,15 b' class Client(HasTraits):'
485 485 # handlers and callbacks for incoming messages
486 486 #--------------------------------------------------------------------------
487 487
488 def _unwrap_exception(self, content):
489 """unwrap exception, and remap engineid to int."""
490 e = ss.unwrap_exception(content)
491 if e.engine_info:
492 e_uuid = e.engine_info['engineid']
493 eid = self._engines[e_uuid]
494 e.engine_info['engineid'] = eid
495 return e
496
488 497 def _register_engine(self, msg):
489 498 """Register a new engine, and update our connection info."""
490 499 content = msg['content']
@@ -537,7 +546,7 b' class Client(HasTraits):'
537 546 print ("got unknown result: %s"%msg_id)
538 547 else:
539 548 self.outstanding.remove(msg_id)
540 self.results[msg_id] = ss.unwrap_exception(msg['content'])
549 self.results[msg_id] = self._unwrap_exception(msg['content'])
541 550
542 551 def _handle_apply_reply(self, msg):
543 552 """Save the reply to an apply_request into our results."""
@@ -569,12 +578,7 b' class Client(HasTraits):'
569 578 # TODO: handle resubmission
570 579 pass
571 580 else:
572 e = ss.unwrap_exception(content)
573 if e.engine_info:
574 e_uuid = e.engine_info['engineid']
575 eid = self._engines[e_uuid]
576 e.engine_info['engineid'] = eid
577 self.results[msg_id] = e
581 self.results[msg_id] = self._unwrap_exception(content)
578 582
579 583 def _flush_notifications(self):
580 584 """Flush notifications of engine registrations waiting
@@ -641,7 +645,7 b' class Client(HasTraits):'
641 645 s = md[name] or ''
642 646 md[name] = s + content['data']
643 647 elif msg_type == 'pyerr':
644 md.update({'pyerr' : ss.unwrap_exception(content)})
648 md.update({'pyerr' : self._unwrap_exception(content)})
645 649 else:
646 650 md.update({msg_type : content['data']})
647 651
@@ -685,13 +689,13 b' class Client(HasTraits):'
685 689 if self._iopub_socket:
686 690 self._flush_iopub(self._iopub_socket)
687 691
688 def barrier(self, msg_ids=None, timeout=-1):
689 """waits on one or more `msg_ids`, for up to `timeout` seconds.
692 def barrier(self, jobs=None, timeout=-1):
693 """waits on one or more `jobs`, for up to `timeout` seconds.
690 694
691 695 Parameters
692 696 ----------
693 697
694 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
698 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
695 699 ints are indices to self.history
696 700 strs are msg_ids
697 701 default: wait on all outstanding messages
@@ -706,19 +710,20 b' class Client(HasTraits):'
706 710 False : timeout reached, some msg_ids still outstanding
707 711 """
708 712 tic = time.time()
709 if msg_ids is None:
713 if jobs is None:
710 714 theids = self.outstanding
711 715 else:
712 if isinstance(msg_ids, (int, str, AsyncResult)):
713 msg_ids = [msg_ids]
716 if isinstance(jobs, (int, str, AsyncResult)):
717 jobs = [jobs]
714 718 theids = set()
715 for msg_id in msg_ids:
716 if isinstance(msg_id, int):
717 msg_id = self.history[msg_id]
718 elif isinstance(msg_id, AsyncResult):
719 map(theids.add, msg_id.msg_ids)
719 for job in jobs:
720 if isinstance(job, int):
721 # index access
722 job = self.history[job]
723 elif isinstance(job, AsyncResult):
724 map(theids.add, job.msg_ids)
720 725 continue
721 theids.add(msg_id)
726 theids.add(job)
722 727 if not theids.intersection(self.outstanding):
723 728 return True
724 729 self.spin()
@@ -747,18 +752,39 b' class Client(HasTraits):'
747 752 if self.debug:
748 753 pprint(msg)
749 754 if msg['content']['status'] != 'ok':
750 error = ss.unwrap_exception(msg['content'])
755 error = self._unwrap_exception(msg['content'])
751 756 if error:
752 757 return error
753 758
754 759
755 760 @spinfirst
756 761 @defaultblock
757 def abort(self, msg_ids = None, targets=None, block=None):
758 """Abort the execution queues of target(s)."""
762 def abort(self, jobs=None, targets=None, block=None):
763 """Abort specific jobs from the execution queues of target(s).
764
765 This is a mechanism to prevent jobs that have already been submitted
766 from executing.
767
768 Parameters
769 ----------
770
771 jobs : msg_id, list of msg_ids, or AsyncResult
772 The jobs to be aborted
773
774
775 """
759 776 targets = self._build_targets(targets)[0]
760 if isinstance(msg_ids, basestring):
761 msg_ids = [msg_ids]
777 msg_ids = []
778 if isinstance(jobs, (basestring,AsyncResult)):
779 jobs = [jobs]
780 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
781 if bad_ids:
782 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
783 for j in jobs:
784 if isinstance(j, AsyncResult):
785 msg_ids.extend(j.msg_ids)
786 else:
787 msg_ids.append(j)
762 788 content = dict(msg_ids=msg_ids)
763 789 for t in targets:
764 790 self.session.send(self._control_socket, 'abort_request',
@@ -770,7 +796,7 b' class Client(HasTraits):'
770 796 if self.debug:
771 797 pprint(msg)
772 798 if msg['content']['status'] != 'ok':
773 error = ss.unwrap_exception(msg['content'])
799 error = self._unwrap_exception(msg['content'])
774 800 if error:
775 801 return error
776 802
@@ -791,7 +817,7 b' class Client(HasTraits):'
791 817 if self.debug:
792 818 pprint(msg)
793 819 if msg['content']['status'] != 'ok':
794 error = ss.unwrap_exception(msg['content'])
820 error = self._unwrap_exception(msg['content'])
795 821
796 822 if controller:
797 823 time.sleep(0.25)
@@ -800,7 +826,7 b' class Client(HasTraits):'
800 826 if self.debug:
801 827 pprint(msg)
802 828 if msg['content']['status'] != 'ok':
803 error = ss.unwrap_exception(msg['content'])
829 error = self._unwrap_exception(msg['content'])
804 830
805 831 if error:
806 832 raise error
@@ -827,8 +853,9 b' class Client(HasTraits):'
827 853 whether or not to wait until done to return
828 854 default: self.block
829 855 """
830 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False)
831 return result
856 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
857 if not block:
858 return result
832 859
833 860 def run(self, filename, targets='all', block=None):
834 861 """Execute contents of `filename` on engine(s).
@@ -1134,6 +1161,8 b' class Client(HasTraits):'
1134 1161 targets = slice(None)
1135 1162
1136 1163 if isinstance(targets, int):
1164 if targets < 0:
1165 targets = self.ids[targets]
1137 1166 if targets not in self.ids:
1138 1167 raise IndexError("No such engine: %i"%targets)
1139 1168 return self._cache_view(targets, balanced)
@@ -1159,7 +1188,8 b' class Client(HasTraits):'
1159 1188 if not isinstance(ns, dict):
1160 1189 raise TypeError("Must be a dict, not %s"%type(ns))
1161 1190 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
1162 return result
1191 if not block:
1192 return result
1163 1193
1164 1194 @defaultblock
1165 1195 def pull(self, keys, targets='all', block=None):
@@ -1191,7 +1221,7 b' class Client(HasTraits):'
1191 1221 msg_ids.extend(r.msg_ids)
1192 1222 r = AsyncResult(self, msg_ids, fname='scatter')
1193 1223 if block:
1194 return r.get()
1224 r.get()
1195 1225 else:
1196 1226 return r
1197 1227
@@ -1218,33 +1248,104 b' class Client(HasTraits):'
1218 1248 #--------------------------------------------------------------------------
1219 1249
1220 1250 @spinfirst
1221 def get_results(self, msg_ids, status_only=False):
1222 """Returns the result of the execute or task request with `msg_ids`.
1251 @defaultblock
1252 def get_result(self, indices_or_msg_ids=None, block=None):
1253 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1254
1255 If the client already has the results, no request to the Hub will be made.
1256
1257 This is a convenient way to construct AsyncResult objects, which are wrappers
1258 that include metadata about execution, and allow for awaiting results that
1259 were not submitted by this Client.
1260
1261 It can also be a convenient way to retrieve the metadata associated with
1262 blocking execution, since it always retrieves
1263
1264 Examples
1265 --------
1266 ::
1267
1268 In [10]: r = client.apply()
1223 1269
1224 1270 Parameters
1225 1271 ----------
1226 1272
1227 msg_ids : list of ints or msg_ids
1273 indices_or_msg_ids : integer history index, str msg_id, or list of either
1274 The indices or msg_ids of indices to be retrieved
1275
1276 block : bool
1277 Whether to wait for the result to be done
1278
1279 Returns
1280 -------
1281
1282 AsyncResult
1283 A single AsyncResult object will always be returned.
1284
1285 AsyncHubResult
1286 A subclass of AsyncResult that retrieves results from the Hub
1287
1288 """
1289 if indices_or_msg_ids is None:
1290 indices_or_msg_ids = -1
1291
1292 if not isinstance(indices_or_msg_ids, (list,tuple)):
1293 indices_or_msg_ids = [indices_or_msg_ids]
1294
1295 theids = []
1296 for id in indices_or_msg_ids:
1297 if isinstance(id, int):
1298 id = self.history[id]
1299 if not isinstance(id, str):
1300 raise TypeError("indices must be str or int, not %r"%id)
1301 theids.append(id)
1302
1303 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1304 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1305
1306 if remote_ids:
1307 ar = AsyncHubResult(self, msg_ids=theids)
1308 else:
1309 ar = AsyncResult(self, msg_ids=theids)
1310
1311 if block:
1312 ar.wait()
1313
1314 return ar
1315
1316 @spinfirst
1317 def result_status(self, msg_ids, status_only=True):
1318 """Check on the status of the result(s) of the apply request with `msg_ids`.
1319
1320 If status_only is False, then the actual results will be retrieved, else
1321 only the status of the results will be checked.
1322
1323 Parameters
1324 ----------
1325
1326 msg_ids : list of msg_ids
1228 1327 if int:
1229 1328 Passed as index to self.history for convenience.
1230 status_only : bool (default: False)
1329 status_only : bool (default: True)
1231 1330 if False:
1232 return the actual results
1331 Retrieve the actual results of completed tasks.
1233 1332
1234 1333 Returns
1235 1334 -------
1236 1335
1237 1336 results : dict
1238 1337 There will always be the keys 'pending' and 'completed', which will
1239 be lists of msg_ids.
1338 be lists of msg_ids that are incomplete or complete. If `status_only`
1339 is False, then completed results will be keyed by their `msg_id`.
1240 1340 """
1241 if not isinstance(msg_ids, (list,tuple)):
1242 msg_ids = [msg_ids]
1341 if not isinstance(indices_or_msg_ids, (list,tuple)):
1342 indices_or_msg_ids = [indices_or_msg_ids]
1343
1243 1344 theids = []
1244 for msg_id in msg_ids:
1345 for msg_id in indices_or_msg_ids:
1245 1346 if isinstance(msg_id, int):
1246 1347 msg_id = self.history[msg_id]
1247 if not isinstance(msg_id, str):
1348 if not isinstance(msg_id, basestring):
1248 1349 raise TypeError("msg_ids must be str, not %r"%msg_id)
1249 1350 theids.append(msg_id)
1250 1351
@@ -1252,7 +1353,7 b' class Client(HasTraits):'
1252 1353 local_results = {}
1253 1354
1254 1355 # comment this block out to temporarily disable local shortcut:
1255 for msg_id in list(theids):
1356 for msg_id in theids:
1256 1357 if msg_id in self.results:
1257 1358 completed.append(msg_id)
1258 1359 local_results[msg_id] = self.results[msg_id]
@@ -1267,7 +1368,7 b' class Client(HasTraits):'
1267 1368 pprint(msg)
1268 1369 content = msg['content']
1269 1370 if content['status'] != 'ok':
1270 raise ss.unwrap_exception(content)
1371 raise self._unwrap_exception(content)
1271 1372 buffers = msg['buffers']
1272 1373 else:
1273 1374 content = dict(completed=[],pending=[])
@@ -1298,13 +1399,17 b' class Client(HasTraits):'
1298 1399 if rcontent['status'] == 'ok':
1299 1400 res,buffers = ss.unserialize_object(buffers)
1300 1401 else:
1301 res = ss.unwrap_exception(rcontent)
1402 print rcontent
1403 res = self._unwrap_exception(rcontent)
1302 1404 failures.append(res)
1303 1405
1304 1406 self.results[msg_id] = res
1305 1407 content[msg_id] = res
1306 1408
1307 error.collect_exceptions(failures, "get_results")
1409 if len(theids) == 1 and failures:
1410 raise failures[0]
1411
1412 error.collect_exceptions(failures, "result_status")
1308 1413 return content
1309 1414
1310 1415 @spinfirst
@@ -1329,11 +1434,11 b' class Client(HasTraits):'
1329 1434 content = msg['content']
1330 1435 status = content.pop('status')
1331 1436 if status != 'ok':
1332 raise ss.unwrap_exception(content)
1437 raise self._unwrap_exception(content)
1333 1438 return ss.rekey(content)
1334 1439
1335 1440 @spinfirst
1336 def purge_results(self, msg_ids=[], targets=[]):
1441 def purge_results(self, jobs=[], targets=[]):
1337 1442 """Tell the controller to forget results.
1338 1443
1339 1444 Individual results can be purged by msg_id, or the entire
@@ -1342,7 +1447,7 b' class Client(HasTraits):'
1342 1447 Parameters
1343 1448 ----------
1344 1449
1345 msg_ids : str or list of strs
1450 jobs : str or list of strs or AsyncResult objects
1346 1451 the msg_ids whose results should be forgotten.
1347 1452 targets : int/str/list of ints/strs
1348 1453 The targets, by uuid or int_id, whose entire history is to be purged.
@@ -1350,10 +1455,24 b' class Client(HasTraits):'
1350 1455
1351 1456 default : None
1352 1457 """
1353 if not targets and not msg_ids:
1354 raise ValueError
1458 if not targets and not jobs:
1459 raise ValueError("Must specify at least one of `targets` and `jobs`")
1355 1460 if targets:
1356 1461 targets = self._build_targets(targets)[1]
1462
1463 # construct msg_ids from jobs
1464 msg_ids = []
1465 if isinstance(jobs, (basestring,AsyncResult)):
1466 jobs = [jobs]
1467 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1468 if bad_ids:
1469 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1470 for j in jobs:
1471 if isinstance(j, AsyncResult):
1472 msg_ids.extend(j.msg_ids)
1473 else:
1474 msg_ids.append(j)
1475
1357 1476 content = dict(targets=targets, msg_ids=msg_ids)
1358 1477 self.session.send(self._query_socket, "purge_request", content=content)
1359 1478 idents, msg = self.session.recv(self._query_socket, 0)
@@ -1361,7 +1480,7 b' class Client(HasTraits):'
1361 1480 pprint(msg)
1362 1481 content = msg['content']
1363 1482 if content['status'] != 'ok':
1364 raise ss.unwrap_exception(content)
1483 raise self._unwrap_exception(content)
1365 1484
1366 1485
1367 1486 __all__ = [ 'Client',
@@ -128,7 +128,7 b' class ParallelFunction(RemoteFunction):'
128 128 args = []
129 129 for seq in sequences:
130 130 part = self.mapObject.getPartition(seq, index, nparts)
131 if not part:
131 if len(part) == 0:
132 132 continue
133 133 else:
134 134 args.append(part)
@@ -15,7 +15,7 b' from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Insta'
15 15 from IPython.external.decorator import decorator
16 16 from IPython.zmq.parallel.asyncresult import AsyncResult
17 17 from IPython.zmq.parallel.dependency import Dependency
18 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
18 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel, remote
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Decorators
@@ -91,6 +91,8 b' class View(HasTraits):'
91 91 for name in self._default_names:
92 92 setattr(self, name, getattr(self, name, None))
93 93
94 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
95
94 96
95 97 def __repr__(self):
96 98 strtargets = str(self._targets)
@@ -106,9 +108,17 b' class View(HasTraits):'
106 108 def targets(self, value):
107 109 raise AttributeError("Cannot set View `targets` after construction!")
108 110
111 @property
112 def balanced(self):
113 return self._balanced
114
115 @balanced.setter
116 def balanced(self, value):
117 raise AttributeError("Cannot set View `balanced` after construction!")
118
109 119 def _defaults(self, *excludes):
110 120 """return dict of our default attributes, excluding names given."""
111 d = dict(balanced=self._balanced, targets=self.targets)
121 d = dict(balanced=self._balanced, targets=self._targets)
112 122 for name in self._default_names:
113 123 if name not in excludes:
114 124 d[name] = getattr(self, name)
@@ -182,22 +192,22 b' class View(HasTraits):'
182 192 d = self._defaults('block', 'bound')
183 193 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
184 194
185 @sync_results
186 @save_ids
187 def apply_bound(self, f, *args, **kwargs):
188 """calls f(*args, **kwargs) bound to engine namespace(s).
189
190 if self.block is False:
191 returns msg_id
192 else:
193 returns actual result of f(*args, **kwargs)
194
195 This method has access to the targets' globals
196
197 """
198 d = self._defaults('bound')
199 return self.client.apply(f, args, kwargs, bound=True, **d)
200
195 # @sync_results
196 # @save_ids
197 # def apply_bound(self, f, *args, **kwargs):
198 # """calls f(*args, **kwargs) bound to engine namespace(s).
199 #
200 # if self.block is False:
201 # returns msg_id
202 # else:
203 # returns actual result of f(*args, **kwargs)
204 #
205 # This method has access to the targets' namespace via globals()
206 #
207 # """
208 # d = self._defaults('bound')
209 # return self.client.apply(f, args, kwargs, bound=True, **d)
210 #
201 211 @sync_results
202 212 @save_ids
203 213 def apply_async_bound(self, f, *args, **kwargs):
@@ -206,7 +216,7 b' class View(HasTraits):'
206 216
207 217 returns: msg_id
208 218
209 This method has access to the targets' globals
219 This method has access to the targets' namespace via globals()
210 220
211 221 """
212 222 d = self._defaults('block', 'bound')
@@ -219,35 +229,54 b' class View(HasTraits):'
219 229
220 230 returns: actual result of f(*args, **kwargs)
221 231
222 This method has access to the targets' globals
232 This method has access to the targets' namespace via globals()
223 233
224 234 """
225 235 d = self._defaults('block', 'bound')
226 236 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
227 237
228 def abort(self, msg_ids=None, block=None):
238 def abort(self, jobs=None, block=None):
229 239 """Abort jobs on my engines.
230 240
231 241 Parameters
232 242 ----------
233 243
234 msg_ids : None, str, list of strs, optional
244 jobs : None, str, list of strs, optional
235 245 if None: abort all jobs.
236 246 else: abort specific msg_id(s).
237 247 """
238 248 block = block if block is not None else self.block
239 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
249 return self.client.abort(jobs=jobs, targets=self._targets, block=block)
240 250
241 251 def queue_status(self, verbose=False):
242 252 """Fetch the Queue status of my engines"""
243 return self.client.queue_status(targets=self.targets, verbose=verbose)
253 return self.client.queue_status(targets=self._targets, verbose=verbose)
244 254
245 def purge_results(self, msg_ids=[], targets=[]):
255 def purge_results(self, jobs=[], targets=[]):
246 256 """Instruct the controller to forget specific results."""
247 257 if targets is None or targets == 'all':
248 targets = self.targets
249 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
258 targets = self._targets
259 return self.client.purge_results(jobs=jobs, targets=targets)
260
261 @spin_after
262 def get_result(self, indices_or_msg_ids=None):
263 """return one or more results, specified by history index or msg_id.
250 264
265 See client.get_result for details.
266
267 """
268
269 if indices_or_msg_ids is None:
270 indices_or_msg_ids = -1
271 if isinstance(indices_or_msg_ids, int):
272 indices_or_msg_ids = self.history[indices_or_msg_ids]
273 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
274 indices_or_msg_ids = list(indices_or_msg_ids)
275 for i,index in enumerate(indices_or_msg_ids):
276 if isinstance(index, int):
277 indices_or_msg_ids[i] = self.history[index]
278 return self.client.get_result(indices_or_msg_ids)
279
251 280 #-------------------------------------------------------------------
252 281 # Map
253 282 #-------------------------------------------------------------------
@@ -261,7 +290,7 b' class View(HasTraits):'
261 290
262 291 This is equivalent to map(...block=False)
263 292
264 See `map` for details.
293 See `self.map` for details.
265 294 """
266 295 if 'block' in kwargs:
267 296 raise TypeError("map_async doesn't take a `block` keyword argument.")
@@ -273,25 +302,33 b' class View(HasTraits):'
273 302
274 303 This is equivalent to map(...block=True)
275 304
276 See `map` for details.
305 See `self.map` for details.
277 306 """
278 307 if 'block' in kwargs:
279 308 raise TypeError("map_sync doesn't take a `block` keyword argument.")
280 309 kwargs['block'] = True
281 310 return self.map(f,*sequences,**kwargs)
282 311
312 def imap(self, f, *sequences, **kwargs):
313 """Parallel version of `itertools.imap`.
314
315 See `self.map` for details.
316 """
317
318 return iter(self.map_async(f,*sequences, **kwargs))
319
283 320 #-------------------------------------------------------------------
284 321 # Decorators
285 322 #-------------------------------------------------------------------
286 323
287 324 def remote(self, bound=True, block=True):
288 325 """Decorator for making a RemoteFunction"""
289 return remote(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
326 return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
290 327
291 328 def parallel(self, dist='b', bound=True, block=None):
292 329 """Decorator for making a ParallelFunction"""
293 330 block = self.block if block is None else block
294 return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
331 return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
295 332
296 333
297 334 class DirectView(View):
@@ -320,7 +357,9 b' class DirectView(View):'
320 357 @spin_after
321 358 @save_ids
322 359 def map(self, f, *sequences, **kwargs):
323 """Parallel version of builtin `map`, using this View's `targets`.
360 """view.map(f, *sequences, block=self.block, bound=self.bound) => list|AsyncMapResult
361
362 Parallel version of builtin `map`, using this View's `targets`.
324 363
325 364 There will be one task per target, so work will be chunked
326 365 if the sequences are longer than `targets`.
@@ -337,7 +376,7 b' class DirectView(View):'
337 376 block : bool
338 377 whether to wait for the result or not [default self.block]
339 378 bound : bool
340 whether to wait for the result or not [default self.bound]
379 whether to have access to the engines' namespaces [default self.bound]
341 380
342 381 Returns
343 382 -------
@@ -347,7 +386,8 b' class DirectView(View):'
347 386 An object like AsyncResult, but which reassembles the sequence of results
348 387 into a single list. AsyncMapResults can be iterated through before all
349 388 results are complete.
350 else:
389 else:
390 list
351 391 the result of map(f,*sequences)
352 392 """
353 393
@@ -359,18 +399,18 b' class DirectView(View):'
359 399
360 400 assert len(sequences) > 0, "must have some sequences to map onto!"
361 401 pf = ParallelFunction(self.client, f, block=block, bound=bound,
362 targets=self.targets, balanced=False)
402 targets=self._targets, balanced=False)
363 403 return pf.map(*sequences)
364 404
365 405 @sync_results
366 406 @save_ids
367 407 def execute(self, code, block=True):
368 408 """execute some code on my targets."""
369 return self.client.execute(code, block=block, targets=self.targets)
409 return self.client.execute(code, block=block, targets=self._targets)
370 410
371 411 def update(self, ns):
372 412 """update remote namespace with dict `ns`"""
373 return self.client.push(ns, targets=self.targets, block=self.block)
413 return self.client.push(ns, targets=self._targets, block=self.block)
374 414
375 415 push = update
376 416
@@ -379,7 +419,7 b' class DirectView(View):'
379 419 will return one object if it is a key.
380 420 It also takes a list of keys, and will return a list of objects."""
381 421 # block = block if block is not None else self.block
382 return self.client.pull(key_s, block=True, targets=self.targets)
422 return self.client.pull(key_s, block=True, targets=self._targets)
383 423
384 424 @sync_results
385 425 @save_ids
@@ -388,14 +428,14 b' class DirectView(View):'
388 428 will return one object if it is a key.
389 429 It also takes a list of keys, and will return a list of objects."""
390 430 block = block if block is not None else self.block
391 return self.client.pull(key_s, block=block, targets=self.targets)
431 return self.client.pull(key_s, block=block, targets=self._targets)
392 432
393 433 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
394 434 """
395 435 Partition a Python sequence and send the partitions to a set of engines.
396 436 """
397 437 block = block if block is not None else self.block
398 targets = targets if targets is not None else self.targets
438 targets = targets if targets is not None else self._targets
399 439
400 440 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
401 441 targets=targets, block=block)
@@ -407,7 +447,7 b' class DirectView(View):'
407 447 Gather a partitioned sequence on a set of engines as a single local seq.
408 448 """
409 449 block = block if block is not None else self.block
410 targets = targets if targets is not None else self.targets
450 targets = targets if targets is not None else self._targets
411 451
412 452 return self.client.gather(key, dist=dist, targets=targets, block=block)
413 453
@@ -420,12 +460,12 b' class DirectView(View):'
420 460 def clear(self, block=False):
421 461 """Clear the remote namespaces on my engines."""
422 462 block = block if block is not None else self.block
423 return self.client.clear(targets=self.targets, block=block)
463 return self.client.clear(targets=self._targets, block=block)
424 464
425 465 def kill(self, block=True):
426 466 """Kill my engines."""
427 467 block = block if block is not None else self.block
428 return self.client.kill(targets=self.targets, block=block)
468 return self.client.kill(targets=self._targets, block=block)
429 469
430 470 #----------------------------------------
431 471 # activate for %px,%autopx magics
@@ -504,9 +544,9 b' class LoadBalancedView(View):'
504 544 def set_flags(self, **kwargs):
505 545 """set my attribute flags by keyword.
506 546
507 A View is a wrapper for the Client's apply method, but
508 with attributes that specify keyword arguments, those attributes
509 can be set by keyword argument with this method.
547 A View is a wrapper for the Client's apply method, but with attributes
548 that specify keyword arguments, those attributes can be set by keyword
549 argument with this method.
510 550
511 551 Parameters
512 552 ----------
@@ -543,10 +583,15 b' class LoadBalancedView(View):'
543 583 @spin_after
544 584 @save_ids
545 585 def map(self, f, *sequences, **kwargs):
546 """Parallel version of builtin `map`, load-balanced by this View.
586 """view.map(f, *sequences, block=self.block, bound=self.bound, chunk_size=1) => list|AsyncMapResult
587
588 Parallel version of builtin `map`, load-balanced by this View.
547 589
548 Each element will be a separate task, and will be load-balanced. This
549 lets individual elements be available for iteration as soon as they arrive.
590 `block`, `bound`, and `chunk_size` can be specified by keyword only.
591
592 Each `chunk_size` elements will be a separate task, and will be
593 load-balanced. This lets individual elements be available for iteration
594 as soon as they arrive.
550 595
551 596 Parameters
552 597 ----------
@@ -558,7 +603,9 b' class LoadBalancedView(View):'
558 603 block : bool
559 604 whether to wait for the result or not [default self.block]
560 605 bound : bool
561 whether to use the engine's namespace
606 whether to use the engine's namespace [default self.bound]
607 chunk_size : int
608 how many elements should be in each task [default 1]
562 609
563 610 Returns
564 611 -------
@@ -586,7 +633,7 b' class LoadBalancedView(View):'
586 633 assert len(sequences) > 0, "must have some sequences to map onto!"
587 634
588 635 pf = ParallelFunction(self.client, f, block=block, bound=bound,
589 targets=self.targets, balanced=True,
636 targets=self._targets, balanced=True,
590 637 chunk_size=chunk_size)
591 638 return pf.map(*sequences)
592 639
@@ -59,13 +59,24 b' of engine ids:'
59 59
60 60 Here we see that there are four engines ready to do work for us.
61 61
62 For direct execution, we will make use of a :class:`DirectView` object, which can be
63 constructed via list-access to the client:
64
65 .. sourcecode::
66
67 In [4]: dview = rc[:] # use all engines
68
69 .. seealso::
70
71 For more information, see the in-depth explanation of :ref:`Views <parallel_view>`.
72
73
62 74 Quick and easy parallelism
63 75 ==========================
64 76
65 77 In many cases, you simply want to apply a Python function to a sequence of
66 78 objects, but *in parallel*. The client interface provides a simple way
67 of accomplishing this: using the builtin :func:`map` and the ``@remote``
68 function decorator, or the client's :meth:`map` method.
79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
69 80
70 81 Parallel map
71 82 ------------
@@ -79,44 +90,67 b" DirectView's :meth:`map` method:"
79 90 .. sourcecode:: ipython
80 91
81 92 In [62]: serial_result = map(lambda x:x**10, range(32))
93
94 In [63]: dview.block = True
95
96 In [66]: parallel_result = dview.map(lambda x: x**10, range(32))
82 97
83 In [66]: parallel_result = rc[:].map(lambda x: x**10, range(32))
84
85 In [67]: serial_result==parallel_result.get()
98 In [67]: serial_result==parallel_result
86 99 Out[67]: True
87 100
88 101
89 102 .. note::
90 103
91 104 The :class:`DirectView`'s version of :meth:`map` does
92 not do any load balancing. For a load balanced version, use a
105 not do dynamic load balancing. For a load balanced version, use a
93 106 :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
94 107 `balanced=True`.
95 108
96 109 .. seealso::
97 110
98 :meth:`map` is implemented via :class:`.ParallelFunction`.
111 :meth:`map` is implemented via :class:`ParallelFunction`.
99 112
100 Remote function decorator
101 -------------------------
113 Remote function decorators
114 --------------------------
102 115
103 116 Remote functions are just like normal functions, but when they are called,
104 117 they execute on one or more engines, rather than locally. IPython provides
105 some decorators:
118 two decorators:
106 119
107 120 .. sourcecode:: ipython
108 121
109 In [10]: @rc.remote(block=True, targets=0)
110 ....: def f(x):
111 ....: return 10.0*x**4
112 ....:
122 In [10]: @rc.remote(block=True, targets='all')
123 ...: def getpid():
124 ...: import os
125 ...: return os.getpid()
126 ...:
127
128 In [11]: getpid()
129 Out[11]: [12345, 12346, 12347, 12348]
113 130
114 In [11]: map(f, range(32)) # this is done on engine 0
115 Out[11]: [0.0,10.0,160.0,...]
131 A ``@parallel`` decorator creates parallel functions, that break up an element-wise
132 operations and distribute them, reconstructing the result.
133
134 .. sourcecode:: ipython
135
136 In [12]: import numpy as np
137
138 In [13]: A = np.random.random((64,48))
139
140 In [14]: @rc.parallel(block=True, targets='all')
141 ...: def pmul(A,B):
142 ...: return A*B
143
144 In [15]: C_local = A*A
145
146 In [16]: C_remote_partial = pmul(A,A)
147
148 In [17]: (C_local == C_remote).all()
149 Out[17]: True
116 150
117 151 .. seealso::
118 152
119 See the docstring for the :func:`parallel` and :func:`remote` decorators for
153 See the docstrings for the :func:`parallel` and :func:`remote` decorators for
120 154 options.
121 155
122 156 Calling Python functions
@@ -152,7 +186,7 b' the extra arguments. For instance, performing index-access on a client creates a'
152 186 Out[4]: <DirectView [1, 2]>
153 187
154 188 In [5]: view.apply<tab>
155 view.apply view.apply_async view.apply_async_bound view.apply_bound view.apply_sync view.apply_sync_bound
189 view.apply view.apply_async view.apply_async_bound view.apply_sync view.apply_sync_bound
156 190
157 191 A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound`
158 192 and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
@@ -180,8 +214,8 b' blocks until the engines are done executing the command:'
180 214
181 215 .. sourcecode:: ipython
182 216
183 In [2]: rc.block=True
184 In [3]: dview = rc[:] # A DirectView of all engines
217 In [2]: dview = rc[:] # A DirectView of all engines
218 In [3]: dview.block=True
185 219 In [4]: dview['a'] = 5
186 220
187 221 In [5]: dview['b'] = 10
@@ -189,13 +223,13 b' blocks until the engines are done executing the command:'
189 223 In [6]: dview.apply_bound(lambda x: a+b+x, 27)
190 224 Out[6]: [42, 42, 42, 42]
191 225
192 Python commands can be executed on specific engines by calling execute using
193 the ``targets`` keyword argument, or creating a :class:`DirectView` instance
194 by index-access to the client:
226 Python commands can be executed on specific engines by calling execute using the ``targets``
227 keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by
228 index-access to the client:
195 229
196 230 .. sourcecode:: ipython
197 231
198 In [6]: rc[::2].execute('c=a+b') # shorthand for rc.execute('c=a+b',targets=[0,2])
232 In [6]: rc.execute('c=a+b', targets=[0,2])
199 233
200 234 In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
201 235
@@ -214,10 +248,12 b' by index-access to the client:'
214 248 :meth:`View.apply(f,*args,**kwargs)`, which simply calls
215 249 ``f(*args,**kwargs)`` remotely.
216 250
217 This example also shows one of the most important things about the IPython
251 Bound and unbound execution
252 ---------------------------
253
254 The previous example also shows one of the most important things about the IPython
218 255 engines: they have a persistent user namespaces. The :meth:`apply` method can
219 be run in either a bound or unbound way. The default for a View is to be
220 unbound, unless called by the :meth:`apply_bound` method:
256 be run in either a bound or unbound manner:
221 257
222 258 .. sourcecode:: ipython
223 259
@@ -225,10 +261,10 b' unbound, unless called by the :meth:`apply_bound` method:'
225 261
226 262 In [10]: v0 = rc[0]
227 263
228 In [12]: v0.apply_bound(lambda : b)
264 In [12]: v0.apply_sync_bound(lambda : b)
229 265 Out[12]: 5
230 266
231 In [13]: v0.apply(lambda : b)
267 In [13]: v0.apply_sync(lambda : b)
232 268 ---------------------------------------------------------------------------
233 269 RemoteError Traceback (most recent call last)
234 270 /home/you/<ipython-input-34-21a468eb10f0> in <module>()
@@ -244,10 +280,10 b' unbound, unless called by the :meth:`apply_bound` method:'
244 280
245 281
246 282 Specifically, `bound=True` specifies that the engine's namespace is to be used
247 for execution, and `bound=False` specifies that the engine's namespace is not
248 to be used (hence, 'b' is undefined during unbound execution, since the
249 function is called in an empty namespace). Unbound execution is often useful
250 for large numbers of atomic tasks, which prevents bloating the engine's
283 as the `globals` when the function is called, and `bound=False` specifies that
284 the engine's namespace is not to be used (hence, 'b' is undefined during unbound
285 execution, since the function is called in an empty namespace). Unbound execution is
286 often useful for large numbers of atomic tasks, which prevents bloating the engine's
251 287 memory, while bound execution lets you build on your previous work.
252 288
253 289
@@ -257,7 +293,7 b' Non-blocking execution'
257 293 In non-blocking mode, :meth:`apply` submits the command to be executed and
258 294 then returns a :class:`AsyncResult` object immediately. The
259 295 :class:`AsyncResult` object gives you a way of getting a result at a later
260 time through its :meth:`get` method.
296 time through its :meth:`get` method.
261 297
262 298 .. Note::
263 299
@@ -280,25 +316,25 b' local Python/IPython session:'
280 316 ...: return time.time()-tic
281 317
282 318 # In non-blocking mode
283 In [7]: pr = dview.apply_async(wait, 2)
319 In [7]: ar = dview.apply_async(wait, 2)
284 320
285 321 # Now block for the result
286 In [8]: pr.get()
322 In [8]: ar.get()
287 323 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
288 324
289 325 # Again in non-blocking mode
290 In [9]: pr = dview.apply_async(wait, 10)
326 In [9]: ar = dview.apply_async(wait, 10)
291 327
292 328 # Poll to see if the result is ready
293 In [10]: pr.ready()
329 In [10]: ar.ready()
294 330 Out[10]: False
295 331
296 332 # ask for the result, but wait a maximum of 1 second:
297 In [45]: pr.get(1)
333 In [45]: ar.get(1)
298 334 ---------------------------------------------------------------------------
299 335 TimeoutError Traceback (most recent call last)
300 336 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
301 ----> 1 pr.get(1)
337 ----> 1 ar.get(1)
302 338
303 339 /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
304 340 62 raise self._exception
@@ -316,8 +352,8 b' local Python/IPython session:'
316 352
317 353 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
318 354 are done. For this, there is a the method :meth:`barrier`. This method takes a
319 tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the
320 associated results are ready:
355 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
356 and blocks until all of the associated results are ready:
321 357
322 358 .. sourcecode:: ipython
323 359
@@ -338,15 +374,17 b' associated results are ready:'
338 374 The ``block`` keyword argument and attributes
339 375 ---------------------------------------------
340 376
341 Most methods(like :meth:`apply`) accept
377 Most client methods(like :meth:`apply`) accept
342 378 ``block`` as a keyword argument. As we have seen above, these
343 keyword arguments control the blocking mode . The :class:`Client` class also has
379 keyword arguments control the blocking mode. The :class:`Client` class also has
344 380 a :attr:`block` attribute that controls the default behavior when the keyword
345 381 argument is not provided. Thus the following logic is used for :attr:`block`:
346 382
347 383 * If no keyword argument is provided, the instance attributes are used.
348 384 * Keyword argument, if provided override the instance attributes for
349 385 the duration of a single call.
386
387 DirectView objects also have a ``bound`` attribute, which is used in the same way.
350 388
351 389 The following examples demonstrate how to use the instance attributes:
352 390
@@ -365,7 +403,7 b' The following examples demonstrate how to use the instance attributes:'
365 403 In [22]: rc.apply(lambda : 42, targets='all')
366 404 Out[22]: [42, 42, 42, 42]
367 405
368 The :attr:`block` and :attr:`targets` instance attributes of the
406 The :attr:`block`, :attr:`bound`, and :attr:`targets` instance attributes of the
369 407 :class:`.DirectView` also determine the behavior of the parallel magic commands.
370 408
371 409
@@ -381,9 +419,9 b' Parallel magic commands'
381 419 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
382 420 that make it more pleasant to execute Python commands on the engines
383 421 interactively. These are simply shortcuts to :meth:`execute` and
384 :meth:`get_result`. The ``%px`` magic executes a single Python command on the
385 engines specified by the :attr:`targets` attribute of the
386 :class:`MultiEngineClient` instance (by default this is ``'all'``):
422 :meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single
423 Python command on the engines specified by the :attr:`targets` attribute of the
424 :class:`DirectView` instance:
387 425
388 426 .. sourcecode:: ipython
389 427
@@ -399,7 +437,6 b' engines specified by the :attr:`targets` attribute of the'
399 437
400 438 In [26]: %px import numpy
401 439 Parallel execution on engines: [0, 1, 2, 3]
402 Out[26]:[None,None,None,None]
403 440
404 441 In [27]: %px a = numpy.random.rand(2,2)
405 442 Parallel execution on engines: [0, 1, 2, 3]
@@ -408,36 +445,25 b' engines specified by the :attr:`targets` attribute of the'
408 445 Parallel execution on engines: [0, 1, 2, 3]
409 446
410 447 In [28]: dv['ev']
411 Out[44]: [ array([ 1.09522024, -0.09645227]),
448 Out[28]: [ array([ 1.09522024, -0.09645227]),
412 449 array([ 1.21435496, -0.35546712]),
413 450 array([ 0.72180653, 0.07133042]),
414 451 array([ 1.46384341e+00, 1.04353244e-04])
415 452 ]
416 453
417 .. Note::
418
419 ``%result`` doesn't work
420
421 The ``%result`` magic gets and prints the stdin/stdout/stderr of the last
422 command executed on each engine. It is simply a shortcut to the
454 The ``%result`` magic gets the most recent result, or takes an argument
455 specifying the index of the result to be requested. It is simply a shortcut to the
423 456 :meth:`get_result` method:
424 457
425 458 .. sourcecode:: ipython
426
427 In [29]: %result
428 Out[29]:
429 <Results List>
430 [0] In [10]: print numpy.linalg.eigvals(a)
431 [0] Out[10]: [ 1.28167017 0.14197338]
432
433 [1] In [9]: print numpy.linalg.eigvals(a)
434 [1] Out[9]: [-0.14093616 1.27877273]
435
436 [2] In [10]: print numpy.linalg.eigvals(a)
437 [2] Out[10]: [-0.37023573 1.06779409]
438
439 [3] In [9]: print numpy.linalg.eigvals(a)
440 [3] Out[9]: [ 0.83664764 -0.25602658]
459
460 In [29]: dv.apply_async_bound(lambda : ev)
461
462 In [30]: %result
463 Out[30]: [ [ 1.28167017 0.14197338],
464 [-0.14093616 1.27877273],
465 [-0.37023573 1.06779409],
466 [ 0.83664764 -0.25602658] ]
441 467
442 468 The ``%autopx`` magic switches to a mode where everything you type is executed
443 469 on the engines given by the :attr:`targets` attribute:
@@ -477,12 +503,6 b' on the engines given by the :attr:`targets` attribute:'
477 503 'Average max eigenvalue is: 10.1158837784',]
478 504
479 505
480 .. Note::
481
482 Multiline ``%autpx`` gets fouled up by NameErrors, because IPython
483 currently introspects too much.
484
485
486 506 Moving Python objects around
487 507 ============================
488 508
@@ -524,14 +544,12 b' In non-blocking mode :meth:`push` and :meth:`pull` also return'
524 544
525 545 In [47]: rc.block=False
526 546
527 In [48]: pr = rc.pull('a')
547 In [48]: ar = rc.pull('a')
528 548
529 In [49]: pr.get()
549 In [49]: ar.get()
530 550 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
531 551
532 552
533
534
535 553 Dictionary interface
536 554 --------------------
537 555
@@ -751,9 +769,9 b' All of this same error handling magic even works in non-blocking mode:'
751 769
752 770 In [83]: rc.block=False
753 771
754 In [84]: pr = rc.execute('1/0')
772 In [84]: ar = rc.execute('1/0')
755 773
756 In [85]: pr.get()
774 In [85]: ar.get()
757 775 ---------------------------------------------------------------------------
758 776 CompositeError Traceback (most recent call last)
759 777
@@ -33,7 +33,8 b' Creating a ``Client`` instance'
33 33 ==============================
34 34
35 35 The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
36 module and then create a :class:`.Client` instance:
36 module and then create a :class:`.Client` instance, and we will also be using
37 a :class:`LoadBalancedView`, here called `lview`:
37 38
38 39 .. sourcecode:: ipython
39 40
@@ -41,8 +42,7 b' module and then create a :class:`.Client` instance:'
41 42
42 43 In [2]: rc = client.Client()
43 44
44 In [3]: lview = rc.view(balanced=True)
45 Out[3]: <LoadBalancedView None>
45 In [3]: lview = rc.view()
46 46
47 47
48 48 This form assumes that the controller was started on localhost with default
@@ -73,14 +73,15 b' the task interface.'
73 73 Parallel map
74 74 ------------
75 75
76 To load-balance :meth:`map`,simply use a LoadBalancedView, created by asking
77 for the ``None`` element:
76 To load-balance :meth:`map`,simply use a LoadBalancedView:
78 77
79 78 .. sourcecode:: ipython
80
79
80 In [62]: lview.block = True
81
81 82 In [63]: serial_result = map(lambda x:x**10, range(32))
82 83
83 In [64]: parallel_result = lview.map(lambda x:x**10, range(32), block=True)
84 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
84 85
85 86 In [65]: serial_result==parallel_result
86 87 Out[65]: True
General Comments 0
You need to be logged in to leave comments. Login now