##// END OF EJS Templates
client.run is now like %run, not TaskClient.run
MinRK -
Show More
@@ -1,1275 +1,1281
1 1 """A semi-synchronous Client for the ZMQ controller"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 import os
14 14 import time
15 15 from getpass import getpass
16 16 from pprint import pprint
17 17 from datetime import datetime
18 18 import json
19 19 pjoin = os.path.join
20 20
21 21 import zmq
22 22 from zmq.eventloop import ioloop, zmqstream
23 23
24 24 from IPython.utils.path import get_ipython_dir
25 25 from IPython.external.decorator import decorator
26 26 from IPython.external.ssh import tunnel
27 27
28 28 import streamsession as ss
29 29 from clusterdir import ClusterDir, ClusterDirError
30 30 # from remotenamespace import RemoteNamespace
31 31 from view import DirectView, LoadBalancedView
32 32 from dependency import Dependency, depend, require
33 33 import error
34 34 import map as Map
35 35 from asyncresult import AsyncResult, AsyncMapResult
36 36 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
37 37 from util import ReverseDict, disambiguate_url, validate_url
38 38
39 39 #--------------------------------------------------------------------------
40 40 # helpers for implementing old MEC API via client.apply
41 41 #--------------------------------------------------------------------------
42 42
43 43 def _push(ns):
44 44 """helper method for implementing `client.push` via `client.apply`"""
45 45 globals().update(ns)
46 46
47 47 def _pull(keys):
48 48 """helper method for implementing `client.pull` via `client.apply`"""
49 49 g = globals()
50 50 if isinstance(keys, (list,tuple, set)):
51 51 for key in keys:
52 52 if not g.has_key(key):
53 53 raise NameError("name '%s' is not defined"%key)
54 54 return map(g.get, keys)
55 55 else:
56 56 if not g.has_key(keys):
57 57 raise NameError("name '%s' is not defined"%keys)
58 58 return g.get(keys)
59 59
60 60 def _clear():
61 61 """helper method for implementing `client.clear` via `client.apply`"""
62 62 globals().clear()
63 63
64 64 def _execute(code):
65 65 """helper method for implementing `client.execute` via `client.apply`"""
66 66 exec code in globals()
67 67
68 68
69 69 #--------------------------------------------------------------------------
70 70 # Decorators for Client methods
71 71 #--------------------------------------------------------------------------
72 72
73 73 @decorator
74 74 def spinfirst(f, self, *args, **kwargs):
75 75 """Call spin() to sync state prior to calling the method."""
76 76 self.spin()
77 77 return f(self, *args, **kwargs)
78 78
79 79 @decorator
80 80 def defaultblock(f, self, *args, **kwargs):
81 81 """Default to self.block; preserve self.block."""
82 82 block = kwargs.get('block',None)
83 83 block = self.block if block is None else block
84 84 saveblock = self.block
85 85 self.block = block
86 86 try:
87 87 ret = f(self, *args, **kwargs)
88 88 finally:
89 89 self.block = saveblock
90 90 return ret
91 91
92 92
93 93 #--------------------------------------------------------------------------
94 94 # Classes
95 95 #--------------------------------------------------------------------------
96 96
97 97 class Metadata(dict):
98 98 """Subclass of dict for initializing metadata values.
99 99
100 100 Attribute access works on keys.
101 101
102 102 These objects have a strict set of keys - errors will raise if you try
103 103 to add new keys.
104 104 """
105 105 def __init__(self, *args, **kwargs):
106 106 dict.__init__(self)
107 107 md = {'msg_id' : None,
108 108 'submitted' : None,
109 109 'started' : None,
110 110 'completed' : None,
111 111 'received' : None,
112 112 'engine_uuid' : None,
113 113 'engine_id' : None,
114 114 'follow' : None,
115 115 'after' : None,
116 116 'status' : None,
117 117
118 118 'pyin' : None,
119 119 'pyout' : None,
120 120 'pyerr' : None,
121 121 'stdout' : '',
122 122 'stderr' : '',
123 123 }
124 124 self.update(md)
125 125 self.update(dict(*args, **kwargs))
126 126
127 127 def __getattr__(self, key):
128 128 """getattr aliased to getitem"""
129 129 if key in self.iterkeys():
130 130 return self[key]
131 131 else:
132 132 raise AttributeError(key)
133 133
134 134 def __setattr__(self, key, value):
135 135 """setattr aliased to setitem, with strict"""
136 136 if key in self.iterkeys():
137 137 self[key] = value
138 138 else:
139 139 raise AttributeError(key)
140 140
141 141 def __setitem__(self, key, value):
142 142 """strict static key enforcement"""
143 143 if key in self.iterkeys():
144 144 dict.__setitem__(self, key, value)
145 145 else:
146 146 raise KeyError(key)
147 147
148 148
149 149 class Client(object):
150 150 """A semi-synchronous client to the IPython ZMQ controller
151 151
152 152 Parameters
153 153 ----------
154 154
155 155 url_or_file : bytes; zmq url or path to ipcontroller-client.json
156 156 Connection information for the Hub's registration. If a json connector
157 157 file is given, then likely no further configuration is necessary.
158 [Default: None]
158 [Default: use profile]
159 profile : bytes
160 The name of the Cluster profile to be used to find connector information.
161 [Default: 'default']
159 162 context : zmq.Context
160 Pass an existing zmq.Context instance, otherwise the client will create its own
163 Pass an existing zmq.Context instance, otherwise the client will create its own.
161 164 username : bytes
162 165 set username to be passed to the Session object
163 166 debug : bool
164 167 flag for lots of message printing for debug purposes
165 168
166 169 #-------------- ssh related args ----------------
167 170 # These are args for configuring the ssh tunnel to be used
168 171 # credentials are used to forward connections over ssh to the Controller
169 172 # Note that the ip given in `addr` needs to be relative to sshserver
170 173 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
171 174 # and set sshserver as the same machine the Controller is on. However,
172 175 # the only requirement is that sshserver is able to see the Controller
173 176 # (i.e. is within the same trusted network).
174 177
175 178 sshserver : str
176 179 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
177 180 If keyfile or password is specified, and this is not, it will default to
178 181 the ip given in addr.
179 182 sshkey : str; path to public ssh key file
180 183 This specifies a key to be used in ssh login, default None.
181 184 Regular default ssh keys will be used without specifying this argument.
182 185 password : str
183 186 Your ssh password to sshserver. Note that if this is left None,
184 187 you will be prompted for it if passwordless key based login is unavailable.
185 188 paramiko : bool
186 189 flag for whether to use paramiko instead of shell ssh for tunneling.
187 190 [default: True on win32, False else]
188 191
189 192 #------- exec authentication args -------
190 193 # If even localhost is untrusted, you can have some protection against
191 194 # unauthorized execution by using a key. Messages are still sent
192 195 # as cleartext, so if someone can snoop your loopback traffic this will
193 196 # not help against malicious attacks.
194 197
195 198 exec_key : str
196 199 an authentication key or file containing a key
197 200 default: None
198 201
199 202
200 203 Attributes
201 204 ----------
202 205 ids : set of int engine IDs
203 206 requesting the ids attribute always synchronizes
204 207 the registration state. To request ids without synchronization,
205 208 use semi-private _ids attributes.
206 209
207 210 history : list of msg_ids
208 211 a list of msg_ids, keeping track of all the execution
209 212 messages you have submitted in order.
210 213
211 214 outstanding : set of msg_ids
212 215 a set of msg_ids that have been submitted, but whose
213 216 results have not yet been received.
214 217
215 218 results : dict
216 219 a dict of all our results, keyed by msg_id
217 220
218 221 block : bool
219 222 determines default behavior when block not specified
220 223 in execution methods
221 224
222 225 Methods
223 226 -------
224 227 spin : flushes incoming results and registration state changes
225 228 control methods spin, and requesting `ids` also ensures up to date
226 229
227 230 barrier : wait on one or more msg_ids
228 231
229 232 execution methods: apply/apply_bound/apply_to/apply_bound
230 233 legacy: execute, run
231 234
232 235 query methods: queue_status, get_result, purge
233 236
234 237 control methods: abort, kill
235 238
236 239 """
237 240
238 241
239 242 _connected=False
240 243 _ssh=False
241 244 _engines=None
242 245 _registration_socket=None
243 246 _query_socket=None
244 247 _control_socket=None
245 248 _iopub_socket=None
246 249 _notification_socket=None
247 250 _mux_socket=None
248 251 _task_socket=None
249 252 block = False
250 253 outstanding=None
251 254 results = None
252 255 history = None
253 256 debug = False
254 257 targets = None
255 258
256 259 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
257 260 context=None, username=None, debug=False, exec_key=None,
258 261 sshserver=None, sshkey=None, password=None, paramiko=None,
259 262 ):
260 263 if context is None:
261 264 context = zmq.Context()
262 265 self.context = context
263 266 self.targets = 'all'
264 267
265 268 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
266 269 if self._cd is not None:
267 270 if url_or_file is None:
268 271 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
269 272 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
270 273 " Please specify at least one of url_or_file or profile."
271 274
272 275 try:
273 276 validate_url(url_or_file)
274 277 except AssertionError:
275 278 if not os.path.exists(url_or_file):
276 279 if self._cd:
277 280 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
278 281 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
279 282 with open(url_or_file) as f:
280 283 cfg = json.loads(f.read())
281 284 else:
282 285 cfg = {'url':url_or_file}
283 286
284 287 # sync defaults from args, json:
285 288 if sshserver:
286 289 cfg['ssh'] = sshserver
287 290 if exec_key:
288 291 cfg['exec_key'] = exec_key
289 292 exec_key = cfg['exec_key']
290 293 sshserver=cfg['ssh']
291 294 url = cfg['url']
292 295 location = cfg.setdefault('location', None)
293 296 cfg['url'] = disambiguate_url(cfg['url'], location)
294 297 url = cfg['url']
295 298
296 299 self._config = cfg
297 300
298 301
299 302 self._ssh = bool(sshserver or sshkey or password)
300 303 if self._ssh and sshserver is None:
301 304 # default to ssh via localhost
302 305 sshserver = url.split('://')[1].split(':')[0]
303 306 if self._ssh and password is None:
304 307 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
305 308 password=False
306 309 else:
307 310 password = getpass("SSH Password for %s: "%sshserver)
308 311 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
309 312 if exec_key is not None and os.path.isfile(exec_key):
310 313 arg = 'keyfile'
311 314 else:
312 315 arg = 'key'
313 316 key_arg = {arg:exec_key}
314 317 if username is None:
315 318 self.session = ss.StreamSession(**key_arg)
316 319 else:
317 320 self.session = ss.StreamSession(username, **key_arg)
318 321 self._registration_socket = self.context.socket(zmq.XREQ)
319 322 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
320 323 if self._ssh:
321 324 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
322 325 else:
323 326 self._registration_socket.connect(url)
324 327 self._engines = ReverseDict()
325 328 self._ids = set()
326 329 self.outstanding=set()
327 330 self.results = {}
328 331 self.metadata = {}
329 332 self.history = []
330 333 self.debug = debug
331 334 self.session.debug = debug
332 335
333 336 self._notification_handlers = {'registration_notification' : self._register_engine,
334 337 'unregistration_notification' : self._unregister_engine,
335 338 }
336 339 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
337 340 'apply_reply' : self._handle_apply_reply}
338 341 self._connect(sshserver, ssh_kwargs)
339 342
340 343
341 344 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
342 345 if ipython_dir is None:
343 346 ipython_dir = get_ipython_dir()
344 347 if cluster_dir is not None:
345 348 try:
346 349 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
347 350 except ClusterDirError:
348 351 pass
349 352 elif profile is not None:
350 353 try:
351 354 self._cd = ClusterDir.find_cluster_dir_by_profile(
352 355 ipython_dir, profile)
353 356 except ClusterDirError:
354 357 pass
355 358 else:
356 359 self._cd = None
357 360
358 361 @property
359 362 def ids(self):
360 363 """Always up to date ids property."""
361 364 self._flush_notifications()
362 365 return self._ids
363 366
364 367 def _update_engines(self, engines):
365 368 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
366 369 for k,v in engines.iteritems():
367 370 eid = int(k)
368 371 self._engines[eid] = bytes(v) # force not unicode
369 372 self._ids.add(eid)
370 373
371 374 def _build_targets(self, targets):
372 375 """Turn valid target IDs or 'all' into two lists:
373 376 (int_ids, uuids).
374 377 """
375 378 if targets is None:
376 379 targets = self._ids
377 380 elif isinstance(targets, str):
378 381 if targets.lower() == 'all':
379 382 targets = self._ids
380 383 else:
381 384 raise TypeError("%r not valid str target, must be 'all'"%(targets))
382 385 elif isinstance(targets, int):
383 386 targets = [targets]
384 387 return [self._engines[t] for t in targets], list(targets)
385 388
386 389 def _connect(self, sshserver, ssh_kwargs):
387 390 """setup all our socket connections to the controller. This is called from
388 391 __init__."""
389 392 if self._connected:
390 393 return
391 394 self._connected=True
392 395
393 396 def connect_socket(s, url):
394 397 url = disambiguate_url(url, self._config['location'])
395 398 if self._ssh:
396 399 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
397 400 else:
398 401 return s.connect(url)
399 402
400 403 self.session.send(self._registration_socket, 'connection_request')
401 404 idents,msg = self.session.recv(self._registration_socket,mode=0)
402 405 if self.debug:
403 406 pprint(msg)
404 407 msg = ss.Message(msg)
405 408 content = msg.content
406 409 if content.status == 'ok':
407 410 if content.mux:
408 411 self._mux_socket = self.context.socket(zmq.PAIR)
409 412 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
410 413 connect_socket(self._mux_socket, content.mux)
411 414 if content.task:
412 415 self._task_socket = self.context.socket(zmq.PAIR)
413 416 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
414 417 connect_socket(self._task_socket, content.task)
415 418 if content.notification:
416 419 self._notification_socket = self.context.socket(zmq.SUB)
417 420 connect_socket(self._notification_socket, content.notification)
418 421 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
419 422 if content.query:
420 423 self._query_socket = self.context.socket(zmq.PAIR)
421 424 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
422 425 connect_socket(self._query_socket, content.query)
423 426 if content.control:
424 427 self._control_socket = self.context.socket(zmq.PAIR)
425 428 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
426 429 connect_socket(self._control_socket, content.control)
427 430 if content.iopub:
428 431 self._iopub_socket = self.context.socket(zmq.SUB)
429 432 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
430 433 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
431 434 connect_socket(self._iopub_socket, content.iopub)
432 435 self._update_engines(dict(content.engines))
433 436
434 437 else:
435 438 self._connected = False
436 439 raise Exception("Failed to connect!")
437 440
438 441 #--------------------------------------------------------------------------
439 442 # handlers and callbacks for incoming messages
440 443 #--------------------------------------------------------------------------
441 444
442 445 def _register_engine(self, msg):
443 446 """Register a new engine, and update our connection info."""
444 447 content = msg['content']
445 448 eid = content['id']
446 449 d = {eid : content['queue']}
447 450 self._update_engines(d)
448 451 self._ids.add(int(eid))
449 452
450 453 def _unregister_engine(self, msg):
451 454 """Unregister an engine that has died."""
452 455 content = msg['content']
453 456 eid = int(content['id'])
454 457 if eid in self._ids:
455 458 self._ids.remove(eid)
456 459 self._engines.pop(eid)
457 460
458 461 def _extract_metadata(self, header, parent, content):
459 462 md = {'msg_id' : parent['msg_id'],
460 463 'received' : datetime.now(),
461 464 'engine_uuid' : header.get('engine', None),
462 465 'follow' : parent.get('follow', []),
463 466 'after' : parent.get('after', []),
464 467 'status' : content['status'],
465 468 }
466 469
467 470 if md['engine_uuid'] is not None:
468 471 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
469 472
470 473 if 'date' in parent:
471 474 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
472 475 if 'started' in header:
473 476 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
474 477 if 'date' in header:
475 478 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
476 479 return md
477 480
478 481 def _handle_execute_reply(self, msg):
479 482 """Save the reply to an execute_request into our results.
480 483
481 484 execute messages are never actually used. apply is used instead.
482 485 """
483 486
484 487 parent = msg['parent_header']
485 488 msg_id = parent['msg_id']
486 489 if msg_id not in self.outstanding:
487 490 if msg_id in self.history:
488 491 print ("got stale result: %s"%msg_id)
489 492 else:
490 493 print ("got unknown result: %s"%msg_id)
491 494 else:
492 495 self.outstanding.remove(msg_id)
493 496 self.results[msg_id] = ss.unwrap_exception(msg['content'])
494 497
495 498 def _handle_apply_reply(self, msg):
496 499 """Save the reply to an apply_request into our results."""
497 500 parent = msg['parent_header']
498 501 msg_id = parent['msg_id']
499 502 if msg_id not in self.outstanding:
500 503 if msg_id in self.history:
501 504 print ("got stale result: %s"%msg_id)
502 505 print self.results[msg_id]
503 506 print msg
504 507 else:
505 508 print ("got unknown result: %s"%msg_id)
506 509 else:
507 510 self.outstanding.remove(msg_id)
508 511 content = msg['content']
509 512 header = msg['header']
510 513
511 514 # construct metadata:
512 515 md = self.metadata.setdefault(msg_id, Metadata())
513 516 md.update(self._extract_metadata(header, parent, content))
514 517 self.metadata[msg_id] = md
515 518
516 519 # construct result:
517 520 if content['status'] == 'ok':
518 521 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
519 522 elif content['status'] == 'aborted':
520 523 self.results[msg_id] = error.AbortedTask(msg_id)
521 524 elif content['status'] == 'resubmitted':
522 525 # TODO: handle resubmission
523 526 pass
524 527 else:
525 528 e = ss.unwrap_exception(content)
526 529 if e.engine_info:
527 530 e_uuid = e.engine_info['engineid']
528 531 eid = self._engines[e_uuid]
529 532 e.engine_info['engineid'] = eid
530 533 self.results[msg_id] = e
531 534
532 535 def _flush_notifications(self):
533 536 """Flush notifications of engine registrations waiting
534 537 in ZMQ queue."""
535 538 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
536 539 while msg is not None:
537 540 if self.debug:
538 541 pprint(msg)
539 542 msg = msg[-1]
540 543 msg_type = msg['msg_type']
541 544 handler = self._notification_handlers.get(msg_type, None)
542 545 if handler is None:
543 546 raise Exception("Unhandled message type: %s"%msg.msg_type)
544 547 else:
545 548 handler(msg)
546 549 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
547 550
548 551 def _flush_results(self, sock):
549 552 """Flush task or queue results waiting in ZMQ queue."""
550 553 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
551 554 while msg is not None:
552 555 if self.debug:
553 556 pprint(msg)
554 557 msg = msg[-1]
555 558 msg_type = msg['msg_type']
556 559 handler = self._queue_handlers.get(msg_type, None)
557 560 if handler is None:
558 561 raise Exception("Unhandled message type: %s"%msg.msg_type)
559 562 else:
560 563 handler(msg)
561 564 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
562 565
563 566 def _flush_control(self, sock):
564 567 """Flush replies from the control channel waiting
565 568 in the ZMQ queue.
566 569
567 570 Currently: ignore them."""
568 571 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
569 572 while msg is not None:
570 573 if self.debug:
571 574 pprint(msg)
572 575 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
573 576
574 577 def _flush_iopub(self, sock):
575 578 """Flush replies from the iopub channel waiting
576 579 in the ZMQ queue.
577 580 """
578 581 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
579 582 while msg is not None:
580 583 if self.debug:
581 584 pprint(msg)
582 585 msg = msg[-1]
583 586 parent = msg['parent_header']
584 587 msg_id = parent['msg_id']
585 588 content = msg['content']
586 589 header = msg['header']
587 590 msg_type = msg['msg_type']
588 591
589 592 # init metadata:
590 593 md = self.metadata.setdefault(msg_id, Metadata())
591 594
592 595 if msg_type == 'stream':
593 596 name = content['name']
594 597 s = md[name] or ''
595 598 md[name] = s + content['data']
596 599 elif msg_type == 'pyerr':
597 600 md.update({'pyerr' : ss.unwrap_exception(content)})
598 601 else:
599 602 md.update({msg_type : content['data']})
600 603
601 604 self.metadata[msg_id] = md
602 605
603 606 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
604 607
605 608 #--------------------------------------------------------------------------
606 609 # getitem
607 610 #--------------------------------------------------------------------------
608 611
609 612 def __getitem__(self, key):
610 613 """Dict access returns DirectView multiplexer objects or,
611 614 if key is None, a LoadBalancedView."""
612 615 if key is None:
613 616 return LoadBalancedView(self)
614 617 if isinstance(key, int):
615 618 if key not in self.ids:
616 619 raise IndexError("No such engine: %i"%key)
617 620 return DirectView(self, key)
618 621
619 622 if isinstance(key, slice):
620 623 indices = range(len(self.ids))[key]
621 624 ids = sorted(self._ids)
622 625 key = [ ids[i] for i in indices ]
623 626 # newkeys = sorted(self._ids)[thekeys[k]]
624 627
625 628 if isinstance(key, (tuple, list, xrange)):
626 629 _,targets = self._build_targets(list(key))
627 630 return DirectView(self, targets)
628 631 else:
629 632 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
630 633
631 634 #--------------------------------------------------------------------------
632 635 # Begin public methods
633 636 #--------------------------------------------------------------------------
634 637
635 638 @property
636 639 def remote(self):
637 640 """property for convenient RemoteFunction generation.
638 641
639 642 >>> @client.remote
640 643 ... def f():
641 644 import os
642 645 print (os.getpid())
643 646 """
644 647 return remote(self, block=self.block)
645 648
646 649 def spin(self):
647 650 """Flush any registration notifications and execution results
648 651 waiting in the ZMQ queue.
649 652 """
650 653 if self._notification_socket:
651 654 self._flush_notifications()
652 655 if self._mux_socket:
653 656 self._flush_results(self._mux_socket)
654 657 if self._task_socket:
655 658 self._flush_results(self._task_socket)
656 659 if self._control_socket:
657 660 self._flush_control(self._control_socket)
658 661 if self._iopub_socket:
659 662 self._flush_iopub(self._iopub_socket)
660 663
661 664 def barrier(self, msg_ids=None, timeout=-1):
662 665 """waits on one or more `msg_ids`, for up to `timeout` seconds.
663 666
664 667 Parameters
665 668 ----------
666 669 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
667 670 ints are indices to self.history
668 671 strs are msg_ids
669 672 default: wait on all outstanding messages
670 673 timeout : float
671 674 a time in seconds, after which to give up.
672 675 default is -1, which means no timeout
673 676
674 677 Returns
675 678 -------
676 679 True : when all msg_ids are done
677 680 False : timeout reached, some msg_ids still outstanding
678 681 """
679 682 tic = time.time()
680 683 if msg_ids is None:
681 684 theids = self.outstanding
682 685 else:
683 686 if isinstance(msg_ids, (int, str, AsyncResult)):
684 687 msg_ids = [msg_ids]
685 688 theids = set()
686 689 for msg_id in msg_ids:
687 690 if isinstance(msg_id, int):
688 691 msg_id = self.history[msg_id]
689 692 elif isinstance(msg_id, AsyncResult):
690 693 map(theids.add, msg_id.msg_ids)
691 694 continue
692 695 theids.add(msg_id)
693 696 if not theids.intersection(self.outstanding):
694 697 return True
695 698 self.spin()
696 699 while theids.intersection(self.outstanding):
697 700 if timeout >= 0 and ( time.time()-tic ) > timeout:
698 701 break
699 702 time.sleep(1e-3)
700 703 self.spin()
701 704 return len(theids.intersection(self.outstanding)) == 0
702 705
703 706 #--------------------------------------------------------------------------
704 707 # Control methods
705 708 #--------------------------------------------------------------------------
706 709
707 710 @spinfirst
708 711 @defaultblock
709 712 def clear(self, targets=None, block=None):
710 713 """Clear the namespace in target(s)."""
711 714 targets = self._build_targets(targets)[0]
712 715 for t in targets:
713 716 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
714 717 error = False
715 718 if self.block:
716 719 for i in range(len(targets)):
717 720 idents,msg = self.session.recv(self._control_socket,0)
718 721 if self.debug:
719 722 pprint(msg)
720 723 if msg['content']['status'] != 'ok':
721 724 error = ss.unwrap_exception(msg['content'])
722 725 if error:
723 726 return error
724 727
725 728
726 729 @spinfirst
727 730 @defaultblock
728 731 def abort(self, msg_ids = None, targets=None, block=None):
729 732 """Abort the execution queues of target(s)."""
730 733 targets = self._build_targets(targets)[0]
731 734 if isinstance(msg_ids, basestring):
732 735 msg_ids = [msg_ids]
733 736 content = dict(msg_ids=msg_ids)
734 737 for t in targets:
735 738 self.session.send(self._control_socket, 'abort_request',
736 739 content=content, ident=t)
737 740 error = False
738 741 if self.block:
739 742 for i in range(len(targets)):
740 743 idents,msg = self.session.recv(self._control_socket,0)
741 744 if self.debug:
742 745 pprint(msg)
743 746 if msg['content']['status'] != 'ok':
744 747 error = ss.unwrap_exception(msg['content'])
745 748 if error:
746 749 return error
747 750
748 751 @spinfirst
749 752 @defaultblock
750 753 def shutdown(self, targets=None, restart=False, controller=False, block=None):
751 754 """Terminates one or more engine processes, optionally including the controller."""
752 755 if controller:
753 756 targets = 'all'
754 757 targets = self._build_targets(targets)[0]
755 758 for t in targets:
756 759 self.session.send(self._control_socket, 'shutdown_request',
757 760 content={'restart':restart},ident=t)
758 761 error = False
759 762 if block or controller:
760 763 for i in range(len(targets)):
761 764 idents,msg = self.session.recv(self._control_socket,0)
762 765 if self.debug:
763 766 pprint(msg)
764 767 if msg['content']['status'] != 'ok':
765 768 error = ss.unwrap_exception(msg['content'])
766 769
767 770 if controller:
768 771 time.sleep(0.25)
769 772 self.session.send(self._query_socket, 'shutdown_request')
770 773 idents,msg = self.session.recv(self._query_socket, 0)
771 774 if self.debug:
772 775 pprint(msg)
773 776 if msg['content']['status'] != 'ok':
774 777 error = ss.unwrap_exception(msg['content'])
775 778
776 779 if error:
777 780 raise error
778 781
779 782 #--------------------------------------------------------------------------
780 783 # Execution methods
781 784 #--------------------------------------------------------------------------
782 785
783 786 @defaultblock
784 787 def execute(self, code, targets='all', block=None):
785 788 """Executes `code` on `targets` in blocking or nonblocking manner.
786 789
787 790 ``execute`` is always `bound` (affects engine namespace)
788 791
789 792 Parameters
790 793 ----------
791 794 code : str
792 795 the code string to be executed
793 796 targets : int/str/list of ints/strs
794 797 the engines on which to execute
795 798 default : all
796 799 block : bool
797 800 whether or not to wait until done to return
798 801 default: self.block
799 802 """
800 803 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
801 804 return result
802 805
803 def run(self, code, block=None):
804 """Runs `code` on an engine.
806 def run(self, filename, targets='all', block=None):
807 """Execute contents of `filename` on engine(s).
805 808
806 Calls to this are load-balanced.
807
808 ``run`` is never `bound` (no effect on engine namespace)
809 This simply reads the contents of the file and calls `execute`.
809 810
810 811 Parameters
811 812 ----------
812 code : str
813 the code string to be executed
813 filename : str
814 The path to the file
815 targets : int/str/list of ints/strs
816 the engines on which to execute
817 default : all
814 818 block : bool
815 819 whether or not to wait until done
820 default: self.block
816 821
817 822 """
818 result = self.apply(_execute, (code,), targets=None, block=block, bound=False)
819 return result
823 with open(filename, 'rb') as f:
824 code = f.read()
825 return self.execute(code, targets=targets, block=block)
820 826
821 827 def _maybe_raise(self, result):
822 828 """wrapper for maybe raising an exception if apply failed."""
823 829 if isinstance(result, error.RemoteError):
824 830 raise result
825 831
826 832 return result
827 833
828 834 def _build_dependency(self, dep):
829 835 """helper for building jsonable dependencies from various input forms"""
830 836 if isinstance(dep, Dependency):
831 837 return dep.as_dict()
832 838 elif isinstance(dep, AsyncResult):
833 839 return dep.msg_ids
834 840 elif dep is None:
835 841 return []
836 842 elif isinstance(dep, set):
837 843 return list(dep)
838 844 elif isinstance(dep, (list,dict)):
839 845 return dep
840 846 elif isinstance(dep, str):
841 847 return [dep]
842 848 else:
843 849 raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep))
844 850
845 851 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
846 852 after=None, follow=None, timeout=None):
847 853 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
848 854
849 855 This is the central execution command for the client.
850 856
851 857 Parameters
852 858 ----------
853 859
854 860 f : function
855 861 The fuction to be called remotely
856 862 args : tuple/list
857 863 The positional arguments passed to `f`
858 864 kwargs : dict
859 865 The keyword arguments passed to `f`
860 866 bound : bool (default: True)
861 867 Whether to execute in the Engine(s) namespace, or in a clean
862 868 namespace not affecting the engine.
863 869 block : bool (default: self.block)
864 870 Whether to wait for the result, or return immediately.
865 871 False:
866 872 returns msg_id(s)
867 873 if multiple targets:
868 874 list of ids
869 875 True:
870 876 returns actual result(s) of f(*args, **kwargs)
871 877 if multiple targets:
872 878 dict of results, by engine ID
873 879 targets : int,list of ints, 'all', None
874 880 Specify the destination of the job.
875 881 if None:
876 882 Submit via Task queue for load-balancing.
877 883 if 'all':
878 884 Run on all active engines
879 885 if list:
880 886 Run on each specified engine
881 887 if int:
882 888 Run on single engine
883 889
884 890 after : Dependency or collection of msg_ids
885 891 Only for load-balanced execution (targets=None)
886 892 Specify a list of msg_ids as a time-based dependency.
887 893 This job will only be run *after* the dependencies
888 894 have been met.
889 895
890 896 follow : Dependency or collection of msg_ids
891 897 Only for load-balanced execution (targets=None)
892 898 Specify a list of msg_ids as a location-based dependency.
893 899 This job will only be run on an engine where this dependency
894 900 is met.
895 901
896 902 timeout : float or None
897 903 Only for load-balanced execution (targets=None)
898 904 Specify an amount of time (in seconds)
899 905
900 906 Returns
901 907 -------
902 908 if block is False:
903 909 if single target:
904 910 return msg_id
905 911 else:
906 912 return list of msg_ids
907 913 ? (should this be dict like block=True) ?
908 914 else:
909 915 if single target:
910 916 return result of f(*args, **kwargs)
911 917 else:
912 918 return dict of results, keyed by engine
913 919 """
914 920
915 921 # defaults:
916 922 block = block if block is not None else self.block
917 923 args = args if args is not None else []
918 924 kwargs = kwargs if kwargs is not None else {}
919 925
920 926 # enforce types of f,args,kwrags
921 927 if not callable(f):
922 928 raise TypeError("f must be callable, not %s"%type(f))
923 929 if not isinstance(args, (tuple, list)):
924 930 raise TypeError("args must be tuple or list, not %s"%type(args))
925 931 if not isinstance(kwargs, dict):
926 932 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
927 933
928 934 after = self._build_dependency(after)
929 935 follow = self._build_dependency(follow)
930 936
931 937 options = dict(bound=bound, block=block)
932 938
933 939 if targets is None:
934 940 return self._apply_balanced(f, args, kwargs, timeout=timeout,
935 941 after=after, follow=follow, **options)
936 942 else:
937 943 return self._apply_direct(f, args, kwargs, targets=targets, **options)
938 944
939 945 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
940 946 after=None, follow=None, timeout=None):
941 947 """The underlying method for applying functions in a load balanced
942 948 manner, via the task queue."""
943 949 subheader = dict(after=after, follow=follow, timeout=timeout)
944 950 bufs = ss.pack_apply_message(f,args,kwargs)
945 951 content = dict(bound=bound)
946 952
947 953 msg = self.session.send(self._task_socket, "apply_request",
948 954 content=content, buffers=bufs, subheader=subheader)
949 955 msg_id = msg['msg_id']
950 956 self.outstanding.add(msg_id)
951 957 self.history.append(msg_id)
952 958 ar = AsyncResult(self, [msg_id], fname=f.__name__)
953 959 if block:
954 960 return ar.get()
955 961 else:
956 962 return ar
957 963
958 964 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
959 965 """Then underlying method for applying functions to specific engines
960 966 via the MUX queue."""
961 967
962 968 queues,targets = self._build_targets(targets)
963 969
964 970 subheader = {}
965 971 content = dict(bound=bound)
966 972 bufs = ss.pack_apply_message(f,args,kwargs)
967 973
968 974 msg_ids = []
969 975 for queue in queues:
970 976 msg = self.session.send(self._mux_socket, "apply_request",
971 977 content=content, buffers=bufs,ident=queue, subheader=subheader)
972 978 msg_id = msg['msg_id']
973 979 self.outstanding.add(msg_id)
974 980 self.history.append(msg_id)
975 981 msg_ids.append(msg_id)
976 982 ar = AsyncResult(self, msg_ids, fname=f.__name__)
977 983 if block:
978 984 return ar.get()
979 985 else:
980 986 return ar
981 987
982 988 #--------------------------------------------------------------------------
983 989 # Map and decorators
984 990 #--------------------------------------------------------------------------
985 991
986 992 def map(self, f, *sequences):
987 993 """Parallel version of builtin `map`, using all our engines."""
988 994 pf = ParallelFunction(self, f, block=self.block,
989 995 bound=True, targets='all')
990 996 return pf.map(*sequences)
991 997
992 998 def parallel(self, bound=True, targets='all', block=True):
993 999 """Decorator for making a ParallelFunction."""
994 1000 return parallel(self, bound=bound, targets=targets, block=block)
995 1001
996 1002 def remote(self, bound=True, targets='all', block=True):
997 1003 """Decorator for making a RemoteFunction."""
998 1004 return remote(self, bound=bound, targets=targets, block=block)
999 1005
1000 1006 #--------------------------------------------------------------------------
1001 1007 # Data movement
1002 1008 #--------------------------------------------------------------------------
1003 1009
1004 1010 @defaultblock
1005 1011 def push(self, ns, targets='all', block=None):
1006 1012 """Push the contents of `ns` into the namespace on `target`"""
1007 1013 if not isinstance(ns, dict):
1008 1014 raise TypeError("Must be a dict, not %s"%type(ns))
1009 1015 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
1010 1016 return result
1011 1017
1012 1018 @defaultblock
1013 1019 def pull(self, keys, targets='all', block=None):
1014 1020 """Pull objects from `target`'s namespace by `keys`"""
1015 1021 if isinstance(keys, str):
1016 1022 pass
1017 1023 elif isinstance(keys, (list,tuple,set)):
1018 1024 for key in keys:
1019 1025 if not isinstance(key, str):
1020 1026 raise TypeError
1021 1027 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
1022 1028 return result
1023 1029
1024 1030 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1025 1031 """
1026 1032 Partition a Python sequence and send the partitions to a set of engines.
1027 1033 """
1028 1034 block = block if block is not None else self.block
1029 1035 targets = self._build_targets(targets)[-1]
1030 1036 mapObject = Map.dists[dist]()
1031 1037 nparts = len(targets)
1032 1038 msg_ids = []
1033 1039 for index, engineid in enumerate(targets):
1034 1040 partition = mapObject.getPartition(seq, index, nparts)
1035 1041 if flatten and len(partition) == 1:
1036 1042 r = self.push({key: partition[0]}, targets=engineid, block=False)
1037 1043 else:
1038 1044 r = self.push({key: partition}, targets=engineid, block=False)
1039 1045 msg_ids.extend(r.msg_ids)
1040 1046 r = AsyncResult(self, msg_ids, fname='scatter')
1041 1047 if block:
1042 1048 return r.get()
1043 1049 else:
1044 1050 return r
1045 1051
1046 1052 def gather(self, key, dist='b', targets='all', block=None):
1047 1053 """
1048 1054 Gather a partitioned sequence on a set of engines as a single local seq.
1049 1055 """
1050 1056 block = block if block is not None else self.block
1051 1057
1052 1058 targets = self._build_targets(targets)[-1]
1053 1059 mapObject = Map.dists[dist]()
1054 1060 msg_ids = []
1055 1061 for index, engineid in enumerate(targets):
1056 1062 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1057 1063
1058 1064 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1059 1065 if block:
1060 1066 return r.get()
1061 1067 else:
1062 1068 return r
1063 1069
1064 1070 #--------------------------------------------------------------------------
1065 1071 # Query methods
1066 1072 #--------------------------------------------------------------------------
1067 1073
1068 1074 @spinfirst
1069 1075 def get_results(self, msg_ids, status_only=False):
1070 1076 """Returns the result of the execute or task request with `msg_ids`.
1071 1077
1072 1078 Parameters
1073 1079 ----------
1074 1080 msg_ids : list of ints or msg_ids
1075 1081 if int:
1076 1082 Passed as index to self.history for convenience.
1077 1083 status_only : bool (default: False)
1078 1084 if False:
1079 1085 return the actual results
1080 1086
1081 1087 Returns
1082 1088 -------
1083 1089
1084 1090 results : dict
1085 1091 There will always be the keys 'pending' and 'completed', which will
1086 1092 be lists of msg_ids.
1087 1093 """
1088 1094 if not isinstance(msg_ids, (list,tuple)):
1089 1095 msg_ids = [msg_ids]
1090 1096 theids = []
1091 1097 for msg_id in msg_ids:
1092 1098 if isinstance(msg_id, int):
1093 1099 msg_id = self.history[msg_id]
1094 1100 if not isinstance(msg_id, str):
1095 1101 raise TypeError("msg_ids must be str, not %r"%msg_id)
1096 1102 theids.append(msg_id)
1097 1103
1098 1104 completed = []
1099 1105 local_results = {}
1100 1106 # temporarily disable local shortcut
1101 1107 # for msg_id in list(theids):
1102 1108 # if msg_id in self.results:
1103 1109 # completed.append(msg_id)
1104 1110 # local_results[msg_id] = self.results[msg_id]
1105 1111 # theids.remove(msg_id)
1106 1112
1107 1113 if theids: # some not locally cached
1108 1114 content = dict(msg_ids=theids, status_only=status_only)
1109 1115 msg = self.session.send(self._query_socket, "result_request", content=content)
1110 1116 zmq.select([self._query_socket], [], [])
1111 1117 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1112 1118 if self.debug:
1113 1119 pprint(msg)
1114 1120 content = msg['content']
1115 1121 if content['status'] != 'ok':
1116 1122 raise ss.unwrap_exception(content)
1117 1123 buffers = msg['buffers']
1118 1124 else:
1119 1125 content = dict(completed=[],pending=[])
1120 1126
1121 1127 content['completed'].extend(completed)
1122 1128
1123 1129 if status_only:
1124 1130 return content
1125 1131
1126 1132 failures = []
1127 1133 # load cached results into result:
1128 1134 content.update(local_results)
1129 1135 # update cache with results:
1130 1136 for msg_id in sorted(theids):
1131 1137 if msg_id in content['completed']:
1132 1138 rec = content[msg_id]
1133 1139 parent = rec['header']
1134 1140 header = rec['result_header']
1135 1141 rcontent = rec['result_content']
1136 1142 iodict = rec['io']
1137 1143 if isinstance(rcontent, str):
1138 1144 rcontent = self.session.unpack(rcontent)
1139 1145
1140 1146 md = self.metadata.setdefault(msg_id, Metadata())
1141 1147 md.update(self._extract_metadata(header, parent, rcontent))
1142 1148 md.update(iodict)
1143 1149
1144 1150 if rcontent['status'] == 'ok':
1145 1151 res,buffers = ss.unserialize_object(buffers)
1146 1152 else:
1147 1153 res = ss.unwrap_exception(rcontent)
1148 1154 failures.append(res)
1149 1155
1150 1156 self.results[msg_id] = res
1151 1157 content[msg_id] = res
1152 1158
1153 1159 error.collect_exceptions(failures, "get_results")
1154 1160 return content
1155 1161
1156 1162 @spinfirst
1157 1163 def queue_status(self, targets=None, verbose=False):
1158 1164 """Fetch the status of engine queues.
1159 1165
1160 1166 Parameters
1161 1167 ----------
1162 1168 targets : int/str/list of ints/strs
1163 1169 the engines on which to execute
1164 1170 default : all
1165 1171 verbose : bool
1166 1172 Whether to return lengths only, or lists of ids for each element
1167 1173 """
1168 1174 targets = self._build_targets(targets)[1]
1169 1175 content = dict(targets=targets, verbose=verbose)
1170 1176 self.session.send(self._query_socket, "queue_request", content=content)
1171 1177 idents,msg = self.session.recv(self._query_socket, 0)
1172 1178 if self.debug:
1173 1179 pprint(msg)
1174 1180 content = msg['content']
1175 1181 status = content.pop('status')
1176 1182 if status != 'ok':
1177 1183 raise ss.unwrap_exception(content)
1178 1184 return ss.rekey(content)
1179 1185
1180 1186 @spinfirst
1181 1187 def purge_results(self, msg_ids=[], targets=[]):
1182 1188 """Tell the controller to forget results.
1183 1189
1184 1190 Individual results can be purged by msg_id, or the entire
1185 1191 history of specific targets can be purged.
1186 1192
1187 1193 Parameters
1188 1194 ----------
1189 1195 msg_ids : str or list of strs
1190 1196 the msg_ids whose results should be forgotten.
1191 1197 targets : int/str/list of ints/strs
1192 1198 The targets, by uuid or int_id, whose entire history is to be purged.
1193 1199 Use `targets='all'` to scrub everything from the controller's memory.
1194 1200
1195 1201 default : None
1196 1202 """
1197 1203 if not targets and not msg_ids:
1198 1204 raise ValueError
1199 1205 if targets:
1200 1206 targets = self._build_targets(targets)[1]
1201 1207 content = dict(targets=targets, msg_ids=msg_ids)
1202 1208 self.session.send(self._query_socket, "purge_request", content=content)
1203 1209 idents, msg = self.session.recv(self._query_socket, 0)
1204 1210 if self.debug:
1205 1211 pprint(msg)
1206 1212 content = msg['content']
1207 1213 if content['status'] != 'ok':
1208 1214 raise ss.unwrap_exception(content)
1209 1215
1210 1216 #----------------------------------------
1211 1217 # activate for %px,%autopx magics
1212 1218 #----------------------------------------
1213 1219 def activate(self):
1214 1220 """Make this `View` active for parallel magic commands.
1215 1221
1216 1222 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1217 1223 In a given IPython session there is a single active one. While
1218 1224 there can be many `Views` created and used by the user,
1219 1225 there is only one active one. The active `View` is used whenever
1220 1226 the magic commands %px and %autopx are used.
1221 1227
1222 1228 The activate() method is called on a given `View` to make it
1223 1229 active. Once this has been done, the magic commands can be used.
1224 1230 """
1225 1231
1226 1232 try:
1227 1233 # This is injected into __builtins__.
1228 1234 ip = get_ipython()
1229 1235 except NameError:
1230 1236 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1231 1237 else:
1232 1238 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1233 1239 if pmagic is not None:
1234 1240 pmagic.active_multiengine_client = self
1235 1241 else:
1236 1242 print "You must first load the parallelmagic extension " \
1237 1243 "by doing '%load_ext parallelmagic'"
1238 1244
1239 1245 class AsynClient(Client):
1240 1246 """An Asynchronous client, using the Tornado Event Loop.
1241 1247 !!!unfinished!!!"""
1242 1248 io_loop = None
1243 1249 _queue_stream = None
1244 1250 _notifier_stream = None
1245 1251 _task_stream = None
1246 1252 _control_stream = None
1247 1253
1248 1254 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1249 1255 Client.__init__(self, addr, context, username, debug)
1250 1256 if io_loop is None:
1251 1257 io_loop = ioloop.IOLoop.instance()
1252 1258 self.io_loop = io_loop
1253 1259
1254 1260 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1255 1261 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1256 1262 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1257 1263 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1258 1264
1259 1265 def spin(self):
1260 1266 for stream in (self.queue_stream, self.notifier_stream,
1261 1267 self.task_stream, self.control_stream):
1262 1268 stream.flush()
1263 1269
1264 1270 __all__ = [ 'Client',
1265 1271 'depend',
1266 1272 'require',
1267 1273 'remote',
1268 1274 'parallel',
1269 1275 'RemoteFunction',
1270 1276 'ParallelFunction',
1271 1277 'DirectView',
1272 1278 'LoadBalancedView',
1273 1279 'AsyncResult',
1274 1280 'AsyncMapResult'
1275 1281 ]
General Comments 0
You need to be logged in to leave comments. Login now