##// END OF EJS Templates
add DirectView.use_dill
MinRK -
Show More
@@ -1,1119 +1,1130 b''
1 """Views of remote engines.
1 """Views of remote engines.
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 from __future__ import print_function
7 from __future__ import print_function
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2010-2011 The IPython Development Team
9 # Copyright (C) 2010-2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 import imp
19 import imp
20 import sys
20 import sys
21 import warnings
21 import warnings
22 from contextlib import contextmanager
22 from contextlib import contextmanager
23 from types import ModuleType
23 from types import ModuleType
24
24
25 import zmq
25 import zmq
26
26
27 from IPython.testing.skipdoctest import skip_doctest
27 from IPython.testing.skipdoctest import skip_doctest
28 from IPython.utils import pickleutil
28 from IPython.utils.traitlets import (
29 from IPython.utils.traitlets import (
29 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
30 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
30 )
31 )
31 from IPython.external.decorator import decorator
32 from IPython.external.decorator import decorator
32
33
33 from IPython.parallel import util
34 from IPython.parallel import util
34 from IPython.parallel.controller.dependency import Dependency, dependent
35 from IPython.parallel.controller.dependency import Dependency, dependent
35 from IPython.utils.py3compat import string_types, iteritems, PY3
36 from IPython.utils.py3compat import string_types, iteritems, PY3
36
37
37 from . import map as Map
38 from . import map as Map
38 from .asyncresult import AsyncResult, AsyncMapResult
39 from .asyncresult import AsyncResult, AsyncMapResult
39 from .remotefunction import ParallelFunction, parallel, remote, getname
40 from .remotefunction import ParallelFunction, parallel, remote, getname
40
41
41 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
42 # Decorators
43 # Decorators
43 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
44
45
45 @decorator
46 @decorator
46 def save_ids(f, self, *args, **kwargs):
47 def save_ids(f, self, *args, **kwargs):
47 """Keep our history and outstanding attributes up to date after a method call."""
48 """Keep our history and outstanding attributes up to date after a method call."""
48 n_previous = len(self.client.history)
49 n_previous = len(self.client.history)
49 try:
50 try:
50 ret = f(self, *args, **kwargs)
51 ret = f(self, *args, **kwargs)
51 finally:
52 finally:
52 nmsgs = len(self.client.history) - n_previous
53 nmsgs = len(self.client.history) - n_previous
53 msg_ids = self.client.history[-nmsgs:]
54 msg_ids = self.client.history[-nmsgs:]
54 self.history.extend(msg_ids)
55 self.history.extend(msg_ids)
55 self.outstanding.update(msg_ids)
56 self.outstanding.update(msg_ids)
56 return ret
57 return ret
57
58
58 @decorator
59 @decorator
59 def sync_results(f, self, *args, **kwargs):
60 def sync_results(f, self, *args, **kwargs):
60 """sync relevant results from self.client to our results attribute."""
61 """sync relevant results from self.client to our results attribute."""
61 if self._in_sync_results:
62 if self._in_sync_results:
62 return f(self, *args, **kwargs)
63 return f(self, *args, **kwargs)
63 self._in_sync_results = True
64 self._in_sync_results = True
64 try:
65 try:
65 ret = f(self, *args, **kwargs)
66 ret = f(self, *args, **kwargs)
66 finally:
67 finally:
67 self._in_sync_results = False
68 self._in_sync_results = False
68 self._sync_results()
69 self._sync_results()
69 return ret
70 return ret
70
71
71 @decorator
72 @decorator
72 def spin_after(f, self, *args, **kwargs):
73 def spin_after(f, self, *args, **kwargs):
73 """call spin after the method."""
74 """call spin after the method."""
74 ret = f(self, *args, **kwargs)
75 ret = f(self, *args, **kwargs)
75 self.spin()
76 self.spin()
76 return ret
77 return ret
77
78
78 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
79 # Classes
80 # Classes
80 #-----------------------------------------------------------------------------
81 #-----------------------------------------------------------------------------
81
82
82 @skip_doctest
83 @skip_doctest
83 class View(HasTraits):
84 class View(HasTraits):
84 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
85 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
85
86
86 Don't use this class, use subclasses.
87 Don't use this class, use subclasses.
87
88
88 Methods
89 Methods
89 -------
90 -------
90
91
91 spin
92 spin
92 flushes incoming results and registration state changes
93 flushes incoming results and registration state changes
93 control methods spin, and requesting `ids` also ensures up to date
94 control methods spin, and requesting `ids` also ensures up to date
94
95
95 wait
96 wait
96 wait on one or more msg_ids
97 wait on one or more msg_ids
97
98
98 execution methods
99 execution methods
99 apply
100 apply
100 legacy: execute, run
101 legacy: execute, run
101
102
102 data movement
103 data movement
103 push, pull, scatter, gather
104 push, pull, scatter, gather
104
105
105 query methods
106 query methods
106 get_result, queue_status, purge_results, result_status
107 get_result, queue_status, purge_results, result_status
107
108
108 control methods
109 control methods
109 abort, shutdown
110 abort, shutdown
110
111
111 """
112 """
112 # flags
113 # flags
113 block=Bool(False)
114 block=Bool(False)
114 track=Bool(True)
115 track=Bool(True)
115 targets = Any()
116 targets = Any()
116
117
117 history=List()
118 history=List()
118 outstanding = Set()
119 outstanding = Set()
119 results = Dict()
120 results = Dict()
120 client = Instance('IPython.parallel.Client')
121 client = Instance('IPython.parallel.Client')
121
122
122 _socket = Instance('zmq.Socket')
123 _socket = Instance('zmq.Socket')
123 _flag_names = List(['targets', 'block', 'track'])
124 _flag_names = List(['targets', 'block', 'track'])
124 _in_sync_results = Bool(False)
125 _in_sync_results = Bool(False)
125 _targets = Any()
126 _targets = Any()
126 _idents = Any()
127 _idents = Any()
127
128
128 def __init__(self, client=None, socket=None, **flags):
129 def __init__(self, client=None, socket=None, **flags):
129 super(View, self).__init__(client=client, _socket=socket)
130 super(View, self).__init__(client=client, _socket=socket)
130 self.results = client.results
131 self.results = client.results
131 self.block = client.block
132 self.block = client.block
132
133
133 self.set_flags(**flags)
134 self.set_flags(**flags)
134
135
135 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
136 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
136
137
137 def __repr__(self):
138 def __repr__(self):
138 strtargets = str(self.targets)
139 strtargets = str(self.targets)
139 if len(strtargets) > 16:
140 if len(strtargets) > 16:
140 strtargets = strtargets[:12]+'...]'
141 strtargets = strtargets[:12]+'...]'
141 return "<%s %s>"%(self.__class__.__name__, strtargets)
142 return "<%s %s>"%(self.__class__.__name__, strtargets)
142
143
143 def __len__(self):
144 def __len__(self):
144 if isinstance(self.targets, list):
145 if isinstance(self.targets, list):
145 return len(self.targets)
146 return len(self.targets)
146 elif isinstance(self.targets, int):
147 elif isinstance(self.targets, int):
147 return 1
148 return 1
148 else:
149 else:
149 return len(self.client)
150 return len(self.client)
150
151
151 def set_flags(self, **kwargs):
152 def set_flags(self, **kwargs):
152 """set my attribute flags by keyword.
153 """set my attribute flags by keyword.
153
154
154 Views determine behavior with a few attributes (`block`, `track`, etc.).
155 Views determine behavior with a few attributes (`block`, `track`, etc.).
155 These attributes can be set all at once by name with this method.
156 These attributes can be set all at once by name with this method.
156
157
157 Parameters
158 Parameters
158 ----------
159 ----------
159
160
160 block : bool
161 block : bool
161 whether to wait for results
162 whether to wait for results
162 track : bool
163 track : bool
163 whether to create a MessageTracker to allow the user to
164 whether to create a MessageTracker to allow the user to
164 safely edit after arrays and buffers during non-copying
165 safely edit after arrays and buffers during non-copying
165 sends.
166 sends.
166 """
167 """
167 for name, value in iteritems(kwargs):
168 for name, value in iteritems(kwargs):
168 if name not in self._flag_names:
169 if name not in self._flag_names:
169 raise KeyError("Invalid name: %r"%name)
170 raise KeyError("Invalid name: %r"%name)
170 else:
171 else:
171 setattr(self, name, value)
172 setattr(self, name, value)
172
173
173 @contextmanager
174 @contextmanager
174 def temp_flags(self, **kwargs):
175 def temp_flags(self, **kwargs):
175 """temporarily set flags, for use in `with` statements.
176 """temporarily set flags, for use in `with` statements.
176
177
177 See set_flags for permanent setting of flags
178 See set_flags for permanent setting of flags
178
179
179 Examples
180 Examples
180 --------
181 --------
181
182
182 >>> view.track=False
183 >>> view.track=False
183 ...
184 ...
184 >>> with view.temp_flags(track=True):
185 >>> with view.temp_flags(track=True):
185 ... ar = view.apply(dostuff, my_big_array)
186 ... ar = view.apply(dostuff, my_big_array)
186 ... ar.tracker.wait() # wait for send to finish
187 ... ar.tracker.wait() # wait for send to finish
187 >>> view.track
188 >>> view.track
188 False
189 False
189
190
190 """
191 """
191 # preflight: save flags, and set temporaries
192 # preflight: save flags, and set temporaries
192 saved_flags = {}
193 saved_flags = {}
193 for f in self._flag_names:
194 for f in self._flag_names:
194 saved_flags[f] = getattr(self, f)
195 saved_flags[f] = getattr(self, f)
195 self.set_flags(**kwargs)
196 self.set_flags(**kwargs)
196 # yield to the with-statement block
197 # yield to the with-statement block
197 try:
198 try:
198 yield
199 yield
199 finally:
200 finally:
200 # postflight: restore saved flags
201 # postflight: restore saved flags
201 self.set_flags(**saved_flags)
202 self.set_flags(**saved_flags)
202
203
203
204
204 #----------------------------------------------------------------
205 #----------------------------------------------------------------
205 # apply
206 # apply
206 #----------------------------------------------------------------
207 #----------------------------------------------------------------
207
208
208 def _sync_results(self):
209 def _sync_results(self):
209 """to be called by @sync_results decorator
210 """to be called by @sync_results decorator
210
211
211 after submitting any tasks.
212 after submitting any tasks.
212 """
213 """
213 delta = self.outstanding.difference(self.client.outstanding)
214 delta = self.outstanding.difference(self.client.outstanding)
214 completed = self.outstanding.intersection(delta)
215 completed = self.outstanding.intersection(delta)
215 self.outstanding = self.outstanding.difference(completed)
216 self.outstanding = self.outstanding.difference(completed)
216
217
217 @sync_results
218 @sync_results
218 @save_ids
219 @save_ids
219 def _really_apply(self, f, args, kwargs, block=None, **options):
220 def _really_apply(self, f, args, kwargs, block=None, **options):
220 """wrapper for client.send_apply_request"""
221 """wrapper for client.send_apply_request"""
221 raise NotImplementedError("Implement in subclasses")
222 raise NotImplementedError("Implement in subclasses")
222
223
223 def apply(self, f, *args, **kwargs):
224 def apply(self, f, *args, **kwargs):
224 """calls f(*args, **kwargs) on remote engines, returning the result.
225 """calls f(*args, **kwargs) on remote engines, returning the result.
225
226
226 This method sets all apply flags via this View's attributes.
227 This method sets all apply flags via this View's attributes.
227
228
228 if self.block is False:
229 if self.block is False:
229 returns AsyncResult
230 returns AsyncResult
230 else:
231 else:
231 returns actual result of f(*args, **kwargs)
232 returns actual result of f(*args, **kwargs)
232 """
233 """
233 return self._really_apply(f, args, kwargs)
234 return self._really_apply(f, args, kwargs)
234
235
235 def apply_async(self, f, *args, **kwargs):
236 def apply_async(self, f, *args, **kwargs):
236 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
237 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
237
238
238 returns AsyncResult
239 returns AsyncResult
239 """
240 """
240 return self._really_apply(f, args, kwargs, block=False)
241 return self._really_apply(f, args, kwargs, block=False)
241
242
242 @spin_after
243 @spin_after
243 def apply_sync(self, f, *args, **kwargs):
244 def apply_sync(self, f, *args, **kwargs):
244 """calls f(*args, **kwargs) on remote engines in a blocking manner,
245 """calls f(*args, **kwargs) on remote engines in a blocking manner,
245 returning the result.
246 returning the result.
246
247
247 returns: actual result of f(*args, **kwargs)
248 returns: actual result of f(*args, **kwargs)
248 """
249 """
249 return self._really_apply(f, args, kwargs, block=True)
250 return self._really_apply(f, args, kwargs, block=True)
250
251
251 #----------------------------------------------------------------
252 #----------------------------------------------------------------
252 # wrappers for client and control methods
253 # wrappers for client and control methods
253 #----------------------------------------------------------------
254 #----------------------------------------------------------------
254 @sync_results
255 @sync_results
255 def spin(self):
256 def spin(self):
256 """spin the client, and sync"""
257 """spin the client, and sync"""
257 self.client.spin()
258 self.client.spin()
258
259
259 @sync_results
260 @sync_results
260 def wait(self, jobs=None, timeout=-1):
261 def wait(self, jobs=None, timeout=-1):
261 """waits on one or more `jobs`, for up to `timeout` seconds.
262 """waits on one or more `jobs`, for up to `timeout` seconds.
262
263
263 Parameters
264 Parameters
264 ----------
265 ----------
265
266
266 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
267 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
267 ints are indices to self.history
268 ints are indices to self.history
268 strs are msg_ids
269 strs are msg_ids
269 default: wait on all outstanding messages
270 default: wait on all outstanding messages
270 timeout : float
271 timeout : float
271 a time in seconds, after which to give up.
272 a time in seconds, after which to give up.
272 default is -1, which means no timeout
273 default is -1, which means no timeout
273
274
274 Returns
275 Returns
275 -------
276 -------
276
277
277 True : when all msg_ids are done
278 True : when all msg_ids are done
278 False : timeout reached, some msg_ids still outstanding
279 False : timeout reached, some msg_ids still outstanding
279 """
280 """
280 if jobs is None:
281 if jobs is None:
281 jobs = self.history
282 jobs = self.history
282 return self.client.wait(jobs, timeout)
283 return self.client.wait(jobs, timeout)
283
284
284 def abort(self, jobs=None, targets=None, block=None):
285 def abort(self, jobs=None, targets=None, block=None):
285 """Abort jobs on my engines.
286 """Abort jobs on my engines.
286
287
287 Parameters
288 Parameters
288 ----------
289 ----------
289
290
290 jobs : None, str, list of strs, optional
291 jobs : None, str, list of strs, optional
291 if None: abort all jobs.
292 if None: abort all jobs.
292 else: abort specific msg_id(s).
293 else: abort specific msg_id(s).
293 """
294 """
294 block = block if block is not None else self.block
295 block = block if block is not None else self.block
295 targets = targets if targets is not None else self.targets
296 targets = targets if targets is not None else self.targets
296 jobs = jobs if jobs is not None else list(self.outstanding)
297 jobs = jobs if jobs is not None else list(self.outstanding)
297
298
298 return self.client.abort(jobs=jobs, targets=targets, block=block)
299 return self.client.abort(jobs=jobs, targets=targets, block=block)
299
300
300 def queue_status(self, targets=None, verbose=False):
301 def queue_status(self, targets=None, verbose=False):
301 """Fetch the Queue status of my engines"""
302 """Fetch the Queue status of my engines"""
302 targets = targets if targets is not None else self.targets
303 targets = targets if targets is not None else self.targets
303 return self.client.queue_status(targets=targets, verbose=verbose)
304 return self.client.queue_status(targets=targets, verbose=verbose)
304
305
305 def purge_results(self, jobs=[], targets=[]):
306 def purge_results(self, jobs=[], targets=[]):
306 """Instruct the controller to forget specific results."""
307 """Instruct the controller to forget specific results."""
307 if targets is None or targets == 'all':
308 if targets is None or targets == 'all':
308 targets = self.targets
309 targets = self.targets
309 return self.client.purge_results(jobs=jobs, targets=targets)
310 return self.client.purge_results(jobs=jobs, targets=targets)
310
311
311 def shutdown(self, targets=None, restart=False, hub=False, block=None):
312 def shutdown(self, targets=None, restart=False, hub=False, block=None):
312 """Terminates one or more engine processes, optionally including the hub.
313 """Terminates one or more engine processes, optionally including the hub.
313 """
314 """
314 block = self.block if block is None else block
315 block = self.block if block is None else block
315 if targets is None or targets == 'all':
316 if targets is None or targets == 'all':
316 targets = self.targets
317 targets = self.targets
317 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
318 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
318
319
319 @spin_after
320 @spin_after
320 def get_result(self, indices_or_msg_ids=None):
321 def get_result(self, indices_or_msg_ids=None):
321 """return one or more results, specified by history index or msg_id.
322 """return one or more results, specified by history index or msg_id.
322
323
323 See client.get_result for details.
324 See client.get_result for details.
324
325
325 """
326 """
326
327
327 if indices_or_msg_ids is None:
328 if indices_or_msg_ids is None:
328 indices_or_msg_ids = -1
329 indices_or_msg_ids = -1
329 if isinstance(indices_or_msg_ids, int):
330 if isinstance(indices_or_msg_ids, int):
330 indices_or_msg_ids = self.history[indices_or_msg_ids]
331 indices_or_msg_ids = self.history[indices_or_msg_ids]
331 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
332 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
332 indices_or_msg_ids = list(indices_or_msg_ids)
333 indices_or_msg_ids = list(indices_or_msg_ids)
333 for i,index in enumerate(indices_or_msg_ids):
334 for i,index in enumerate(indices_or_msg_ids):
334 if isinstance(index, int):
335 if isinstance(index, int):
335 indices_or_msg_ids[i] = self.history[index]
336 indices_or_msg_ids[i] = self.history[index]
336 return self.client.get_result(indices_or_msg_ids)
337 return self.client.get_result(indices_or_msg_ids)
337
338
338 #-------------------------------------------------------------------
339 #-------------------------------------------------------------------
339 # Map
340 # Map
340 #-------------------------------------------------------------------
341 #-------------------------------------------------------------------
341
342
342 @sync_results
343 @sync_results
343 def map(self, f, *sequences, **kwargs):
344 def map(self, f, *sequences, **kwargs):
344 """override in subclasses"""
345 """override in subclasses"""
345 raise NotImplementedError
346 raise NotImplementedError
346
347
347 def map_async(self, f, *sequences, **kwargs):
348 def map_async(self, f, *sequences, **kwargs):
348 """Parallel version of builtin `map`, using this view's engines.
349 """Parallel version of builtin `map`, using this view's engines.
349
350
350 This is equivalent to map(...block=False)
351 This is equivalent to map(...block=False)
351
352
352 See `self.map` for details.
353 See `self.map` for details.
353 """
354 """
354 if 'block' in kwargs:
355 if 'block' in kwargs:
355 raise TypeError("map_async doesn't take a `block` keyword argument.")
356 raise TypeError("map_async doesn't take a `block` keyword argument.")
356 kwargs['block'] = False
357 kwargs['block'] = False
357 return self.map(f,*sequences,**kwargs)
358 return self.map(f,*sequences,**kwargs)
358
359
359 def map_sync(self, f, *sequences, **kwargs):
360 def map_sync(self, f, *sequences, **kwargs):
360 """Parallel version of builtin `map`, using this view's engines.
361 """Parallel version of builtin `map`, using this view's engines.
361
362
362 This is equivalent to map(...block=True)
363 This is equivalent to map(...block=True)
363
364
364 See `self.map` for details.
365 See `self.map` for details.
365 """
366 """
366 if 'block' in kwargs:
367 if 'block' in kwargs:
367 raise TypeError("map_sync doesn't take a `block` keyword argument.")
368 raise TypeError("map_sync doesn't take a `block` keyword argument.")
368 kwargs['block'] = True
369 kwargs['block'] = True
369 return self.map(f,*sequences,**kwargs)
370 return self.map(f,*sequences,**kwargs)
370
371
371 def imap(self, f, *sequences, **kwargs):
372 def imap(self, f, *sequences, **kwargs):
372 """Parallel version of `itertools.imap`.
373 """Parallel version of `itertools.imap`.
373
374
374 See `self.map` for details.
375 See `self.map` for details.
375
376
376 """
377 """
377
378
378 return iter(self.map_async(f,*sequences, **kwargs))
379 return iter(self.map_async(f,*sequences, **kwargs))
379
380
380 #-------------------------------------------------------------------
381 #-------------------------------------------------------------------
381 # Decorators
382 # Decorators
382 #-------------------------------------------------------------------
383 #-------------------------------------------------------------------
383
384
384 def remote(self, block=None, **flags):
385 def remote(self, block=None, **flags):
385 """Decorator for making a RemoteFunction"""
386 """Decorator for making a RemoteFunction"""
386 block = self.block if block is None else block
387 block = self.block if block is None else block
387 return remote(self, block=block, **flags)
388 return remote(self, block=block, **flags)
388
389
389 def parallel(self, dist='b', block=None, **flags):
390 def parallel(self, dist='b', block=None, **flags):
390 """Decorator for making a ParallelFunction"""
391 """Decorator for making a ParallelFunction"""
391 block = self.block if block is None else block
392 block = self.block if block is None else block
392 return parallel(self, dist=dist, block=block, **flags)
393 return parallel(self, dist=dist, block=block, **flags)
393
394
394 @skip_doctest
395 @skip_doctest
395 class DirectView(View):
396 class DirectView(View):
396 """Direct Multiplexer View of one or more engines.
397 """Direct Multiplexer View of one or more engines.
397
398
398 These are created via indexed access to a client:
399 These are created via indexed access to a client:
399
400
400 >>> dv_1 = client[1]
401 >>> dv_1 = client[1]
401 >>> dv_all = client[:]
402 >>> dv_all = client[:]
402 >>> dv_even = client[::2]
403 >>> dv_even = client[::2]
403 >>> dv_some = client[1:3]
404 >>> dv_some = client[1:3]
404
405
405 This object provides dictionary access to engine namespaces:
406 This object provides dictionary access to engine namespaces:
406
407
407 # push a=5:
408 # push a=5:
408 >>> dv['a'] = 5
409 >>> dv['a'] = 5
409 # pull 'foo':
410 # pull 'foo':
410 >>> db['foo']
411 >>> db['foo']
411
412
412 """
413 """
413
414
414 def __init__(self, client=None, socket=None, targets=None):
415 def __init__(self, client=None, socket=None, targets=None):
415 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
416 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
416
417
417 @property
418 @property
418 def importer(self):
419 def importer(self):
419 """sync_imports(local=True) as a property.
420 """sync_imports(local=True) as a property.
420
421
421 See sync_imports for details.
422 See sync_imports for details.
422
423
423 """
424 """
424 return self.sync_imports(True)
425 return self.sync_imports(True)
425
426
426 @contextmanager
427 @contextmanager
427 def sync_imports(self, local=True, quiet=False):
428 def sync_imports(self, local=True, quiet=False):
428 """Context Manager for performing simultaneous local and remote imports.
429 """Context Manager for performing simultaneous local and remote imports.
429
430
430 'import x as y' will *not* work. The 'as y' part will simply be ignored.
431 'import x as y' will *not* work. The 'as y' part will simply be ignored.
431
432
432 If `local=True`, then the package will also be imported locally.
433 If `local=True`, then the package will also be imported locally.
433
434
434 If `quiet=True`, no output will be produced when attempting remote
435 If `quiet=True`, no output will be produced when attempting remote
435 imports.
436 imports.
436
437
437 Note that remote-only (`local=False`) imports have not been implemented.
438 Note that remote-only (`local=False`) imports have not been implemented.
438
439
439 >>> with view.sync_imports():
440 >>> with view.sync_imports():
440 ... from numpy import recarray
441 ... from numpy import recarray
441 importing recarray from numpy on engine(s)
442 importing recarray from numpy on engine(s)
442
443
443 """
444 """
444 from IPython.utils.py3compat import builtin_mod
445 from IPython.utils.py3compat import builtin_mod
445 local_import = builtin_mod.__import__
446 local_import = builtin_mod.__import__
446 modules = set()
447 modules = set()
447 results = []
448 results = []
448 @util.interactive
449 @util.interactive
449 def remote_import(name, fromlist, level):
450 def remote_import(name, fromlist, level):
450 """the function to be passed to apply, that actually performs the import
451 """the function to be passed to apply, that actually performs the import
451 on the engine, and loads up the user namespace.
452 on the engine, and loads up the user namespace.
452 """
453 """
453 import sys
454 import sys
454 user_ns = globals()
455 user_ns = globals()
455 mod = __import__(name, fromlist=fromlist, level=level)
456 mod = __import__(name, fromlist=fromlist, level=level)
456 if fromlist:
457 if fromlist:
457 for key in fromlist:
458 for key in fromlist:
458 user_ns[key] = getattr(mod, key)
459 user_ns[key] = getattr(mod, key)
459 else:
460 else:
460 user_ns[name] = sys.modules[name]
461 user_ns[name] = sys.modules[name]
461
462
462 def view_import(name, globals={}, locals={}, fromlist=[], level=0):
463 def view_import(name, globals={}, locals={}, fromlist=[], level=0):
463 """the drop-in replacement for __import__, that optionally imports
464 """the drop-in replacement for __import__, that optionally imports
464 locally as well.
465 locally as well.
465 """
466 """
466 # don't override nested imports
467 # don't override nested imports
467 save_import = builtin_mod.__import__
468 save_import = builtin_mod.__import__
468 builtin_mod.__import__ = local_import
469 builtin_mod.__import__ = local_import
469
470
470 if imp.lock_held():
471 if imp.lock_held():
471 # this is a side-effect import, don't do it remotely, or even
472 # this is a side-effect import, don't do it remotely, or even
472 # ignore the local effects
473 # ignore the local effects
473 return local_import(name, globals, locals, fromlist, level)
474 return local_import(name, globals, locals, fromlist, level)
474
475
475 imp.acquire_lock()
476 imp.acquire_lock()
476 if local:
477 if local:
477 mod = local_import(name, globals, locals, fromlist, level)
478 mod = local_import(name, globals, locals, fromlist, level)
478 else:
479 else:
479 raise NotImplementedError("remote-only imports not yet implemented")
480 raise NotImplementedError("remote-only imports not yet implemented")
480 imp.release_lock()
481 imp.release_lock()
481
482
482 key = name+':'+','.join(fromlist or [])
483 key = name+':'+','.join(fromlist or [])
483 if level <= 0 and key not in modules:
484 if level <= 0 and key not in modules:
484 modules.add(key)
485 modules.add(key)
485 if not quiet:
486 if not quiet:
486 if fromlist:
487 if fromlist:
487 print("importing %s from %s on engine(s)"%(','.join(fromlist), name))
488 print("importing %s from %s on engine(s)"%(','.join(fromlist), name))
488 else:
489 else:
489 print("importing %s on engine(s)"%name)
490 print("importing %s on engine(s)"%name)
490 results.append(self.apply_async(remote_import, name, fromlist, level))
491 results.append(self.apply_async(remote_import, name, fromlist, level))
491 # restore override
492 # restore override
492 builtin_mod.__import__ = save_import
493 builtin_mod.__import__ = save_import
493
494
494 return mod
495 return mod
495
496
496 # override __import__
497 # override __import__
497 builtin_mod.__import__ = view_import
498 builtin_mod.__import__ = view_import
498 try:
499 try:
499 # enter the block
500 # enter the block
500 yield
501 yield
501 except ImportError:
502 except ImportError:
502 if local:
503 if local:
503 raise
504 raise
504 else:
505 else:
505 # ignore import errors if not doing local imports
506 # ignore import errors if not doing local imports
506 pass
507 pass
507 finally:
508 finally:
508 # always restore __import__
509 # always restore __import__
509 builtin_mod.__import__ = local_import
510 builtin_mod.__import__ = local_import
510
511
511 for r in results:
512 for r in results:
512 # raise possible remote ImportErrors here
513 # raise possible remote ImportErrors here
513 r.get()
514 r.get()
514
515
516 def use_dill(self):
517 """Expand serialization support with dill
518
519 adds support for closures, etc.
520
521 This calls IPython.utils.pickleutil.use_dill() here and on each engine.
522 """
523 pickleutil.use_dill()
524 return self.apply(pickleutil.use_dill)
525
515
526
516 @sync_results
527 @sync_results
517 @save_ids
528 @save_ids
518 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
529 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
519 """calls f(*args, **kwargs) on remote engines, returning the result.
530 """calls f(*args, **kwargs) on remote engines, returning the result.
520
531
521 This method sets all of `apply`'s flags via this View's attributes.
532 This method sets all of `apply`'s flags via this View's attributes.
522
533
523 Parameters
534 Parameters
524 ----------
535 ----------
525
536
526 f : callable
537 f : callable
527
538
528 args : list [default: empty]
539 args : list [default: empty]
529
540
530 kwargs : dict [default: empty]
541 kwargs : dict [default: empty]
531
542
532 targets : target list [default: self.targets]
543 targets : target list [default: self.targets]
533 where to run
544 where to run
534 block : bool [default: self.block]
545 block : bool [default: self.block]
535 whether to block
546 whether to block
536 track : bool [default: self.track]
547 track : bool [default: self.track]
537 whether to ask zmq to track the message, for safe non-copying sends
548 whether to ask zmq to track the message, for safe non-copying sends
538
549
539 Returns
550 Returns
540 -------
551 -------
541
552
542 if self.block is False:
553 if self.block is False:
543 returns AsyncResult
554 returns AsyncResult
544 else:
555 else:
545 returns actual result of f(*args, **kwargs) on the engine(s)
556 returns actual result of f(*args, **kwargs) on the engine(s)
546 This will be a list of self.targets is also a list (even length 1), or
557 This will be a list of self.targets is also a list (even length 1), or
547 the single result if self.targets is an integer engine id
558 the single result if self.targets is an integer engine id
548 """
559 """
549 args = [] if args is None else args
560 args = [] if args is None else args
550 kwargs = {} if kwargs is None else kwargs
561 kwargs = {} if kwargs is None else kwargs
551 block = self.block if block is None else block
562 block = self.block if block is None else block
552 track = self.track if track is None else track
563 track = self.track if track is None else track
553 targets = self.targets if targets is None else targets
564 targets = self.targets if targets is None else targets
554
565
555 _idents, _targets = self.client._build_targets(targets)
566 _idents, _targets = self.client._build_targets(targets)
556 msg_ids = []
567 msg_ids = []
557 trackers = []
568 trackers = []
558 for ident in _idents:
569 for ident in _idents:
559 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
570 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
560 ident=ident)
571 ident=ident)
561 if track:
572 if track:
562 trackers.append(msg['tracker'])
573 trackers.append(msg['tracker'])
563 msg_ids.append(msg['header']['msg_id'])
574 msg_ids.append(msg['header']['msg_id'])
564 if isinstance(targets, int):
575 if isinstance(targets, int):
565 msg_ids = msg_ids[0]
576 msg_ids = msg_ids[0]
566 tracker = None if track is False else zmq.MessageTracker(*trackers)
577 tracker = None if track is False else zmq.MessageTracker(*trackers)
567 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets, tracker=tracker)
578 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets, tracker=tracker)
568 if block:
579 if block:
569 try:
580 try:
570 return ar.get()
581 return ar.get()
571 except KeyboardInterrupt:
582 except KeyboardInterrupt:
572 pass
583 pass
573 return ar
584 return ar
574
585
575
586
576 @sync_results
587 @sync_results
577 def map(self, f, *sequences, **kwargs):
588 def map(self, f, *sequences, **kwargs):
578 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
589 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
579
590
580 Parallel version of builtin `map`, using this View's `targets`.
591 Parallel version of builtin `map`, using this View's `targets`.
581
592
582 There will be one task per target, so work will be chunked
593 There will be one task per target, so work will be chunked
583 if the sequences are longer than `targets`.
594 if the sequences are longer than `targets`.
584
595
585 Results can be iterated as they are ready, but will become available in chunks.
596 Results can be iterated as they are ready, but will become available in chunks.
586
597
587 Parameters
598 Parameters
588 ----------
599 ----------
589
600
590 f : callable
601 f : callable
591 function to be mapped
602 function to be mapped
592 *sequences: one or more sequences of matching length
603 *sequences: one or more sequences of matching length
593 the sequences to be distributed and passed to `f`
604 the sequences to be distributed and passed to `f`
594 block : bool
605 block : bool
595 whether to wait for the result or not [default self.block]
606 whether to wait for the result or not [default self.block]
596
607
597 Returns
608 Returns
598 -------
609 -------
599
610
600 if block=False:
611 if block=False:
601 AsyncMapResult
612 AsyncMapResult
602 An object like AsyncResult, but which reassembles the sequence of results
613 An object like AsyncResult, but which reassembles the sequence of results
603 into a single list. AsyncMapResults can be iterated through before all
614 into a single list. AsyncMapResults can be iterated through before all
604 results are complete.
615 results are complete.
605 else:
616 else:
606 list
617 list
607 the result of map(f,*sequences)
618 the result of map(f,*sequences)
608 """
619 """
609
620
610 block = kwargs.pop('block', self.block)
621 block = kwargs.pop('block', self.block)
611 for k in kwargs.keys():
622 for k in kwargs.keys():
612 if k not in ['block', 'track']:
623 if k not in ['block', 'track']:
613 raise TypeError("invalid keyword arg, %r"%k)
624 raise TypeError("invalid keyword arg, %r"%k)
614
625
615 assert len(sequences) > 0, "must have some sequences to map onto!"
626 assert len(sequences) > 0, "must have some sequences to map onto!"
616 pf = ParallelFunction(self, f, block=block, **kwargs)
627 pf = ParallelFunction(self, f, block=block, **kwargs)
617 return pf.map(*sequences)
628 return pf.map(*sequences)
618
629
619 @sync_results
630 @sync_results
620 @save_ids
631 @save_ids
621 def execute(self, code, silent=True, targets=None, block=None):
632 def execute(self, code, silent=True, targets=None, block=None):
622 """Executes `code` on `targets` in blocking or nonblocking manner.
633 """Executes `code` on `targets` in blocking or nonblocking manner.
623
634
624 ``execute`` is always `bound` (affects engine namespace)
635 ``execute`` is always `bound` (affects engine namespace)
625
636
626 Parameters
637 Parameters
627 ----------
638 ----------
628
639
629 code : str
640 code : str
630 the code string to be executed
641 the code string to be executed
631 block : bool
642 block : bool
632 whether or not to wait until done to return
643 whether or not to wait until done to return
633 default: self.block
644 default: self.block
634 """
645 """
635 block = self.block if block is None else block
646 block = self.block if block is None else block
636 targets = self.targets if targets is None else targets
647 targets = self.targets if targets is None else targets
637
648
638 _idents, _targets = self.client._build_targets(targets)
649 _idents, _targets = self.client._build_targets(targets)
639 msg_ids = []
650 msg_ids = []
640 trackers = []
651 trackers = []
641 for ident in _idents:
652 for ident in _idents:
642 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
653 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
643 msg_ids.append(msg['header']['msg_id'])
654 msg_ids.append(msg['header']['msg_id'])
644 if isinstance(targets, int):
655 if isinstance(targets, int):
645 msg_ids = msg_ids[0]
656 msg_ids = msg_ids[0]
646 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets)
657 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets)
647 if block:
658 if block:
648 try:
659 try:
649 ar.get()
660 ar.get()
650 except KeyboardInterrupt:
661 except KeyboardInterrupt:
651 pass
662 pass
652 return ar
663 return ar
653
664
654 def run(self, filename, targets=None, block=None):
665 def run(self, filename, targets=None, block=None):
655 """Execute contents of `filename` on my engine(s).
666 """Execute contents of `filename` on my engine(s).
656
667
657 This simply reads the contents of the file and calls `execute`.
668 This simply reads the contents of the file and calls `execute`.
658
669
659 Parameters
670 Parameters
660 ----------
671 ----------
661
672
662 filename : str
673 filename : str
663 The path to the file
674 The path to the file
664 targets : int/str/list of ints/strs
675 targets : int/str/list of ints/strs
665 the engines on which to execute
676 the engines on which to execute
666 default : all
677 default : all
667 block : bool
678 block : bool
668 whether or not to wait until done
679 whether or not to wait until done
669 default: self.block
680 default: self.block
670
681
671 """
682 """
672 with open(filename, 'r') as f:
683 with open(filename, 'r') as f:
673 # add newline in case of trailing indented whitespace
684 # add newline in case of trailing indented whitespace
674 # which will cause SyntaxError
685 # which will cause SyntaxError
675 code = f.read()+'\n'
686 code = f.read()+'\n'
676 return self.execute(code, block=block, targets=targets)
687 return self.execute(code, block=block, targets=targets)
677
688
678 def update(self, ns):
689 def update(self, ns):
679 """update remote namespace with dict `ns`
690 """update remote namespace with dict `ns`
680
691
681 See `push` for details.
692 See `push` for details.
682 """
693 """
683 return self.push(ns, block=self.block, track=self.track)
694 return self.push(ns, block=self.block, track=self.track)
684
695
685 def push(self, ns, targets=None, block=None, track=None):
696 def push(self, ns, targets=None, block=None, track=None):
686 """update remote namespace with dict `ns`
697 """update remote namespace with dict `ns`
687
698
688 Parameters
699 Parameters
689 ----------
700 ----------
690
701
691 ns : dict
702 ns : dict
692 dict of keys with which to update engine namespace(s)
703 dict of keys with which to update engine namespace(s)
693 block : bool [default : self.block]
704 block : bool [default : self.block]
694 whether to wait to be notified of engine receipt
705 whether to wait to be notified of engine receipt
695
706
696 """
707 """
697
708
698 block = block if block is not None else self.block
709 block = block if block is not None else self.block
699 track = track if track is not None else self.track
710 track = track if track is not None else self.track
700 targets = targets if targets is not None else self.targets
711 targets = targets if targets is not None else self.targets
701 # applier = self.apply_sync if block else self.apply_async
712 # applier = self.apply_sync if block else self.apply_async
702 if not isinstance(ns, dict):
713 if not isinstance(ns, dict):
703 raise TypeError("Must be a dict, not %s"%type(ns))
714 raise TypeError("Must be a dict, not %s"%type(ns))
704 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
715 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
705
716
706 def get(self, key_s):
717 def get(self, key_s):
707 """get object(s) by `key_s` from remote namespace
718 """get object(s) by `key_s` from remote namespace
708
719
709 see `pull` for details.
720 see `pull` for details.
710 """
721 """
711 # block = block if block is not None else self.block
722 # block = block if block is not None else self.block
712 return self.pull(key_s, block=True)
723 return self.pull(key_s, block=True)
713
724
714 def pull(self, names, targets=None, block=None):
725 def pull(self, names, targets=None, block=None):
715 """get object(s) by `name` from remote namespace
726 """get object(s) by `name` from remote namespace
716
727
717 will return one object if it is a key.
728 will return one object if it is a key.
718 can also take a list of keys, in which case it will return a list of objects.
729 can also take a list of keys, in which case it will return a list of objects.
719 """
730 """
720 block = block if block is not None else self.block
731 block = block if block is not None else self.block
721 targets = targets if targets is not None else self.targets
732 targets = targets if targets is not None else self.targets
722 applier = self.apply_sync if block else self.apply_async
733 applier = self.apply_sync if block else self.apply_async
723 if isinstance(names, string_types):
734 if isinstance(names, string_types):
724 pass
735 pass
725 elif isinstance(names, (list,tuple,set)):
736 elif isinstance(names, (list,tuple,set)):
726 for key in names:
737 for key in names:
727 if not isinstance(key, string_types):
738 if not isinstance(key, string_types):
728 raise TypeError("keys must be str, not type %r"%type(key))
739 raise TypeError("keys must be str, not type %r"%type(key))
729 else:
740 else:
730 raise TypeError("names must be strs, not %r"%names)
741 raise TypeError("names must be strs, not %r"%names)
731 return self._really_apply(util._pull, (names,), block=block, targets=targets)
742 return self._really_apply(util._pull, (names,), block=block, targets=targets)
732
743
733 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
744 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
734 """
745 """
735 Partition a Python sequence and send the partitions to a set of engines.
746 Partition a Python sequence and send the partitions to a set of engines.
736 """
747 """
737 block = block if block is not None else self.block
748 block = block if block is not None else self.block
738 track = track if track is not None else self.track
749 track = track if track is not None else self.track
739 targets = targets if targets is not None else self.targets
750 targets = targets if targets is not None else self.targets
740
751
741 # construct integer ID list:
752 # construct integer ID list:
742 targets = self.client._build_targets(targets)[1]
753 targets = self.client._build_targets(targets)[1]
743
754
744 mapObject = Map.dists[dist]()
755 mapObject = Map.dists[dist]()
745 nparts = len(targets)
756 nparts = len(targets)
746 msg_ids = []
757 msg_ids = []
747 trackers = []
758 trackers = []
748 for index, engineid in enumerate(targets):
759 for index, engineid in enumerate(targets):
749 partition = mapObject.getPartition(seq, index, nparts)
760 partition = mapObject.getPartition(seq, index, nparts)
750 if flatten and len(partition) == 1:
761 if flatten and len(partition) == 1:
751 ns = {key: partition[0]}
762 ns = {key: partition[0]}
752 else:
763 else:
753 ns = {key: partition}
764 ns = {key: partition}
754 r = self.push(ns, block=False, track=track, targets=engineid)
765 r = self.push(ns, block=False, track=track, targets=engineid)
755 msg_ids.extend(r.msg_ids)
766 msg_ids.extend(r.msg_ids)
756 if track:
767 if track:
757 trackers.append(r._tracker)
768 trackers.append(r._tracker)
758
769
759 if track:
770 if track:
760 tracker = zmq.MessageTracker(*trackers)
771 tracker = zmq.MessageTracker(*trackers)
761 else:
772 else:
762 tracker = None
773 tracker = None
763
774
764 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
775 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
765 if block:
776 if block:
766 r.wait()
777 r.wait()
767 else:
778 else:
768 return r
779 return r
769
780
770 @sync_results
781 @sync_results
771 @save_ids
782 @save_ids
772 def gather(self, key, dist='b', targets=None, block=None):
783 def gather(self, key, dist='b', targets=None, block=None):
773 """
784 """
774 Gather a partitioned sequence on a set of engines as a single local seq.
785 Gather a partitioned sequence on a set of engines as a single local seq.
775 """
786 """
776 block = block if block is not None else self.block
787 block = block if block is not None else self.block
777 targets = targets if targets is not None else self.targets
788 targets = targets if targets is not None else self.targets
778 mapObject = Map.dists[dist]()
789 mapObject = Map.dists[dist]()
779 msg_ids = []
790 msg_ids = []
780
791
781 # construct integer ID list:
792 # construct integer ID list:
782 targets = self.client._build_targets(targets)[1]
793 targets = self.client._build_targets(targets)[1]
783
794
784 for index, engineid in enumerate(targets):
795 for index, engineid in enumerate(targets):
785 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
796 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
786
797
787 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
798 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
788
799
789 if block:
800 if block:
790 try:
801 try:
791 return r.get()
802 return r.get()
792 except KeyboardInterrupt:
803 except KeyboardInterrupt:
793 pass
804 pass
794 return r
805 return r
795
806
796 def __getitem__(self, key):
807 def __getitem__(self, key):
797 return self.get(key)
808 return self.get(key)
798
809
799 def __setitem__(self,key, value):
810 def __setitem__(self,key, value):
800 self.update({key:value})
811 self.update({key:value})
801
812
802 def clear(self, targets=None, block=None):
813 def clear(self, targets=None, block=None):
803 """Clear the remote namespaces on my engines."""
814 """Clear the remote namespaces on my engines."""
804 block = block if block is not None else self.block
815 block = block if block is not None else self.block
805 targets = targets if targets is not None else self.targets
816 targets = targets if targets is not None else self.targets
806 return self.client.clear(targets=targets, block=block)
817 return self.client.clear(targets=targets, block=block)
807
818
808 #----------------------------------------
819 #----------------------------------------
809 # activate for %px, %autopx, etc. magics
820 # activate for %px, %autopx, etc. magics
810 #----------------------------------------
821 #----------------------------------------
811
822
812 def activate(self, suffix=''):
823 def activate(self, suffix=''):
813 """Activate IPython magics associated with this View
824 """Activate IPython magics associated with this View
814
825
815 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
826 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
816
827
817 Parameters
828 Parameters
818 ----------
829 ----------
819
830
820 suffix: str [default: '']
831 suffix: str [default: '']
821 The suffix, if any, for the magics. This allows you to have
832 The suffix, if any, for the magics. This allows you to have
822 multiple views associated with parallel magics at the same time.
833 multiple views associated with parallel magics at the same time.
823
834
824 e.g. ``rc[::2].activate(suffix='_even')`` will give you
835 e.g. ``rc[::2].activate(suffix='_even')`` will give you
825 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
836 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
826 on the even engines.
837 on the even engines.
827 """
838 """
828
839
829 from IPython.parallel.client.magics import ParallelMagics
840 from IPython.parallel.client.magics import ParallelMagics
830
841
831 try:
842 try:
832 # This is injected into __builtins__.
843 # This is injected into __builtins__.
833 ip = get_ipython()
844 ip = get_ipython()
834 except NameError:
845 except NameError:
835 print("The IPython parallel magics (%px, etc.) only work within IPython.")
846 print("The IPython parallel magics (%px, etc.) only work within IPython.")
836 return
847 return
837
848
838 M = ParallelMagics(ip, self, suffix)
849 M = ParallelMagics(ip, self, suffix)
839 ip.magics_manager.register(M)
850 ip.magics_manager.register(M)
840
851
841
852
842 @skip_doctest
853 @skip_doctest
843 class LoadBalancedView(View):
854 class LoadBalancedView(View):
844 """An load-balancing View that only executes via the Task scheduler.
855 """An load-balancing View that only executes via the Task scheduler.
845
856
846 Load-balanced views can be created with the client's `view` method:
857 Load-balanced views can be created with the client's `view` method:
847
858
848 >>> v = client.load_balanced_view()
859 >>> v = client.load_balanced_view()
849
860
850 or targets can be specified, to restrict the potential destinations:
861 or targets can be specified, to restrict the potential destinations:
851
862
852 >>> v = client.client.load_balanced_view([1,3])
863 >>> v = client.client.load_balanced_view([1,3])
853
864
854 which would restrict loadbalancing to between engines 1 and 3.
865 which would restrict loadbalancing to between engines 1 and 3.
855
866
856 """
867 """
857
868
858 follow=Any()
869 follow=Any()
859 after=Any()
870 after=Any()
860 timeout=CFloat()
871 timeout=CFloat()
861 retries = Integer(0)
872 retries = Integer(0)
862
873
863 _task_scheme = Any()
874 _task_scheme = Any()
864 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
875 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
865
876
866 def __init__(self, client=None, socket=None, **flags):
877 def __init__(self, client=None, socket=None, **flags):
867 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
878 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
868 self._task_scheme=client._task_scheme
879 self._task_scheme=client._task_scheme
869
880
870 def _validate_dependency(self, dep):
881 def _validate_dependency(self, dep):
871 """validate a dependency.
882 """validate a dependency.
872
883
873 For use in `set_flags`.
884 For use in `set_flags`.
874 """
885 """
875 if dep is None or isinstance(dep, string_types + (AsyncResult, Dependency)):
886 if dep is None or isinstance(dep, string_types + (AsyncResult, Dependency)):
876 return True
887 return True
877 elif isinstance(dep, (list,set, tuple)):
888 elif isinstance(dep, (list,set, tuple)):
878 for d in dep:
889 for d in dep:
879 if not isinstance(d, string_types + (AsyncResult,)):
890 if not isinstance(d, string_types + (AsyncResult,)):
880 return False
891 return False
881 elif isinstance(dep, dict):
892 elif isinstance(dep, dict):
882 if set(dep.keys()) != set(Dependency().as_dict().keys()):
893 if set(dep.keys()) != set(Dependency().as_dict().keys()):
883 return False
894 return False
884 if not isinstance(dep['msg_ids'], list):
895 if not isinstance(dep['msg_ids'], list):
885 return False
896 return False
886 for d in dep['msg_ids']:
897 for d in dep['msg_ids']:
887 if not isinstance(d, string_types):
898 if not isinstance(d, string_types):
888 return False
899 return False
889 else:
900 else:
890 return False
901 return False
891
902
892 return True
903 return True
893
904
894 def _render_dependency(self, dep):
905 def _render_dependency(self, dep):
895 """helper for building jsonable dependencies from various input forms."""
906 """helper for building jsonable dependencies from various input forms."""
896 if isinstance(dep, Dependency):
907 if isinstance(dep, Dependency):
897 return dep.as_dict()
908 return dep.as_dict()
898 elif isinstance(dep, AsyncResult):
909 elif isinstance(dep, AsyncResult):
899 return dep.msg_ids
910 return dep.msg_ids
900 elif dep is None:
911 elif dep is None:
901 return []
912 return []
902 else:
913 else:
903 # pass to Dependency constructor
914 # pass to Dependency constructor
904 return list(Dependency(dep))
915 return list(Dependency(dep))
905
916
906 def set_flags(self, **kwargs):
917 def set_flags(self, **kwargs):
907 """set my attribute flags by keyword.
918 """set my attribute flags by keyword.
908
919
909 A View is a wrapper for the Client's apply method, but with attributes
920 A View is a wrapper for the Client's apply method, but with attributes
910 that specify keyword arguments, those attributes can be set by keyword
921 that specify keyword arguments, those attributes can be set by keyword
911 argument with this method.
922 argument with this method.
912
923
913 Parameters
924 Parameters
914 ----------
925 ----------
915
926
916 block : bool
927 block : bool
917 whether to wait for results
928 whether to wait for results
918 track : bool
929 track : bool
919 whether to create a MessageTracker to allow the user to
930 whether to create a MessageTracker to allow the user to
920 safely edit after arrays and buffers during non-copying
931 safely edit after arrays and buffers during non-copying
921 sends.
932 sends.
922
933
923 after : Dependency or collection of msg_ids
934 after : Dependency or collection of msg_ids
924 Only for load-balanced execution (targets=None)
935 Only for load-balanced execution (targets=None)
925 Specify a list of msg_ids as a time-based dependency.
936 Specify a list of msg_ids as a time-based dependency.
926 This job will only be run *after* the dependencies
937 This job will only be run *after* the dependencies
927 have been met.
938 have been met.
928
939
929 follow : Dependency or collection of msg_ids
940 follow : Dependency or collection of msg_ids
930 Only for load-balanced execution (targets=None)
941 Only for load-balanced execution (targets=None)
931 Specify a list of msg_ids as a location-based dependency.
942 Specify a list of msg_ids as a location-based dependency.
932 This job will only be run on an engine where this dependency
943 This job will only be run on an engine where this dependency
933 is met.
944 is met.
934
945
935 timeout : float/int or None
946 timeout : float/int or None
936 Only for load-balanced execution (targets=None)
947 Only for load-balanced execution (targets=None)
937 Specify an amount of time (in seconds) for the scheduler to
948 Specify an amount of time (in seconds) for the scheduler to
938 wait for dependencies to be met before failing with a
949 wait for dependencies to be met before failing with a
939 DependencyTimeout.
950 DependencyTimeout.
940
951
941 retries : int
952 retries : int
942 Number of times a task will be retried on failure.
953 Number of times a task will be retried on failure.
943 """
954 """
944
955
945 super(LoadBalancedView, self).set_flags(**kwargs)
956 super(LoadBalancedView, self).set_flags(**kwargs)
946 for name in ('follow', 'after'):
957 for name in ('follow', 'after'):
947 if name in kwargs:
958 if name in kwargs:
948 value = kwargs[name]
959 value = kwargs[name]
949 if self._validate_dependency(value):
960 if self._validate_dependency(value):
950 setattr(self, name, value)
961 setattr(self, name, value)
951 else:
962 else:
952 raise ValueError("Invalid dependency: %r"%value)
963 raise ValueError("Invalid dependency: %r"%value)
953 if 'timeout' in kwargs:
964 if 'timeout' in kwargs:
954 t = kwargs['timeout']
965 t = kwargs['timeout']
955 if not isinstance(t, (int, float, type(None))):
966 if not isinstance(t, (int, float, type(None))):
956 if (not PY3) and (not isinstance(t, long)):
967 if (not PY3) and (not isinstance(t, long)):
957 raise TypeError("Invalid type for timeout: %r"%type(t))
968 raise TypeError("Invalid type for timeout: %r"%type(t))
958 if t is not None:
969 if t is not None:
959 if t < 0:
970 if t < 0:
960 raise ValueError("Invalid timeout: %s"%t)
971 raise ValueError("Invalid timeout: %s"%t)
961 self.timeout = t
972 self.timeout = t
962
973
963 @sync_results
974 @sync_results
964 @save_ids
975 @save_ids
965 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
976 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
966 after=None, follow=None, timeout=None,
977 after=None, follow=None, timeout=None,
967 targets=None, retries=None):
978 targets=None, retries=None):
968 """calls f(*args, **kwargs) on a remote engine, returning the result.
979 """calls f(*args, **kwargs) on a remote engine, returning the result.
969
980
970 This method temporarily sets all of `apply`'s flags for a single call.
981 This method temporarily sets all of `apply`'s flags for a single call.
971
982
972 Parameters
983 Parameters
973 ----------
984 ----------
974
985
975 f : callable
986 f : callable
976
987
977 args : list [default: empty]
988 args : list [default: empty]
978
989
979 kwargs : dict [default: empty]
990 kwargs : dict [default: empty]
980
991
981 block : bool [default: self.block]
992 block : bool [default: self.block]
982 whether to block
993 whether to block
983 track : bool [default: self.track]
994 track : bool [default: self.track]
984 whether to ask zmq to track the message, for safe non-copying sends
995 whether to ask zmq to track the message, for safe non-copying sends
985
996
986 !!!!!! TODO: THE REST HERE !!!!
997 !!!!!! TODO: THE REST HERE !!!!
987
998
988 Returns
999 Returns
989 -------
1000 -------
990
1001
991 if self.block is False:
1002 if self.block is False:
992 returns AsyncResult
1003 returns AsyncResult
993 else:
1004 else:
994 returns actual result of f(*args, **kwargs) on the engine(s)
1005 returns actual result of f(*args, **kwargs) on the engine(s)
995 This will be a list of self.targets is also a list (even length 1), or
1006 This will be a list of self.targets is also a list (even length 1), or
996 the single result if self.targets is an integer engine id
1007 the single result if self.targets is an integer engine id
997 """
1008 """
998
1009
999 # validate whether we can run
1010 # validate whether we can run
1000 if self._socket.closed:
1011 if self._socket.closed:
1001 msg = "Task farming is disabled"
1012 msg = "Task farming is disabled"
1002 if self._task_scheme == 'pure':
1013 if self._task_scheme == 'pure':
1003 msg += " because the pure ZMQ scheduler cannot handle"
1014 msg += " because the pure ZMQ scheduler cannot handle"
1004 msg += " disappearing engines."
1015 msg += " disappearing engines."
1005 raise RuntimeError(msg)
1016 raise RuntimeError(msg)
1006
1017
1007 if self._task_scheme == 'pure':
1018 if self._task_scheme == 'pure':
1008 # pure zmq scheme doesn't support extra features
1019 # pure zmq scheme doesn't support extra features
1009 msg = "Pure ZMQ scheduler doesn't support the following flags:"
1020 msg = "Pure ZMQ scheduler doesn't support the following flags:"
1010 "follow, after, retries, targets, timeout"
1021 "follow, after, retries, targets, timeout"
1011 if (follow or after or retries or targets or timeout):
1022 if (follow or after or retries or targets or timeout):
1012 # hard fail on Scheduler flags
1023 # hard fail on Scheduler flags
1013 raise RuntimeError(msg)
1024 raise RuntimeError(msg)
1014 if isinstance(f, dependent):
1025 if isinstance(f, dependent):
1015 # soft warn on functional dependencies
1026 # soft warn on functional dependencies
1016 warnings.warn(msg, RuntimeWarning)
1027 warnings.warn(msg, RuntimeWarning)
1017
1028
1018 # build args
1029 # build args
1019 args = [] if args is None else args
1030 args = [] if args is None else args
1020 kwargs = {} if kwargs is None else kwargs
1031 kwargs = {} if kwargs is None else kwargs
1021 block = self.block if block is None else block
1032 block = self.block if block is None else block
1022 track = self.track if track is None else track
1033 track = self.track if track is None else track
1023 after = self.after if after is None else after
1034 after = self.after if after is None else after
1024 retries = self.retries if retries is None else retries
1035 retries = self.retries if retries is None else retries
1025 follow = self.follow if follow is None else follow
1036 follow = self.follow if follow is None else follow
1026 timeout = self.timeout if timeout is None else timeout
1037 timeout = self.timeout if timeout is None else timeout
1027 targets = self.targets if targets is None else targets
1038 targets = self.targets if targets is None else targets
1028
1039
1029 if not isinstance(retries, int):
1040 if not isinstance(retries, int):
1030 raise TypeError('retries must be int, not %r'%type(retries))
1041 raise TypeError('retries must be int, not %r'%type(retries))
1031
1042
1032 if targets is None:
1043 if targets is None:
1033 idents = []
1044 idents = []
1034 else:
1045 else:
1035 idents = self.client._build_targets(targets)[0]
1046 idents = self.client._build_targets(targets)[0]
1036 # ensure *not* bytes
1047 # ensure *not* bytes
1037 idents = [ ident.decode() for ident in idents ]
1048 idents = [ ident.decode() for ident in idents ]
1038
1049
1039 after = self._render_dependency(after)
1050 after = self._render_dependency(after)
1040 follow = self._render_dependency(follow)
1051 follow = self._render_dependency(follow)
1041 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1052 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1042
1053
1043 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1054 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1044 metadata=metadata)
1055 metadata=metadata)
1045 tracker = None if track is False else msg['tracker']
1056 tracker = None if track is False else msg['tracker']
1046
1057
1047 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1058 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1048
1059
1049 if block:
1060 if block:
1050 try:
1061 try:
1051 return ar.get()
1062 return ar.get()
1052 except KeyboardInterrupt:
1063 except KeyboardInterrupt:
1053 pass
1064 pass
1054 return ar
1065 return ar
1055
1066
1056 @sync_results
1067 @sync_results
1057 @save_ids
1068 @save_ids
1058 def map(self, f, *sequences, **kwargs):
1069 def map(self, f, *sequences, **kwargs):
1059 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1070 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1060
1071
1061 Parallel version of builtin `map`, load-balanced by this View.
1072 Parallel version of builtin `map`, load-balanced by this View.
1062
1073
1063 `block`, and `chunksize` can be specified by keyword only.
1074 `block`, and `chunksize` can be specified by keyword only.
1064
1075
1065 Each `chunksize` elements will be a separate task, and will be
1076 Each `chunksize` elements will be a separate task, and will be
1066 load-balanced. This lets individual elements be available for iteration
1077 load-balanced. This lets individual elements be available for iteration
1067 as soon as they arrive.
1078 as soon as they arrive.
1068
1079
1069 Parameters
1080 Parameters
1070 ----------
1081 ----------
1071
1082
1072 f : callable
1083 f : callable
1073 function to be mapped
1084 function to be mapped
1074 *sequences: one or more sequences of matching length
1085 *sequences: one or more sequences of matching length
1075 the sequences to be distributed and passed to `f`
1086 the sequences to be distributed and passed to `f`
1076 block : bool [default self.block]
1087 block : bool [default self.block]
1077 whether to wait for the result or not
1088 whether to wait for the result or not
1078 track : bool
1089 track : bool
1079 whether to create a MessageTracker to allow the user to
1090 whether to create a MessageTracker to allow the user to
1080 safely edit after arrays and buffers during non-copying
1091 safely edit after arrays and buffers during non-copying
1081 sends.
1092 sends.
1082 chunksize : int [default 1]
1093 chunksize : int [default 1]
1083 how many elements should be in each task.
1094 how many elements should be in each task.
1084 ordered : bool [default True]
1095 ordered : bool [default True]
1085 Whether the results should be gathered as they arrive, or enforce
1096 Whether the results should be gathered as they arrive, or enforce
1086 the order of submission.
1097 the order of submission.
1087
1098
1088 Only applies when iterating through AsyncMapResult as results arrive.
1099 Only applies when iterating through AsyncMapResult as results arrive.
1089 Has no effect when block=True.
1100 Has no effect when block=True.
1090
1101
1091 Returns
1102 Returns
1092 -------
1103 -------
1093
1104
1094 if block=False:
1105 if block=False:
1095 AsyncMapResult
1106 AsyncMapResult
1096 An object like AsyncResult, but which reassembles the sequence of results
1107 An object like AsyncResult, but which reassembles the sequence of results
1097 into a single list. AsyncMapResults can be iterated through before all
1108 into a single list. AsyncMapResults can be iterated through before all
1098 results are complete.
1109 results are complete.
1099 else:
1110 else:
1100 the result of map(f,*sequences)
1111 the result of map(f,*sequences)
1101
1112
1102 """
1113 """
1103
1114
1104 # default
1115 # default
1105 block = kwargs.get('block', self.block)
1116 block = kwargs.get('block', self.block)
1106 chunksize = kwargs.get('chunksize', 1)
1117 chunksize = kwargs.get('chunksize', 1)
1107 ordered = kwargs.get('ordered', True)
1118 ordered = kwargs.get('ordered', True)
1108
1119
1109 keyset = set(kwargs.keys())
1120 keyset = set(kwargs.keys())
1110 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1121 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1111 if extra_keys:
1122 if extra_keys:
1112 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1123 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1113
1124
1114 assert len(sequences) > 0, "must have some sequences to map onto!"
1125 assert len(sequences) > 0, "must have some sequences to map onto!"
1115
1126
1116 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1127 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1117 return pf.map(*sequences)
1128 return pf.map(*sequences)
1118
1129
1119 __all__ = ['LoadBalancedView', 'DirectView']
1130 __all__ = ['LoadBalancedView', 'DirectView']
General Comments 0
You need to be logged in to leave comments. Login now