##// END OF EJS Templates
Merge pull request #8060 from yuvallanger/patch-1...
Min RK -
r20716:d729e736 merge
parent child Browse files
Show More
@@ -1,1125 +1,1125 b''
1 """Views of remote engines."""
1 """Views of remote engines."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import print_function
6 from __future__ import print_function
7
7
8 import imp
8 import imp
9 import sys
9 import sys
10 import warnings
10 import warnings
11 from contextlib import contextmanager
11 from contextlib import contextmanager
12 from types import ModuleType
12 from types import ModuleType
13
13
14 import zmq
14 import zmq
15
15
16 from IPython.testing.skipdoctest import skip_doctest
16 from IPython.testing.skipdoctest import skip_doctest
17 from IPython.utils import pickleutil
17 from IPython.utils import pickleutil
18 from IPython.utils.traitlets import (
18 from IPython.utils.traitlets import (
19 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
19 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
20 )
20 )
21 from IPython.external.decorator import decorator
21 from IPython.external.decorator import decorator
22
22
23 from IPython.parallel import util
23 from IPython.parallel import util
24 from IPython.parallel.controller.dependency import Dependency, dependent
24 from IPython.parallel.controller.dependency import Dependency, dependent
25 from IPython.utils.py3compat import string_types, iteritems, PY3
25 from IPython.utils.py3compat import string_types, iteritems, PY3
26
26
27 from . import map as Map
27 from . import map as Map
28 from .asyncresult import AsyncResult, AsyncMapResult
28 from .asyncresult import AsyncResult, AsyncMapResult
29 from .remotefunction import ParallelFunction, parallel, remote, getname
29 from .remotefunction import ParallelFunction, parallel, remote, getname
30
30
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32 # Decorators
32 # Decorators
33 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
34
34
35 @decorator
35 @decorator
36 def save_ids(f, self, *args, **kwargs):
36 def save_ids(f, self, *args, **kwargs):
37 """Keep our history and outstanding attributes up to date after a method call."""
37 """Keep our history and outstanding attributes up to date after a method call."""
38 n_previous = len(self.client.history)
38 n_previous = len(self.client.history)
39 try:
39 try:
40 ret = f(self, *args, **kwargs)
40 ret = f(self, *args, **kwargs)
41 finally:
41 finally:
42 nmsgs = len(self.client.history) - n_previous
42 nmsgs = len(self.client.history) - n_previous
43 msg_ids = self.client.history[-nmsgs:]
43 msg_ids = self.client.history[-nmsgs:]
44 self.history.extend(msg_ids)
44 self.history.extend(msg_ids)
45 self.outstanding.update(msg_ids)
45 self.outstanding.update(msg_ids)
46 return ret
46 return ret
47
47
48 @decorator
48 @decorator
49 def sync_results(f, self, *args, **kwargs):
49 def sync_results(f, self, *args, **kwargs):
50 """sync relevant results from self.client to our results attribute."""
50 """sync relevant results from self.client to our results attribute."""
51 if self._in_sync_results:
51 if self._in_sync_results:
52 return f(self, *args, **kwargs)
52 return f(self, *args, **kwargs)
53 self._in_sync_results = True
53 self._in_sync_results = True
54 try:
54 try:
55 ret = f(self, *args, **kwargs)
55 ret = f(self, *args, **kwargs)
56 finally:
56 finally:
57 self._in_sync_results = False
57 self._in_sync_results = False
58 self._sync_results()
58 self._sync_results()
59 return ret
59 return ret
60
60
61 @decorator
61 @decorator
62 def spin_after(f, self, *args, **kwargs):
62 def spin_after(f, self, *args, **kwargs):
63 """call spin after the method."""
63 """call spin after the method."""
64 ret = f(self, *args, **kwargs)
64 ret = f(self, *args, **kwargs)
65 self.spin()
65 self.spin()
66 return ret
66 return ret
67
67
68 #-----------------------------------------------------------------------------
68 #-----------------------------------------------------------------------------
69 # Classes
69 # Classes
70 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
71
71
72 @skip_doctest
72 @skip_doctest
73 class View(HasTraits):
73 class View(HasTraits):
74 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
74 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
75
75
76 Don't use this class, use subclasses.
76 Don't use this class, use subclasses.
77
77
78 Methods
78 Methods
79 -------
79 -------
80
80
81 spin
81 spin
82 flushes incoming results and registration state changes
82 flushes incoming results and registration state changes
83 control methods spin, and requesting `ids` also ensures up to date
83 control methods spin, and requesting `ids` also ensures up to date
84
84
85 wait
85 wait
86 wait on one or more msg_ids
86 wait on one or more msg_ids
87
87
88 execution methods
88 execution methods
89 apply
89 apply
90 legacy: execute, run
90 legacy: execute, run
91
91
92 data movement
92 data movement
93 push, pull, scatter, gather
93 push, pull, scatter, gather
94
94
95 query methods
95 query methods
96 get_result, queue_status, purge_results, result_status
96 get_result, queue_status, purge_results, result_status
97
97
98 control methods
98 control methods
99 abort, shutdown
99 abort, shutdown
100
100
101 """
101 """
102 # flags
102 # flags
103 block=Bool(False)
103 block=Bool(False)
104 track=Bool(True)
104 track=Bool(True)
105 targets = Any()
105 targets = Any()
106
106
107 history=List()
107 history=List()
108 outstanding = Set()
108 outstanding = Set()
109 results = Dict()
109 results = Dict()
110 client = Instance('IPython.parallel.Client')
110 client = Instance('IPython.parallel.Client')
111
111
112 _socket = Instance('zmq.Socket')
112 _socket = Instance('zmq.Socket')
113 _flag_names = List(['targets', 'block', 'track'])
113 _flag_names = List(['targets', 'block', 'track'])
114 _in_sync_results = Bool(False)
114 _in_sync_results = Bool(False)
115 _targets = Any()
115 _targets = Any()
116 _idents = Any()
116 _idents = Any()
117
117
118 def __init__(self, client=None, socket=None, **flags):
118 def __init__(self, client=None, socket=None, **flags):
119 super(View, self).__init__(client=client, _socket=socket)
119 super(View, self).__init__(client=client, _socket=socket)
120 self.results = client.results
120 self.results = client.results
121 self.block = client.block
121 self.block = client.block
122
122
123 self.set_flags(**flags)
123 self.set_flags(**flags)
124
124
125 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
125 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
126
126
127 def __repr__(self):
127 def __repr__(self):
128 strtargets = str(self.targets)
128 strtargets = str(self.targets)
129 if len(strtargets) > 16:
129 if len(strtargets) > 16:
130 strtargets = strtargets[:12]+'...]'
130 strtargets = strtargets[:12]+'...]'
131 return "<%s %s>"%(self.__class__.__name__, strtargets)
131 return "<%s %s>"%(self.__class__.__name__, strtargets)
132
132
133 def __len__(self):
133 def __len__(self):
134 if isinstance(self.targets, list):
134 if isinstance(self.targets, list):
135 return len(self.targets)
135 return len(self.targets)
136 elif isinstance(self.targets, int):
136 elif isinstance(self.targets, int):
137 return 1
137 return 1
138 else:
138 else:
139 return len(self.client)
139 return len(self.client)
140
140
141 def set_flags(self, **kwargs):
141 def set_flags(self, **kwargs):
142 """set my attribute flags by keyword.
142 """set my attribute flags by keyword.
143
143
144 Views determine behavior with a few attributes (`block`, `track`, etc.).
144 Views determine behavior with a few attributes (`block`, `track`, etc.).
145 These attributes can be set all at once by name with this method.
145 These attributes can be set all at once by name with this method.
146
146
147 Parameters
147 Parameters
148 ----------
148 ----------
149
149
150 block : bool
150 block : bool
151 whether to wait for results
151 whether to wait for results
152 track : bool
152 track : bool
153 whether to create a MessageTracker to allow the user to
153 whether to create a MessageTracker to allow the user to
154 safely edit after arrays and buffers during non-copying
154 safely edit after arrays and buffers during non-copying
155 sends.
155 sends.
156 """
156 """
157 for name, value in iteritems(kwargs):
157 for name, value in iteritems(kwargs):
158 if name not in self._flag_names:
158 if name not in self._flag_names:
159 raise KeyError("Invalid name: %r"%name)
159 raise KeyError("Invalid name: %r"%name)
160 else:
160 else:
161 setattr(self, name, value)
161 setattr(self, name, value)
162
162
163 @contextmanager
163 @contextmanager
164 def temp_flags(self, **kwargs):
164 def temp_flags(self, **kwargs):
165 """temporarily set flags, for use in `with` statements.
165 """temporarily set flags, for use in `with` statements.
166
166
167 See set_flags for permanent setting of flags
167 See set_flags for permanent setting of flags
168
168
169 Examples
169 Examples
170 --------
170 --------
171
171
172 >>> view.track=False
172 >>> view.track=False
173 ...
173 ...
174 >>> with view.temp_flags(track=True):
174 >>> with view.temp_flags(track=True):
175 ... ar = view.apply(dostuff, my_big_array)
175 ... ar = view.apply(dostuff, my_big_array)
176 ... ar.tracker.wait() # wait for send to finish
176 ... ar.tracker.wait() # wait for send to finish
177 >>> view.track
177 >>> view.track
178 False
178 False
179
179
180 """
180 """
181 # preflight: save flags, and set temporaries
181 # preflight: save flags, and set temporaries
182 saved_flags = {}
182 saved_flags = {}
183 for f in self._flag_names:
183 for f in self._flag_names:
184 saved_flags[f] = getattr(self, f)
184 saved_flags[f] = getattr(self, f)
185 self.set_flags(**kwargs)
185 self.set_flags(**kwargs)
186 # yield to the with-statement block
186 # yield to the with-statement block
187 try:
187 try:
188 yield
188 yield
189 finally:
189 finally:
190 # postflight: restore saved flags
190 # postflight: restore saved flags
191 self.set_flags(**saved_flags)
191 self.set_flags(**saved_flags)
192
192
193
193
194 #----------------------------------------------------------------
194 #----------------------------------------------------------------
195 # apply
195 # apply
196 #----------------------------------------------------------------
196 #----------------------------------------------------------------
197
197
198 def _sync_results(self):
198 def _sync_results(self):
199 """to be called by @sync_results decorator
199 """to be called by @sync_results decorator
200
200
201 after submitting any tasks.
201 after submitting any tasks.
202 """
202 """
203 delta = self.outstanding.difference(self.client.outstanding)
203 delta = self.outstanding.difference(self.client.outstanding)
204 completed = self.outstanding.intersection(delta)
204 completed = self.outstanding.intersection(delta)
205 self.outstanding = self.outstanding.difference(completed)
205 self.outstanding = self.outstanding.difference(completed)
206
206
207 @sync_results
207 @sync_results
208 @save_ids
208 @save_ids
209 def _really_apply(self, f, args, kwargs, block=None, **options):
209 def _really_apply(self, f, args, kwargs, block=None, **options):
210 """wrapper for client.send_apply_request"""
210 """wrapper for client.send_apply_request"""
211 raise NotImplementedError("Implement in subclasses")
211 raise NotImplementedError("Implement in subclasses")
212
212
213 def apply(self, f, *args, **kwargs):
213 def apply(self, f, *args, **kwargs):
214 """calls ``f(*args, **kwargs)`` on remote engines, returning the result.
214 """calls ``f(*args, **kwargs)`` on remote engines, returning the result.
215
215
216 This method sets all apply flags via this View's attributes.
216 This method sets all apply flags via this View's attributes.
217
217
218 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult`
218 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult`
219 instance if ``self.block`` is False, otherwise the return value of
219 instance if ``self.block`` is False, otherwise the return value of
220 ``f(*args, **kwargs)``.
220 ``f(*args, **kwargs)``.
221 """
221 """
222 return self._really_apply(f, args, kwargs)
222 return self._really_apply(f, args, kwargs)
223
223
224 def apply_async(self, f, *args, **kwargs):
224 def apply_async(self, f, *args, **kwargs):
225 """calls ``f(*args, **kwargs)`` on remote engines in a nonblocking manner.
225 """calls ``f(*args, **kwargs)`` on remote engines in a nonblocking manner.
226
226
227 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult` instance.
227 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult` instance.
228 """
228 """
229 return self._really_apply(f, args, kwargs, block=False)
229 return self._really_apply(f, args, kwargs, block=False)
230
230
231 @spin_after
231 @spin_after
232 def apply_sync(self, f, *args, **kwargs):
232 def apply_sync(self, f, *args, **kwargs):
233 """calls ``f(*args, **kwargs)`` on remote engines in a blocking manner,
233 """calls ``f(*args, **kwargs)`` on remote engines in a blocking manner,
234 returning the result.
234 returning the result.
235 """
235 """
236 return self._really_apply(f, args, kwargs, block=True)
236 return self._really_apply(f, args, kwargs, block=True)
237
237
238 #----------------------------------------------------------------
238 #----------------------------------------------------------------
239 # wrappers for client and control methods
239 # wrappers for client and control methods
240 #----------------------------------------------------------------
240 #----------------------------------------------------------------
241 @sync_results
241 @sync_results
242 def spin(self):
242 def spin(self):
243 """spin the client, and sync"""
243 """spin the client, and sync"""
244 self.client.spin()
244 self.client.spin()
245
245
246 @sync_results
246 @sync_results
247 def wait(self, jobs=None, timeout=-1):
247 def wait(self, jobs=None, timeout=-1):
248 """waits on one or more `jobs`, for up to `timeout` seconds.
248 """waits on one or more `jobs`, for up to `timeout` seconds.
249
249
250 Parameters
250 Parameters
251 ----------
251 ----------
252
252
253 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
253 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
254 ints are indices to self.history
254 ints are indices to self.history
255 strs are msg_ids
255 strs are msg_ids
256 default: wait on all outstanding messages
256 default: wait on all outstanding messages
257 timeout : float
257 timeout : float
258 a time in seconds, after which to give up.
258 a time in seconds, after which to give up.
259 default is -1, which means no timeout
259 default is -1, which means no timeout
260
260
261 Returns
261 Returns
262 -------
262 -------
263
263
264 True : when all msg_ids are done
264 True : when all msg_ids are done
265 False : timeout reached, some msg_ids still outstanding
265 False : timeout reached, some msg_ids still outstanding
266 """
266 """
267 if jobs is None:
267 if jobs is None:
268 jobs = self.history
268 jobs = self.history
269 return self.client.wait(jobs, timeout)
269 return self.client.wait(jobs, timeout)
270
270
271 def abort(self, jobs=None, targets=None, block=None):
271 def abort(self, jobs=None, targets=None, block=None):
272 """Abort jobs on my engines.
272 """Abort jobs on my engines.
273
273
274 Parameters
274 Parameters
275 ----------
275 ----------
276
276
277 jobs : None, str, list of strs, optional
277 jobs : None, str, list of strs, optional
278 if None: abort all jobs.
278 if None: abort all jobs.
279 else: abort specific msg_id(s).
279 else: abort specific msg_id(s).
280 """
280 """
281 block = block if block is not None else self.block
281 block = block if block is not None else self.block
282 targets = targets if targets is not None else self.targets
282 targets = targets if targets is not None else self.targets
283 jobs = jobs if jobs is not None else list(self.outstanding)
283 jobs = jobs if jobs is not None else list(self.outstanding)
284
284
285 return self.client.abort(jobs=jobs, targets=targets, block=block)
285 return self.client.abort(jobs=jobs, targets=targets, block=block)
286
286
287 def queue_status(self, targets=None, verbose=False):
287 def queue_status(self, targets=None, verbose=False):
288 """Fetch the Queue status of my engines"""
288 """Fetch the Queue status of my engines"""
289 targets = targets if targets is not None else self.targets
289 targets = targets if targets is not None else self.targets
290 return self.client.queue_status(targets=targets, verbose=verbose)
290 return self.client.queue_status(targets=targets, verbose=verbose)
291
291
292 def purge_results(self, jobs=[], targets=[]):
292 def purge_results(self, jobs=[], targets=[]):
293 """Instruct the controller to forget specific results."""
293 """Instruct the controller to forget specific results."""
294 if targets is None or targets == 'all':
294 if targets is None or targets == 'all':
295 targets = self.targets
295 targets = self.targets
296 return self.client.purge_results(jobs=jobs, targets=targets)
296 return self.client.purge_results(jobs=jobs, targets=targets)
297
297
298 def shutdown(self, targets=None, restart=False, hub=False, block=None):
298 def shutdown(self, targets=None, restart=False, hub=False, block=None):
299 """Terminates one or more engine processes, optionally including the hub.
299 """Terminates one or more engine processes, optionally including the hub.
300 """
300 """
301 block = self.block if block is None else block
301 block = self.block if block is None else block
302 if targets is None or targets == 'all':
302 if targets is None or targets == 'all':
303 targets = self.targets
303 targets = self.targets
304 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
304 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
305
305
306 @spin_after
306 @spin_after
307 def get_result(self, indices_or_msg_ids=None, block=None, owner=True):
307 def get_result(self, indices_or_msg_ids=None, block=None, owner=True):
308 """return one or more results, specified by history index or msg_id.
308 """return one or more results, specified by history index or msg_id.
309
309
310 See :meth:`IPython.parallel.client.client.Client.get_result` for details.
310 See :meth:`IPython.parallel.client.client.Client.get_result` for details.
311 """
311 """
312
312
313 if indices_or_msg_ids is None:
313 if indices_or_msg_ids is None:
314 indices_or_msg_ids = -1
314 indices_or_msg_ids = -1
315 if isinstance(indices_or_msg_ids, int):
315 if isinstance(indices_or_msg_ids, int):
316 indices_or_msg_ids = self.history[indices_or_msg_ids]
316 indices_or_msg_ids = self.history[indices_or_msg_ids]
317 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
317 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
318 indices_or_msg_ids = list(indices_or_msg_ids)
318 indices_or_msg_ids = list(indices_or_msg_ids)
319 for i,index in enumerate(indices_or_msg_ids):
319 for i,index in enumerate(indices_or_msg_ids):
320 if isinstance(index, int):
320 if isinstance(index, int):
321 indices_or_msg_ids[i] = self.history[index]
321 indices_or_msg_ids[i] = self.history[index]
322 return self.client.get_result(indices_or_msg_ids, block=block, owner=owner)
322 return self.client.get_result(indices_or_msg_ids, block=block, owner=owner)
323
323
324 #-------------------------------------------------------------------
324 #-------------------------------------------------------------------
325 # Map
325 # Map
326 #-------------------------------------------------------------------
326 #-------------------------------------------------------------------
327
327
328 @sync_results
328 @sync_results
329 def map(self, f, *sequences, **kwargs):
329 def map(self, f, *sequences, **kwargs):
330 """override in subclasses"""
330 """override in subclasses"""
331 raise NotImplementedError
331 raise NotImplementedError
332
332
333 def map_async(self, f, *sequences, **kwargs):
333 def map_async(self, f, *sequences, **kwargs):
334 """Parallel version of builtin :func:`python:map`, using this view's engines.
334 """Parallel version of builtin :func:`python:map`, using this view's engines.
335
335
336 This is equivalent to ``map(...block=False)``.
336 This is equivalent to ``map(...block=False)``.
337
337
338 See `self.map` for details.
338 See `self.map` for details.
339 """
339 """
340 if 'block' in kwargs:
340 if 'block' in kwargs:
341 raise TypeError("map_async doesn't take a `block` keyword argument.")
341 raise TypeError("map_async doesn't take a `block` keyword argument.")
342 kwargs['block'] = False
342 kwargs['block'] = False
343 return self.map(f,*sequences,**kwargs)
343 return self.map(f,*sequences,**kwargs)
344
344
345 def map_sync(self, f, *sequences, **kwargs):
345 def map_sync(self, f, *sequences, **kwargs):
346 """Parallel version of builtin :func:`python:map`, using this view's engines.
346 """Parallel version of builtin :func:`python:map`, using this view's engines.
347
347
348 This is equivalent to ``map(...block=True)``.
348 This is equivalent to ``map(...block=True)``.
349
349
350 See `self.map` for details.
350 See `self.map` for details.
351 """
351 """
352 if 'block' in kwargs:
352 if 'block' in kwargs:
353 raise TypeError("map_sync doesn't take a `block` keyword argument.")
353 raise TypeError("map_sync doesn't take a `block` keyword argument.")
354 kwargs['block'] = True
354 kwargs['block'] = True
355 return self.map(f,*sequences,**kwargs)
355 return self.map(f,*sequences,**kwargs)
356
356
357 def imap(self, f, *sequences, **kwargs):
357 def imap(self, f, *sequences, **kwargs):
358 """Parallel version of :func:`itertools.imap`.
358 """Parallel version of :func:`itertools.imap`.
359
359
360 See `self.map` for details.
360 See `self.map` for details.
361
361
362 """
362 """
363
363
364 return iter(self.map_async(f,*sequences, **kwargs))
364 return iter(self.map_async(f,*sequences, **kwargs))
365
365
366 #-------------------------------------------------------------------
366 #-------------------------------------------------------------------
367 # Decorators
367 # Decorators
368 #-------------------------------------------------------------------
368 #-------------------------------------------------------------------
369
369
370 def remote(self, block=None, **flags):
370 def remote(self, block=None, **flags):
371 """Decorator for making a RemoteFunction"""
371 """Decorator for making a RemoteFunction"""
372 block = self.block if block is None else block
372 block = self.block if block is None else block
373 return remote(self, block=block, **flags)
373 return remote(self, block=block, **flags)
374
374
375 def parallel(self, dist='b', block=None, **flags):
375 def parallel(self, dist='b', block=None, **flags):
376 """Decorator for making a ParallelFunction"""
376 """Decorator for making a ParallelFunction"""
377 block = self.block if block is None else block
377 block = self.block if block is None else block
378 return parallel(self, dist=dist, block=block, **flags)
378 return parallel(self, dist=dist, block=block, **flags)
379
379
380 @skip_doctest
380 @skip_doctest
381 class DirectView(View):
381 class DirectView(View):
382 """Direct Multiplexer View of one or more engines.
382 """Direct Multiplexer View of one or more engines.
383
383
384 These are created via indexed access to a client:
384 These are created via indexed access to a client:
385
385
386 >>> dv_1 = client[1]
386 >>> dv_1 = client[1]
387 >>> dv_all = client[:]
387 >>> dv_all = client[:]
388 >>> dv_even = client[::2]
388 >>> dv_even = client[::2]
389 >>> dv_some = client[1:3]
389 >>> dv_some = client[1:3]
390
390
391 This object provides dictionary access to engine namespaces:
391 This object provides dictionary access to engine namespaces:
392
392
393 # push a=5:
393 # push a=5:
394 >>> dv['a'] = 5
394 >>> dv['a'] = 5
395 # pull 'foo':
395 # pull 'foo':
396 >>> dv['foo']
396 >>> dv['foo']
397
397
398 """
398 """
399
399
400 def __init__(self, client=None, socket=None, targets=None):
400 def __init__(self, client=None, socket=None, targets=None):
401 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
401 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
402
402
403 @property
403 @property
404 def importer(self):
404 def importer(self):
405 """sync_imports(local=True) as a property.
405 """sync_imports(local=True) as a property.
406
406
407 See sync_imports for details.
407 See sync_imports for details.
408
408
409 """
409 """
410 return self.sync_imports(True)
410 return self.sync_imports(True)
411
411
412 @contextmanager
412 @contextmanager
413 def sync_imports(self, local=True, quiet=False):
413 def sync_imports(self, local=True, quiet=False):
414 """Context Manager for performing simultaneous local and remote imports.
414 """Context Manager for performing simultaneous local and remote imports.
415
415
416 'import x as y' will *not* work. The 'as y' part will simply be ignored.
416 'import x as y' will *not* work. The 'as y' part will simply be ignored.
417
417
418 If `local=True`, then the package will also be imported locally.
418 If `local=True`, then the package will also be imported locally.
419
419
420 If `quiet=True`, no output will be produced when attempting remote
420 If `quiet=True`, no output will be produced when attempting remote
421 imports.
421 imports.
422
422
423 Note that remote-only (`local=False`) imports have not been implemented.
423 Note that remote-only (`local=False`) imports have not been implemented.
424
424
425 >>> with view.sync_imports():
425 >>> with view.sync_imports():
426 ... from numpy import recarray
426 ... from numpy import recarray
427 importing recarray from numpy on engine(s)
427 importing recarray from numpy on engine(s)
428
428
429 """
429 """
430 from IPython.utils.py3compat import builtin_mod
430 from IPython.utils.py3compat import builtin_mod
431 local_import = builtin_mod.__import__
431 local_import = builtin_mod.__import__
432 modules = set()
432 modules = set()
433 results = []
433 results = []
434 @util.interactive
434 @util.interactive
435 def remote_import(name, fromlist, level):
435 def remote_import(name, fromlist, level):
436 """the function to be passed to apply, that actually performs the import
436 """the function to be passed to apply, that actually performs the import
437 on the engine, and loads up the user namespace.
437 on the engine, and loads up the user namespace.
438 """
438 """
439 import sys
439 import sys
440 user_ns = globals()
440 user_ns = globals()
441 mod = __import__(name, fromlist=fromlist, level=level)
441 mod = __import__(name, fromlist=fromlist, level=level)
442 if fromlist:
442 if fromlist:
443 for key in fromlist:
443 for key in fromlist:
444 user_ns[key] = getattr(mod, key)
444 user_ns[key] = getattr(mod, key)
445 else:
445 else:
446 user_ns[name] = sys.modules[name]
446 user_ns[name] = sys.modules[name]
447
447
448 def view_import(name, globals={}, locals={}, fromlist=[], level=0):
448 def view_import(name, globals={}, locals={}, fromlist=[], level=0):
449 """the drop-in replacement for __import__, that optionally imports
449 """the drop-in replacement for __import__, that optionally imports
450 locally as well.
450 locally as well.
451 """
451 """
452 # don't override nested imports
452 # don't override nested imports
453 save_import = builtin_mod.__import__
453 save_import = builtin_mod.__import__
454 builtin_mod.__import__ = local_import
454 builtin_mod.__import__ = local_import
455
455
456 if imp.lock_held():
456 if imp.lock_held():
457 # this is a side-effect import, don't do it remotely, or even
457 # this is a side-effect import, don't do it remotely, or even
458 # ignore the local effects
458 # ignore the local effects
459 return local_import(name, globals, locals, fromlist, level)
459 return local_import(name, globals, locals, fromlist, level)
460
460
461 imp.acquire_lock()
461 imp.acquire_lock()
462 if local:
462 if local:
463 mod = local_import(name, globals, locals, fromlist, level)
463 mod = local_import(name, globals, locals, fromlist, level)
464 else:
464 else:
465 raise NotImplementedError("remote-only imports not yet implemented")
465 raise NotImplementedError("remote-only imports not yet implemented")
466 imp.release_lock()
466 imp.release_lock()
467
467
468 key = name+':'+','.join(fromlist or [])
468 key = name+':'+','.join(fromlist or [])
469 if level <= 0 and key not in modules:
469 if level <= 0 and key not in modules:
470 modules.add(key)
470 modules.add(key)
471 if not quiet:
471 if not quiet:
472 if fromlist:
472 if fromlist:
473 print("importing %s from %s on engine(s)"%(','.join(fromlist), name))
473 print("importing %s from %s on engine(s)"%(','.join(fromlist), name))
474 else:
474 else:
475 print("importing %s on engine(s)"%name)
475 print("importing %s on engine(s)"%name)
476 results.append(self.apply_async(remote_import, name, fromlist, level))
476 results.append(self.apply_async(remote_import, name, fromlist, level))
477 # restore override
477 # restore override
478 builtin_mod.__import__ = save_import
478 builtin_mod.__import__ = save_import
479
479
480 return mod
480 return mod
481
481
482 # override __import__
482 # override __import__
483 builtin_mod.__import__ = view_import
483 builtin_mod.__import__ = view_import
484 try:
484 try:
485 # enter the block
485 # enter the block
486 yield
486 yield
487 except ImportError:
487 except ImportError:
488 if local:
488 if local:
489 raise
489 raise
490 else:
490 else:
491 # ignore import errors if not doing local imports
491 # ignore import errors if not doing local imports
492 pass
492 pass
493 finally:
493 finally:
494 # always restore __import__
494 # always restore __import__
495 builtin_mod.__import__ = local_import
495 builtin_mod.__import__ = local_import
496
496
497 for r in results:
497 for r in results:
498 # raise possible remote ImportErrors here
498 # raise possible remote ImportErrors here
499 r.get()
499 r.get()
500
500
501 def use_dill(self):
501 def use_dill(self):
502 """Expand serialization support with dill
502 """Expand serialization support with dill
503
503
504 adds support for closures, etc.
504 adds support for closures, etc.
505
505
506 This calls IPython.utils.pickleutil.use_dill() here and on each engine.
506 This calls IPython.utils.pickleutil.use_dill() here and on each engine.
507 """
507 """
508 pickleutil.use_dill()
508 pickleutil.use_dill()
509 return self.apply(pickleutil.use_dill)
509 return self.apply(pickleutil.use_dill)
510
510
511 def use_cloudpickle(self):
511 def use_cloudpickle(self):
512 """Expand serialization support with cloudpickle.
512 """Expand serialization support with cloudpickle.
513 """
513 """
514 pickleutil.use_cloudpickle()
514 pickleutil.use_cloudpickle()
515 return self.apply(pickleutil.use_cloudpickle)
515 return self.apply(pickleutil.use_cloudpickle)
516
516
517
517
518 @sync_results
518 @sync_results
519 @save_ids
519 @save_ids
520 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
520 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
521 """calls f(*args, **kwargs) on remote engines, returning the result.
521 """calls f(*args, **kwargs) on remote engines, returning the result.
522
522
523 This method sets all of `apply`'s flags via this View's attributes.
523 This method sets all of `apply`'s flags via this View's attributes.
524
524
525 Parameters
525 Parameters
526 ----------
526 ----------
527
527
528 f : callable
528 f : callable
529
529
530 args : list [default: empty]
530 args : list [default: empty]
531
531
532 kwargs : dict [default: empty]
532 kwargs : dict [default: empty]
533
533
534 targets : target list [default: self.targets]
534 targets : target list [default: self.targets]
535 where to run
535 where to run
536 block : bool [default: self.block]
536 block : bool [default: self.block]
537 whether to block
537 whether to block
538 track : bool [default: self.track]
538 track : bool [default: self.track]
539 whether to ask zmq to track the message, for safe non-copying sends
539 whether to ask zmq to track the message, for safe non-copying sends
540
540
541 Returns
541 Returns
542 -------
542 -------
543
543
544 if self.block is False:
544 if self.block is False:
545 returns AsyncResult
545 returns AsyncResult
546 else:
546 else:
547 returns actual result of f(*args, **kwargs) on the engine(s)
547 returns actual result of f(*args, **kwargs) on the engine(s)
548 This will be a list of self.targets is also a list (even length 1), or
548 This will be a list of self.targets is also a list (even length 1), or
549 the single result if self.targets is an integer engine id
549 the single result if self.targets is an integer engine id
550 """
550 """
551 args = [] if args is None else args
551 args = [] if args is None else args
552 kwargs = {} if kwargs is None else kwargs
552 kwargs = {} if kwargs is None else kwargs
553 block = self.block if block is None else block
553 block = self.block if block is None else block
554 track = self.track if track is None else track
554 track = self.track if track is None else track
555 targets = self.targets if targets is None else targets
555 targets = self.targets if targets is None else targets
556
556
557 _idents, _targets = self.client._build_targets(targets)
557 _idents, _targets = self.client._build_targets(targets)
558 msg_ids = []
558 msg_ids = []
559 trackers = []
559 trackers = []
560 for ident in _idents:
560 for ident in _idents:
561 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
561 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
562 ident=ident)
562 ident=ident)
563 if track:
563 if track:
564 trackers.append(msg['tracker'])
564 trackers.append(msg['tracker'])
565 msg_ids.append(msg['header']['msg_id'])
565 msg_ids.append(msg['header']['msg_id'])
566 if isinstance(targets, int):
566 if isinstance(targets, int):
567 msg_ids = msg_ids[0]
567 msg_ids = msg_ids[0]
568 tracker = None if track is False else zmq.MessageTracker(*trackers)
568 tracker = None if track is False else zmq.MessageTracker(*trackers)
569 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets,
569 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets,
570 tracker=tracker, owner=True,
570 tracker=tracker, owner=True,
571 )
571 )
572 if block:
572 if block:
573 try:
573 try:
574 return ar.get()
574 return ar.get()
575 except KeyboardInterrupt:
575 except KeyboardInterrupt:
576 pass
576 pass
577 return ar
577 return ar
578
578
579
579
580 @sync_results
580 @sync_results
581 def map(self, f, *sequences, **kwargs):
581 def map(self, f, *sequences, **kwargs):
582 """``view.map(f, *sequences, block=self.block)`` => list|AsyncMapResult
582 """``view.map(f, *sequences, block=self.block)`` => list|AsyncMapResult
583
583
584 Parallel version of builtin `map`, using this View's `targets`.
584 Parallel version of builtin `map`, using this View's `targets`.
585
585
586 There will be one task per target, so work will be chunked
586 There will be one task per target, so work will be chunked
587 if the sequences are longer than `targets`.
587 if the sequences are longer than `targets`.
588
588
589 Results can be iterated as they are ready, but will become available in chunks.
589 Results can be iterated as they are ready, but will become available in chunks.
590
590
591 Parameters
591 Parameters
592 ----------
592 ----------
593
593
594 f : callable
594 f : callable
595 function to be mapped
595 function to be mapped
596 *sequences: one or more sequences of matching length
596 *sequences: one or more sequences of matching length
597 the sequences to be distributed and passed to `f`
597 the sequences to be distributed and passed to `f`
598 block : bool
598 block : bool
599 whether to wait for the result or not [default self.block]
599 whether to wait for the result or not [default self.block]
600
600
601 Returns
601 Returns
602 -------
602 -------
603
603
604
604
605 If block=False
605 If block=False
606 An :class:`~IPython.parallel.client.asyncresult.AsyncMapResult` instance.
606 An :class:`~IPython.parallel.client.asyncresult.AsyncMapResult` instance.
607 An object like AsyncResult, but which reassembles the sequence of results
607 An object like AsyncResult, but which reassembles the sequence of results
608 into a single list. AsyncMapResults can be iterated through before all
608 into a single list. AsyncMapResults can be iterated through before all
609 results are complete.
609 results are complete.
610 else
610 else
611 A list, the result of ``map(f,*sequences)``
611 A list, the result of ``map(f,*sequences)``
612 """
612 """
613
613
614 block = kwargs.pop('block', self.block)
614 block = kwargs.pop('block', self.block)
615 for k in kwargs.keys():
615 for k in kwargs.keys():
616 if k not in ['block', 'track']:
616 if k not in ['block', 'track']:
617 raise TypeError("invalid keyword arg, %r"%k)
617 raise TypeError("invalid keyword arg, %r"%k)
618
618
619 assert len(sequences) > 0, "must have some sequences to map onto!"
619 assert len(sequences) > 0, "must have some sequences to map onto!"
620 pf = ParallelFunction(self, f, block=block, **kwargs)
620 pf = ParallelFunction(self, f, block=block, **kwargs)
621 return pf.map(*sequences)
621 return pf.map(*sequences)
622
622
623 @sync_results
623 @sync_results
624 @save_ids
624 @save_ids
625 def execute(self, code, silent=True, targets=None, block=None):
625 def execute(self, code, silent=True, targets=None, block=None):
626 """Executes `code` on `targets` in blocking or nonblocking manner.
626 """Executes `code` on `targets` in blocking or nonblocking manner.
627
627
628 ``execute`` is always `bound` (affects engine namespace)
628 ``execute`` is always `bound` (affects engine namespace)
629
629
630 Parameters
630 Parameters
631 ----------
631 ----------
632
632
633 code : str
633 code : str
634 the code string to be executed
634 the code string to be executed
635 block : bool
635 block : bool
636 whether or not to wait until done to return
636 whether or not to wait until done to return
637 default: self.block
637 default: self.block
638 """
638 """
639 block = self.block if block is None else block
639 block = self.block if block is None else block
640 targets = self.targets if targets is None else targets
640 targets = self.targets if targets is None else targets
641
641
642 _idents, _targets = self.client._build_targets(targets)
642 _idents, _targets = self.client._build_targets(targets)
643 msg_ids = []
643 msg_ids = []
644 trackers = []
644 trackers = []
645 for ident in _idents:
645 for ident in _idents:
646 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
646 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
647 msg_ids.append(msg['header']['msg_id'])
647 msg_ids.append(msg['header']['msg_id'])
648 if isinstance(targets, int):
648 if isinstance(targets, int):
649 msg_ids = msg_ids[0]
649 msg_ids = msg_ids[0]
650 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets, owner=True)
650 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets, owner=True)
651 if block:
651 if block:
652 try:
652 try:
653 ar.get()
653 ar.get()
654 except KeyboardInterrupt:
654 except KeyboardInterrupt:
655 pass
655 pass
656 return ar
656 return ar
657
657
658 def run(self, filename, targets=None, block=None):
658 def run(self, filename, targets=None, block=None):
659 """Execute contents of `filename` on my engine(s).
659 """Execute contents of `filename` on my engine(s).
660
660
661 This simply reads the contents of the file and calls `execute`.
661 This simply reads the contents of the file and calls `execute`.
662
662
663 Parameters
663 Parameters
664 ----------
664 ----------
665
665
666 filename : str
666 filename : str
667 The path to the file
667 The path to the file
668 targets : int/str/list of ints/strs
668 targets : int/str/list of ints/strs
669 the engines on which to execute
669 the engines on which to execute
670 default : all
670 default : all
671 block : bool
671 block : bool
672 whether or not to wait until done
672 whether or not to wait until done
673 default: self.block
673 default: self.block
674
674
675 """
675 """
676 with open(filename, 'r') as f:
676 with open(filename, 'r') as f:
677 # add newline in case of trailing indented whitespace
677 # add newline in case of trailing indented whitespace
678 # which will cause SyntaxError
678 # which will cause SyntaxError
679 code = f.read()+'\n'
679 code = f.read()+'\n'
680 return self.execute(code, block=block, targets=targets)
680 return self.execute(code, block=block, targets=targets)
681
681
682 def update(self, ns):
682 def update(self, ns):
683 """update remote namespace with dict `ns`
683 """update remote namespace with dict `ns`
684
684
685 See `push` for details.
685 See `push` for details.
686 """
686 """
687 return self.push(ns, block=self.block, track=self.track)
687 return self.push(ns, block=self.block, track=self.track)
688
688
689 def push(self, ns, targets=None, block=None, track=None):
689 def push(self, ns, targets=None, block=None, track=None):
690 """update remote namespace with dict `ns`
690 """update remote namespace with dict `ns`
691
691
692 Parameters
692 Parameters
693 ----------
693 ----------
694
694
695 ns : dict
695 ns : dict
696 dict of keys with which to update engine namespace(s)
696 dict of keys with which to update engine namespace(s)
697 block : bool [default : self.block]
697 block : bool [default : self.block]
698 whether to wait to be notified of engine receipt
698 whether to wait to be notified of engine receipt
699
699
700 """
700 """
701
701
702 block = block if block is not None else self.block
702 block = block if block is not None else self.block
703 track = track if track is not None else self.track
703 track = track if track is not None else self.track
704 targets = targets if targets is not None else self.targets
704 targets = targets if targets is not None else self.targets
705 # applier = self.apply_sync if block else self.apply_async
705 # applier = self.apply_sync if block else self.apply_async
706 if not isinstance(ns, dict):
706 if not isinstance(ns, dict):
707 raise TypeError("Must be a dict, not %s"%type(ns))
707 raise TypeError("Must be a dict, not %s"%type(ns))
708 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
708 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
709
709
710 def get(self, key_s):
710 def get(self, key_s):
711 """get object(s) by `key_s` from remote namespace
711 """get object(s) by `key_s` from remote namespace
712
712
713 see `pull` for details.
713 see `pull` for details.
714 """
714 """
715 # block = block if block is not None else self.block
715 # block = block if block is not None else self.block
716 return self.pull(key_s, block=True)
716 return self.pull(key_s, block=True)
717
717
718 def pull(self, names, targets=None, block=None):
718 def pull(self, names, targets=None, block=None):
719 """get object(s) by `name` from remote namespace
719 """get object(s) by `name` from remote namespace
720
720
721 will return one object if it is a key.
721 will return one object if it is a key.
722 can also take a list of keys, in which case it will return a list of objects.
722 can also take a list of keys, in which case it will return a list of objects.
723 """
723 """
724 block = block if block is not None else self.block
724 block = block if block is not None else self.block
725 targets = targets if targets is not None else self.targets
725 targets = targets if targets is not None else self.targets
726 applier = self.apply_sync if block else self.apply_async
726 applier = self.apply_sync if block else self.apply_async
727 if isinstance(names, string_types):
727 if isinstance(names, string_types):
728 pass
728 pass
729 elif isinstance(names, (list,tuple,set)):
729 elif isinstance(names, (list,tuple,set)):
730 for key in names:
730 for key in names:
731 if not isinstance(key, string_types):
731 if not isinstance(key, string_types):
732 raise TypeError("keys must be str, not type %r"%type(key))
732 raise TypeError("keys must be str, not type %r"%type(key))
733 else:
733 else:
734 raise TypeError("names must be strs, not %r"%names)
734 raise TypeError("names must be strs, not %r"%names)
735 return self._really_apply(util._pull, (names,), block=block, targets=targets)
735 return self._really_apply(util._pull, (names,), block=block, targets=targets)
736
736
737 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
737 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
738 """
738 """
739 Partition a Python sequence and send the partitions to a set of engines.
739 Partition a Python sequence and send the partitions to a set of engines.
740 """
740 """
741 block = block if block is not None else self.block
741 block = block if block is not None else self.block
742 track = track if track is not None else self.track
742 track = track if track is not None else self.track
743 targets = targets if targets is not None else self.targets
743 targets = targets if targets is not None else self.targets
744
744
745 # construct integer ID list:
745 # construct integer ID list:
746 targets = self.client._build_targets(targets)[1]
746 targets = self.client._build_targets(targets)[1]
747
747
748 mapObject = Map.dists[dist]()
748 mapObject = Map.dists[dist]()
749 nparts = len(targets)
749 nparts = len(targets)
750 msg_ids = []
750 msg_ids = []
751 trackers = []
751 trackers = []
752 for index, engineid in enumerate(targets):
752 for index, engineid in enumerate(targets):
753 partition = mapObject.getPartition(seq, index, nparts)
753 partition = mapObject.getPartition(seq, index, nparts)
754 if flatten and len(partition) == 1:
754 if flatten and len(partition) == 1:
755 ns = {key: partition[0]}
755 ns = {key: partition[0]}
756 else:
756 else:
757 ns = {key: partition}
757 ns = {key: partition}
758 r = self.push(ns, block=False, track=track, targets=engineid)
758 r = self.push(ns, block=False, track=track, targets=engineid)
759 msg_ids.extend(r.msg_ids)
759 msg_ids.extend(r.msg_ids)
760 if track:
760 if track:
761 trackers.append(r._tracker)
761 trackers.append(r._tracker)
762
762
763 if track:
763 if track:
764 tracker = zmq.MessageTracker(*trackers)
764 tracker = zmq.MessageTracker(*trackers)
765 else:
765 else:
766 tracker = None
766 tracker = None
767
767
768 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets,
768 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets,
769 tracker=tracker, owner=True,
769 tracker=tracker, owner=True,
770 )
770 )
771 if block:
771 if block:
772 r.wait()
772 r.wait()
773 else:
773 else:
774 return r
774 return r
775
775
776 @sync_results
776 @sync_results
777 @save_ids
777 @save_ids
778 def gather(self, key, dist='b', targets=None, block=None):
778 def gather(self, key, dist='b', targets=None, block=None):
779 """
779 """
780 Gather a partitioned sequence on a set of engines as a single local seq.
780 Gather a partitioned sequence on a set of engines as a single local seq.
781 """
781 """
782 block = block if block is not None else self.block
782 block = block if block is not None else self.block
783 targets = targets if targets is not None else self.targets
783 targets = targets if targets is not None else self.targets
784 mapObject = Map.dists[dist]()
784 mapObject = Map.dists[dist]()
785 msg_ids = []
785 msg_ids = []
786
786
787 # construct integer ID list:
787 # construct integer ID list:
788 targets = self.client._build_targets(targets)[1]
788 targets = self.client._build_targets(targets)[1]
789
789
790 for index, engineid in enumerate(targets):
790 for index, engineid in enumerate(targets):
791 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
791 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
792
792
793 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
793 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
794
794
795 if block:
795 if block:
796 try:
796 try:
797 return r.get()
797 return r.get()
798 except KeyboardInterrupt:
798 except KeyboardInterrupt:
799 pass
799 pass
800 return r
800 return r
801
801
802 def __getitem__(self, key):
802 def __getitem__(self, key):
803 return self.get(key)
803 return self.get(key)
804
804
805 def __setitem__(self,key, value):
805 def __setitem__(self,key, value):
806 self.update({key:value})
806 self.update({key:value})
807
807
808 def clear(self, targets=None, block=None):
808 def clear(self, targets=None, block=None):
809 """Clear the remote namespaces on my engines."""
809 """Clear the remote namespaces on my engines."""
810 block = block if block is not None else self.block
810 block = block if block is not None else self.block
811 targets = targets if targets is not None else self.targets
811 targets = targets if targets is not None else self.targets
812 return self.client.clear(targets=targets, block=block)
812 return self.client.clear(targets=targets, block=block)
813
813
814 #----------------------------------------
814 #----------------------------------------
815 # activate for %px, %autopx, etc. magics
815 # activate for %px, %autopx, etc. magics
816 #----------------------------------------
816 #----------------------------------------
817
817
818 def activate(self, suffix=''):
818 def activate(self, suffix=''):
819 """Activate IPython magics associated with this View
819 """Activate IPython magics associated with this View
820
820
821 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
821 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
822
822
823 Parameters
823 Parameters
824 ----------
824 ----------
825
825
826 suffix: str [default: '']
826 suffix: str [default: '']
827 The suffix, if any, for the magics. This allows you to have
827 The suffix, if any, for the magics. This allows you to have
828 multiple views associated with parallel magics at the same time.
828 multiple views associated with parallel magics at the same time.
829
829
830 e.g. ``rc[::2].activate(suffix='_even')`` will give you
830 e.g. ``rc[::2].activate(suffix='_even')`` will give you
831 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
831 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
832 on the even engines.
832 on the even engines.
833 """
833 """
834
834
835 from IPython.parallel.client.magics import ParallelMagics
835 from IPython.parallel.client.magics import ParallelMagics
836
836
837 try:
837 try:
838 # This is injected into __builtins__.
838 # This is injected into __builtins__.
839 ip = get_ipython()
839 ip = get_ipython()
840 except NameError:
840 except NameError:
841 print("The IPython parallel magics (%px, etc.) only work within IPython.")
841 print("The IPython parallel magics (%px, etc.) only work within IPython.")
842 return
842 return
843
843
844 M = ParallelMagics(ip, self, suffix)
844 M = ParallelMagics(ip, self, suffix)
845 ip.magics_manager.register(M)
845 ip.magics_manager.register(M)
846
846
847
847
848 @skip_doctest
848 @skip_doctest
849 class LoadBalancedView(View):
849 class LoadBalancedView(View):
850 """An load-balancing View that only executes via the Task scheduler.
850 """An load-balancing View that only executes via the Task scheduler.
851
851
852 Load-balanced views can be created with the client's `view` method:
852 Load-balanced views can be created with the client's `view` method:
853
853
854 >>> v = client.load_balanced_view()
854 >>> v = client.load_balanced_view()
855
855
856 or targets can be specified, to restrict the potential destinations:
856 or targets can be specified, to restrict the potential destinations:
857
857
858 >>> v = client.client.load_balanced_view([1,3])
858 >>> v = client.load_balanced_view([1,3])
859
859
860 which would restrict loadbalancing to between engines 1 and 3.
860 which would restrict loadbalancing to between engines 1 and 3.
861
861
862 """
862 """
863
863
864 follow=Any()
864 follow=Any()
865 after=Any()
865 after=Any()
866 timeout=CFloat()
866 timeout=CFloat()
867 retries = Integer(0)
867 retries = Integer(0)
868
868
869 _task_scheme = Any()
869 _task_scheme = Any()
870 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
870 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
871
871
872 def __init__(self, client=None, socket=None, **flags):
872 def __init__(self, client=None, socket=None, **flags):
873 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
873 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
874 self._task_scheme=client._task_scheme
874 self._task_scheme=client._task_scheme
875
875
876 def _validate_dependency(self, dep):
876 def _validate_dependency(self, dep):
877 """validate a dependency.
877 """validate a dependency.
878
878
879 For use in `set_flags`.
879 For use in `set_flags`.
880 """
880 """
881 if dep is None or isinstance(dep, string_types + (AsyncResult, Dependency)):
881 if dep is None or isinstance(dep, string_types + (AsyncResult, Dependency)):
882 return True
882 return True
883 elif isinstance(dep, (list,set, tuple)):
883 elif isinstance(dep, (list,set, tuple)):
884 for d in dep:
884 for d in dep:
885 if not isinstance(d, string_types + (AsyncResult,)):
885 if not isinstance(d, string_types + (AsyncResult,)):
886 return False
886 return False
887 elif isinstance(dep, dict):
887 elif isinstance(dep, dict):
888 if set(dep.keys()) != set(Dependency().as_dict().keys()):
888 if set(dep.keys()) != set(Dependency().as_dict().keys()):
889 return False
889 return False
890 if not isinstance(dep['msg_ids'], list):
890 if not isinstance(dep['msg_ids'], list):
891 return False
891 return False
892 for d in dep['msg_ids']:
892 for d in dep['msg_ids']:
893 if not isinstance(d, string_types):
893 if not isinstance(d, string_types):
894 return False
894 return False
895 else:
895 else:
896 return False
896 return False
897
897
898 return True
898 return True
899
899
900 def _render_dependency(self, dep):
900 def _render_dependency(self, dep):
901 """helper for building jsonable dependencies from various input forms."""
901 """helper for building jsonable dependencies from various input forms."""
902 if isinstance(dep, Dependency):
902 if isinstance(dep, Dependency):
903 return dep.as_dict()
903 return dep.as_dict()
904 elif isinstance(dep, AsyncResult):
904 elif isinstance(dep, AsyncResult):
905 return dep.msg_ids
905 return dep.msg_ids
906 elif dep is None:
906 elif dep is None:
907 return []
907 return []
908 else:
908 else:
909 # pass to Dependency constructor
909 # pass to Dependency constructor
910 return list(Dependency(dep))
910 return list(Dependency(dep))
911
911
912 def set_flags(self, **kwargs):
912 def set_flags(self, **kwargs):
913 """set my attribute flags by keyword.
913 """set my attribute flags by keyword.
914
914
915 A View is a wrapper for the Client's apply method, but with attributes
915 A View is a wrapper for the Client's apply method, but with attributes
916 that specify keyword arguments, those attributes can be set by keyword
916 that specify keyword arguments, those attributes can be set by keyword
917 argument with this method.
917 argument with this method.
918
918
919 Parameters
919 Parameters
920 ----------
920 ----------
921
921
922 block : bool
922 block : bool
923 whether to wait for results
923 whether to wait for results
924 track : bool
924 track : bool
925 whether to create a MessageTracker to allow the user to
925 whether to create a MessageTracker to allow the user to
926 safely edit after arrays and buffers during non-copying
926 safely edit after arrays and buffers during non-copying
927 sends.
927 sends.
928
928
929 after : Dependency or collection of msg_ids
929 after : Dependency or collection of msg_ids
930 Only for load-balanced execution (targets=None)
930 Only for load-balanced execution (targets=None)
931 Specify a list of msg_ids as a time-based dependency.
931 Specify a list of msg_ids as a time-based dependency.
932 This job will only be run *after* the dependencies
932 This job will only be run *after* the dependencies
933 have been met.
933 have been met.
934
934
935 follow : Dependency or collection of msg_ids
935 follow : Dependency or collection of msg_ids
936 Only for load-balanced execution (targets=None)
936 Only for load-balanced execution (targets=None)
937 Specify a list of msg_ids as a location-based dependency.
937 Specify a list of msg_ids as a location-based dependency.
938 This job will only be run on an engine where this dependency
938 This job will only be run on an engine where this dependency
939 is met.
939 is met.
940
940
941 timeout : float/int or None
941 timeout : float/int or None
942 Only for load-balanced execution (targets=None)
942 Only for load-balanced execution (targets=None)
943 Specify an amount of time (in seconds) for the scheduler to
943 Specify an amount of time (in seconds) for the scheduler to
944 wait for dependencies to be met before failing with a
944 wait for dependencies to be met before failing with a
945 DependencyTimeout.
945 DependencyTimeout.
946
946
947 retries : int
947 retries : int
948 Number of times a task will be retried on failure.
948 Number of times a task will be retried on failure.
949 """
949 """
950
950
951 super(LoadBalancedView, self).set_flags(**kwargs)
951 super(LoadBalancedView, self).set_flags(**kwargs)
952 for name in ('follow', 'after'):
952 for name in ('follow', 'after'):
953 if name in kwargs:
953 if name in kwargs:
954 value = kwargs[name]
954 value = kwargs[name]
955 if self._validate_dependency(value):
955 if self._validate_dependency(value):
956 setattr(self, name, value)
956 setattr(self, name, value)
957 else:
957 else:
958 raise ValueError("Invalid dependency: %r"%value)
958 raise ValueError("Invalid dependency: %r"%value)
959 if 'timeout' in kwargs:
959 if 'timeout' in kwargs:
960 t = kwargs['timeout']
960 t = kwargs['timeout']
961 if not isinstance(t, (int, float, type(None))):
961 if not isinstance(t, (int, float, type(None))):
962 if (not PY3) and (not isinstance(t, long)):
962 if (not PY3) and (not isinstance(t, long)):
963 raise TypeError("Invalid type for timeout: %r"%type(t))
963 raise TypeError("Invalid type for timeout: %r"%type(t))
964 if t is not None:
964 if t is not None:
965 if t < 0:
965 if t < 0:
966 raise ValueError("Invalid timeout: %s"%t)
966 raise ValueError("Invalid timeout: %s"%t)
967 self.timeout = t
967 self.timeout = t
968
968
969 @sync_results
969 @sync_results
970 @save_ids
970 @save_ids
971 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
971 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
972 after=None, follow=None, timeout=None,
972 after=None, follow=None, timeout=None,
973 targets=None, retries=None):
973 targets=None, retries=None):
974 """calls f(*args, **kwargs) on a remote engine, returning the result.
974 """calls f(*args, **kwargs) on a remote engine, returning the result.
975
975
976 This method temporarily sets all of `apply`'s flags for a single call.
976 This method temporarily sets all of `apply`'s flags for a single call.
977
977
978 Parameters
978 Parameters
979 ----------
979 ----------
980
980
981 f : callable
981 f : callable
982
982
983 args : list [default: empty]
983 args : list [default: empty]
984
984
985 kwargs : dict [default: empty]
985 kwargs : dict [default: empty]
986
986
987 block : bool [default: self.block]
987 block : bool [default: self.block]
988 whether to block
988 whether to block
989 track : bool [default: self.track]
989 track : bool [default: self.track]
990 whether to ask zmq to track the message, for safe non-copying sends
990 whether to ask zmq to track the message, for safe non-copying sends
991
991
992 !!!!!! TODO: THE REST HERE !!!!
992 !!!!!! TODO: THE REST HERE !!!!
993
993
994 Returns
994 Returns
995 -------
995 -------
996
996
997 if self.block is False:
997 if self.block is False:
998 returns AsyncResult
998 returns AsyncResult
999 else:
999 else:
1000 returns actual result of f(*args, **kwargs) on the engine(s)
1000 returns actual result of f(*args, **kwargs) on the engine(s)
1001 This will be a list of self.targets is also a list (even length 1), or
1001 This will be a list of self.targets is also a list (even length 1), or
1002 the single result if self.targets is an integer engine id
1002 the single result if self.targets is an integer engine id
1003 """
1003 """
1004
1004
1005 # validate whether we can run
1005 # validate whether we can run
1006 if self._socket.closed:
1006 if self._socket.closed:
1007 msg = "Task farming is disabled"
1007 msg = "Task farming is disabled"
1008 if self._task_scheme == 'pure':
1008 if self._task_scheme == 'pure':
1009 msg += " because the pure ZMQ scheduler cannot handle"
1009 msg += " because the pure ZMQ scheduler cannot handle"
1010 msg += " disappearing engines."
1010 msg += " disappearing engines."
1011 raise RuntimeError(msg)
1011 raise RuntimeError(msg)
1012
1012
1013 if self._task_scheme == 'pure':
1013 if self._task_scheme == 'pure':
1014 # pure zmq scheme doesn't support extra features
1014 # pure zmq scheme doesn't support extra features
1015 msg = "Pure ZMQ scheduler doesn't support the following flags:"
1015 msg = "Pure ZMQ scheduler doesn't support the following flags:"
1016 "follow, after, retries, targets, timeout"
1016 "follow, after, retries, targets, timeout"
1017 if (follow or after or retries or targets or timeout):
1017 if (follow or after or retries or targets or timeout):
1018 # hard fail on Scheduler flags
1018 # hard fail on Scheduler flags
1019 raise RuntimeError(msg)
1019 raise RuntimeError(msg)
1020 if isinstance(f, dependent):
1020 if isinstance(f, dependent):
1021 # soft warn on functional dependencies
1021 # soft warn on functional dependencies
1022 warnings.warn(msg, RuntimeWarning)
1022 warnings.warn(msg, RuntimeWarning)
1023
1023
1024 # build args
1024 # build args
1025 args = [] if args is None else args
1025 args = [] if args is None else args
1026 kwargs = {} if kwargs is None else kwargs
1026 kwargs = {} if kwargs is None else kwargs
1027 block = self.block if block is None else block
1027 block = self.block if block is None else block
1028 track = self.track if track is None else track
1028 track = self.track if track is None else track
1029 after = self.after if after is None else after
1029 after = self.after if after is None else after
1030 retries = self.retries if retries is None else retries
1030 retries = self.retries if retries is None else retries
1031 follow = self.follow if follow is None else follow
1031 follow = self.follow if follow is None else follow
1032 timeout = self.timeout if timeout is None else timeout
1032 timeout = self.timeout if timeout is None else timeout
1033 targets = self.targets if targets is None else targets
1033 targets = self.targets if targets is None else targets
1034
1034
1035 if not isinstance(retries, int):
1035 if not isinstance(retries, int):
1036 raise TypeError('retries must be int, not %r'%type(retries))
1036 raise TypeError('retries must be int, not %r'%type(retries))
1037
1037
1038 if targets is None:
1038 if targets is None:
1039 idents = []
1039 idents = []
1040 else:
1040 else:
1041 idents = self.client._build_targets(targets)[0]
1041 idents = self.client._build_targets(targets)[0]
1042 # ensure *not* bytes
1042 # ensure *not* bytes
1043 idents = [ ident.decode() for ident in idents ]
1043 idents = [ ident.decode() for ident in idents ]
1044
1044
1045 after = self._render_dependency(after)
1045 after = self._render_dependency(after)
1046 follow = self._render_dependency(follow)
1046 follow = self._render_dependency(follow)
1047 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1047 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1048
1048
1049 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1049 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1050 metadata=metadata)
1050 metadata=metadata)
1051 tracker = None if track is False else msg['tracker']
1051 tracker = None if track is False else msg['tracker']
1052
1052
1053 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f),
1053 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f),
1054 targets=None, tracker=tracker, owner=True,
1054 targets=None, tracker=tracker, owner=True,
1055 )
1055 )
1056 if block:
1056 if block:
1057 try:
1057 try:
1058 return ar.get()
1058 return ar.get()
1059 except KeyboardInterrupt:
1059 except KeyboardInterrupt:
1060 pass
1060 pass
1061 return ar
1061 return ar
1062
1062
1063 @sync_results
1063 @sync_results
1064 @save_ids
1064 @save_ids
1065 def map(self, f, *sequences, **kwargs):
1065 def map(self, f, *sequences, **kwargs):
1066 """``view.map(f, *sequences, block=self.block, chunksize=1, ordered=True)`` => list|AsyncMapResult
1066 """``view.map(f, *sequences, block=self.block, chunksize=1, ordered=True)`` => list|AsyncMapResult
1067
1067
1068 Parallel version of builtin `map`, load-balanced by this View.
1068 Parallel version of builtin `map`, load-balanced by this View.
1069
1069
1070 `block`, and `chunksize` can be specified by keyword only.
1070 `block`, and `chunksize` can be specified by keyword only.
1071
1071
1072 Each `chunksize` elements will be a separate task, and will be
1072 Each `chunksize` elements will be a separate task, and will be
1073 load-balanced. This lets individual elements be available for iteration
1073 load-balanced. This lets individual elements be available for iteration
1074 as soon as they arrive.
1074 as soon as they arrive.
1075
1075
1076 Parameters
1076 Parameters
1077 ----------
1077 ----------
1078
1078
1079 f : callable
1079 f : callable
1080 function to be mapped
1080 function to be mapped
1081 *sequences: one or more sequences of matching length
1081 *sequences: one or more sequences of matching length
1082 the sequences to be distributed and passed to `f`
1082 the sequences to be distributed and passed to `f`
1083 block : bool [default self.block]
1083 block : bool [default self.block]
1084 whether to wait for the result or not
1084 whether to wait for the result or not
1085 track : bool
1085 track : bool
1086 whether to create a MessageTracker to allow the user to
1086 whether to create a MessageTracker to allow the user to
1087 safely edit after arrays and buffers during non-copying
1087 safely edit after arrays and buffers during non-copying
1088 sends.
1088 sends.
1089 chunksize : int [default 1]
1089 chunksize : int [default 1]
1090 how many elements should be in each task.
1090 how many elements should be in each task.
1091 ordered : bool [default True]
1091 ordered : bool [default True]
1092 Whether the results should be gathered as they arrive, or enforce
1092 Whether the results should be gathered as they arrive, or enforce
1093 the order of submission.
1093 the order of submission.
1094
1094
1095 Only applies when iterating through AsyncMapResult as results arrive.
1095 Only applies when iterating through AsyncMapResult as results arrive.
1096 Has no effect when block=True.
1096 Has no effect when block=True.
1097
1097
1098 Returns
1098 Returns
1099 -------
1099 -------
1100
1100
1101 if block=False
1101 if block=False
1102 An :class:`~IPython.parallel.client.asyncresult.AsyncMapResult` instance.
1102 An :class:`~IPython.parallel.client.asyncresult.AsyncMapResult` instance.
1103 An object like AsyncResult, but which reassembles the sequence of results
1103 An object like AsyncResult, but which reassembles the sequence of results
1104 into a single list. AsyncMapResults can be iterated through before all
1104 into a single list. AsyncMapResults can be iterated through before all
1105 results are complete.
1105 results are complete.
1106 else
1106 else
1107 A list, the result of ``map(f,*sequences)``
1107 A list, the result of ``map(f,*sequences)``
1108 """
1108 """
1109
1109
1110 # default
1110 # default
1111 block = kwargs.get('block', self.block)
1111 block = kwargs.get('block', self.block)
1112 chunksize = kwargs.get('chunksize', 1)
1112 chunksize = kwargs.get('chunksize', 1)
1113 ordered = kwargs.get('ordered', True)
1113 ordered = kwargs.get('ordered', True)
1114
1114
1115 keyset = set(kwargs.keys())
1115 keyset = set(kwargs.keys())
1116 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1116 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1117 if extra_keys:
1117 if extra_keys:
1118 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1118 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1119
1119
1120 assert len(sequences) > 0, "must have some sequences to map onto!"
1120 assert len(sequences) > 0, "must have some sequences to map onto!"
1121
1121
1122 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1122 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1123 return pf.map(*sequences)
1123 return pf.map(*sequences)
1124
1124
1125 __all__ = ['LoadBalancedView', 'DirectView']
1125 __all__ = ['LoadBalancedView', 'DirectView']
General Comments 0
You need to be logged in to leave comments. Login now