##// END OF EJS Templates
Merge pull request #4106 from minrk/clear-block...
Thomas Kluyver -
r12305:bf1bbcb9 merge
parent child Browse files
Show More
@@ -1,1116 +1,1116 b''
1 """Views of remote engines.
1 """Views of remote engines.
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import imp
18 import imp
19 import sys
19 import sys
20 import warnings
20 import warnings
21 from contextlib import contextmanager
21 from contextlib import contextmanager
22 from types import ModuleType
22 from types import ModuleType
23
23
24 import zmq
24 import zmq
25
25
26 from IPython.testing.skipdoctest import skip_doctest
26 from IPython.testing.skipdoctest import skip_doctest
27 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
29 )
29 )
30 from IPython.external.decorator import decorator
30 from IPython.external.decorator import decorator
31
31
32 from IPython.parallel import util
32 from IPython.parallel import util
33 from IPython.parallel.controller.dependency import Dependency, dependent
33 from IPython.parallel.controller.dependency import Dependency, dependent
34
34
35 from . import map as Map
35 from . import map as Map
36 from .asyncresult import AsyncResult, AsyncMapResult
36 from .asyncresult import AsyncResult, AsyncMapResult
37 from .remotefunction import ParallelFunction, parallel, remote, getname
37 from .remotefunction import ParallelFunction, parallel, remote, getname
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Decorators
40 # Decorators
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 @decorator
43 @decorator
44 def save_ids(f, self, *args, **kwargs):
44 def save_ids(f, self, *args, **kwargs):
45 """Keep our history and outstanding attributes up to date after a method call."""
45 """Keep our history and outstanding attributes up to date after a method call."""
46 n_previous = len(self.client.history)
46 n_previous = len(self.client.history)
47 try:
47 try:
48 ret = f(self, *args, **kwargs)
48 ret = f(self, *args, **kwargs)
49 finally:
49 finally:
50 nmsgs = len(self.client.history) - n_previous
50 nmsgs = len(self.client.history) - n_previous
51 msg_ids = self.client.history[-nmsgs:]
51 msg_ids = self.client.history[-nmsgs:]
52 self.history.extend(msg_ids)
52 self.history.extend(msg_ids)
53 map(self.outstanding.add, msg_ids)
53 map(self.outstanding.add, msg_ids)
54 return ret
54 return ret
55
55
56 @decorator
56 @decorator
57 def sync_results(f, self, *args, **kwargs):
57 def sync_results(f, self, *args, **kwargs):
58 """sync relevant results from self.client to our results attribute."""
58 """sync relevant results from self.client to our results attribute."""
59 if self._in_sync_results:
59 if self._in_sync_results:
60 return f(self, *args, **kwargs)
60 return f(self, *args, **kwargs)
61 self._in_sync_results = True
61 self._in_sync_results = True
62 try:
62 try:
63 ret = f(self, *args, **kwargs)
63 ret = f(self, *args, **kwargs)
64 finally:
64 finally:
65 self._in_sync_results = False
65 self._in_sync_results = False
66 self._sync_results()
66 self._sync_results()
67 return ret
67 return ret
68
68
69 @decorator
69 @decorator
70 def spin_after(f, self, *args, **kwargs):
70 def spin_after(f, self, *args, **kwargs):
71 """call spin after the method."""
71 """call spin after the method."""
72 ret = f(self, *args, **kwargs)
72 ret = f(self, *args, **kwargs)
73 self.spin()
73 self.spin()
74 return ret
74 return ret
75
75
76 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
77 # Classes
77 # Classes
78 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
79
79
80 @skip_doctest
80 @skip_doctest
81 class View(HasTraits):
81 class View(HasTraits):
82 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
82 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
83
83
84 Don't use this class, use subclasses.
84 Don't use this class, use subclasses.
85
85
86 Methods
86 Methods
87 -------
87 -------
88
88
89 spin
89 spin
90 flushes incoming results and registration state changes
90 flushes incoming results and registration state changes
91 control methods spin, and requesting `ids` also ensures up to date
91 control methods spin, and requesting `ids` also ensures up to date
92
92
93 wait
93 wait
94 wait on one or more msg_ids
94 wait on one or more msg_ids
95
95
96 execution methods
96 execution methods
97 apply
97 apply
98 legacy: execute, run
98 legacy: execute, run
99
99
100 data movement
100 data movement
101 push, pull, scatter, gather
101 push, pull, scatter, gather
102
102
103 query methods
103 query methods
104 get_result, queue_status, purge_results, result_status
104 get_result, queue_status, purge_results, result_status
105
105
106 control methods
106 control methods
107 abort, shutdown
107 abort, shutdown
108
108
109 """
109 """
110 # flags
110 # flags
111 block=Bool(False)
111 block=Bool(False)
112 track=Bool(True)
112 track=Bool(True)
113 targets = Any()
113 targets = Any()
114
114
115 history=List()
115 history=List()
116 outstanding = Set()
116 outstanding = Set()
117 results = Dict()
117 results = Dict()
118 client = Instance('IPython.parallel.Client')
118 client = Instance('IPython.parallel.Client')
119
119
120 _socket = Instance('zmq.Socket')
120 _socket = Instance('zmq.Socket')
121 _flag_names = List(['targets', 'block', 'track'])
121 _flag_names = List(['targets', 'block', 'track'])
122 _in_sync_results = Bool(False)
122 _in_sync_results = Bool(False)
123 _targets = Any()
123 _targets = Any()
124 _idents = Any()
124 _idents = Any()
125
125
126 def __init__(self, client=None, socket=None, **flags):
126 def __init__(self, client=None, socket=None, **flags):
127 super(View, self).__init__(client=client, _socket=socket)
127 super(View, self).__init__(client=client, _socket=socket)
128 self.results = client.results
128 self.results = client.results
129 self.block = client.block
129 self.block = client.block
130
130
131 self.set_flags(**flags)
131 self.set_flags(**flags)
132
132
133 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
133 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
134
134
135 def __repr__(self):
135 def __repr__(self):
136 strtargets = str(self.targets)
136 strtargets = str(self.targets)
137 if len(strtargets) > 16:
137 if len(strtargets) > 16:
138 strtargets = strtargets[:12]+'...]'
138 strtargets = strtargets[:12]+'...]'
139 return "<%s %s>"%(self.__class__.__name__, strtargets)
139 return "<%s %s>"%(self.__class__.__name__, strtargets)
140
140
141 def __len__(self):
141 def __len__(self):
142 if isinstance(self.targets, list):
142 if isinstance(self.targets, list):
143 return len(self.targets)
143 return len(self.targets)
144 elif isinstance(self.targets, int):
144 elif isinstance(self.targets, int):
145 return 1
145 return 1
146 else:
146 else:
147 return len(self.client)
147 return len(self.client)
148
148
149 def set_flags(self, **kwargs):
149 def set_flags(self, **kwargs):
150 """set my attribute flags by keyword.
150 """set my attribute flags by keyword.
151
151
152 Views determine behavior with a few attributes (`block`, `track`, etc.).
152 Views determine behavior with a few attributes (`block`, `track`, etc.).
153 These attributes can be set all at once by name with this method.
153 These attributes can be set all at once by name with this method.
154
154
155 Parameters
155 Parameters
156 ----------
156 ----------
157
157
158 block : bool
158 block : bool
159 whether to wait for results
159 whether to wait for results
160 track : bool
160 track : bool
161 whether to create a MessageTracker to allow the user to
161 whether to create a MessageTracker to allow the user to
162 safely edit after arrays and buffers during non-copying
162 safely edit after arrays and buffers during non-copying
163 sends.
163 sends.
164 """
164 """
165 for name, value in kwargs.iteritems():
165 for name, value in kwargs.iteritems():
166 if name not in self._flag_names:
166 if name not in self._flag_names:
167 raise KeyError("Invalid name: %r"%name)
167 raise KeyError("Invalid name: %r"%name)
168 else:
168 else:
169 setattr(self, name, value)
169 setattr(self, name, value)
170
170
171 @contextmanager
171 @contextmanager
172 def temp_flags(self, **kwargs):
172 def temp_flags(self, **kwargs):
173 """temporarily set flags, for use in `with` statements.
173 """temporarily set flags, for use in `with` statements.
174
174
175 See set_flags for permanent setting of flags
175 See set_flags for permanent setting of flags
176
176
177 Examples
177 Examples
178 --------
178 --------
179
179
180 >>> view.track=False
180 >>> view.track=False
181 ...
181 ...
182 >>> with view.temp_flags(track=True):
182 >>> with view.temp_flags(track=True):
183 ... ar = view.apply(dostuff, my_big_array)
183 ... ar = view.apply(dostuff, my_big_array)
184 ... ar.tracker.wait() # wait for send to finish
184 ... ar.tracker.wait() # wait for send to finish
185 >>> view.track
185 >>> view.track
186 False
186 False
187
187
188 """
188 """
189 # preflight: save flags, and set temporaries
189 # preflight: save flags, and set temporaries
190 saved_flags = {}
190 saved_flags = {}
191 for f in self._flag_names:
191 for f in self._flag_names:
192 saved_flags[f] = getattr(self, f)
192 saved_flags[f] = getattr(self, f)
193 self.set_flags(**kwargs)
193 self.set_flags(**kwargs)
194 # yield to the with-statement block
194 # yield to the with-statement block
195 try:
195 try:
196 yield
196 yield
197 finally:
197 finally:
198 # postflight: restore saved flags
198 # postflight: restore saved flags
199 self.set_flags(**saved_flags)
199 self.set_flags(**saved_flags)
200
200
201
201
202 #----------------------------------------------------------------
202 #----------------------------------------------------------------
203 # apply
203 # apply
204 #----------------------------------------------------------------
204 #----------------------------------------------------------------
205
205
206 def _sync_results(self):
206 def _sync_results(self):
207 """to be called by @sync_results decorator
207 """to be called by @sync_results decorator
208
208
209 after submitting any tasks.
209 after submitting any tasks.
210 """
210 """
211 delta = self.outstanding.difference(self.client.outstanding)
211 delta = self.outstanding.difference(self.client.outstanding)
212 completed = self.outstanding.intersection(delta)
212 completed = self.outstanding.intersection(delta)
213 self.outstanding = self.outstanding.difference(completed)
213 self.outstanding = self.outstanding.difference(completed)
214
214
215 @sync_results
215 @sync_results
216 @save_ids
216 @save_ids
217 def _really_apply(self, f, args, kwargs, block=None, **options):
217 def _really_apply(self, f, args, kwargs, block=None, **options):
218 """wrapper for client.send_apply_request"""
218 """wrapper for client.send_apply_request"""
219 raise NotImplementedError("Implement in subclasses")
219 raise NotImplementedError("Implement in subclasses")
220
220
221 def apply(self, f, *args, **kwargs):
221 def apply(self, f, *args, **kwargs):
222 """calls f(*args, **kwargs) on remote engines, returning the result.
222 """calls f(*args, **kwargs) on remote engines, returning the result.
223
223
224 This method sets all apply flags via this View's attributes.
224 This method sets all apply flags via this View's attributes.
225
225
226 if self.block is False:
226 if self.block is False:
227 returns AsyncResult
227 returns AsyncResult
228 else:
228 else:
229 returns actual result of f(*args, **kwargs)
229 returns actual result of f(*args, **kwargs)
230 """
230 """
231 return self._really_apply(f, args, kwargs)
231 return self._really_apply(f, args, kwargs)
232
232
233 def apply_async(self, f, *args, **kwargs):
233 def apply_async(self, f, *args, **kwargs):
234 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
234 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
235
235
236 returns AsyncResult
236 returns AsyncResult
237 """
237 """
238 return self._really_apply(f, args, kwargs, block=False)
238 return self._really_apply(f, args, kwargs, block=False)
239
239
240 @spin_after
240 @spin_after
241 def apply_sync(self, f, *args, **kwargs):
241 def apply_sync(self, f, *args, **kwargs):
242 """calls f(*args, **kwargs) on remote engines in a blocking manner,
242 """calls f(*args, **kwargs) on remote engines in a blocking manner,
243 returning the result.
243 returning the result.
244
244
245 returns: actual result of f(*args, **kwargs)
245 returns: actual result of f(*args, **kwargs)
246 """
246 """
247 return self._really_apply(f, args, kwargs, block=True)
247 return self._really_apply(f, args, kwargs, block=True)
248
248
249 #----------------------------------------------------------------
249 #----------------------------------------------------------------
250 # wrappers for client and control methods
250 # wrappers for client and control methods
251 #----------------------------------------------------------------
251 #----------------------------------------------------------------
252 @sync_results
252 @sync_results
253 def spin(self):
253 def spin(self):
254 """spin the client, and sync"""
254 """spin the client, and sync"""
255 self.client.spin()
255 self.client.spin()
256
256
257 @sync_results
257 @sync_results
258 def wait(self, jobs=None, timeout=-1):
258 def wait(self, jobs=None, timeout=-1):
259 """waits on one or more `jobs`, for up to `timeout` seconds.
259 """waits on one or more `jobs`, for up to `timeout` seconds.
260
260
261 Parameters
261 Parameters
262 ----------
262 ----------
263
263
264 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
264 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
265 ints are indices to self.history
265 ints are indices to self.history
266 strs are msg_ids
266 strs are msg_ids
267 default: wait on all outstanding messages
267 default: wait on all outstanding messages
268 timeout : float
268 timeout : float
269 a time in seconds, after which to give up.
269 a time in seconds, after which to give up.
270 default is -1, which means no timeout
270 default is -1, which means no timeout
271
271
272 Returns
272 Returns
273 -------
273 -------
274
274
275 True : when all msg_ids are done
275 True : when all msg_ids are done
276 False : timeout reached, some msg_ids still outstanding
276 False : timeout reached, some msg_ids still outstanding
277 """
277 """
278 if jobs is None:
278 if jobs is None:
279 jobs = self.history
279 jobs = self.history
280 return self.client.wait(jobs, timeout)
280 return self.client.wait(jobs, timeout)
281
281
282 def abort(self, jobs=None, targets=None, block=None):
282 def abort(self, jobs=None, targets=None, block=None):
283 """Abort jobs on my engines.
283 """Abort jobs on my engines.
284
284
285 Parameters
285 Parameters
286 ----------
286 ----------
287
287
288 jobs : None, str, list of strs, optional
288 jobs : None, str, list of strs, optional
289 if None: abort all jobs.
289 if None: abort all jobs.
290 else: abort specific msg_id(s).
290 else: abort specific msg_id(s).
291 """
291 """
292 block = block if block is not None else self.block
292 block = block if block is not None else self.block
293 targets = targets if targets is not None else self.targets
293 targets = targets if targets is not None else self.targets
294 jobs = jobs if jobs is not None else list(self.outstanding)
294 jobs = jobs if jobs is not None else list(self.outstanding)
295
295
296 return self.client.abort(jobs=jobs, targets=targets, block=block)
296 return self.client.abort(jobs=jobs, targets=targets, block=block)
297
297
298 def queue_status(self, targets=None, verbose=False):
298 def queue_status(self, targets=None, verbose=False):
299 """Fetch the Queue status of my engines"""
299 """Fetch the Queue status of my engines"""
300 targets = targets if targets is not None else self.targets
300 targets = targets if targets is not None else self.targets
301 return self.client.queue_status(targets=targets, verbose=verbose)
301 return self.client.queue_status(targets=targets, verbose=verbose)
302
302
303 def purge_results(self, jobs=[], targets=[]):
303 def purge_results(self, jobs=[], targets=[]):
304 """Instruct the controller to forget specific results."""
304 """Instruct the controller to forget specific results."""
305 if targets is None or targets == 'all':
305 if targets is None or targets == 'all':
306 targets = self.targets
306 targets = self.targets
307 return self.client.purge_results(jobs=jobs, targets=targets)
307 return self.client.purge_results(jobs=jobs, targets=targets)
308
308
309 def shutdown(self, targets=None, restart=False, hub=False, block=None):
309 def shutdown(self, targets=None, restart=False, hub=False, block=None):
310 """Terminates one or more engine processes, optionally including the hub.
310 """Terminates one or more engine processes, optionally including the hub.
311 """
311 """
312 block = self.block if block is None else block
312 block = self.block if block is None else block
313 if targets is None or targets == 'all':
313 if targets is None or targets == 'all':
314 targets = self.targets
314 targets = self.targets
315 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
315 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
316
316
317 @spin_after
317 @spin_after
318 def get_result(self, indices_or_msg_ids=None):
318 def get_result(self, indices_or_msg_ids=None):
319 """return one or more results, specified by history index or msg_id.
319 """return one or more results, specified by history index or msg_id.
320
320
321 See client.get_result for details.
321 See client.get_result for details.
322
322
323 """
323 """
324
324
325 if indices_or_msg_ids is None:
325 if indices_or_msg_ids is None:
326 indices_or_msg_ids = -1
326 indices_or_msg_ids = -1
327 if isinstance(indices_or_msg_ids, int):
327 if isinstance(indices_or_msg_ids, int):
328 indices_or_msg_ids = self.history[indices_or_msg_ids]
328 indices_or_msg_ids = self.history[indices_or_msg_ids]
329 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
329 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
330 indices_or_msg_ids = list(indices_or_msg_ids)
330 indices_or_msg_ids = list(indices_or_msg_ids)
331 for i,index in enumerate(indices_or_msg_ids):
331 for i,index in enumerate(indices_or_msg_ids):
332 if isinstance(index, int):
332 if isinstance(index, int):
333 indices_or_msg_ids[i] = self.history[index]
333 indices_or_msg_ids[i] = self.history[index]
334 return self.client.get_result(indices_or_msg_ids)
334 return self.client.get_result(indices_or_msg_ids)
335
335
336 #-------------------------------------------------------------------
336 #-------------------------------------------------------------------
337 # Map
337 # Map
338 #-------------------------------------------------------------------
338 #-------------------------------------------------------------------
339
339
340 @sync_results
340 @sync_results
341 def map(self, f, *sequences, **kwargs):
341 def map(self, f, *sequences, **kwargs):
342 """override in subclasses"""
342 """override in subclasses"""
343 raise NotImplementedError
343 raise NotImplementedError
344
344
345 def map_async(self, f, *sequences, **kwargs):
345 def map_async(self, f, *sequences, **kwargs):
346 """Parallel version of builtin `map`, using this view's engines.
346 """Parallel version of builtin `map`, using this view's engines.
347
347
348 This is equivalent to map(...block=False)
348 This is equivalent to map(...block=False)
349
349
350 See `self.map` for details.
350 See `self.map` for details.
351 """
351 """
352 if 'block' in kwargs:
352 if 'block' in kwargs:
353 raise TypeError("map_async doesn't take a `block` keyword argument.")
353 raise TypeError("map_async doesn't take a `block` keyword argument.")
354 kwargs['block'] = False
354 kwargs['block'] = False
355 return self.map(f,*sequences,**kwargs)
355 return self.map(f,*sequences,**kwargs)
356
356
357 def map_sync(self, f, *sequences, **kwargs):
357 def map_sync(self, f, *sequences, **kwargs):
358 """Parallel version of builtin `map`, using this view's engines.
358 """Parallel version of builtin `map`, using this view's engines.
359
359
360 This is equivalent to map(...block=True)
360 This is equivalent to map(...block=True)
361
361
362 See `self.map` for details.
362 See `self.map` for details.
363 """
363 """
364 if 'block' in kwargs:
364 if 'block' in kwargs:
365 raise TypeError("map_sync doesn't take a `block` keyword argument.")
365 raise TypeError("map_sync doesn't take a `block` keyword argument.")
366 kwargs['block'] = True
366 kwargs['block'] = True
367 return self.map(f,*sequences,**kwargs)
367 return self.map(f,*sequences,**kwargs)
368
368
369 def imap(self, f, *sequences, **kwargs):
369 def imap(self, f, *sequences, **kwargs):
370 """Parallel version of `itertools.imap`.
370 """Parallel version of `itertools.imap`.
371
371
372 See `self.map` for details.
372 See `self.map` for details.
373
373
374 """
374 """
375
375
376 return iter(self.map_async(f,*sequences, **kwargs))
376 return iter(self.map_async(f,*sequences, **kwargs))
377
377
378 #-------------------------------------------------------------------
378 #-------------------------------------------------------------------
379 # Decorators
379 # Decorators
380 #-------------------------------------------------------------------
380 #-------------------------------------------------------------------
381
381
382 def remote(self, block=True, **flags):
382 def remote(self, block=None, **flags):
383 """Decorator for making a RemoteFunction"""
383 """Decorator for making a RemoteFunction"""
384 block = self.block if block is None else block
384 block = self.block if block is None else block
385 return remote(self, block=block, **flags)
385 return remote(self, block=block, **flags)
386
386
387 def parallel(self, dist='b', block=None, **flags):
387 def parallel(self, dist='b', block=None, **flags):
388 """Decorator for making a ParallelFunction"""
388 """Decorator for making a ParallelFunction"""
389 block = self.block if block is None else block
389 block = self.block if block is None else block
390 return parallel(self, dist=dist, block=block, **flags)
390 return parallel(self, dist=dist, block=block, **flags)
391
391
392 @skip_doctest
392 @skip_doctest
393 class DirectView(View):
393 class DirectView(View):
394 """Direct Multiplexer View of one or more engines.
394 """Direct Multiplexer View of one or more engines.
395
395
396 These are created via indexed access to a client:
396 These are created via indexed access to a client:
397
397
398 >>> dv_1 = client[1]
398 >>> dv_1 = client[1]
399 >>> dv_all = client[:]
399 >>> dv_all = client[:]
400 >>> dv_even = client[::2]
400 >>> dv_even = client[::2]
401 >>> dv_some = client[1:3]
401 >>> dv_some = client[1:3]
402
402
403 This object provides dictionary access to engine namespaces:
403 This object provides dictionary access to engine namespaces:
404
404
405 # push a=5:
405 # push a=5:
406 >>> dv['a'] = 5
406 >>> dv['a'] = 5
407 # pull 'foo':
407 # pull 'foo':
408 >>> db['foo']
408 >>> db['foo']
409
409
410 """
410 """
411
411
412 def __init__(self, client=None, socket=None, targets=None):
412 def __init__(self, client=None, socket=None, targets=None):
413 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
413 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
414
414
415 @property
415 @property
416 def importer(self):
416 def importer(self):
417 """sync_imports(local=True) as a property.
417 """sync_imports(local=True) as a property.
418
418
419 See sync_imports for details.
419 See sync_imports for details.
420
420
421 """
421 """
422 return self.sync_imports(True)
422 return self.sync_imports(True)
423
423
424 @contextmanager
424 @contextmanager
425 def sync_imports(self, local=True, quiet=False):
425 def sync_imports(self, local=True, quiet=False):
426 """Context Manager for performing simultaneous local and remote imports.
426 """Context Manager for performing simultaneous local and remote imports.
427
427
428 'import x as y' will *not* work. The 'as y' part will simply be ignored.
428 'import x as y' will *not* work. The 'as y' part will simply be ignored.
429
429
430 If `local=True`, then the package will also be imported locally.
430 If `local=True`, then the package will also be imported locally.
431
431
432 If `quiet=True`, no output will be produced when attempting remote
432 If `quiet=True`, no output will be produced when attempting remote
433 imports.
433 imports.
434
434
435 Note that remote-only (`local=False`) imports have not been implemented.
435 Note that remote-only (`local=False`) imports have not been implemented.
436
436
437 >>> with view.sync_imports():
437 >>> with view.sync_imports():
438 ... from numpy import recarray
438 ... from numpy import recarray
439 importing recarray from numpy on engine(s)
439 importing recarray from numpy on engine(s)
440
440
441 """
441 """
442 import __builtin__
442 import __builtin__
443 local_import = __builtin__.__import__
443 local_import = __builtin__.__import__
444 modules = set()
444 modules = set()
445 results = []
445 results = []
446 @util.interactive
446 @util.interactive
447 def remote_import(name, fromlist, level):
447 def remote_import(name, fromlist, level):
448 """the function to be passed to apply, that actually performs the import
448 """the function to be passed to apply, that actually performs the import
449 on the engine, and loads up the user namespace.
449 on the engine, and loads up the user namespace.
450 """
450 """
451 import sys
451 import sys
452 user_ns = globals()
452 user_ns = globals()
453 mod = __import__(name, fromlist=fromlist, level=level)
453 mod = __import__(name, fromlist=fromlist, level=level)
454 if fromlist:
454 if fromlist:
455 for key in fromlist:
455 for key in fromlist:
456 user_ns[key] = getattr(mod, key)
456 user_ns[key] = getattr(mod, key)
457 else:
457 else:
458 user_ns[name] = sys.modules[name]
458 user_ns[name] = sys.modules[name]
459
459
460 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
460 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
461 """the drop-in replacement for __import__, that optionally imports
461 """the drop-in replacement for __import__, that optionally imports
462 locally as well.
462 locally as well.
463 """
463 """
464 # don't override nested imports
464 # don't override nested imports
465 save_import = __builtin__.__import__
465 save_import = __builtin__.__import__
466 __builtin__.__import__ = local_import
466 __builtin__.__import__ = local_import
467
467
468 if imp.lock_held():
468 if imp.lock_held():
469 # this is a side-effect import, don't do it remotely, or even
469 # this is a side-effect import, don't do it remotely, or even
470 # ignore the local effects
470 # ignore the local effects
471 return local_import(name, globals, locals, fromlist, level)
471 return local_import(name, globals, locals, fromlist, level)
472
472
473 imp.acquire_lock()
473 imp.acquire_lock()
474 if local:
474 if local:
475 mod = local_import(name, globals, locals, fromlist, level)
475 mod = local_import(name, globals, locals, fromlist, level)
476 else:
476 else:
477 raise NotImplementedError("remote-only imports not yet implemented")
477 raise NotImplementedError("remote-only imports not yet implemented")
478 imp.release_lock()
478 imp.release_lock()
479
479
480 key = name+':'+','.join(fromlist or [])
480 key = name+':'+','.join(fromlist or [])
481 if level == -1 and key not in modules:
481 if level == -1 and key not in modules:
482 modules.add(key)
482 modules.add(key)
483 if not quiet:
483 if not quiet:
484 if fromlist:
484 if fromlist:
485 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
485 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
486 else:
486 else:
487 print "importing %s on engine(s)"%name
487 print "importing %s on engine(s)"%name
488 results.append(self.apply_async(remote_import, name, fromlist, level))
488 results.append(self.apply_async(remote_import, name, fromlist, level))
489 # restore override
489 # restore override
490 __builtin__.__import__ = save_import
490 __builtin__.__import__ = save_import
491
491
492 return mod
492 return mod
493
493
494 # override __import__
494 # override __import__
495 __builtin__.__import__ = view_import
495 __builtin__.__import__ = view_import
496 try:
496 try:
497 # enter the block
497 # enter the block
498 yield
498 yield
499 except ImportError:
499 except ImportError:
500 if local:
500 if local:
501 raise
501 raise
502 else:
502 else:
503 # ignore import errors if not doing local imports
503 # ignore import errors if not doing local imports
504 pass
504 pass
505 finally:
505 finally:
506 # always restore __import__
506 # always restore __import__
507 __builtin__.__import__ = local_import
507 __builtin__.__import__ = local_import
508
508
509 for r in results:
509 for r in results:
510 # raise possible remote ImportErrors here
510 # raise possible remote ImportErrors here
511 r.get()
511 r.get()
512
512
513
513
514 @sync_results
514 @sync_results
515 @save_ids
515 @save_ids
516 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
516 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
517 """calls f(*args, **kwargs) on remote engines, returning the result.
517 """calls f(*args, **kwargs) on remote engines, returning the result.
518
518
519 This method sets all of `apply`'s flags via this View's attributes.
519 This method sets all of `apply`'s flags via this View's attributes.
520
520
521 Parameters
521 Parameters
522 ----------
522 ----------
523
523
524 f : callable
524 f : callable
525
525
526 args : list [default: empty]
526 args : list [default: empty]
527
527
528 kwargs : dict [default: empty]
528 kwargs : dict [default: empty]
529
529
530 targets : target list [default: self.targets]
530 targets : target list [default: self.targets]
531 where to run
531 where to run
532 block : bool [default: self.block]
532 block : bool [default: self.block]
533 whether to block
533 whether to block
534 track : bool [default: self.track]
534 track : bool [default: self.track]
535 whether to ask zmq to track the message, for safe non-copying sends
535 whether to ask zmq to track the message, for safe non-copying sends
536
536
537 Returns
537 Returns
538 -------
538 -------
539
539
540 if self.block is False:
540 if self.block is False:
541 returns AsyncResult
541 returns AsyncResult
542 else:
542 else:
543 returns actual result of f(*args, **kwargs) on the engine(s)
543 returns actual result of f(*args, **kwargs) on the engine(s)
544 This will be a list of self.targets is also a list (even length 1), or
544 This will be a list of self.targets is also a list (even length 1), or
545 the single result if self.targets is an integer engine id
545 the single result if self.targets is an integer engine id
546 """
546 """
547 args = [] if args is None else args
547 args = [] if args is None else args
548 kwargs = {} if kwargs is None else kwargs
548 kwargs = {} if kwargs is None else kwargs
549 block = self.block if block is None else block
549 block = self.block if block is None else block
550 track = self.track if track is None else track
550 track = self.track if track is None else track
551 targets = self.targets if targets is None else targets
551 targets = self.targets if targets is None else targets
552
552
553 _idents, _targets = self.client._build_targets(targets)
553 _idents, _targets = self.client._build_targets(targets)
554 msg_ids = []
554 msg_ids = []
555 trackers = []
555 trackers = []
556 for ident in _idents:
556 for ident in _idents:
557 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
557 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
558 ident=ident)
558 ident=ident)
559 if track:
559 if track:
560 trackers.append(msg['tracker'])
560 trackers.append(msg['tracker'])
561 msg_ids.append(msg['header']['msg_id'])
561 msg_ids.append(msg['header']['msg_id'])
562 if isinstance(targets, int):
562 if isinstance(targets, int):
563 msg_ids = msg_ids[0]
563 msg_ids = msg_ids[0]
564 tracker = None if track is False else zmq.MessageTracker(*trackers)
564 tracker = None if track is False else zmq.MessageTracker(*trackers)
565 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets, tracker=tracker)
565 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets, tracker=tracker)
566 if block:
566 if block:
567 try:
567 try:
568 return ar.get()
568 return ar.get()
569 except KeyboardInterrupt:
569 except KeyboardInterrupt:
570 pass
570 pass
571 return ar
571 return ar
572
572
573
573
574 @sync_results
574 @sync_results
575 def map(self, f, *sequences, **kwargs):
575 def map(self, f, *sequences, **kwargs):
576 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
576 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
577
577
578 Parallel version of builtin `map`, using this View's `targets`.
578 Parallel version of builtin `map`, using this View's `targets`.
579
579
580 There will be one task per target, so work will be chunked
580 There will be one task per target, so work will be chunked
581 if the sequences are longer than `targets`.
581 if the sequences are longer than `targets`.
582
582
583 Results can be iterated as they are ready, but will become available in chunks.
583 Results can be iterated as they are ready, but will become available in chunks.
584
584
585 Parameters
585 Parameters
586 ----------
586 ----------
587
587
588 f : callable
588 f : callable
589 function to be mapped
589 function to be mapped
590 *sequences: one or more sequences of matching length
590 *sequences: one or more sequences of matching length
591 the sequences to be distributed and passed to `f`
591 the sequences to be distributed and passed to `f`
592 block : bool
592 block : bool
593 whether to wait for the result or not [default self.block]
593 whether to wait for the result or not [default self.block]
594
594
595 Returns
595 Returns
596 -------
596 -------
597
597
598 if block=False:
598 if block=False:
599 AsyncMapResult
599 AsyncMapResult
600 An object like AsyncResult, but which reassembles the sequence of results
600 An object like AsyncResult, but which reassembles the sequence of results
601 into a single list. AsyncMapResults can be iterated through before all
601 into a single list. AsyncMapResults can be iterated through before all
602 results are complete.
602 results are complete.
603 else:
603 else:
604 list
604 list
605 the result of map(f,*sequences)
605 the result of map(f,*sequences)
606 """
606 """
607
607
608 block = kwargs.pop('block', self.block)
608 block = kwargs.pop('block', self.block)
609 for k in kwargs.keys():
609 for k in kwargs.keys():
610 if k not in ['block', 'track']:
610 if k not in ['block', 'track']:
611 raise TypeError("invalid keyword arg, %r"%k)
611 raise TypeError("invalid keyword arg, %r"%k)
612
612
613 assert len(sequences) > 0, "must have some sequences to map onto!"
613 assert len(sequences) > 0, "must have some sequences to map onto!"
614 pf = ParallelFunction(self, f, block=block, **kwargs)
614 pf = ParallelFunction(self, f, block=block, **kwargs)
615 return pf.map(*sequences)
615 return pf.map(*sequences)
616
616
617 @sync_results
617 @sync_results
618 @save_ids
618 @save_ids
619 def execute(self, code, silent=True, targets=None, block=None):
619 def execute(self, code, silent=True, targets=None, block=None):
620 """Executes `code` on `targets` in blocking or nonblocking manner.
620 """Executes `code` on `targets` in blocking or nonblocking manner.
621
621
622 ``execute`` is always `bound` (affects engine namespace)
622 ``execute`` is always `bound` (affects engine namespace)
623
623
624 Parameters
624 Parameters
625 ----------
625 ----------
626
626
627 code : str
627 code : str
628 the code string to be executed
628 the code string to be executed
629 block : bool
629 block : bool
630 whether or not to wait until done to return
630 whether or not to wait until done to return
631 default: self.block
631 default: self.block
632 """
632 """
633 block = self.block if block is None else block
633 block = self.block if block is None else block
634 targets = self.targets if targets is None else targets
634 targets = self.targets if targets is None else targets
635
635
636 _idents, _targets = self.client._build_targets(targets)
636 _idents, _targets = self.client._build_targets(targets)
637 msg_ids = []
637 msg_ids = []
638 trackers = []
638 trackers = []
639 for ident in _idents:
639 for ident in _idents:
640 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
640 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
641 msg_ids.append(msg['header']['msg_id'])
641 msg_ids.append(msg['header']['msg_id'])
642 if isinstance(targets, int):
642 if isinstance(targets, int):
643 msg_ids = msg_ids[0]
643 msg_ids = msg_ids[0]
644 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets)
644 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets)
645 if block:
645 if block:
646 try:
646 try:
647 ar.get()
647 ar.get()
648 except KeyboardInterrupt:
648 except KeyboardInterrupt:
649 pass
649 pass
650 return ar
650 return ar
651
651
652 def run(self, filename, targets=None, block=None):
652 def run(self, filename, targets=None, block=None):
653 """Execute contents of `filename` on my engine(s).
653 """Execute contents of `filename` on my engine(s).
654
654
655 This simply reads the contents of the file and calls `execute`.
655 This simply reads the contents of the file and calls `execute`.
656
656
657 Parameters
657 Parameters
658 ----------
658 ----------
659
659
660 filename : str
660 filename : str
661 The path to the file
661 The path to the file
662 targets : int/str/list of ints/strs
662 targets : int/str/list of ints/strs
663 the engines on which to execute
663 the engines on which to execute
664 default : all
664 default : all
665 block : bool
665 block : bool
666 whether or not to wait until done
666 whether or not to wait until done
667 default: self.block
667 default: self.block
668
668
669 """
669 """
670 with open(filename, 'r') as f:
670 with open(filename, 'r') as f:
671 # add newline in case of trailing indented whitespace
671 # add newline in case of trailing indented whitespace
672 # which will cause SyntaxError
672 # which will cause SyntaxError
673 code = f.read()+'\n'
673 code = f.read()+'\n'
674 return self.execute(code, block=block, targets=targets)
674 return self.execute(code, block=block, targets=targets)
675
675
676 def update(self, ns):
676 def update(self, ns):
677 """update remote namespace with dict `ns`
677 """update remote namespace with dict `ns`
678
678
679 See `push` for details.
679 See `push` for details.
680 """
680 """
681 return self.push(ns, block=self.block, track=self.track)
681 return self.push(ns, block=self.block, track=self.track)
682
682
683 def push(self, ns, targets=None, block=None, track=None):
683 def push(self, ns, targets=None, block=None, track=None):
684 """update remote namespace with dict `ns`
684 """update remote namespace with dict `ns`
685
685
686 Parameters
686 Parameters
687 ----------
687 ----------
688
688
689 ns : dict
689 ns : dict
690 dict of keys with which to update engine namespace(s)
690 dict of keys with which to update engine namespace(s)
691 block : bool [default : self.block]
691 block : bool [default : self.block]
692 whether to wait to be notified of engine receipt
692 whether to wait to be notified of engine receipt
693
693
694 """
694 """
695
695
696 block = block if block is not None else self.block
696 block = block if block is not None else self.block
697 track = track if track is not None else self.track
697 track = track if track is not None else self.track
698 targets = targets if targets is not None else self.targets
698 targets = targets if targets is not None else self.targets
699 # applier = self.apply_sync if block else self.apply_async
699 # applier = self.apply_sync if block else self.apply_async
700 if not isinstance(ns, dict):
700 if not isinstance(ns, dict):
701 raise TypeError("Must be a dict, not %s"%type(ns))
701 raise TypeError("Must be a dict, not %s"%type(ns))
702 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
702 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
703
703
704 def get(self, key_s):
704 def get(self, key_s):
705 """get object(s) by `key_s` from remote namespace
705 """get object(s) by `key_s` from remote namespace
706
706
707 see `pull` for details.
707 see `pull` for details.
708 """
708 """
709 # block = block if block is not None else self.block
709 # block = block if block is not None else self.block
710 return self.pull(key_s, block=True)
710 return self.pull(key_s, block=True)
711
711
712 def pull(self, names, targets=None, block=None):
712 def pull(self, names, targets=None, block=None):
713 """get object(s) by `name` from remote namespace
713 """get object(s) by `name` from remote namespace
714
714
715 will return one object if it is a key.
715 will return one object if it is a key.
716 can also take a list of keys, in which case it will return a list of objects.
716 can also take a list of keys, in which case it will return a list of objects.
717 """
717 """
718 block = block if block is not None else self.block
718 block = block if block is not None else self.block
719 targets = targets if targets is not None else self.targets
719 targets = targets if targets is not None else self.targets
720 applier = self.apply_sync if block else self.apply_async
720 applier = self.apply_sync if block else self.apply_async
721 if isinstance(names, basestring):
721 if isinstance(names, basestring):
722 pass
722 pass
723 elif isinstance(names, (list,tuple,set)):
723 elif isinstance(names, (list,tuple,set)):
724 for key in names:
724 for key in names:
725 if not isinstance(key, basestring):
725 if not isinstance(key, basestring):
726 raise TypeError("keys must be str, not type %r"%type(key))
726 raise TypeError("keys must be str, not type %r"%type(key))
727 else:
727 else:
728 raise TypeError("names must be strs, not %r"%names)
728 raise TypeError("names must be strs, not %r"%names)
729 return self._really_apply(util._pull, (names,), block=block, targets=targets)
729 return self._really_apply(util._pull, (names,), block=block, targets=targets)
730
730
731 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
731 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
732 """
732 """
733 Partition a Python sequence and send the partitions to a set of engines.
733 Partition a Python sequence and send the partitions to a set of engines.
734 """
734 """
735 block = block if block is not None else self.block
735 block = block if block is not None else self.block
736 track = track if track is not None else self.track
736 track = track if track is not None else self.track
737 targets = targets if targets is not None else self.targets
737 targets = targets if targets is not None else self.targets
738
738
739 # construct integer ID list:
739 # construct integer ID list:
740 targets = self.client._build_targets(targets)[1]
740 targets = self.client._build_targets(targets)[1]
741
741
742 mapObject = Map.dists[dist]()
742 mapObject = Map.dists[dist]()
743 nparts = len(targets)
743 nparts = len(targets)
744 msg_ids = []
744 msg_ids = []
745 trackers = []
745 trackers = []
746 for index, engineid in enumerate(targets):
746 for index, engineid in enumerate(targets):
747 partition = mapObject.getPartition(seq, index, nparts)
747 partition = mapObject.getPartition(seq, index, nparts)
748 if flatten and len(partition) == 1:
748 if flatten and len(partition) == 1:
749 ns = {key: partition[0]}
749 ns = {key: partition[0]}
750 else:
750 else:
751 ns = {key: partition}
751 ns = {key: partition}
752 r = self.push(ns, block=False, track=track, targets=engineid)
752 r = self.push(ns, block=False, track=track, targets=engineid)
753 msg_ids.extend(r.msg_ids)
753 msg_ids.extend(r.msg_ids)
754 if track:
754 if track:
755 trackers.append(r._tracker)
755 trackers.append(r._tracker)
756
756
757 if track:
757 if track:
758 tracker = zmq.MessageTracker(*trackers)
758 tracker = zmq.MessageTracker(*trackers)
759 else:
759 else:
760 tracker = None
760 tracker = None
761
761
762 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
762 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
763 if block:
763 if block:
764 r.wait()
764 r.wait()
765 else:
765 else:
766 return r
766 return r
767
767
768 @sync_results
768 @sync_results
769 @save_ids
769 @save_ids
770 def gather(self, key, dist='b', targets=None, block=None):
770 def gather(self, key, dist='b', targets=None, block=None):
771 """
771 """
772 Gather a partitioned sequence on a set of engines as a single local seq.
772 Gather a partitioned sequence on a set of engines as a single local seq.
773 """
773 """
774 block = block if block is not None else self.block
774 block = block if block is not None else self.block
775 targets = targets if targets is not None else self.targets
775 targets = targets if targets is not None else self.targets
776 mapObject = Map.dists[dist]()
776 mapObject = Map.dists[dist]()
777 msg_ids = []
777 msg_ids = []
778
778
779 # construct integer ID list:
779 # construct integer ID list:
780 targets = self.client._build_targets(targets)[1]
780 targets = self.client._build_targets(targets)[1]
781
781
782 for index, engineid in enumerate(targets):
782 for index, engineid in enumerate(targets):
783 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
783 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
784
784
785 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
785 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
786
786
787 if block:
787 if block:
788 try:
788 try:
789 return r.get()
789 return r.get()
790 except KeyboardInterrupt:
790 except KeyboardInterrupt:
791 pass
791 pass
792 return r
792 return r
793
793
794 def __getitem__(self, key):
794 def __getitem__(self, key):
795 return self.get(key)
795 return self.get(key)
796
796
797 def __setitem__(self,key, value):
797 def __setitem__(self,key, value):
798 self.update({key:value})
798 self.update({key:value})
799
799
800 def clear(self, targets=None, block=False):
800 def clear(self, targets=None, block=None):
801 """Clear the remote namespaces on my engines."""
801 """Clear the remote namespaces on my engines."""
802 block = block if block is not None else self.block
802 block = block if block is not None else self.block
803 targets = targets if targets is not None else self.targets
803 targets = targets if targets is not None else self.targets
804 return self.client.clear(targets=targets, block=block)
804 return self.client.clear(targets=targets, block=block)
805
805
806 #----------------------------------------
806 #----------------------------------------
807 # activate for %px, %autopx, etc. magics
807 # activate for %px, %autopx, etc. magics
808 #----------------------------------------
808 #----------------------------------------
809
809
810 def activate(self, suffix=''):
810 def activate(self, suffix=''):
811 """Activate IPython magics associated with this View
811 """Activate IPython magics associated with this View
812
812
813 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
813 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
814
814
815 Parameters
815 Parameters
816 ----------
816 ----------
817
817
818 suffix: str [default: '']
818 suffix: str [default: '']
819 The suffix, if any, for the magics. This allows you to have
819 The suffix, if any, for the magics. This allows you to have
820 multiple views associated with parallel magics at the same time.
820 multiple views associated with parallel magics at the same time.
821
821
822 e.g. ``rc[::2].activate(suffix='_even')`` will give you
822 e.g. ``rc[::2].activate(suffix='_even')`` will give you
823 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
823 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
824 on the even engines.
824 on the even engines.
825 """
825 """
826
826
827 from IPython.parallel.client.magics import ParallelMagics
827 from IPython.parallel.client.magics import ParallelMagics
828
828
829 try:
829 try:
830 # This is injected into __builtins__.
830 # This is injected into __builtins__.
831 ip = get_ipython()
831 ip = get_ipython()
832 except NameError:
832 except NameError:
833 print "The IPython parallel magics (%px, etc.) only work within IPython."
833 print "The IPython parallel magics (%px, etc.) only work within IPython."
834 return
834 return
835
835
836 M = ParallelMagics(ip, self, suffix)
836 M = ParallelMagics(ip, self, suffix)
837 ip.magics_manager.register(M)
837 ip.magics_manager.register(M)
838
838
839
839
840 @skip_doctest
840 @skip_doctest
841 class LoadBalancedView(View):
841 class LoadBalancedView(View):
842 """An load-balancing View that only executes via the Task scheduler.
842 """An load-balancing View that only executes via the Task scheduler.
843
843
844 Load-balanced views can be created with the client's `view` method:
844 Load-balanced views can be created with the client's `view` method:
845
845
846 >>> v = client.load_balanced_view()
846 >>> v = client.load_balanced_view()
847
847
848 or targets can be specified, to restrict the potential destinations:
848 or targets can be specified, to restrict the potential destinations:
849
849
850 >>> v = client.client.load_balanced_view([1,3])
850 >>> v = client.client.load_balanced_view([1,3])
851
851
852 which would restrict loadbalancing to between engines 1 and 3.
852 which would restrict loadbalancing to between engines 1 and 3.
853
853
854 """
854 """
855
855
856 follow=Any()
856 follow=Any()
857 after=Any()
857 after=Any()
858 timeout=CFloat()
858 timeout=CFloat()
859 retries = Integer(0)
859 retries = Integer(0)
860
860
861 _task_scheme = Any()
861 _task_scheme = Any()
862 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
862 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
863
863
864 def __init__(self, client=None, socket=None, **flags):
864 def __init__(self, client=None, socket=None, **flags):
865 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
865 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
866 self._task_scheme=client._task_scheme
866 self._task_scheme=client._task_scheme
867
867
868 def _validate_dependency(self, dep):
868 def _validate_dependency(self, dep):
869 """validate a dependency.
869 """validate a dependency.
870
870
871 For use in `set_flags`.
871 For use in `set_flags`.
872 """
872 """
873 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
873 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
874 return True
874 return True
875 elif isinstance(dep, (list,set, tuple)):
875 elif isinstance(dep, (list,set, tuple)):
876 for d in dep:
876 for d in dep:
877 if not isinstance(d, (basestring, AsyncResult)):
877 if not isinstance(d, (basestring, AsyncResult)):
878 return False
878 return False
879 elif isinstance(dep, dict):
879 elif isinstance(dep, dict):
880 if set(dep.keys()) != set(Dependency().as_dict().keys()):
880 if set(dep.keys()) != set(Dependency().as_dict().keys()):
881 return False
881 return False
882 if not isinstance(dep['msg_ids'], list):
882 if not isinstance(dep['msg_ids'], list):
883 return False
883 return False
884 for d in dep['msg_ids']:
884 for d in dep['msg_ids']:
885 if not isinstance(d, basestring):
885 if not isinstance(d, basestring):
886 return False
886 return False
887 else:
887 else:
888 return False
888 return False
889
889
890 return True
890 return True
891
891
892 def _render_dependency(self, dep):
892 def _render_dependency(self, dep):
893 """helper for building jsonable dependencies from various input forms."""
893 """helper for building jsonable dependencies from various input forms."""
894 if isinstance(dep, Dependency):
894 if isinstance(dep, Dependency):
895 return dep.as_dict()
895 return dep.as_dict()
896 elif isinstance(dep, AsyncResult):
896 elif isinstance(dep, AsyncResult):
897 return dep.msg_ids
897 return dep.msg_ids
898 elif dep is None:
898 elif dep is None:
899 return []
899 return []
900 else:
900 else:
901 # pass to Dependency constructor
901 # pass to Dependency constructor
902 return list(Dependency(dep))
902 return list(Dependency(dep))
903
903
904 def set_flags(self, **kwargs):
904 def set_flags(self, **kwargs):
905 """set my attribute flags by keyword.
905 """set my attribute flags by keyword.
906
906
907 A View is a wrapper for the Client's apply method, but with attributes
907 A View is a wrapper for the Client's apply method, but with attributes
908 that specify keyword arguments, those attributes can be set by keyword
908 that specify keyword arguments, those attributes can be set by keyword
909 argument with this method.
909 argument with this method.
910
910
911 Parameters
911 Parameters
912 ----------
912 ----------
913
913
914 block : bool
914 block : bool
915 whether to wait for results
915 whether to wait for results
916 track : bool
916 track : bool
917 whether to create a MessageTracker to allow the user to
917 whether to create a MessageTracker to allow the user to
918 safely edit after arrays and buffers during non-copying
918 safely edit after arrays and buffers during non-copying
919 sends.
919 sends.
920
920
921 after : Dependency or collection of msg_ids
921 after : Dependency or collection of msg_ids
922 Only for load-balanced execution (targets=None)
922 Only for load-balanced execution (targets=None)
923 Specify a list of msg_ids as a time-based dependency.
923 Specify a list of msg_ids as a time-based dependency.
924 This job will only be run *after* the dependencies
924 This job will only be run *after* the dependencies
925 have been met.
925 have been met.
926
926
927 follow : Dependency or collection of msg_ids
927 follow : Dependency or collection of msg_ids
928 Only for load-balanced execution (targets=None)
928 Only for load-balanced execution (targets=None)
929 Specify a list of msg_ids as a location-based dependency.
929 Specify a list of msg_ids as a location-based dependency.
930 This job will only be run on an engine where this dependency
930 This job will only be run on an engine where this dependency
931 is met.
931 is met.
932
932
933 timeout : float/int or None
933 timeout : float/int or None
934 Only for load-balanced execution (targets=None)
934 Only for load-balanced execution (targets=None)
935 Specify an amount of time (in seconds) for the scheduler to
935 Specify an amount of time (in seconds) for the scheduler to
936 wait for dependencies to be met before failing with a
936 wait for dependencies to be met before failing with a
937 DependencyTimeout.
937 DependencyTimeout.
938
938
939 retries : int
939 retries : int
940 Number of times a task will be retried on failure.
940 Number of times a task will be retried on failure.
941 """
941 """
942
942
943 super(LoadBalancedView, self).set_flags(**kwargs)
943 super(LoadBalancedView, self).set_flags(**kwargs)
944 for name in ('follow', 'after'):
944 for name in ('follow', 'after'):
945 if name in kwargs:
945 if name in kwargs:
946 value = kwargs[name]
946 value = kwargs[name]
947 if self._validate_dependency(value):
947 if self._validate_dependency(value):
948 setattr(self, name, value)
948 setattr(self, name, value)
949 else:
949 else:
950 raise ValueError("Invalid dependency: %r"%value)
950 raise ValueError("Invalid dependency: %r"%value)
951 if 'timeout' in kwargs:
951 if 'timeout' in kwargs:
952 t = kwargs['timeout']
952 t = kwargs['timeout']
953 if not isinstance(t, (int, long, float, type(None))):
953 if not isinstance(t, (int, long, float, type(None))):
954 raise TypeError("Invalid type for timeout: %r"%type(t))
954 raise TypeError("Invalid type for timeout: %r"%type(t))
955 if t is not None:
955 if t is not None:
956 if t < 0:
956 if t < 0:
957 raise ValueError("Invalid timeout: %s"%t)
957 raise ValueError("Invalid timeout: %s"%t)
958 self.timeout = t
958 self.timeout = t
959
959
960 @sync_results
960 @sync_results
961 @save_ids
961 @save_ids
962 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
962 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
963 after=None, follow=None, timeout=None,
963 after=None, follow=None, timeout=None,
964 targets=None, retries=None):
964 targets=None, retries=None):
965 """calls f(*args, **kwargs) on a remote engine, returning the result.
965 """calls f(*args, **kwargs) on a remote engine, returning the result.
966
966
967 This method temporarily sets all of `apply`'s flags for a single call.
967 This method temporarily sets all of `apply`'s flags for a single call.
968
968
969 Parameters
969 Parameters
970 ----------
970 ----------
971
971
972 f : callable
972 f : callable
973
973
974 args : list [default: empty]
974 args : list [default: empty]
975
975
976 kwargs : dict [default: empty]
976 kwargs : dict [default: empty]
977
977
978 block : bool [default: self.block]
978 block : bool [default: self.block]
979 whether to block
979 whether to block
980 track : bool [default: self.track]
980 track : bool [default: self.track]
981 whether to ask zmq to track the message, for safe non-copying sends
981 whether to ask zmq to track the message, for safe non-copying sends
982
982
983 !!!!!! TODO: THE REST HERE !!!!
983 !!!!!! TODO: THE REST HERE !!!!
984
984
985 Returns
985 Returns
986 -------
986 -------
987
987
988 if self.block is False:
988 if self.block is False:
989 returns AsyncResult
989 returns AsyncResult
990 else:
990 else:
991 returns actual result of f(*args, **kwargs) on the engine(s)
991 returns actual result of f(*args, **kwargs) on the engine(s)
992 This will be a list of self.targets is also a list (even length 1), or
992 This will be a list of self.targets is also a list (even length 1), or
993 the single result if self.targets is an integer engine id
993 the single result if self.targets is an integer engine id
994 """
994 """
995
995
996 # validate whether we can run
996 # validate whether we can run
997 if self._socket.closed:
997 if self._socket.closed:
998 msg = "Task farming is disabled"
998 msg = "Task farming is disabled"
999 if self._task_scheme == 'pure':
999 if self._task_scheme == 'pure':
1000 msg += " because the pure ZMQ scheduler cannot handle"
1000 msg += " because the pure ZMQ scheduler cannot handle"
1001 msg += " disappearing engines."
1001 msg += " disappearing engines."
1002 raise RuntimeError(msg)
1002 raise RuntimeError(msg)
1003
1003
1004 if self._task_scheme == 'pure':
1004 if self._task_scheme == 'pure':
1005 # pure zmq scheme doesn't support extra features
1005 # pure zmq scheme doesn't support extra features
1006 msg = "Pure ZMQ scheduler doesn't support the following flags:"
1006 msg = "Pure ZMQ scheduler doesn't support the following flags:"
1007 "follow, after, retries, targets, timeout"
1007 "follow, after, retries, targets, timeout"
1008 if (follow or after or retries or targets or timeout):
1008 if (follow or after or retries or targets or timeout):
1009 # hard fail on Scheduler flags
1009 # hard fail on Scheduler flags
1010 raise RuntimeError(msg)
1010 raise RuntimeError(msg)
1011 if isinstance(f, dependent):
1011 if isinstance(f, dependent):
1012 # soft warn on functional dependencies
1012 # soft warn on functional dependencies
1013 warnings.warn(msg, RuntimeWarning)
1013 warnings.warn(msg, RuntimeWarning)
1014
1014
1015 # build args
1015 # build args
1016 args = [] if args is None else args
1016 args = [] if args is None else args
1017 kwargs = {} if kwargs is None else kwargs
1017 kwargs = {} if kwargs is None else kwargs
1018 block = self.block if block is None else block
1018 block = self.block if block is None else block
1019 track = self.track if track is None else track
1019 track = self.track if track is None else track
1020 after = self.after if after is None else after
1020 after = self.after if after is None else after
1021 retries = self.retries if retries is None else retries
1021 retries = self.retries if retries is None else retries
1022 follow = self.follow if follow is None else follow
1022 follow = self.follow if follow is None else follow
1023 timeout = self.timeout if timeout is None else timeout
1023 timeout = self.timeout if timeout is None else timeout
1024 targets = self.targets if targets is None else targets
1024 targets = self.targets if targets is None else targets
1025
1025
1026 if not isinstance(retries, int):
1026 if not isinstance(retries, int):
1027 raise TypeError('retries must be int, not %r'%type(retries))
1027 raise TypeError('retries must be int, not %r'%type(retries))
1028
1028
1029 if targets is None:
1029 if targets is None:
1030 idents = []
1030 idents = []
1031 else:
1031 else:
1032 idents = self.client._build_targets(targets)[0]
1032 idents = self.client._build_targets(targets)[0]
1033 # ensure *not* bytes
1033 # ensure *not* bytes
1034 idents = [ ident.decode() for ident in idents ]
1034 idents = [ ident.decode() for ident in idents ]
1035
1035
1036 after = self._render_dependency(after)
1036 after = self._render_dependency(after)
1037 follow = self._render_dependency(follow)
1037 follow = self._render_dependency(follow)
1038 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1038 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1039
1039
1040 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1040 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1041 metadata=metadata)
1041 metadata=metadata)
1042 tracker = None if track is False else msg['tracker']
1042 tracker = None if track is False else msg['tracker']
1043
1043
1044 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1044 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1045
1045
1046 if block:
1046 if block:
1047 try:
1047 try:
1048 return ar.get()
1048 return ar.get()
1049 except KeyboardInterrupt:
1049 except KeyboardInterrupt:
1050 pass
1050 pass
1051 return ar
1051 return ar
1052
1052
1053 @sync_results
1053 @sync_results
1054 @save_ids
1054 @save_ids
1055 def map(self, f, *sequences, **kwargs):
1055 def map(self, f, *sequences, **kwargs):
1056 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1056 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1057
1057
1058 Parallel version of builtin `map`, load-balanced by this View.
1058 Parallel version of builtin `map`, load-balanced by this View.
1059
1059
1060 `block`, and `chunksize` can be specified by keyword only.
1060 `block`, and `chunksize` can be specified by keyword only.
1061
1061
1062 Each `chunksize` elements will be a separate task, and will be
1062 Each `chunksize` elements will be a separate task, and will be
1063 load-balanced. This lets individual elements be available for iteration
1063 load-balanced. This lets individual elements be available for iteration
1064 as soon as they arrive.
1064 as soon as they arrive.
1065
1065
1066 Parameters
1066 Parameters
1067 ----------
1067 ----------
1068
1068
1069 f : callable
1069 f : callable
1070 function to be mapped
1070 function to be mapped
1071 *sequences: one or more sequences of matching length
1071 *sequences: one or more sequences of matching length
1072 the sequences to be distributed and passed to `f`
1072 the sequences to be distributed and passed to `f`
1073 block : bool [default self.block]
1073 block : bool [default self.block]
1074 whether to wait for the result or not
1074 whether to wait for the result or not
1075 track : bool
1075 track : bool
1076 whether to create a MessageTracker to allow the user to
1076 whether to create a MessageTracker to allow the user to
1077 safely edit after arrays and buffers during non-copying
1077 safely edit after arrays and buffers during non-copying
1078 sends.
1078 sends.
1079 chunksize : int [default 1]
1079 chunksize : int [default 1]
1080 how many elements should be in each task.
1080 how many elements should be in each task.
1081 ordered : bool [default True]
1081 ordered : bool [default True]
1082 Whether the results should be gathered as they arrive, or enforce
1082 Whether the results should be gathered as they arrive, or enforce
1083 the order of submission.
1083 the order of submission.
1084
1084
1085 Only applies when iterating through AsyncMapResult as results arrive.
1085 Only applies when iterating through AsyncMapResult as results arrive.
1086 Has no effect when block=True.
1086 Has no effect when block=True.
1087
1087
1088 Returns
1088 Returns
1089 -------
1089 -------
1090
1090
1091 if block=False:
1091 if block=False:
1092 AsyncMapResult
1092 AsyncMapResult
1093 An object like AsyncResult, but which reassembles the sequence of results
1093 An object like AsyncResult, but which reassembles the sequence of results
1094 into a single list. AsyncMapResults can be iterated through before all
1094 into a single list. AsyncMapResults can be iterated through before all
1095 results are complete.
1095 results are complete.
1096 else:
1096 else:
1097 the result of map(f,*sequences)
1097 the result of map(f,*sequences)
1098
1098
1099 """
1099 """
1100
1100
1101 # default
1101 # default
1102 block = kwargs.get('block', self.block)
1102 block = kwargs.get('block', self.block)
1103 chunksize = kwargs.get('chunksize', 1)
1103 chunksize = kwargs.get('chunksize', 1)
1104 ordered = kwargs.get('ordered', True)
1104 ordered = kwargs.get('ordered', True)
1105
1105
1106 keyset = set(kwargs.keys())
1106 keyset = set(kwargs.keys())
1107 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1107 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1108 if extra_keys:
1108 if extra_keys:
1109 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1109 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1110
1110
1111 assert len(sequences) > 0, "must have some sequences to map onto!"
1111 assert len(sequences) > 0, "must have some sequences to map onto!"
1112
1112
1113 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1113 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1114 return pf.map(*sequences)
1114 return pf.map(*sequences)
1115
1115
1116 __all__ = ['LoadBalancedView', 'DirectView']
1116 __all__ = ['LoadBalancedView', 'DirectView']
General Comments 0
You need to be logged in to leave comments. Login now