##// END OF EJS Templates
split get_results into get_result/result_status, add AsyncHubResult
MinRK -
Show More
@@ -10,6 +10,8 b''
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import time
14
13 from IPython.external.decorator import decorator
15 from IPython.external.decorator import decorator
14 import error
16 import error
15
17
@@ -189,6 +191,23 b' class AsyncResult(object):'
189 raise AttributeError("%r object has no attribute %r"%(
191 raise AttributeError("%r object has no attribute %r"%(
190 self.__class__.__name__, key))
192 self.__class__.__name__, key))
191 return self.__getitem__(key)
193 return self.__getitem__(key)
194
195 # asynchronous iterator:
196 def __iter__(self):
197 if self._single_result:
198 raise TypeError("AsyncResults with a single result are not iterable.")
199 try:
200 rlist = self.get(0)
201 except error.TimeoutError:
202 # wait for each result individually
203 for msg_id in self.msg_ids:
204 ar = AsyncResult(self._client, msg_id, self._fname)
205 yield ar.get()
206 else:
207 # already done
208 for r in rlist:
209 yield r
210
192
211
193
212
194 class AsyncMapResult(AsyncResult):
213 class AsyncMapResult(AsyncResult):
@@ -227,6 +246,49 b' class AsyncMapResult(AsyncResult):'
227 # already done
246 # already done
228 for r in rlist:
247 for r in rlist:
229 yield r
248 yield r
249
250
251 class AsyncHubResult(AsyncResult):
252 """Class to wrap pending results that must be requested from the Hub"""
230
253
254 def wait(self, timeout=-1):
255 """wait for result to complete."""
256 start = time.time()
257 if self._ready:
258 return
259 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
260 local_ready = self._client.barrier(local_ids, timeout)
261 if local_ready:
262 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
263 if not remote_ids:
264 self._ready = True
265 else:
266 rdict = self._client.result_status(remote_ids, status_only=False)
267 pending = rdict['pending']
268 while pending and time.time() < start+timeout:
269 rdict = self._client.result_status(remote_ids, status_only=False)
270 pending = rdict['pending']
271 if pending:
272 time.sleep(0.1)
273 if not pending:
274 self._ready = True
275 if self._ready:
276 try:
277 results = map(self._client.results.get, self.msg_ids)
278 self._result = results
279 if self._single_result:
280 r = results[0]
281 if isinstance(r, Exception):
282 raise r
283 else:
284 results = error.collect_exceptions(results, self._fname)
285 self._result = self._reconstruct_result(results)
286 except Exception, e:
287 self._exception = e
288 self._success = False
289 else:
290 self._success = True
291 finally:
292 self._metadata = map(self._client.metadata.get, self.msg_ids)
231
293
232 __all__ = ['AsyncResult', 'AsyncMapResult'] No newline at end of file
294 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
@@ -32,7 +32,7 b' from IPython.external.ssh import tunnel'
32 import error
32 import error
33 import map as Map
33 import map as Map
34 import streamsession as ss
34 import streamsession as ss
35 from asyncresult import AsyncResult, AsyncMapResult
35 from asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult
36 from clusterdir import ClusterDir, ClusterDirError
36 from clusterdir import ClusterDir, ClusterDirError
37 from dependency import Dependency, depend, require, dependent
37 from dependency import Dependency, depend, require, dependent
38 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
38 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
@@ -485,6 +485,15 b' class Client(HasTraits):'
485 # handlers and callbacks for incoming messages
485 # handlers and callbacks for incoming messages
486 #--------------------------------------------------------------------------
486 #--------------------------------------------------------------------------
487
487
488 def _unwrap_exception(self, content):
489 """unwrap exception, and remap engineid to int."""
490 e = ss.unwrap_exception(content)
491 if e.engine_info:
492 e_uuid = e.engine_info['engineid']
493 eid = self._engines[e_uuid]
494 e.engine_info['engineid'] = eid
495 return e
496
488 def _register_engine(self, msg):
497 def _register_engine(self, msg):
489 """Register a new engine, and update our connection info."""
498 """Register a new engine, and update our connection info."""
490 content = msg['content']
499 content = msg['content']
@@ -537,7 +546,7 b' class Client(HasTraits):'
537 print ("got unknown result: %s"%msg_id)
546 print ("got unknown result: %s"%msg_id)
538 else:
547 else:
539 self.outstanding.remove(msg_id)
548 self.outstanding.remove(msg_id)
540 self.results[msg_id] = ss.unwrap_exception(msg['content'])
549 self.results[msg_id] = self._unwrap_exception(msg['content'])
541
550
542 def _handle_apply_reply(self, msg):
551 def _handle_apply_reply(self, msg):
543 """Save the reply to an apply_request into our results."""
552 """Save the reply to an apply_request into our results."""
@@ -569,12 +578,7 b' class Client(HasTraits):'
569 # TODO: handle resubmission
578 # TODO: handle resubmission
570 pass
579 pass
571 else:
580 else:
572 e = ss.unwrap_exception(content)
581 self.results[msg_id] = self._unwrap_exception(content)
573 if e.engine_info:
574 e_uuid = e.engine_info['engineid']
575 eid = self._engines[e_uuid]
576 e.engine_info['engineid'] = eid
577 self.results[msg_id] = e
578
582
579 def _flush_notifications(self):
583 def _flush_notifications(self):
580 """Flush notifications of engine registrations waiting
584 """Flush notifications of engine registrations waiting
@@ -641,7 +645,7 b' class Client(HasTraits):'
641 s = md[name] or ''
645 s = md[name] or ''
642 md[name] = s + content['data']
646 md[name] = s + content['data']
643 elif msg_type == 'pyerr':
647 elif msg_type == 'pyerr':
644 md.update({'pyerr' : ss.unwrap_exception(content)})
648 md.update({'pyerr' : self._unwrap_exception(content)})
645 else:
649 else:
646 md.update({msg_type : content['data']})
650 md.update({msg_type : content['data']})
647
651
@@ -685,13 +689,13 b' class Client(HasTraits):'
685 if self._iopub_socket:
689 if self._iopub_socket:
686 self._flush_iopub(self._iopub_socket)
690 self._flush_iopub(self._iopub_socket)
687
691
688 def barrier(self, msg_ids=None, timeout=-1):
692 def barrier(self, jobs=None, timeout=-1):
689 """waits on one or more `msg_ids`, for up to `timeout` seconds.
693 """waits on one or more `jobs`, for up to `timeout` seconds.
690
694
691 Parameters
695 Parameters
692 ----------
696 ----------
693
697
694 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
698 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
695 ints are indices to self.history
699 ints are indices to self.history
696 strs are msg_ids
700 strs are msg_ids
697 default: wait on all outstanding messages
701 default: wait on all outstanding messages
@@ -706,19 +710,20 b' class Client(HasTraits):'
706 False : timeout reached, some msg_ids still outstanding
710 False : timeout reached, some msg_ids still outstanding
707 """
711 """
708 tic = time.time()
712 tic = time.time()
709 if msg_ids is None:
713 if jobs is None:
710 theids = self.outstanding
714 theids = self.outstanding
711 else:
715 else:
712 if isinstance(msg_ids, (int, str, AsyncResult)):
716 if isinstance(jobs, (int, str, AsyncResult)):
713 msg_ids = [msg_ids]
717 jobs = [jobs]
714 theids = set()
718 theids = set()
715 for msg_id in msg_ids:
719 for job in jobs:
716 if isinstance(msg_id, int):
720 if isinstance(job, int):
717 msg_id = self.history[msg_id]
721 # index access
718 elif isinstance(msg_id, AsyncResult):
722 job = self.history[job]
719 map(theids.add, msg_id.msg_ids)
723 elif isinstance(job, AsyncResult):
724 map(theids.add, job.msg_ids)
720 continue
725 continue
721 theids.add(msg_id)
726 theids.add(job)
722 if not theids.intersection(self.outstanding):
727 if not theids.intersection(self.outstanding):
723 return True
728 return True
724 self.spin()
729 self.spin()
@@ -747,18 +752,39 b' class Client(HasTraits):'
747 if self.debug:
752 if self.debug:
748 pprint(msg)
753 pprint(msg)
749 if msg['content']['status'] != 'ok':
754 if msg['content']['status'] != 'ok':
750 error = ss.unwrap_exception(msg['content'])
755 error = self._unwrap_exception(msg['content'])
751 if error:
756 if error:
752 return error
757 return error
753
758
754
759
755 @spinfirst
760 @spinfirst
756 @defaultblock
761 @defaultblock
757 def abort(self, msg_ids = None, targets=None, block=None):
762 def abort(self, jobs=None, targets=None, block=None):
758 """Abort the execution queues of target(s)."""
763 """Abort specific jobs from the execution queues of target(s).
764
765 This is a mechanism to prevent jobs that have already been submitted
766 from executing.
767
768 Parameters
769 ----------
770
771 jobs : msg_id, list of msg_ids, or AsyncResult
772 The jobs to be aborted
773
774
775 """
759 targets = self._build_targets(targets)[0]
776 targets = self._build_targets(targets)[0]
760 if isinstance(msg_ids, basestring):
777 msg_ids = []
761 msg_ids = [msg_ids]
778 if isinstance(jobs, (basestring,AsyncResult)):
779 jobs = [jobs]
780 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
781 if bad_ids:
782 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
783 for j in jobs:
784 if isinstance(j, AsyncResult):
785 msg_ids.extend(j.msg_ids)
786 else:
787 msg_ids.append(j)
762 content = dict(msg_ids=msg_ids)
788 content = dict(msg_ids=msg_ids)
763 for t in targets:
789 for t in targets:
764 self.session.send(self._control_socket, 'abort_request',
790 self.session.send(self._control_socket, 'abort_request',
@@ -770,7 +796,7 b' class Client(HasTraits):'
770 if self.debug:
796 if self.debug:
771 pprint(msg)
797 pprint(msg)
772 if msg['content']['status'] != 'ok':
798 if msg['content']['status'] != 'ok':
773 error = ss.unwrap_exception(msg['content'])
799 error = self._unwrap_exception(msg['content'])
774 if error:
800 if error:
775 return error
801 return error
776
802
@@ -791,7 +817,7 b' class Client(HasTraits):'
791 if self.debug:
817 if self.debug:
792 pprint(msg)
818 pprint(msg)
793 if msg['content']['status'] != 'ok':
819 if msg['content']['status'] != 'ok':
794 error = ss.unwrap_exception(msg['content'])
820 error = self._unwrap_exception(msg['content'])
795
821
796 if controller:
822 if controller:
797 time.sleep(0.25)
823 time.sleep(0.25)
@@ -800,7 +826,7 b' class Client(HasTraits):'
800 if self.debug:
826 if self.debug:
801 pprint(msg)
827 pprint(msg)
802 if msg['content']['status'] != 'ok':
828 if msg['content']['status'] != 'ok':
803 error = ss.unwrap_exception(msg['content'])
829 error = self._unwrap_exception(msg['content'])
804
830
805 if error:
831 if error:
806 raise error
832 raise error
@@ -827,8 +853,9 b' class Client(HasTraits):'
827 whether or not to wait until done to return
853 whether or not to wait until done to return
828 default: self.block
854 default: self.block
829 """
855 """
830 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False)
856 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
831 return result
857 if not block:
858 return result
832
859
833 def run(self, filename, targets='all', block=None):
860 def run(self, filename, targets='all', block=None):
834 """Execute contents of `filename` on engine(s).
861 """Execute contents of `filename` on engine(s).
@@ -1134,6 +1161,8 b' class Client(HasTraits):'
1134 targets = slice(None)
1161 targets = slice(None)
1135
1162
1136 if isinstance(targets, int):
1163 if isinstance(targets, int):
1164 if targets < 0:
1165 targets = self.ids[targets]
1137 if targets not in self.ids:
1166 if targets not in self.ids:
1138 raise IndexError("No such engine: %i"%targets)
1167 raise IndexError("No such engine: %i"%targets)
1139 return self._cache_view(targets, balanced)
1168 return self._cache_view(targets, balanced)
@@ -1159,7 +1188,8 b' class Client(HasTraits):'
1159 if not isinstance(ns, dict):
1188 if not isinstance(ns, dict):
1160 raise TypeError("Must be a dict, not %s"%type(ns))
1189 raise TypeError("Must be a dict, not %s"%type(ns))
1161 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
1190 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
1162 return result
1191 if not block:
1192 return result
1163
1193
1164 @defaultblock
1194 @defaultblock
1165 def pull(self, keys, targets='all', block=None):
1195 def pull(self, keys, targets='all', block=None):
@@ -1191,7 +1221,7 b' class Client(HasTraits):'
1191 msg_ids.extend(r.msg_ids)
1221 msg_ids.extend(r.msg_ids)
1192 r = AsyncResult(self, msg_ids, fname='scatter')
1222 r = AsyncResult(self, msg_ids, fname='scatter')
1193 if block:
1223 if block:
1194 return r.get()
1224 r.get()
1195 else:
1225 else:
1196 return r
1226 return r
1197
1227
@@ -1218,33 +1248,104 b' class Client(HasTraits):'
1218 #--------------------------------------------------------------------------
1248 #--------------------------------------------------------------------------
1219
1249
1220 @spinfirst
1250 @spinfirst
1221 def get_results(self, msg_ids, status_only=False):
1251 @defaultblock
1222 """Returns the result of the execute or task request with `msg_ids`.
1252 def get_result(self, indices_or_msg_ids=None, block=None):
1253 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1254
1255 If the client already has the results, no request to the Hub will be made.
1256
1257 This is a convenient way to construct AsyncResult objects, which are wrappers
1258 that include metadata about execution, and allow for awaiting results that
1259 were not submitted by this Client.
1260
1261 It can also be a convenient way to retrieve the metadata associated with
1262 blocking execution, since it always retrieves
1263
1264 Examples
1265 --------
1266 ::
1267
1268 In [10]: r = client.apply()
1223
1269
1224 Parameters
1270 Parameters
1225 ----------
1271 ----------
1226
1272
1227 msg_ids : list of ints or msg_ids
1273 indices_or_msg_ids : integer history index, str msg_id, or list of either
1274 The indices or msg_ids of indices to be retrieved
1275
1276 block : bool
1277 Whether to wait for the result to be done
1278
1279 Returns
1280 -------
1281
1282 AsyncResult
1283 A single AsyncResult object will always be returned.
1284
1285 AsyncHubResult
1286 A subclass of AsyncResult that retrieves results from the Hub
1287
1288 """
1289 if indices_or_msg_ids is None:
1290 indices_or_msg_ids = -1
1291
1292 if not isinstance(indices_or_msg_ids, (list,tuple)):
1293 indices_or_msg_ids = [indices_or_msg_ids]
1294
1295 theids = []
1296 for id in indices_or_msg_ids:
1297 if isinstance(id, int):
1298 id = self.history[id]
1299 if not isinstance(id, str):
1300 raise TypeError("indices must be str or int, not %r"%id)
1301 theids.append(id)
1302
1303 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1304 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1305
1306 if remote_ids:
1307 ar = AsyncHubResult(self, msg_ids=theids)
1308 else:
1309 ar = AsyncResult(self, msg_ids=theids)
1310
1311 if block:
1312 ar.wait()
1313
1314 return ar
1315
1316 @spinfirst
1317 def result_status(self, msg_ids, status_only=True):
1318 """Check on the status of the result(s) of the apply request with `msg_ids`.
1319
1320 If status_only is False, then the actual results will be retrieved, else
1321 only the status of the results will be checked.
1322
1323 Parameters
1324 ----------
1325
1326 msg_ids : list of msg_ids
1228 if int:
1327 if int:
1229 Passed as index to self.history for convenience.
1328 Passed as index to self.history for convenience.
1230 status_only : bool (default: False)
1329 status_only : bool (default: True)
1231 if False:
1330 if False:
1232 return the actual results
1331 Retrieve the actual results of completed tasks.
1233
1332
1234 Returns
1333 Returns
1235 -------
1334 -------
1236
1335
1237 results : dict
1336 results : dict
1238 There will always be the keys 'pending' and 'completed', which will
1337 There will always be the keys 'pending' and 'completed', which will
1239 be lists of msg_ids.
1338 be lists of msg_ids that are incomplete or complete. If `status_only`
1339 is False, then completed results will be keyed by their `msg_id`.
1240 """
1340 """
1241 if not isinstance(msg_ids, (list,tuple)):
1341 if not isinstance(indices_or_msg_ids, (list,tuple)):
1242 msg_ids = [msg_ids]
1342 indices_or_msg_ids = [indices_or_msg_ids]
1343
1243 theids = []
1344 theids = []
1244 for msg_id in msg_ids:
1345 for msg_id in indices_or_msg_ids:
1245 if isinstance(msg_id, int):
1346 if isinstance(msg_id, int):
1246 msg_id = self.history[msg_id]
1347 msg_id = self.history[msg_id]
1247 if not isinstance(msg_id, str):
1348 if not isinstance(msg_id, basestring):
1248 raise TypeError("msg_ids must be str, not %r"%msg_id)
1349 raise TypeError("msg_ids must be str, not %r"%msg_id)
1249 theids.append(msg_id)
1350 theids.append(msg_id)
1250
1351
@@ -1252,7 +1353,7 b' class Client(HasTraits):'
1252 local_results = {}
1353 local_results = {}
1253
1354
1254 # comment this block out to temporarily disable local shortcut:
1355 # comment this block out to temporarily disable local shortcut:
1255 for msg_id in list(theids):
1356 for msg_id in theids:
1256 if msg_id in self.results:
1357 if msg_id in self.results:
1257 completed.append(msg_id)
1358 completed.append(msg_id)
1258 local_results[msg_id] = self.results[msg_id]
1359 local_results[msg_id] = self.results[msg_id]
@@ -1267,7 +1368,7 b' class Client(HasTraits):'
1267 pprint(msg)
1368 pprint(msg)
1268 content = msg['content']
1369 content = msg['content']
1269 if content['status'] != 'ok':
1370 if content['status'] != 'ok':
1270 raise ss.unwrap_exception(content)
1371 raise self._unwrap_exception(content)
1271 buffers = msg['buffers']
1372 buffers = msg['buffers']
1272 else:
1373 else:
1273 content = dict(completed=[],pending=[])
1374 content = dict(completed=[],pending=[])
@@ -1298,13 +1399,17 b' class Client(HasTraits):'
1298 if rcontent['status'] == 'ok':
1399 if rcontent['status'] == 'ok':
1299 res,buffers = ss.unserialize_object(buffers)
1400 res,buffers = ss.unserialize_object(buffers)
1300 else:
1401 else:
1301 res = ss.unwrap_exception(rcontent)
1402 print rcontent
1403 res = self._unwrap_exception(rcontent)
1302 failures.append(res)
1404 failures.append(res)
1303
1405
1304 self.results[msg_id] = res
1406 self.results[msg_id] = res
1305 content[msg_id] = res
1407 content[msg_id] = res
1306
1408
1307 error.collect_exceptions(failures, "get_results")
1409 if len(theids) == 1 and failures:
1410 raise failures[0]
1411
1412 error.collect_exceptions(failures, "result_status")
1308 return content
1413 return content
1309
1414
1310 @spinfirst
1415 @spinfirst
@@ -1329,11 +1434,11 b' class Client(HasTraits):'
1329 content = msg['content']
1434 content = msg['content']
1330 status = content.pop('status')
1435 status = content.pop('status')
1331 if status != 'ok':
1436 if status != 'ok':
1332 raise ss.unwrap_exception(content)
1437 raise self._unwrap_exception(content)
1333 return ss.rekey(content)
1438 return ss.rekey(content)
1334
1439
1335 @spinfirst
1440 @spinfirst
1336 def purge_results(self, msg_ids=[], targets=[]):
1441 def purge_results(self, jobs=[], targets=[]):
1337 """Tell the controller to forget results.
1442 """Tell the controller to forget results.
1338
1443
1339 Individual results can be purged by msg_id, or the entire
1444 Individual results can be purged by msg_id, or the entire
@@ -1342,7 +1447,7 b' class Client(HasTraits):'
1342 Parameters
1447 Parameters
1343 ----------
1448 ----------
1344
1449
1345 msg_ids : str or list of strs
1450 jobs : str or list of strs or AsyncResult objects
1346 the msg_ids whose results should be forgotten.
1451 the msg_ids whose results should be forgotten.
1347 targets : int/str/list of ints/strs
1452 targets : int/str/list of ints/strs
1348 The targets, by uuid or int_id, whose entire history is to be purged.
1453 The targets, by uuid or int_id, whose entire history is to be purged.
@@ -1350,10 +1455,24 b' class Client(HasTraits):'
1350
1455
1351 default : None
1456 default : None
1352 """
1457 """
1353 if not targets and not msg_ids:
1458 if not targets and not jobs:
1354 raise ValueError
1459 raise ValueError("Must specify at least one of `targets` and `jobs`")
1355 if targets:
1460 if targets:
1356 targets = self._build_targets(targets)[1]
1461 targets = self._build_targets(targets)[1]
1462
1463 # construct msg_ids from jobs
1464 msg_ids = []
1465 if isinstance(jobs, (basestring,AsyncResult)):
1466 jobs = [jobs]
1467 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1468 if bad_ids:
1469 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1470 for j in jobs:
1471 if isinstance(j, AsyncResult):
1472 msg_ids.extend(j.msg_ids)
1473 else:
1474 msg_ids.append(j)
1475
1357 content = dict(targets=targets, msg_ids=msg_ids)
1476 content = dict(targets=targets, msg_ids=msg_ids)
1358 self.session.send(self._query_socket, "purge_request", content=content)
1477 self.session.send(self._query_socket, "purge_request", content=content)
1359 idents, msg = self.session.recv(self._query_socket, 0)
1478 idents, msg = self.session.recv(self._query_socket, 0)
@@ -1361,7 +1480,7 b' class Client(HasTraits):'
1361 pprint(msg)
1480 pprint(msg)
1362 content = msg['content']
1481 content = msg['content']
1363 if content['status'] != 'ok':
1482 if content['status'] != 'ok':
1364 raise ss.unwrap_exception(content)
1483 raise self._unwrap_exception(content)
1365
1484
1366
1485
1367 __all__ = [ 'Client',
1486 __all__ = [ 'Client',
@@ -128,7 +128,7 b' class ParallelFunction(RemoteFunction):'
128 args = []
128 args = []
129 for seq in sequences:
129 for seq in sequences:
130 part = self.mapObject.getPartition(seq, index, nparts)
130 part = self.mapObject.getPartition(seq, index, nparts)
131 if not part:
131 if len(part) == 0:
132 continue
132 continue
133 else:
133 else:
134 args.append(part)
134 args.append(part)
@@ -15,7 +15,7 b' from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Insta'
15 from IPython.external.decorator import decorator
15 from IPython.external.decorator import decorator
16 from IPython.zmq.parallel.asyncresult import AsyncResult
16 from IPython.zmq.parallel.asyncresult import AsyncResult
17 from IPython.zmq.parallel.dependency import Dependency
17 from IPython.zmq.parallel.dependency import Dependency
18 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
18 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel, remote
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Decorators
21 # Decorators
@@ -91,6 +91,8 b' class View(HasTraits):'
91 for name in self._default_names:
91 for name in self._default_names:
92 setattr(self, name, getattr(self, name, None))
92 setattr(self, name, getattr(self, name, None))
93
93
94 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
95
94
96
95 def __repr__(self):
97 def __repr__(self):
96 strtargets = str(self._targets)
98 strtargets = str(self._targets)
@@ -106,9 +108,17 b' class View(HasTraits):'
106 def targets(self, value):
108 def targets(self, value):
107 raise AttributeError("Cannot set View `targets` after construction!")
109 raise AttributeError("Cannot set View `targets` after construction!")
108
110
111 @property
112 def balanced(self):
113 return self._balanced
114
115 @balanced.setter
116 def balanced(self, value):
117 raise AttributeError("Cannot set View `balanced` after construction!")
118
109 def _defaults(self, *excludes):
119 def _defaults(self, *excludes):
110 """return dict of our default attributes, excluding names given."""
120 """return dict of our default attributes, excluding names given."""
111 d = dict(balanced=self._balanced, targets=self.targets)
121 d = dict(balanced=self._balanced, targets=self._targets)
112 for name in self._default_names:
122 for name in self._default_names:
113 if name not in excludes:
123 if name not in excludes:
114 d[name] = getattr(self, name)
124 d[name] = getattr(self, name)
@@ -182,22 +192,22 b' class View(HasTraits):'
182 d = self._defaults('block', 'bound')
192 d = self._defaults('block', 'bound')
183 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
193 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
184
194
185 @sync_results
195 # @sync_results
186 @save_ids
196 # @save_ids
187 def apply_bound(self, f, *args, **kwargs):
197 # def apply_bound(self, f, *args, **kwargs):
188 """calls f(*args, **kwargs) bound to engine namespace(s).
198 # """calls f(*args, **kwargs) bound to engine namespace(s).
189
199 #
190 if self.block is False:
200 # if self.block is False:
191 returns msg_id
201 # returns msg_id
192 else:
202 # else:
193 returns actual result of f(*args, **kwargs)
203 # returns actual result of f(*args, **kwargs)
194
204 #
195 This method has access to the targets' globals
205 # This method has access to the targets' namespace via globals()
196
206 #
197 """
207 # """
198 d = self._defaults('bound')
208 # d = self._defaults('bound')
199 return self.client.apply(f, args, kwargs, bound=True, **d)
209 # return self.client.apply(f, args, kwargs, bound=True, **d)
200
210 #
201 @sync_results
211 @sync_results
202 @save_ids
212 @save_ids
203 def apply_async_bound(self, f, *args, **kwargs):
213 def apply_async_bound(self, f, *args, **kwargs):
@@ -206,7 +216,7 b' class View(HasTraits):'
206
216
207 returns: msg_id
217 returns: msg_id
208
218
209 This method has access to the targets' globals
219 This method has access to the targets' namespace via globals()
210
220
211 """
221 """
212 d = self._defaults('block', 'bound')
222 d = self._defaults('block', 'bound')
@@ -219,35 +229,54 b' class View(HasTraits):'
219
229
220 returns: actual result of f(*args, **kwargs)
230 returns: actual result of f(*args, **kwargs)
221
231
222 This method has access to the targets' globals
232 This method has access to the targets' namespace via globals()
223
233
224 """
234 """
225 d = self._defaults('block', 'bound')
235 d = self._defaults('block', 'bound')
226 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
236 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
227
237
228 def abort(self, msg_ids=None, block=None):
238 def abort(self, jobs=None, block=None):
229 """Abort jobs on my engines.
239 """Abort jobs on my engines.
230
240
231 Parameters
241 Parameters
232 ----------
242 ----------
233
243
234 msg_ids : None, str, list of strs, optional
244 jobs : None, str, list of strs, optional
235 if None: abort all jobs.
245 if None: abort all jobs.
236 else: abort specific msg_id(s).
246 else: abort specific msg_id(s).
237 """
247 """
238 block = block if block is not None else self.block
248 block = block if block is not None else self.block
239 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
249 return self.client.abort(jobs=jobs, targets=self._targets, block=block)
240
250
241 def queue_status(self, verbose=False):
251 def queue_status(self, verbose=False):
242 """Fetch the Queue status of my engines"""
252 """Fetch the Queue status of my engines"""
243 return self.client.queue_status(targets=self.targets, verbose=verbose)
253 return self.client.queue_status(targets=self._targets, verbose=verbose)
244
254
245 def purge_results(self, msg_ids=[], targets=[]):
255 def purge_results(self, jobs=[], targets=[]):
246 """Instruct the controller to forget specific results."""
256 """Instruct the controller to forget specific results."""
247 if targets is None or targets == 'all':
257 if targets is None or targets == 'all':
248 targets = self.targets
258 targets = self._targets
249 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
259 return self.client.purge_results(jobs=jobs, targets=targets)
260
261 @spin_after
262 def get_result(self, indices_or_msg_ids=None):
263 """return one or more results, specified by history index or msg_id.
250
264
265 See client.get_result for details.
266
267 """
268
269 if indices_or_msg_ids is None:
270 indices_or_msg_ids = -1
271 if isinstance(indices_or_msg_ids, int):
272 indices_or_msg_ids = self.history[indices_or_msg_ids]
273 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
274 indices_or_msg_ids = list(indices_or_msg_ids)
275 for i,index in enumerate(indices_or_msg_ids):
276 if isinstance(index, int):
277 indices_or_msg_ids[i] = self.history[index]
278 return self.client.get_result(indices_or_msg_ids)
279
251 #-------------------------------------------------------------------
280 #-------------------------------------------------------------------
252 # Map
281 # Map
253 #-------------------------------------------------------------------
282 #-------------------------------------------------------------------
@@ -261,7 +290,7 b' class View(HasTraits):'
261
290
262 This is equivalent to map(...block=False)
291 This is equivalent to map(...block=False)
263
292
264 See `map` for details.
293 See `self.map` for details.
265 """
294 """
266 if 'block' in kwargs:
295 if 'block' in kwargs:
267 raise TypeError("map_async doesn't take a `block` keyword argument.")
296 raise TypeError("map_async doesn't take a `block` keyword argument.")
@@ -273,25 +302,33 b' class View(HasTraits):'
273
302
274 This is equivalent to map(...block=True)
303 This is equivalent to map(...block=True)
275
304
276 See `map` for details.
305 See `self.map` for details.
277 """
306 """
278 if 'block' in kwargs:
307 if 'block' in kwargs:
279 raise TypeError("map_sync doesn't take a `block` keyword argument.")
308 raise TypeError("map_sync doesn't take a `block` keyword argument.")
280 kwargs['block'] = True
309 kwargs['block'] = True
281 return self.map(f,*sequences,**kwargs)
310 return self.map(f,*sequences,**kwargs)
282
311
312 def imap(self, f, *sequences, **kwargs):
313 """Parallel version of `itertools.imap`.
314
315 See `self.map` for details.
316 """
317
318 return iter(self.map_async(f,*sequences, **kwargs))
319
283 #-------------------------------------------------------------------
320 #-------------------------------------------------------------------
284 # Decorators
321 # Decorators
285 #-------------------------------------------------------------------
322 #-------------------------------------------------------------------
286
323
287 def remote(self, bound=True, block=True):
324 def remote(self, bound=True, block=True):
288 """Decorator for making a RemoteFunction"""
325 """Decorator for making a RemoteFunction"""
289 return remote(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
326 return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
290
327
291 def parallel(self, dist='b', bound=True, block=None):
328 def parallel(self, dist='b', bound=True, block=None):
292 """Decorator for making a ParallelFunction"""
329 """Decorator for making a ParallelFunction"""
293 block = self.block if block is None else block
330 block = self.block if block is None else block
294 return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
331 return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
295
332
296
333
297 class DirectView(View):
334 class DirectView(View):
@@ -320,7 +357,9 b' class DirectView(View):'
320 @spin_after
357 @spin_after
321 @save_ids
358 @save_ids
322 def map(self, f, *sequences, **kwargs):
359 def map(self, f, *sequences, **kwargs):
323 """Parallel version of builtin `map`, using this View's `targets`.
360 """view.map(f, *sequences, block=self.block, bound=self.bound) => list|AsyncMapResult
361
362 Parallel version of builtin `map`, using this View's `targets`.
324
363
325 There will be one task per target, so work will be chunked
364 There will be one task per target, so work will be chunked
326 if the sequences are longer than `targets`.
365 if the sequences are longer than `targets`.
@@ -337,7 +376,7 b' class DirectView(View):'
337 block : bool
376 block : bool
338 whether to wait for the result or not [default self.block]
377 whether to wait for the result or not [default self.block]
339 bound : bool
378 bound : bool
340 whether to wait for the result or not [default self.bound]
379 whether to have access to the engines' namespaces [default self.bound]
341
380
342 Returns
381 Returns
343 -------
382 -------
@@ -347,7 +386,8 b' class DirectView(View):'
347 An object like AsyncResult, but which reassembles the sequence of results
386 An object like AsyncResult, but which reassembles the sequence of results
348 into a single list. AsyncMapResults can be iterated through before all
387 into a single list. AsyncMapResults can be iterated through before all
349 results are complete.
388 results are complete.
350 else:
389 else:
390 list
351 the result of map(f,*sequences)
391 the result of map(f,*sequences)
352 """
392 """
353
393
@@ -359,18 +399,18 b' class DirectView(View):'
359
399
360 assert len(sequences) > 0, "must have some sequences to map onto!"
400 assert len(sequences) > 0, "must have some sequences to map onto!"
361 pf = ParallelFunction(self.client, f, block=block, bound=bound,
401 pf = ParallelFunction(self.client, f, block=block, bound=bound,
362 targets=self.targets, balanced=False)
402 targets=self._targets, balanced=False)
363 return pf.map(*sequences)
403 return pf.map(*sequences)
364
404
365 @sync_results
405 @sync_results
366 @save_ids
406 @save_ids
367 def execute(self, code, block=True):
407 def execute(self, code, block=True):
368 """execute some code on my targets."""
408 """execute some code on my targets."""
369 return self.client.execute(code, block=block, targets=self.targets)
409 return self.client.execute(code, block=block, targets=self._targets)
370
410
371 def update(self, ns):
411 def update(self, ns):
372 """update remote namespace with dict `ns`"""
412 """update remote namespace with dict `ns`"""
373 return self.client.push(ns, targets=self.targets, block=self.block)
413 return self.client.push(ns, targets=self._targets, block=self.block)
374
414
375 push = update
415 push = update
376
416
@@ -379,7 +419,7 b' class DirectView(View):'
379 will return one object if it is a key.
419 will return one object if it is a key.
380 It also takes a list of keys, and will return a list of objects."""
420 It also takes a list of keys, and will return a list of objects."""
381 # block = block if block is not None else self.block
421 # block = block if block is not None else self.block
382 return self.client.pull(key_s, block=True, targets=self.targets)
422 return self.client.pull(key_s, block=True, targets=self._targets)
383
423
384 @sync_results
424 @sync_results
385 @save_ids
425 @save_ids
@@ -388,14 +428,14 b' class DirectView(View):'
388 will return one object if it is a key.
428 will return one object if it is a key.
389 It also takes a list of keys, and will return a list of objects."""
429 It also takes a list of keys, and will return a list of objects."""
390 block = block if block is not None else self.block
430 block = block if block is not None else self.block
391 return self.client.pull(key_s, block=block, targets=self.targets)
431 return self.client.pull(key_s, block=block, targets=self._targets)
392
432
393 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
433 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
394 """
434 """
395 Partition a Python sequence and send the partitions to a set of engines.
435 Partition a Python sequence and send the partitions to a set of engines.
396 """
436 """
397 block = block if block is not None else self.block
437 block = block if block is not None else self.block
398 targets = targets if targets is not None else self.targets
438 targets = targets if targets is not None else self._targets
399
439
400 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
440 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
401 targets=targets, block=block)
441 targets=targets, block=block)
@@ -407,7 +447,7 b' class DirectView(View):'
407 Gather a partitioned sequence on a set of engines as a single local seq.
447 Gather a partitioned sequence on a set of engines as a single local seq.
408 """
448 """
409 block = block if block is not None else self.block
449 block = block if block is not None else self.block
410 targets = targets if targets is not None else self.targets
450 targets = targets if targets is not None else self._targets
411
451
412 return self.client.gather(key, dist=dist, targets=targets, block=block)
452 return self.client.gather(key, dist=dist, targets=targets, block=block)
413
453
@@ -420,12 +460,12 b' class DirectView(View):'
420 def clear(self, block=False):
460 def clear(self, block=False):
421 """Clear the remote namespaces on my engines."""
461 """Clear the remote namespaces on my engines."""
422 block = block if block is not None else self.block
462 block = block if block is not None else self.block
423 return self.client.clear(targets=self.targets, block=block)
463 return self.client.clear(targets=self._targets, block=block)
424
464
425 def kill(self, block=True):
465 def kill(self, block=True):
426 """Kill my engines."""
466 """Kill my engines."""
427 block = block if block is not None else self.block
467 block = block if block is not None else self.block
428 return self.client.kill(targets=self.targets, block=block)
468 return self.client.kill(targets=self._targets, block=block)
429
469
430 #----------------------------------------
470 #----------------------------------------
431 # activate for %px,%autopx magics
471 # activate for %px,%autopx magics
@@ -504,9 +544,9 b' class LoadBalancedView(View):'
504 def set_flags(self, **kwargs):
544 def set_flags(self, **kwargs):
505 """set my attribute flags by keyword.
545 """set my attribute flags by keyword.
506
546
507 A View is a wrapper for the Client's apply method, but
547 A View is a wrapper for the Client's apply method, but with attributes
508 with attributes that specify keyword arguments, those attributes
548 that specify keyword arguments, those attributes can be set by keyword
509 can be set by keyword argument with this method.
549 argument with this method.
510
550
511 Parameters
551 Parameters
512 ----------
552 ----------
@@ -543,10 +583,15 b' class LoadBalancedView(View):'
543 @spin_after
583 @spin_after
544 @save_ids
584 @save_ids
545 def map(self, f, *sequences, **kwargs):
585 def map(self, f, *sequences, **kwargs):
546 """Parallel version of builtin `map`, load-balanced by this View.
586 """view.map(f, *sequences, block=self.block, bound=self.bound, chunk_size=1) => list|AsyncMapResult
587
588 Parallel version of builtin `map`, load-balanced by this View.
547
589
548 Each element will be a separate task, and will be load-balanced. This
590 `block`, `bound`, and `chunk_size` can be specified by keyword only.
549 lets individual elements be available for iteration as soon as they arrive.
591
592 Each `chunk_size` elements will be a separate task, and will be
593 load-balanced. This lets individual elements be available for iteration
594 as soon as they arrive.
550
595
551 Parameters
596 Parameters
552 ----------
597 ----------
@@ -558,7 +603,9 b' class LoadBalancedView(View):'
558 block : bool
603 block : bool
559 whether to wait for the result or not [default self.block]
604 whether to wait for the result or not [default self.block]
560 bound : bool
605 bound : bool
561 whether to use the engine's namespace
606 whether to use the engine's namespace [default self.bound]
607 chunk_size : int
608 how many elements should be in each task [default 1]
562
609
563 Returns
610 Returns
564 -------
611 -------
@@ -586,7 +633,7 b' class LoadBalancedView(View):'
586 assert len(sequences) > 0, "must have some sequences to map onto!"
633 assert len(sequences) > 0, "must have some sequences to map onto!"
587
634
588 pf = ParallelFunction(self.client, f, block=block, bound=bound,
635 pf = ParallelFunction(self.client, f, block=block, bound=bound,
589 targets=self.targets, balanced=True,
636 targets=self._targets, balanced=True,
590 chunk_size=chunk_size)
637 chunk_size=chunk_size)
591 return pf.map(*sequences)
638 return pf.map(*sequences)
592
639
@@ -59,13 +59,24 b' of engine ids:'
59
59
60 Here we see that there are four engines ready to do work for us.
60 Here we see that there are four engines ready to do work for us.
61
61
62 For direct execution, we will make use of a :class:`DirectView` object, which can be
63 constructed via list-access to the client:
64
65 .. sourcecode::
66
67 In [4]: dview = rc[:] # use all engines
68
69 .. seealso::
70
71 For more information, see the in-depth explanation of :ref:`Views <parallel_view>`.
72
73
62 Quick and easy parallelism
74 Quick and easy parallelism
63 ==========================
75 ==========================
64
76
65 In many cases, you simply want to apply a Python function to a sequence of
77 In many cases, you simply want to apply a Python function to a sequence of
66 objects, but *in parallel*. The client interface provides a simple way
78 objects, but *in parallel*. The client interface provides a simple way
67 of accomplishing this: using the builtin :func:`map` and the ``@remote``
79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
68 function decorator, or the client's :meth:`map` method.
69
80
70 Parallel map
81 Parallel map
71 ------------
82 ------------
@@ -79,44 +90,67 b" DirectView's :meth:`map` method:"
79 .. sourcecode:: ipython
90 .. sourcecode:: ipython
80
91
81 In [62]: serial_result = map(lambda x:x**10, range(32))
92 In [62]: serial_result = map(lambda x:x**10, range(32))
93
94 In [63]: dview.block = True
95
96 In [66]: parallel_result = dview.map(lambda x: x**10, range(32))
82
97
83 In [66]: parallel_result = rc[:].map(lambda x: x**10, range(32))
98 In [67]: serial_result==parallel_result
84
85 In [67]: serial_result==parallel_result.get()
86 Out[67]: True
99 Out[67]: True
87
100
88
101
89 .. note::
102 .. note::
90
103
91 The :class:`DirectView`'s version of :meth:`map` does
104 The :class:`DirectView`'s version of :meth:`map` does
92 not do any load balancing. For a load balanced version, use a
105 not do dynamic load balancing. For a load balanced version, use a
93 :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
106 :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
94 `balanced=True`.
107 `balanced=True`.
95
108
96 .. seealso::
109 .. seealso::
97
110
98 :meth:`map` is implemented via :class:`.ParallelFunction`.
111 :meth:`map` is implemented via :class:`ParallelFunction`.
99
112
100 Remote function decorator
113 Remote function decorators
101 -------------------------
114 --------------------------
102
115
103 Remote functions are just like normal functions, but when they are called,
116 Remote functions are just like normal functions, but when they are called,
104 they execute on one or more engines, rather than locally. IPython provides
117 they execute on one or more engines, rather than locally. IPython provides
105 some decorators:
118 two decorators:
106
119
107 .. sourcecode:: ipython
120 .. sourcecode:: ipython
108
121
109 In [10]: @rc.remote(block=True, targets=0)
122 In [10]: @rc.remote(block=True, targets='all')
110 ....: def f(x):
123 ...: def getpid():
111 ....: return 10.0*x**4
124 ...: import os
112 ....:
125 ...: return os.getpid()
126 ...:
127
128 In [11]: getpid()
129 Out[11]: [12345, 12346, 12347, 12348]
113
130
114 In [11]: map(f, range(32)) # this is done on engine 0
131 A ``@parallel`` decorator creates parallel functions, that break up an element-wise
115 Out[11]: [0.0,10.0,160.0,...]
132 operations and distribute them, reconstructing the result.
133
134 .. sourcecode:: ipython
135
136 In [12]: import numpy as np
137
138 In [13]: A = np.random.random((64,48))
139
140 In [14]: @rc.parallel(block=True, targets='all')
141 ...: def pmul(A,B):
142 ...: return A*B
143
144 In [15]: C_local = A*A
145
146 In [16]: C_remote_partial = pmul(A,A)
147
148 In [17]: (C_local == C_remote).all()
149 Out[17]: True
116
150
117 .. seealso::
151 .. seealso::
118
152
119 See the docstring for the :func:`parallel` and :func:`remote` decorators for
153 See the docstrings for the :func:`parallel` and :func:`remote` decorators for
120 options.
154 options.
121
155
122 Calling Python functions
156 Calling Python functions
@@ -152,7 +186,7 b' the extra arguments. For instance, performing index-access on a client creates a'
152 Out[4]: <DirectView [1, 2]>
186 Out[4]: <DirectView [1, 2]>
153
187
154 In [5]: view.apply<tab>
188 In [5]: view.apply<tab>
155 view.apply view.apply_async view.apply_async_bound view.apply_bound view.apply_sync view.apply_sync_bound
189 view.apply view.apply_async view.apply_async_bound view.apply_sync view.apply_sync_bound
156
190
157 A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound`
191 A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound`
158 and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
192 and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
@@ -180,8 +214,8 b' blocks until the engines are done executing the command:'
180
214
181 .. sourcecode:: ipython
215 .. sourcecode:: ipython
182
216
183 In [2]: rc.block=True
217 In [2]: dview = rc[:] # A DirectView of all engines
184 In [3]: dview = rc[:] # A DirectView of all engines
218 In [3]: dview.block=True
185 In [4]: dview['a'] = 5
219 In [4]: dview['a'] = 5
186
220
187 In [5]: dview['b'] = 10
221 In [5]: dview['b'] = 10
@@ -189,13 +223,13 b' blocks until the engines are done executing the command:'
189 In [6]: dview.apply_bound(lambda x: a+b+x, 27)
223 In [6]: dview.apply_bound(lambda x: a+b+x, 27)
190 Out[6]: [42, 42, 42, 42]
224 Out[6]: [42, 42, 42, 42]
191
225
192 Python commands can be executed on specific engines by calling execute using
226 Python commands can be executed on specific engines by calling execute using the ``targets``
193 the ``targets`` keyword argument, or creating a :class:`DirectView` instance
227 keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by
194 by index-access to the client:
228 index-access to the client:
195
229
196 .. sourcecode:: ipython
230 .. sourcecode:: ipython
197
231
198 In [6]: rc[::2].execute('c=a+b') # shorthand for rc.execute('c=a+b',targets=[0,2])
232 In [6]: rc.execute('c=a+b', targets=[0,2])
199
233
200 In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
234 In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
201
235
@@ -214,10 +248,12 b' by index-access to the client:'
214 :meth:`View.apply(f,*args,**kwargs)`, which simply calls
248 :meth:`View.apply(f,*args,**kwargs)`, which simply calls
215 ``f(*args,**kwargs)`` remotely.
249 ``f(*args,**kwargs)`` remotely.
216
250
217 This example also shows one of the most important things about the IPython
251 Bound and unbound execution
252 ---------------------------
253
254 The previous example also shows one of the most important things about the IPython
218 engines: they have a persistent user namespaces. The :meth:`apply` method can
255 engines: they have a persistent user namespaces. The :meth:`apply` method can
219 be run in either a bound or unbound way. The default for a View is to be
256 be run in either a bound or unbound manner:
220 unbound, unless called by the :meth:`apply_bound` method:
221
257
222 .. sourcecode:: ipython
258 .. sourcecode:: ipython
223
259
@@ -225,10 +261,10 b' unbound, unless called by the :meth:`apply_bound` method:'
225
261
226 In [10]: v0 = rc[0]
262 In [10]: v0 = rc[0]
227
263
228 In [12]: v0.apply_bound(lambda : b)
264 In [12]: v0.apply_sync_bound(lambda : b)
229 Out[12]: 5
265 Out[12]: 5
230
266
231 In [13]: v0.apply(lambda : b)
267 In [13]: v0.apply_sync(lambda : b)
232 ---------------------------------------------------------------------------
268 ---------------------------------------------------------------------------
233 RemoteError Traceback (most recent call last)
269 RemoteError Traceback (most recent call last)
234 /home/you/<ipython-input-34-21a468eb10f0> in <module>()
270 /home/you/<ipython-input-34-21a468eb10f0> in <module>()
@@ -244,10 +280,10 b' unbound, unless called by the :meth:`apply_bound` method:'
244
280
245
281
246 Specifically, `bound=True` specifies that the engine's namespace is to be used
282 Specifically, `bound=True` specifies that the engine's namespace is to be used
247 for execution, and `bound=False` specifies that the engine's namespace is not
283 as the `globals` when the function is called, and `bound=False` specifies that
248 to be used (hence, 'b' is undefined during unbound execution, since the
284 the engine's namespace is not to be used (hence, 'b' is undefined during unbound
249 function is called in an empty namespace). Unbound execution is often useful
285 execution, since the function is called in an empty namespace). Unbound execution is
250 for large numbers of atomic tasks, which prevents bloating the engine's
286 often useful for large numbers of atomic tasks, which prevents bloating the engine's
251 memory, while bound execution lets you build on your previous work.
287 memory, while bound execution lets you build on your previous work.
252
288
253
289
@@ -257,7 +293,7 b' Non-blocking execution'
257 In non-blocking mode, :meth:`apply` submits the command to be executed and
293 In non-blocking mode, :meth:`apply` submits the command to be executed and
258 then returns a :class:`AsyncResult` object immediately. The
294 then returns a :class:`AsyncResult` object immediately. The
259 :class:`AsyncResult` object gives you a way of getting a result at a later
295 :class:`AsyncResult` object gives you a way of getting a result at a later
260 time through its :meth:`get` method.
296 time through its :meth:`get` method.
261
297
262 .. Note::
298 .. Note::
263
299
@@ -280,25 +316,25 b' local Python/IPython session:'
280 ...: return time.time()-tic
316 ...: return time.time()-tic
281
317
282 # In non-blocking mode
318 # In non-blocking mode
283 In [7]: pr = dview.apply_async(wait, 2)
319 In [7]: ar = dview.apply_async(wait, 2)
284
320
285 # Now block for the result
321 # Now block for the result
286 In [8]: pr.get()
322 In [8]: ar.get()
287 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
323 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
288
324
289 # Again in non-blocking mode
325 # Again in non-blocking mode
290 In [9]: pr = dview.apply_async(wait, 10)
326 In [9]: ar = dview.apply_async(wait, 10)
291
327
292 # Poll to see if the result is ready
328 # Poll to see if the result is ready
293 In [10]: pr.ready()
329 In [10]: ar.ready()
294 Out[10]: False
330 Out[10]: False
295
331
296 # ask for the result, but wait a maximum of 1 second:
332 # ask for the result, but wait a maximum of 1 second:
297 In [45]: pr.get(1)
333 In [45]: ar.get(1)
298 ---------------------------------------------------------------------------
334 ---------------------------------------------------------------------------
299 TimeoutError Traceback (most recent call last)
335 TimeoutError Traceback (most recent call last)
300 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
336 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
301 ----> 1 pr.get(1)
337 ----> 1 ar.get(1)
302
338
303 /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
339 /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
304 62 raise self._exception
340 62 raise self._exception
@@ -316,8 +352,8 b' local Python/IPython session:'
316
352
317 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
353 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
318 are done. For this, there is a the method :meth:`barrier`. This method takes a
354 are done. For this, there is a the method :meth:`barrier`. This method takes a
319 tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the
355 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
320 associated results are ready:
356 and blocks until all of the associated results are ready:
321
357
322 .. sourcecode:: ipython
358 .. sourcecode:: ipython
323
359
@@ -338,15 +374,17 b' associated results are ready:'
338 The ``block`` keyword argument and attributes
374 The ``block`` keyword argument and attributes
339 ---------------------------------------------
375 ---------------------------------------------
340
376
341 Most methods(like :meth:`apply`) accept
377 Most client methods(like :meth:`apply`) accept
342 ``block`` as a keyword argument. As we have seen above, these
378 ``block`` as a keyword argument. As we have seen above, these
343 keyword arguments control the blocking mode . The :class:`Client` class also has
379 keyword arguments control the blocking mode. The :class:`Client` class also has
344 a :attr:`block` attribute that controls the default behavior when the keyword
380 a :attr:`block` attribute that controls the default behavior when the keyword
345 argument is not provided. Thus the following logic is used for :attr:`block`:
381 argument is not provided. Thus the following logic is used for :attr:`block`:
346
382
347 * If no keyword argument is provided, the instance attributes are used.
383 * If no keyword argument is provided, the instance attributes are used.
348 * Keyword argument, if provided override the instance attributes for
384 * Keyword argument, if provided override the instance attributes for
349 the duration of a single call.
385 the duration of a single call.
386
387 DirectView objects also have a ``bound`` attribute, which is used in the same way.
350
388
351 The following examples demonstrate how to use the instance attributes:
389 The following examples demonstrate how to use the instance attributes:
352
390
@@ -365,7 +403,7 b' The following examples demonstrate how to use the instance attributes:'
365 In [22]: rc.apply(lambda : 42, targets='all')
403 In [22]: rc.apply(lambda : 42, targets='all')
366 Out[22]: [42, 42, 42, 42]
404 Out[22]: [42, 42, 42, 42]
367
405
368 The :attr:`block` and :attr:`targets` instance attributes of the
406 The :attr:`block`, :attr:`bound`, and :attr:`targets` instance attributes of the
369 :class:`.DirectView` also determine the behavior of the parallel magic commands.
407 :class:`.DirectView` also determine the behavior of the parallel magic commands.
370
408
371
409
@@ -381,9 +419,9 b' Parallel magic commands'
381 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
419 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
382 that make it more pleasant to execute Python commands on the engines
420 that make it more pleasant to execute Python commands on the engines
383 interactively. These are simply shortcuts to :meth:`execute` and
421 interactively. These are simply shortcuts to :meth:`execute` and
384 :meth:`get_result`. The ``%px`` magic executes a single Python command on the
422 :meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single
385 engines specified by the :attr:`targets` attribute of the
423 Python command on the engines specified by the :attr:`targets` attribute of the
386 :class:`MultiEngineClient` instance (by default this is ``'all'``):
424 :class:`DirectView` instance:
387
425
388 .. sourcecode:: ipython
426 .. sourcecode:: ipython
389
427
@@ -399,7 +437,6 b' engines specified by the :attr:`targets` attribute of the'
399
437
400 In [26]: %px import numpy
438 In [26]: %px import numpy
401 Parallel execution on engines: [0, 1, 2, 3]
439 Parallel execution on engines: [0, 1, 2, 3]
402 Out[26]:[None,None,None,None]
403
440
404 In [27]: %px a = numpy.random.rand(2,2)
441 In [27]: %px a = numpy.random.rand(2,2)
405 Parallel execution on engines: [0, 1, 2, 3]
442 Parallel execution on engines: [0, 1, 2, 3]
@@ -408,36 +445,25 b' engines specified by the :attr:`targets` attribute of the'
408 Parallel execution on engines: [0, 1, 2, 3]
445 Parallel execution on engines: [0, 1, 2, 3]
409
446
410 In [28]: dv['ev']
447 In [28]: dv['ev']
411 Out[44]: [ array([ 1.09522024, -0.09645227]),
448 Out[28]: [ array([ 1.09522024, -0.09645227]),
412 array([ 1.21435496, -0.35546712]),
449 array([ 1.21435496, -0.35546712]),
413 array([ 0.72180653, 0.07133042]),
450 array([ 0.72180653, 0.07133042]),
414 array([ 1.46384341e+00, 1.04353244e-04])
451 array([ 1.46384341e+00, 1.04353244e-04])
415 ]
452 ]
416
453
417 .. Note::
454 The ``%result`` magic gets the most recent result, or takes an argument
418
455 specifying the index of the result to be requested. It is simply a shortcut to the
419 ``%result`` doesn't work
420
421 The ``%result`` magic gets and prints the stdin/stdout/stderr of the last
422 command executed on each engine. It is simply a shortcut to the
423 :meth:`get_result` method:
456 :meth:`get_result` method:
424
457
425 .. sourcecode:: ipython
458 .. sourcecode:: ipython
426
459
427 In [29]: %result
460 In [29]: dv.apply_async_bound(lambda : ev)
428 Out[29]:
461
429 <Results List>
462 In [30]: %result
430 [0] In [10]: print numpy.linalg.eigvals(a)
463 Out[30]: [ [ 1.28167017 0.14197338],
431 [0] Out[10]: [ 1.28167017 0.14197338]
464 [-0.14093616 1.27877273],
432
465 [-0.37023573 1.06779409],
433 [1] In [9]: print numpy.linalg.eigvals(a)
466 [ 0.83664764 -0.25602658] ]
434 [1] Out[9]: [-0.14093616 1.27877273]
435
436 [2] In [10]: print numpy.linalg.eigvals(a)
437 [2] Out[10]: [-0.37023573 1.06779409]
438
439 [3] In [9]: print numpy.linalg.eigvals(a)
440 [3] Out[9]: [ 0.83664764 -0.25602658]
441
467
442 The ``%autopx`` magic switches to a mode where everything you type is executed
468 The ``%autopx`` magic switches to a mode where everything you type is executed
443 on the engines given by the :attr:`targets` attribute:
469 on the engines given by the :attr:`targets` attribute:
@@ -477,12 +503,6 b' on the engines given by the :attr:`targets` attribute:'
477 'Average max eigenvalue is: 10.1158837784',]
503 'Average max eigenvalue is: 10.1158837784',]
478
504
479
505
480 .. Note::
481
482 Multiline ``%autpx`` gets fouled up by NameErrors, because IPython
483 currently introspects too much.
484
485
486 Moving Python objects around
506 Moving Python objects around
487 ============================
507 ============================
488
508
@@ -524,14 +544,12 b' In non-blocking mode :meth:`push` and :meth:`pull` also return'
524
544
525 In [47]: rc.block=False
545 In [47]: rc.block=False
526
546
527 In [48]: pr = rc.pull('a')
547 In [48]: ar = rc.pull('a')
528
548
529 In [49]: pr.get()
549 In [49]: ar.get()
530 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
550 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
531
551
532
552
533
534
535 Dictionary interface
553 Dictionary interface
536 --------------------
554 --------------------
537
555
@@ -751,9 +769,9 b' All of this same error handling magic even works in non-blocking mode:'
751
769
752 In [83]: rc.block=False
770 In [83]: rc.block=False
753
771
754 In [84]: pr = rc.execute('1/0')
772 In [84]: ar = rc.execute('1/0')
755
773
756 In [85]: pr.get()
774 In [85]: ar.get()
757 ---------------------------------------------------------------------------
775 ---------------------------------------------------------------------------
758 CompositeError Traceback (most recent call last)
776 CompositeError Traceback (most recent call last)
759
777
@@ -33,7 +33,8 b' Creating a ``Client`` instance'
33 ==============================
33 ==============================
34
34
35 The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
35 The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
36 module and then create a :class:`.Client` instance:
36 module and then create a :class:`.Client` instance, and we will also be using
37 a :class:`LoadBalancedView`, here called `lview`:
37
38
38 .. sourcecode:: ipython
39 .. sourcecode:: ipython
39
40
@@ -41,8 +42,7 b' module and then create a :class:`.Client` instance:'
41
42
42 In [2]: rc = client.Client()
43 In [2]: rc = client.Client()
43
44
44 In [3]: lview = rc.view(balanced=True)
45 In [3]: lview = rc.view()
45 Out[3]: <LoadBalancedView None>
46
46
47
47
48 This form assumes that the controller was started on localhost with default
48 This form assumes that the controller was started on localhost with default
@@ -73,14 +73,15 b' the task interface.'
73 Parallel map
73 Parallel map
74 ------------
74 ------------
75
75
76 To load-balance :meth:`map`,simply use a LoadBalancedView, created by asking
76 To load-balance :meth:`map`,simply use a LoadBalancedView:
77 for the ``None`` element:
78
77
79 .. sourcecode:: ipython
78 .. sourcecode:: ipython
80
79
80 In [62]: lview.block = True
81
81 In [63]: serial_result = map(lambda x:x**10, range(32))
82 In [63]: serial_result = map(lambda x:x**10, range(32))
82
83
83 In [64]: parallel_result = lview.map(lambda x:x**10, range(32), block=True)
84 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
84
85
85 In [65]: serial_result==parallel_result
86 In [65]: serial_result==parallel_result
86 Out[65]: True
87 Out[65]: True
General Comments 0
You need to be logged in to leave comments. Login now