##// END OF EJS Templates
add shutdown to Views
MinRK -
Show More
@@ -1,1028 +1,1036 b''
1 """Views of remote engines."""
1 """Views of remote engines."""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import imp
13 import imp
14 import sys
14 import sys
15 import warnings
15 import warnings
16 from contextlib import contextmanager
16 from contextlib import contextmanager
17 from types import ModuleType
17 from types import ModuleType
18
18
19 import zmq
19 import zmq
20
20
21 from IPython.testing import decorators as testdec
21 from IPython.testing import decorators as testdec
22 from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance, CFloat
22 from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance, CFloat
23
23
24 from IPython.external.decorator import decorator
24 from IPython.external.decorator import decorator
25
25
26 from . import map as Map
26 from . import map as Map
27 from . import util
27 from . import util
28 from .asyncresult import AsyncResult, AsyncMapResult
28 from .asyncresult import AsyncResult, AsyncMapResult
29 from .dependency import Dependency, dependent
29 from .dependency import Dependency, dependent
30 from .remotefunction import ParallelFunction, parallel, remote
30 from .remotefunction import ParallelFunction, parallel, remote
31
31
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33 # Decorators
33 # Decorators
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35
35
36 @decorator
36 @decorator
37 def save_ids(f, self, *args, **kwargs):
37 def save_ids(f, self, *args, **kwargs):
38 """Keep our history and outstanding attributes up to date after a method call."""
38 """Keep our history and outstanding attributes up to date after a method call."""
39 n_previous = len(self.client.history)
39 n_previous = len(self.client.history)
40 try:
40 try:
41 ret = f(self, *args, **kwargs)
41 ret = f(self, *args, **kwargs)
42 finally:
42 finally:
43 nmsgs = len(self.client.history) - n_previous
43 nmsgs = len(self.client.history) - n_previous
44 msg_ids = self.client.history[-nmsgs:]
44 msg_ids = self.client.history[-nmsgs:]
45 self.history.extend(msg_ids)
45 self.history.extend(msg_ids)
46 map(self.outstanding.add, msg_ids)
46 map(self.outstanding.add, msg_ids)
47 return ret
47 return ret
48
48
49 @decorator
49 @decorator
50 def sync_results(f, self, *args, **kwargs):
50 def sync_results(f, self, *args, **kwargs):
51 """sync relevant results from self.client to our results attribute."""
51 """sync relevant results from self.client to our results attribute."""
52 ret = f(self, *args, **kwargs)
52 ret = f(self, *args, **kwargs)
53 delta = self.outstanding.difference(self.client.outstanding)
53 delta = self.outstanding.difference(self.client.outstanding)
54 completed = self.outstanding.intersection(delta)
54 completed = self.outstanding.intersection(delta)
55 self.outstanding = self.outstanding.difference(completed)
55 self.outstanding = self.outstanding.difference(completed)
56 for msg_id in completed:
56 for msg_id in completed:
57 self.results[msg_id] = self.client.results[msg_id]
57 self.results[msg_id] = self.client.results[msg_id]
58 return ret
58 return ret
59
59
60 @decorator
60 @decorator
61 def spin_after(f, self, *args, **kwargs):
61 def spin_after(f, self, *args, **kwargs):
62 """call spin after the method."""
62 """call spin after the method."""
63 ret = f(self, *args, **kwargs)
63 ret = f(self, *args, **kwargs)
64 self.spin()
64 self.spin()
65 return ret
65 return ret
66
66
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68 # Classes
68 # Classes
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70
70
71 class View(HasTraits):
71 class View(HasTraits):
72 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
72 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
73
73
74 Don't use this class, use subclasses.
74 Don't use this class, use subclasses.
75
75
76 Methods
76 Methods
77 -------
77 -------
78
78
79 spin
79 spin
80 flushes incoming results and registration state changes
80 flushes incoming results and registration state changes
81 control methods spin, and requesting `ids` also ensures up to date
81 control methods spin, and requesting `ids` also ensures up to date
82
82
83 wait
83 wait
84 wait on one or more msg_ids
84 wait on one or more msg_ids
85
85
86 execution methods
86 execution methods
87 apply
87 apply
88 legacy: execute, run
88 legacy: execute, run
89
89
90 data movement
90 data movement
91 push, pull, scatter, gather
91 push, pull, scatter, gather
92
92
93 query methods
93 query methods
94 get_result, queue_status, purge_results, result_status
94 get_result, queue_status, purge_results, result_status
95
95
96 control methods
96 control methods
97 abort, shutdown
97 abort, shutdown
98
98
99 """
99 """
100 # flags
100 # flags
101 block=Bool(False)
101 block=Bool(False)
102 track=Bool(True)
102 track=Bool(True)
103 targets = Any()
103 targets = Any()
104
104
105 history=List()
105 history=List()
106 outstanding = Set()
106 outstanding = Set()
107 results = Dict()
107 results = Dict()
108 client = Instance('IPython.parallel.client.Client')
108 client = Instance('IPython.parallel.client.Client')
109
109
110 _socket = Instance('zmq.Socket')
110 _socket = Instance('zmq.Socket')
111 _flag_names = List(['targets', 'block', 'track'])
111 _flag_names = List(['targets', 'block', 'track'])
112 _targets = Any()
112 _targets = Any()
113 _idents = Any()
113 _idents = Any()
114
114
115 def __init__(self, client=None, socket=None, **flags):
115 def __init__(self, client=None, socket=None, **flags):
116 super(View, self).__init__(client=client, _socket=socket)
116 super(View, self).__init__(client=client, _socket=socket)
117 self.block = client.block
117 self.block = client.block
118
118
119 self.set_flags(**flags)
119 self.set_flags(**flags)
120
120
121 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
121 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
122
122
123
123
124 def __repr__(self):
124 def __repr__(self):
125 strtargets = str(self.targets)
125 strtargets = str(self.targets)
126 if len(strtargets) > 16:
126 if len(strtargets) > 16:
127 strtargets = strtargets[:12]+'...]'
127 strtargets = strtargets[:12]+'...]'
128 return "<%s %s>"%(self.__class__.__name__, strtargets)
128 return "<%s %s>"%(self.__class__.__name__, strtargets)
129
129
130 def set_flags(self, **kwargs):
130 def set_flags(self, **kwargs):
131 """set my attribute flags by keyword.
131 """set my attribute flags by keyword.
132
132
133 Views determine behavior with a few attributes (`block`, `track`, etc.).
133 Views determine behavior with a few attributes (`block`, `track`, etc.).
134 These attributes can be set all at once by name with this method.
134 These attributes can be set all at once by name with this method.
135
135
136 Parameters
136 Parameters
137 ----------
137 ----------
138
138
139 block : bool
139 block : bool
140 whether to wait for results
140 whether to wait for results
141 track : bool
141 track : bool
142 whether to create a MessageTracker to allow the user to
142 whether to create a MessageTracker to allow the user to
143 safely edit after arrays and buffers during non-copying
143 safely edit after arrays and buffers during non-copying
144 sends.
144 sends.
145 """
145 """
146 for name, value in kwargs.iteritems():
146 for name, value in kwargs.iteritems():
147 if name not in self._flag_names:
147 if name not in self._flag_names:
148 raise KeyError("Invalid name: %r"%name)
148 raise KeyError("Invalid name: %r"%name)
149 else:
149 else:
150 setattr(self, name, value)
150 setattr(self, name, value)
151
151
152 @contextmanager
152 @contextmanager
153 def temp_flags(self, **kwargs):
153 def temp_flags(self, **kwargs):
154 """temporarily set flags, for use in `with` statements.
154 """temporarily set flags, for use in `with` statements.
155
155
156 See set_flags for permanent setting of flags
156 See set_flags for permanent setting of flags
157
157
158 Examples
158 Examples
159 --------
159 --------
160
160
161 >>> view.track=False
161 >>> view.track=False
162 ...
162 ...
163 >>> with view.temp_flags(track=True):
163 >>> with view.temp_flags(track=True):
164 ... ar = view.apply(dostuff, my_big_array)
164 ... ar = view.apply(dostuff, my_big_array)
165 ... ar.tracker.wait() # wait for send to finish
165 ... ar.tracker.wait() # wait for send to finish
166 >>> view.track
166 >>> view.track
167 False
167 False
168
168
169 """
169 """
170 # preflight: save flags, and set temporaries
170 # preflight: save flags, and set temporaries
171 saved_flags = {}
171 saved_flags = {}
172 for f in self._flag_names:
172 for f in self._flag_names:
173 saved_flags[f] = getattr(self, f)
173 saved_flags[f] = getattr(self, f)
174 self.set_flags(**kwargs)
174 self.set_flags(**kwargs)
175 # yield to the with-statement block
175 # yield to the with-statement block
176 try:
176 try:
177 yield
177 yield
178 finally:
178 finally:
179 # postflight: restore saved flags
179 # postflight: restore saved flags
180 self.set_flags(**saved_flags)
180 self.set_flags(**saved_flags)
181
181
182
182
183 #----------------------------------------------------------------
183 #----------------------------------------------------------------
184 # apply
184 # apply
185 #----------------------------------------------------------------
185 #----------------------------------------------------------------
186
186
187 @sync_results
187 @sync_results
188 @save_ids
188 @save_ids
189 def _really_apply(self, f, args, kwargs, block=None, **options):
189 def _really_apply(self, f, args, kwargs, block=None, **options):
190 """wrapper for client.send_apply_message"""
190 """wrapper for client.send_apply_message"""
191 raise NotImplementedError("Implement in subclasses")
191 raise NotImplementedError("Implement in subclasses")
192
192
193 def apply(self, f, *args, **kwargs):
193 def apply(self, f, *args, **kwargs):
194 """calls f(*args, **kwargs) on remote engines, returning the result.
194 """calls f(*args, **kwargs) on remote engines, returning the result.
195
195
196 This method sets all apply flags via this View's attributes.
196 This method sets all apply flags via this View's attributes.
197
197
198 if self.block is False:
198 if self.block is False:
199 returns AsyncResult
199 returns AsyncResult
200 else:
200 else:
201 returns actual result of f(*args, **kwargs)
201 returns actual result of f(*args, **kwargs)
202 """
202 """
203 return self._really_apply(f, args, kwargs)
203 return self._really_apply(f, args, kwargs)
204
204
205 def apply_async(self, f, *args, **kwargs):
205 def apply_async(self, f, *args, **kwargs):
206 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
206 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
207
207
208 returns AsyncResult
208 returns AsyncResult
209 """
209 """
210 return self._really_apply(f, args, kwargs, block=False)
210 return self._really_apply(f, args, kwargs, block=False)
211
211
212 @spin_after
212 @spin_after
213 def apply_sync(self, f, *args, **kwargs):
213 def apply_sync(self, f, *args, **kwargs):
214 """calls f(*args, **kwargs) on remote engines in a blocking manner,
214 """calls f(*args, **kwargs) on remote engines in a blocking manner,
215 returning the result.
215 returning the result.
216
216
217 returns: actual result of f(*args, **kwargs)
217 returns: actual result of f(*args, **kwargs)
218 """
218 """
219 return self._really_apply(f, args, kwargs, block=True)
219 return self._really_apply(f, args, kwargs, block=True)
220
220
221 #----------------------------------------------------------------
221 #----------------------------------------------------------------
222 # wrappers for client and control methods
222 # wrappers for client and control methods
223 #----------------------------------------------------------------
223 #----------------------------------------------------------------
224 @sync_results
224 @sync_results
225 def spin(self):
225 def spin(self):
226 """spin the client, and sync"""
226 """spin the client, and sync"""
227 self.client.spin()
227 self.client.spin()
228
228
229 @sync_results
229 @sync_results
230 def wait(self, jobs=None, timeout=-1):
230 def wait(self, jobs=None, timeout=-1):
231 """waits on one or more `jobs`, for up to `timeout` seconds.
231 """waits on one or more `jobs`, for up to `timeout` seconds.
232
232
233 Parameters
233 Parameters
234 ----------
234 ----------
235
235
236 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
236 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
237 ints are indices to self.history
237 ints are indices to self.history
238 strs are msg_ids
238 strs are msg_ids
239 default: wait on all outstanding messages
239 default: wait on all outstanding messages
240 timeout : float
240 timeout : float
241 a time in seconds, after which to give up.
241 a time in seconds, after which to give up.
242 default is -1, which means no timeout
242 default is -1, which means no timeout
243
243
244 Returns
244 Returns
245 -------
245 -------
246
246
247 True : when all msg_ids are done
247 True : when all msg_ids are done
248 False : timeout reached, some msg_ids still outstanding
248 False : timeout reached, some msg_ids still outstanding
249 """
249 """
250 if jobs is None:
250 if jobs is None:
251 jobs = self.history
251 jobs = self.history
252 return self.client.wait(jobs, timeout)
252 return self.client.wait(jobs, timeout)
253
253
254 def abort(self, jobs=None, targets=None, block=None):
254 def abort(self, jobs=None, targets=None, block=None):
255 """Abort jobs on my engines.
255 """Abort jobs on my engines.
256
256
257 Parameters
257 Parameters
258 ----------
258 ----------
259
259
260 jobs : None, str, list of strs, optional
260 jobs : None, str, list of strs, optional
261 if None: abort all jobs.
261 if None: abort all jobs.
262 else: abort specific msg_id(s).
262 else: abort specific msg_id(s).
263 """
263 """
264 block = block if block is not None else self.block
264 block = block if block is not None else self.block
265 targets = targets if targets is not None else self.targets
265 targets = targets if targets is not None else self.targets
266 return self.client.abort(jobs=jobs, targets=targets, block=block)
266 return self.client.abort(jobs=jobs, targets=targets, block=block)
267
267
268 def queue_status(self, targets=None, verbose=False):
268 def queue_status(self, targets=None, verbose=False):
269 """Fetch the Queue status of my engines"""
269 """Fetch the Queue status of my engines"""
270 targets = targets if targets is not None else self.targets
270 targets = targets if targets is not None else self.targets
271 return self.client.queue_status(targets=targets, verbose=verbose)
271 return self.client.queue_status(targets=targets, verbose=verbose)
272
272
273 def purge_results(self, jobs=[], targets=[]):
273 def purge_results(self, jobs=[], targets=[]):
274 """Instruct the controller to forget specific results."""
274 """Instruct the controller to forget specific results."""
275 if targets is None or targets == 'all':
275 if targets is None or targets == 'all':
276 targets = self.targets
276 targets = self.targets
277 return self.client.purge_results(jobs=jobs, targets=targets)
277 return self.client.purge_results(jobs=jobs, targets=targets)
278
278
279 def shutdown(self, targets=None, restart=False, hub=False, block=None):
280 """Terminates one or more engine processes, optionally including the hub.
281 """
282 block = self.block if block is None else block
283 if targets is None or targets == 'all':
284 targets = self.targets
285 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
286
279 @spin_after
287 @spin_after
280 def get_result(self, indices_or_msg_ids=None):
288 def get_result(self, indices_or_msg_ids=None):
281 """return one or more results, specified by history index or msg_id.
289 """return one or more results, specified by history index or msg_id.
282
290
283 See client.get_result for details.
291 See client.get_result for details.
284
292
285 """
293 """
286
294
287 if indices_or_msg_ids is None:
295 if indices_or_msg_ids is None:
288 indices_or_msg_ids = -1
296 indices_or_msg_ids = -1
289 if isinstance(indices_or_msg_ids, int):
297 if isinstance(indices_or_msg_ids, int):
290 indices_or_msg_ids = self.history[indices_or_msg_ids]
298 indices_or_msg_ids = self.history[indices_or_msg_ids]
291 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
299 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
292 indices_or_msg_ids = list(indices_or_msg_ids)
300 indices_or_msg_ids = list(indices_or_msg_ids)
293 for i,index in enumerate(indices_or_msg_ids):
301 for i,index in enumerate(indices_or_msg_ids):
294 if isinstance(index, int):
302 if isinstance(index, int):
295 indices_or_msg_ids[i] = self.history[index]
303 indices_or_msg_ids[i] = self.history[index]
296 return self.client.get_result(indices_or_msg_ids)
304 return self.client.get_result(indices_or_msg_ids)
297
305
298 #-------------------------------------------------------------------
306 #-------------------------------------------------------------------
299 # Map
307 # Map
300 #-------------------------------------------------------------------
308 #-------------------------------------------------------------------
301
309
302 def map(self, f, *sequences, **kwargs):
310 def map(self, f, *sequences, **kwargs):
303 """override in subclasses"""
311 """override in subclasses"""
304 raise NotImplementedError
312 raise NotImplementedError
305
313
306 def map_async(self, f, *sequences, **kwargs):
314 def map_async(self, f, *sequences, **kwargs):
307 """Parallel version of builtin `map`, using this view's engines.
315 """Parallel version of builtin `map`, using this view's engines.
308
316
309 This is equivalent to map(...block=False)
317 This is equivalent to map(...block=False)
310
318
311 See `self.map` for details.
319 See `self.map` for details.
312 """
320 """
313 if 'block' in kwargs:
321 if 'block' in kwargs:
314 raise TypeError("map_async doesn't take a `block` keyword argument.")
322 raise TypeError("map_async doesn't take a `block` keyword argument.")
315 kwargs['block'] = False
323 kwargs['block'] = False
316 return self.map(f,*sequences,**kwargs)
324 return self.map(f,*sequences,**kwargs)
317
325
318 def map_sync(self, f, *sequences, **kwargs):
326 def map_sync(self, f, *sequences, **kwargs):
319 """Parallel version of builtin `map`, using this view's engines.
327 """Parallel version of builtin `map`, using this view's engines.
320
328
321 This is equivalent to map(...block=True)
329 This is equivalent to map(...block=True)
322
330
323 See `self.map` for details.
331 See `self.map` for details.
324 """
332 """
325 if 'block' in kwargs:
333 if 'block' in kwargs:
326 raise TypeError("map_sync doesn't take a `block` keyword argument.")
334 raise TypeError("map_sync doesn't take a `block` keyword argument.")
327 kwargs['block'] = True
335 kwargs['block'] = True
328 return self.map(f,*sequences,**kwargs)
336 return self.map(f,*sequences,**kwargs)
329
337
330 def imap(self, f, *sequences, **kwargs):
338 def imap(self, f, *sequences, **kwargs):
331 """Parallel version of `itertools.imap`.
339 """Parallel version of `itertools.imap`.
332
340
333 See `self.map` for details.
341 See `self.map` for details.
334
342
335 """
343 """
336
344
337 return iter(self.map_async(f,*sequences, **kwargs))
345 return iter(self.map_async(f,*sequences, **kwargs))
338
346
339 #-------------------------------------------------------------------
347 #-------------------------------------------------------------------
340 # Decorators
348 # Decorators
341 #-------------------------------------------------------------------
349 #-------------------------------------------------------------------
342
350
343 def remote(self, block=True, **flags):
351 def remote(self, block=True, **flags):
344 """Decorator for making a RemoteFunction"""
352 """Decorator for making a RemoteFunction"""
345 block = self.block if block is None else block
353 block = self.block if block is None else block
346 return remote(self, block=block, **flags)
354 return remote(self, block=block, **flags)
347
355
348 def parallel(self, dist='b', block=None, **flags):
356 def parallel(self, dist='b', block=None, **flags):
349 """Decorator for making a ParallelFunction"""
357 """Decorator for making a ParallelFunction"""
350 block = self.block if block is None else block
358 block = self.block if block is None else block
351 return parallel(self, dist=dist, block=block, **flags)
359 return parallel(self, dist=dist, block=block, **flags)
352
360
353 @testdec.skip_doctest
361 @testdec.skip_doctest
354 class DirectView(View):
362 class DirectView(View):
355 """Direct Multiplexer View of one or more engines.
363 """Direct Multiplexer View of one or more engines.
356
364
357 These are created via indexed access to a client:
365 These are created via indexed access to a client:
358
366
359 >>> dv_1 = client[1]
367 >>> dv_1 = client[1]
360 >>> dv_all = client[:]
368 >>> dv_all = client[:]
361 >>> dv_even = client[::2]
369 >>> dv_even = client[::2]
362 >>> dv_some = client[1:3]
370 >>> dv_some = client[1:3]
363
371
364 This object provides dictionary access to engine namespaces:
372 This object provides dictionary access to engine namespaces:
365
373
366 # push a=5:
374 # push a=5:
367 >>> dv['a'] = 5
375 >>> dv['a'] = 5
368 # pull 'foo':
376 # pull 'foo':
369 >>> db['foo']
377 >>> db['foo']
370
378
371 """
379 """
372
380
373 def __init__(self, client=None, socket=None, targets=None):
381 def __init__(self, client=None, socket=None, targets=None):
374 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
382 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
375
383
376 @property
384 @property
377 def importer(self):
385 def importer(self):
378 """sync_imports(local=True) as a property.
386 """sync_imports(local=True) as a property.
379
387
380 See sync_imports for details.
388 See sync_imports for details.
381
389
382 In [10]: with v.importer:
390 In [10]: with v.importer:
383 ....: import numpy
391 ....: import numpy
384 ....:
392 ....:
385 importing numpy on engine(s)
393 importing numpy on engine(s)
386
394
387 """
395 """
388 return self.sync_imports(True)
396 return self.sync_imports(True)
389
397
390 @contextmanager
398 @contextmanager
391 def sync_imports(self, local=True):
399 def sync_imports(self, local=True):
392 """Context Manager for performing simultaneous local and remote imports.
400 """Context Manager for performing simultaneous local and remote imports.
393
401
394 'import x as y' will *not* work. The 'as y' part will simply be ignored.
402 'import x as y' will *not* work. The 'as y' part will simply be ignored.
395
403
396 >>> with view.sync_imports():
404 >>> with view.sync_imports():
397 ... from numpy import recarray
405 ... from numpy import recarray
398 importing recarray from numpy on engine(s)
406 importing recarray from numpy on engine(s)
399
407
400 """
408 """
401 import __builtin__
409 import __builtin__
402 local_import = __builtin__.__import__
410 local_import = __builtin__.__import__
403 modules = set()
411 modules = set()
404 results = []
412 results = []
405 @util.interactive
413 @util.interactive
406 def remote_import(name, fromlist, level):
414 def remote_import(name, fromlist, level):
407 """the function to be passed to apply, that actually performs the import
415 """the function to be passed to apply, that actually performs the import
408 on the engine, and loads up the user namespace.
416 on the engine, and loads up the user namespace.
409 """
417 """
410 import sys
418 import sys
411 user_ns = globals()
419 user_ns = globals()
412 mod = __import__(name, fromlist=fromlist, level=level)
420 mod = __import__(name, fromlist=fromlist, level=level)
413 if fromlist:
421 if fromlist:
414 for key in fromlist:
422 for key in fromlist:
415 user_ns[key] = getattr(mod, key)
423 user_ns[key] = getattr(mod, key)
416 else:
424 else:
417 user_ns[name] = sys.modules[name]
425 user_ns[name] = sys.modules[name]
418
426
419 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
427 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
420 """the drop-in replacement for __import__, that optionally imports
428 """the drop-in replacement for __import__, that optionally imports
421 locally as well.
429 locally as well.
422 """
430 """
423 # don't override nested imports
431 # don't override nested imports
424 save_import = __builtin__.__import__
432 save_import = __builtin__.__import__
425 __builtin__.__import__ = local_import
433 __builtin__.__import__ = local_import
426
434
427 if imp.lock_held():
435 if imp.lock_held():
428 # this is a side-effect import, don't do it remotely, or even
436 # this is a side-effect import, don't do it remotely, or even
429 # ignore the local effects
437 # ignore the local effects
430 return local_import(name, globals, locals, fromlist, level)
438 return local_import(name, globals, locals, fromlist, level)
431
439
432 imp.acquire_lock()
440 imp.acquire_lock()
433 if local:
441 if local:
434 mod = local_import(name, globals, locals, fromlist, level)
442 mod = local_import(name, globals, locals, fromlist, level)
435 else:
443 else:
436 raise NotImplementedError("remote-only imports not yet implemented")
444 raise NotImplementedError("remote-only imports not yet implemented")
437 imp.release_lock()
445 imp.release_lock()
438
446
439 key = name+':'+','.join(fromlist or [])
447 key = name+':'+','.join(fromlist or [])
440 if level == -1 and key not in modules:
448 if level == -1 and key not in modules:
441 modules.add(key)
449 modules.add(key)
442 if fromlist:
450 if fromlist:
443 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
451 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
444 else:
452 else:
445 print "importing %s on engine(s)"%name
453 print "importing %s on engine(s)"%name
446 results.append(self.apply_async(remote_import, name, fromlist, level))
454 results.append(self.apply_async(remote_import, name, fromlist, level))
447 # restore override
455 # restore override
448 __builtin__.__import__ = save_import
456 __builtin__.__import__ = save_import
449
457
450 return mod
458 return mod
451
459
452 # override __import__
460 # override __import__
453 __builtin__.__import__ = view_import
461 __builtin__.__import__ = view_import
454 try:
462 try:
455 # enter the block
463 # enter the block
456 yield
464 yield
457 except ImportError:
465 except ImportError:
458 if not local:
466 if not local:
459 # ignore import errors if not doing local imports
467 # ignore import errors if not doing local imports
460 pass
468 pass
461 finally:
469 finally:
462 # always restore __import__
470 # always restore __import__
463 __builtin__.__import__ = local_import
471 __builtin__.__import__ = local_import
464
472
465 for r in results:
473 for r in results:
466 # raise possible remote ImportErrors here
474 # raise possible remote ImportErrors here
467 r.get()
475 r.get()
468
476
469
477
470 @sync_results
478 @sync_results
471 @save_ids
479 @save_ids
472 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
480 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
473 """calls f(*args, **kwargs) on remote engines, returning the result.
481 """calls f(*args, **kwargs) on remote engines, returning the result.
474
482
475 This method sets all of `apply`'s flags via this View's attributes.
483 This method sets all of `apply`'s flags via this View's attributes.
476
484
477 Parameters
485 Parameters
478 ----------
486 ----------
479
487
480 f : callable
488 f : callable
481
489
482 args : list [default: empty]
490 args : list [default: empty]
483
491
484 kwargs : dict [default: empty]
492 kwargs : dict [default: empty]
485
493
486 targets : target list [default: self.targets]
494 targets : target list [default: self.targets]
487 where to run
495 where to run
488 block : bool [default: self.block]
496 block : bool [default: self.block]
489 whether to block
497 whether to block
490 track : bool [default: self.track]
498 track : bool [default: self.track]
491 whether to ask zmq to track the message, for safe non-copying sends
499 whether to ask zmq to track the message, for safe non-copying sends
492
500
493 Returns
501 Returns
494 -------
502 -------
495
503
496 if self.block is False:
504 if self.block is False:
497 returns AsyncResult
505 returns AsyncResult
498 else:
506 else:
499 returns actual result of f(*args, **kwargs) on the engine(s)
507 returns actual result of f(*args, **kwargs) on the engine(s)
500 This will be a list of self.targets is also a list (even length 1), or
508 This will be a list of self.targets is also a list (even length 1), or
501 the single result if self.targets is an integer engine id
509 the single result if self.targets is an integer engine id
502 """
510 """
503 args = [] if args is None else args
511 args = [] if args is None else args
504 kwargs = {} if kwargs is None else kwargs
512 kwargs = {} if kwargs is None else kwargs
505 block = self.block if block is None else block
513 block = self.block if block is None else block
506 track = self.track if track is None else track
514 track = self.track if track is None else track
507 targets = self.targets if targets is None else targets
515 targets = self.targets if targets is None else targets
508
516
509 _idents = self.client._build_targets(targets)[0]
517 _idents = self.client._build_targets(targets)[0]
510 msg_ids = []
518 msg_ids = []
511 trackers = []
519 trackers = []
512 for ident in _idents:
520 for ident in _idents:
513 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
521 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
514 ident=ident)
522 ident=ident)
515 if track:
523 if track:
516 trackers.append(msg['tracker'])
524 trackers.append(msg['tracker'])
517 msg_ids.append(msg['msg_id'])
525 msg_ids.append(msg['msg_id'])
518 tracker = None if track is False else zmq.MessageTracker(*trackers)
526 tracker = None if track is False else zmq.MessageTracker(*trackers)
519 ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
527 ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
520 if block:
528 if block:
521 try:
529 try:
522 return ar.get()
530 return ar.get()
523 except KeyboardInterrupt:
531 except KeyboardInterrupt:
524 pass
532 pass
525 return ar
533 return ar
526
534
527 @spin_after
535 @spin_after
528 def map(self, f, *sequences, **kwargs):
536 def map(self, f, *sequences, **kwargs):
529 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
537 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
530
538
531 Parallel version of builtin `map`, using this View's `targets`.
539 Parallel version of builtin `map`, using this View's `targets`.
532
540
533 There will be one task per target, so work will be chunked
541 There will be one task per target, so work will be chunked
534 if the sequences are longer than `targets`.
542 if the sequences are longer than `targets`.
535
543
536 Results can be iterated as they are ready, but will become available in chunks.
544 Results can be iterated as they are ready, but will become available in chunks.
537
545
538 Parameters
546 Parameters
539 ----------
547 ----------
540
548
541 f : callable
549 f : callable
542 function to be mapped
550 function to be mapped
543 *sequences: one or more sequences of matching length
551 *sequences: one or more sequences of matching length
544 the sequences to be distributed and passed to `f`
552 the sequences to be distributed and passed to `f`
545 block : bool
553 block : bool
546 whether to wait for the result or not [default self.block]
554 whether to wait for the result or not [default self.block]
547
555
548 Returns
556 Returns
549 -------
557 -------
550
558
551 if block=False:
559 if block=False:
552 AsyncMapResult
560 AsyncMapResult
553 An object like AsyncResult, but which reassembles the sequence of results
561 An object like AsyncResult, but which reassembles the sequence of results
554 into a single list. AsyncMapResults can be iterated through before all
562 into a single list. AsyncMapResults can be iterated through before all
555 results are complete.
563 results are complete.
556 else:
564 else:
557 list
565 list
558 the result of map(f,*sequences)
566 the result of map(f,*sequences)
559 """
567 """
560
568
561 block = kwargs.pop('block', self.block)
569 block = kwargs.pop('block', self.block)
562 for k in kwargs.keys():
570 for k in kwargs.keys():
563 if k not in ['block', 'track']:
571 if k not in ['block', 'track']:
564 raise TypeError("invalid keyword arg, %r"%k)
572 raise TypeError("invalid keyword arg, %r"%k)
565
573
566 assert len(sequences) > 0, "must have some sequences to map onto!"
574 assert len(sequences) > 0, "must have some sequences to map onto!"
567 pf = ParallelFunction(self, f, block=block, **kwargs)
575 pf = ParallelFunction(self, f, block=block, **kwargs)
568 return pf.map(*sequences)
576 return pf.map(*sequences)
569
577
570 def execute(self, code, targets=None, block=None):
578 def execute(self, code, targets=None, block=None):
571 """Executes `code` on `targets` in blocking or nonblocking manner.
579 """Executes `code` on `targets` in blocking or nonblocking manner.
572
580
573 ``execute`` is always `bound` (affects engine namespace)
581 ``execute`` is always `bound` (affects engine namespace)
574
582
575 Parameters
583 Parameters
576 ----------
584 ----------
577
585
578 code : str
586 code : str
579 the code string to be executed
587 the code string to be executed
580 block : bool
588 block : bool
581 whether or not to wait until done to return
589 whether or not to wait until done to return
582 default: self.block
590 default: self.block
583 """
591 """
584 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
592 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
585
593
586 def run(self, filename, targets=None, block=None):
594 def run(self, filename, targets=None, block=None):
587 """Execute contents of `filename` on my engine(s).
595 """Execute contents of `filename` on my engine(s).
588
596
589 This simply reads the contents of the file and calls `execute`.
597 This simply reads the contents of the file and calls `execute`.
590
598
591 Parameters
599 Parameters
592 ----------
600 ----------
593
601
594 filename : str
602 filename : str
595 The path to the file
603 The path to the file
596 targets : int/str/list of ints/strs
604 targets : int/str/list of ints/strs
597 the engines on which to execute
605 the engines on which to execute
598 default : all
606 default : all
599 block : bool
607 block : bool
600 whether or not to wait until done
608 whether or not to wait until done
601 default: self.block
609 default: self.block
602
610
603 """
611 """
604 with open(filename, 'r') as f:
612 with open(filename, 'r') as f:
605 # add newline in case of trailing indented whitespace
613 # add newline in case of trailing indented whitespace
606 # which will cause SyntaxError
614 # which will cause SyntaxError
607 code = f.read()+'\n'
615 code = f.read()+'\n'
608 return self.execute(code, block=block, targets=targets)
616 return self.execute(code, block=block, targets=targets)
609
617
610 def update(self, ns):
618 def update(self, ns):
611 """update remote namespace with dict `ns`
619 """update remote namespace with dict `ns`
612
620
613 See `push` for details.
621 See `push` for details.
614 """
622 """
615 return self.push(ns, block=self.block, track=self.track)
623 return self.push(ns, block=self.block, track=self.track)
616
624
617 def push(self, ns, targets=None, block=None, track=None):
625 def push(self, ns, targets=None, block=None, track=None):
618 """update remote namespace with dict `ns`
626 """update remote namespace with dict `ns`
619
627
620 Parameters
628 Parameters
621 ----------
629 ----------
622
630
623 ns : dict
631 ns : dict
624 dict of keys with which to update engine namespace(s)
632 dict of keys with which to update engine namespace(s)
625 block : bool [default : self.block]
633 block : bool [default : self.block]
626 whether to wait to be notified of engine receipt
634 whether to wait to be notified of engine receipt
627
635
628 """
636 """
629
637
630 block = block if block is not None else self.block
638 block = block if block is not None else self.block
631 track = track if track is not None else self.track
639 track = track if track is not None else self.track
632 targets = targets if targets is not None else self.targets
640 targets = targets if targets is not None else self.targets
633 # applier = self.apply_sync if block else self.apply_async
641 # applier = self.apply_sync if block else self.apply_async
634 if not isinstance(ns, dict):
642 if not isinstance(ns, dict):
635 raise TypeError("Must be a dict, not %s"%type(ns))
643 raise TypeError("Must be a dict, not %s"%type(ns))
636 return self._really_apply(util._push, (ns,), block=block, track=track, targets=targets)
644 return self._really_apply(util._push, (ns,), block=block, track=track, targets=targets)
637
645
638 def get(self, key_s):
646 def get(self, key_s):
639 """get object(s) by `key_s` from remote namespace
647 """get object(s) by `key_s` from remote namespace
640
648
641 see `pull` for details.
649 see `pull` for details.
642 """
650 """
643 # block = block if block is not None else self.block
651 # block = block if block is not None else self.block
644 return self.pull(key_s, block=True)
652 return self.pull(key_s, block=True)
645
653
646 def pull(self, names, targets=None, block=True):
654 def pull(self, names, targets=None, block=True):
647 """get object(s) by `name` from remote namespace
655 """get object(s) by `name` from remote namespace
648
656
649 will return one object if it is a key.
657 will return one object if it is a key.
650 can also take a list of keys, in which case it will return a list of objects.
658 can also take a list of keys, in which case it will return a list of objects.
651 """
659 """
652 block = block if block is not None else self.block
660 block = block if block is not None else self.block
653 targets = targets if targets is not None else self.targets
661 targets = targets if targets is not None else self.targets
654 applier = self.apply_sync if block else self.apply_async
662 applier = self.apply_sync if block else self.apply_async
655 if isinstance(names, basestring):
663 if isinstance(names, basestring):
656 pass
664 pass
657 elif isinstance(names, (list,tuple,set)):
665 elif isinstance(names, (list,tuple,set)):
658 for key in names:
666 for key in names:
659 if not isinstance(key, basestring):
667 if not isinstance(key, basestring):
660 raise TypeError("keys must be str, not type %r"%type(key))
668 raise TypeError("keys must be str, not type %r"%type(key))
661 else:
669 else:
662 raise TypeError("names must be strs, not %r"%names)
670 raise TypeError("names must be strs, not %r"%names)
663 return self._really_apply(util._pull, (names,), block=block, targets=targets)
671 return self._really_apply(util._pull, (names,), block=block, targets=targets)
664
672
665 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
673 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
666 """
674 """
667 Partition a Python sequence and send the partitions to a set of engines.
675 Partition a Python sequence and send the partitions to a set of engines.
668 """
676 """
669 block = block if block is not None else self.block
677 block = block if block is not None else self.block
670 track = track if track is not None else self.track
678 track = track if track is not None else self.track
671 targets = targets if targets is not None else self.targets
679 targets = targets if targets is not None else self.targets
672
680
673 mapObject = Map.dists[dist]()
681 mapObject = Map.dists[dist]()
674 nparts = len(targets)
682 nparts = len(targets)
675 msg_ids = []
683 msg_ids = []
676 trackers = []
684 trackers = []
677 for index, engineid in enumerate(targets):
685 for index, engineid in enumerate(targets):
678 partition = mapObject.getPartition(seq, index, nparts)
686 partition = mapObject.getPartition(seq, index, nparts)
679 if flatten and len(partition) == 1:
687 if flatten and len(partition) == 1:
680 ns = {key: partition[0]}
688 ns = {key: partition[0]}
681 else:
689 else:
682 ns = {key: partition}
690 ns = {key: partition}
683 r = self.push(ns, block=False, track=track, targets=engineid)
691 r = self.push(ns, block=False, track=track, targets=engineid)
684 msg_ids.extend(r.msg_ids)
692 msg_ids.extend(r.msg_ids)
685 if track:
693 if track:
686 trackers.append(r._tracker)
694 trackers.append(r._tracker)
687
695
688 if track:
696 if track:
689 tracker = zmq.MessageTracker(*trackers)
697 tracker = zmq.MessageTracker(*trackers)
690 else:
698 else:
691 tracker = None
699 tracker = None
692
700
693 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
701 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
694 if block:
702 if block:
695 r.wait()
703 r.wait()
696 else:
704 else:
697 return r
705 return r
698
706
699 @sync_results
707 @sync_results
700 @save_ids
708 @save_ids
701 def gather(self, key, dist='b', targets=None, block=None):
709 def gather(self, key, dist='b', targets=None, block=None):
702 """
710 """
703 Gather a partitioned sequence on a set of engines as a single local seq.
711 Gather a partitioned sequence on a set of engines as a single local seq.
704 """
712 """
705 block = block if block is not None else self.block
713 block = block if block is not None else self.block
706 targets = targets if targets is not None else self.targets
714 targets = targets if targets is not None else self.targets
707 mapObject = Map.dists[dist]()
715 mapObject = Map.dists[dist]()
708 msg_ids = []
716 msg_ids = []
709
717
710 for index, engineid in enumerate(targets):
718 for index, engineid in enumerate(targets):
711 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
719 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
712
720
713 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
721 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
714
722
715 if block:
723 if block:
716 try:
724 try:
717 return r.get()
725 return r.get()
718 except KeyboardInterrupt:
726 except KeyboardInterrupt:
719 pass
727 pass
720 return r
728 return r
721
729
722 def __getitem__(self, key):
730 def __getitem__(self, key):
723 return self.get(key)
731 return self.get(key)
724
732
725 def __setitem__(self,key, value):
733 def __setitem__(self,key, value):
726 self.update({key:value})
734 self.update({key:value})
727
735
728 def clear(self, targets=None, block=False):
736 def clear(self, targets=None, block=False):
729 """Clear the remote namespaces on my engines."""
737 """Clear the remote namespaces on my engines."""
730 block = block if block is not None else self.block
738 block = block if block is not None else self.block
731 targets = targets if targets is not None else self.targets
739 targets = targets if targets is not None else self.targets
732 return self.client.clear(targets=targets, block=block)
740 return self.client.clear(targets=targets, block=block)
733
741
734 def kill(self, targets=None, block=True):
742 def kill(self, targets=None, block=True):
735 """Kill my engines."""
743 """Kill my engines."""
736 block = block if block is not None else self.block
744 block = block if block is not None else self.block
737 targets = targets if targets is not None else self.targets
745 targets = targets if targets is not None else self.targets
738 return self.client.kill(targets=targets, block=block)
746 return self.client.kill(targets=targets, block=block)
739
747
740 #----------------------------------------
748 #----------------------------------------
741 # activate for %px,%autopx magics
749 # activate for %px,%autopx magics
742 #----------------------------------------
750 #----------------------------------------
743 def activate(self):
751 def activate(self):
744 """Make this `View` active for parallel magic commands.
752 """Make this `View` active for parallel magic commands.
745
753
746 IPython has a magic command syntax to work with `MultiEngineClient` objects.
754 IPython has a magic command syntax to work with `MultiEngineClient` objects.
747 In a given IPython session there is a single active one. While
755 In a given IPython session there is a single active one. While
748 there can be many `Views` created and used by the user,
756 there can be many `Views` created and used by the user,
749 there is only one active one. The active `View` is used whenever
757 there is only one active one. The active `View` is used whenever
750 the magic commands %px and %autopx are used.
758 the magic commands %px and %autopx are used.
751
759
752 The activate() method is called on a given `View` to make it
760 The activate() method is called on a given `View` to make it
753 active. Once this has been done, the magic commands can be used.
761 active. Once this has been done, the magic commands can be used.
754 """
762 """
755
763
756 try:
764 try:
757 # This is injected into __builtins__.
765 # This is injected into __builtins__.
758 ip = get_ipython()
766 ip = get_ipython()
759 except NameError:
767 except NameError:
760 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
768 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
761 else:
769 else:
762 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
770 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
763 if pmagic is not None:
771 if pmagic is not None:
764 pmagic.active_multiengine_client = self
772 pmagic.active_multiengine_client = self
765 else:
773 else:
766 print "You must first load the parallelmagic extension " \
774 print "You must first load the parallelmagic extension " \
767 "by doing '%load_ext parallelmagic'"
775 "by doing '%load_ext parallelmagic'"
768
776
769
777
770 @testdec.skip_doctest
778 @testdec.skip_doctest
771 class LoadBalancedView(View):
779 class LoadBalancedView(View):
772 """An load-balancing View that only executes via the Task scheduler.
780 """An load-balancing View that only executes via the Task scheduler.
773
781
774 Load-balanced views can be created with the client's `view` method:
782 Load-balanced views can be created with the client's `view` method:
775
783
776 >>> v = client.load_balanced_view()
784 >>> v = client.load_balanced_view()
777
785
778 or targets can be specified, to restrict the potential destinations:
786 or targets can be specified, to restrict the potential destinations:
779
787
780 >>> v = client.client.load_balanced_view(([1,3])
788 >>> v = client.client.load_balanced_view(([1,3])
781
789
782 which would restrict loadbalancing to between engines 1 and 3.
790 which would restrict loadbalancing to between engines 1 and 3.
783
791
784 """
792 """
785
793
786 follow=Any()
794 follow=Any()
787 after=Any()
795 after=Any()
788 timeout=CFloat()
796 timeout=CFloat()
789
797
790 _task_scheme = Any()
798 _task_scheme = Any()
791 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout'])
799 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout'])
792
800
793 def __init__(self, client=None, socket=None, **flags):
801 def __init__(self, client=None, socket=None, **flags):
794 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
802 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
795 self._task_scheme=client._task_scheme
803 self._task_scheme=client._task_scheme
796
804
797 def _validate_dependency(self, dep):
805 def _validate_dependency(self, dep):
798 """validate a dependency.
806 """validate a dependency.
799
807
800 For use in `set_flags`.
808 For use in `set_flags`.
801 """
809 """
802 if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
810 if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
803 return True
811 return True
804 elif isinstance(dep, (list,set, tuple)):
812 elif isinstance(dep, (list,set, tuple)):
805 for d in dep:
813 for d in dep:
806 if not isinstance(d, (str, AsyncResult)):
814 if not isinstance(d, (str, AsyncResult)):
807 return False
815 return False
808 elif isinstance(dep, dict):
816 elif isinstance(dep, dict):
809 if set(dep.keys()) != set(Dependency().as_dict().keys()):
817 if set(dep.keys()) != set(Dependency().as_dict().keys()):
810 return False
818 return False
811 if not isinstance(dep['msg_ids'], list):
819 if not isinstance(dep['msg_ids'], list):
812 return False
820 return False
813 for d in dep['msg_ids']:
821 for d in dep['msg_ids']:
814 if not isinstance(d, str):
822 if not isinstance(d, str):
815 return False
823 return False
816 else:
824 else:
817 return False
825 return False
818
826
819 return True
827 return True
820
828
821 def _render_dependency(self, dep):
829 def _render_dependency(self, dep):
822 """helper for building jsonable dependencies from various input forms."""
830 """helper for building jsonable dependencies from various input forms."""
823 if isinstance(dep, Dependency):
831 if isinstance(dep, Dependency):
824 return dep.as_dict()
832 return dep.as_dict()
825 elif isinstance(dep, AsyncResult):
833 elif isinstance(dep, AsyncResult):
826 return dep.msg_ids
834 return dep.msg_ids
827 elif dep is None:
835 elif dep is None:
828 return []
836 return []
829 else:
837 else:
830 # pass to Dependency constructor
838 # pass to Dependency constructor
831 return list(Dependency(dep))
839 return list(Dependency(dep))
832
840
833 def set_flags(self, **kwargs):
841 def set_flags(self, **kwargs):
834 """set my attribute flags by keyword.
842 """set my attribute flags by keyword.
835
843
836 A View is a wrapper for the Client's apply method, but with attributes
844 A View is a wrapper for the Client's apply method, but with attributes
837 that specify keyword arguments, those attributes can be set by keyword
845 that specify keyword arguments, those attributes can be set by keyword
838 argument with this method.
846 argument with this method.
839
847
840 Parameters
848 Parameters
841 ----------
849 ----------
842
850
843 block : bool
851 block : bool
844 whether to wait for results
852 whether to wait for results
845 track : bool
853 track : bool
846 whether to create a MessageTracker to allow the user to
854 whether to create a MessageTracker to allow the user to
847 safely edit after arrays and buffers during non-copying
855 safely edit after arrays and buffers during non-copying
848 sends.
856 sends.
849 #
857 #
850 after : Dependency or collection of msg_ids
858 after : Dependency or collection of msg_ids
851 Only for load-balanced execution (targets=None)
859 Only for load-balanced execution (targets=None)
852 Specify a list of msg_ids as a time-based dependency.
860 Specify a list of msg_ids as a time-based dependency.
853 This job will only be run *after* the dependencies
861 This job will only be run *after* the dependencies
854 have been met.
862 have been met.
855
863
856 follow : Dependency or collection of msg_ids
864 follow : Dependency or collection of msg_ids
857 Only for load-balanced execution (targets=None)
865 Only for load-balanced execution (targets=None)
858 Specify a list of msg_ids as a location-based dependency.
866 Specify a list of msg_ids as a location-based dependency.
859 This job will only be run on an engine where this dependency
867 This job will only be run on an engine where this dependency
860 is met.
868 is met.
861
869
862 timeout : float/int or None
870 timeout : float/int or None
863 Only for load-balanced execution (targets=None)
871 Only for load-balanced execution (targets=None)
864 Specify an amount of time (in seconds) for the scheduler to
872 Specify an amount of time (in seconds) for the scheduler to
865 wait for dependencies to be met before failing with a
873 wait for dependencies to be met before failing with a
866 DependencyTimeout.
874 DependencyTimeout.
867 """
875 """
868
876
869 super(LoadBalancedView, self).set_flags(**kwargs)
877 super(LoadBalancedView, self).set_flags(**kwargs)
870 for name in ('follow', 'after'):
878 for name in ('follow', 'after'):
871 if name in kwargs:
879 if name in kwargs:
872 value = kwargs[name]
880 value = kwargs[name]
873 if self._validate_dependency(value):
881 if self._validate_dependency(value):
874 setattr(self, name, value)
882 setattr(self, name, value)
875 else:
883 else:
876 raise ValueError("Invalid dependency: %r"%value)
884 raise ValueError("Invalid dependency: %r"%value)
877 if 'timeout' in kwargs:
885 if 'timeout' in kwargs:
878 t = kwargs['timeout']
886 t = kwargs['timeout']
879 if not isinstance(t, (int, long, float, type(None))):
887 if not isinstance(t, (int, long, float, type(None))):
880 raise TypeError("Invalid type for timeout: %r"%type(t))
888 raise TypeError("Invalid type for timeout: %r"%type(t))
881 if t is not None:
889 if t is not None:
882 if t < 0:
890 if t < 0:
883 raise ValueError("Invalid timeout: %s"%t)
891 raise ValueError("Invalid timeout: %s"%t)
884 self.timeout = t
892 self.timeout = t
885
893
886 @sync_results
894 @sync_results
887 @save_ids
895 @save_ids
888 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
896 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
889 after=None, follow=None, timeout=None,
897 after=None, follow=None, timeout=None,
890 targets=None):
898 targets=None):
891 """calls f(*args, **kwargs) on a remote engine, returning the result.
899 """calls f(*args, **kwargs) on a remote engine, returning the result.
892
900
893 This method temporarily sets all of `apply`'s flags for a single call.
901 This method temporarily sets all of `apply`'s flags for a single call.
894
902
895 Parameters
903 Parameters
896 ----------
904 ----------
897
905
898 f : callable
906 f : callable
899
907
900 args : list [default: empty]
908 args : list [default: empty]
901
909
902 kwargs : dict [default: empty]
910 kwargs : dict [default: empty]
903
911
904 block : bool [default: self.block]
912 block : bool [default: self.block]
905 whether to block
913 whether to block
906 track : bool [default: self.track]
914 track : bool [default: self.track]
907 whether to ask zmq to track the message, for safe non-copying sends
915 whether to ask zmq to track the message, for safe non-copying sends
908
916
909 !!!!!! TODO: THE REST HERE !!!!
917 !!!!!! TODO: THE REST HERE !!!!
910
918
911 Returns
919 Returns
912 -------
920 -------
913
921
914 if self.block is False:
922 if self.block is False:
915 returns AsyncResult
923 returns AsyncResult
916 else:
924 else:
917 returns actual result of f(*args, **kwargs) on the engine(s)
925 returns actual result of f(*args, **kwargs) on the engine(s)
918 This will be a list of self.targets is also a list (even length 1), or
926 This will be a list of self.targets is also a list (even length 1), or
919 the single result if self.targets is an integer engine id
927 the single result if self.targets is an integer engine id
920 """
928 """
921
929
922 # validate whether we can run
930 # validate whether we can run
923 if self._socket.closed:
931 if self._socket.closed:
924 msg = "Task farming is disabled"
932 msg = "Task farming is disabled"
925 if self._task_scheme == 'pure':
933 if self._task_scheme == 'pure':
926 msg += " because the pure ZMQ scheduler cannot handle"
934 msg += " because the pure ZMQ scheduler cannot handle"
927 msg += " disappearing engines."
935 msg += " disappearing engines."
928 raise RuntimeError(msg)
936 raise RuntimeError(msg)
929
937
930 if self._task_scheme == 'pure':
938 if self._task_scheme == 'pure':
931 # pure zmq scheme doesn't support dependencies
939 # pure zmq scheme doesn't support dependencies
932 msg = "Pure ZMQ scheduler doesn't support dependencies"
940 msg = "Pure ZMQ scheduler doesn't support dependencies"
933 if (follow or after):
941 if (follow or after):
934 # hard fail on DAG dependencies
942 # hard fail on DAG dependencies
935 raise RuntimeError(msg)
943 raise RuntimeError(msg)
936 if isinstance(f, dependent):
944 if isinstance(f, dependent):
937 # soft warn on functional dependencies
945 # soft warn on functional dependencies
938 warnings.warn(msg, RuntimeWarning)
946 warnings.warn(msg, RuntimeWarning)
939
947
940 # build args
948 # build args
941 args = [] if args is None else args
949 args = [] if args is None else args
942 kwargs = {} if kwargs is None else kwargs
950 kwargs = {} if kwargs is None else kwargs
943 block = self.block if block is None else block
951 block = self.block if block is None else block
944 track = self.track if track is None else track
952 track = self.track if track is None else track
945 after = self.after if after is None else after
953 after = self.after if after is None else after
946 follow = self.follow if follow is None else follow
954 follow = self.follow if follow is None else follow
947 timeout = self.timeout if timeout is None else timeout
955 timeout = self.timeout if timeout is None else timeout
948 targets = self.targets if targets is None else targets
956 targets = self.targets if targets is None else targets
949
957
950 if targets is None:
958 if targets is None:
951 idents = []
959 idents = []
952 else:
960 else:
953 idents = self.client._build_targets(targets)[0]
961 idents = self.client._build_targets(targets)[0]
954
962
955 after = self._render_dependency(after)
963 after = self._render_dependency(after)
956 follow = self._render_dependency(follow)
964 follow = self._render_dependency(follow)
957 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
965 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
958
966
959 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
967 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
960 subheader=subheader)
968 subheader=subheader)
961 tracker = None if track is False else msg['tracker']
969 tracker = None if track is False else msg['tracker']
962
970
963 ar = AsyncResult(self.client, msg['msg_id'], fname=f.__name__, targets=None, tracker=tracker)
971 ar = AsyncResult(self.client, msg['msg_id'], fname=f.__name__, targets=None, tracker=tracker)
964
972
965 if block:
973 if block:
966 try:
974 try:
967 return ar.get()
975 return ar.get()
968 except KeyboardInterrupt:
976 except KeyboardInterrupt:
969 pass
977 pass
970 return ar
978 return ar
971
979
972 @spin_after
980 @spin_after
973 @save_ids
981 @save_ids
974 def map(self, f, *sequences, **kwargs):
982 def map(self, f, *sequences, **kwargs):
975 """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult
983 """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult
976
984
977 Parallel version of builtin `map`, load-balanced by this View.
985 Parallel version of builtin `map`, load-balanced by this View.
978
986
979 `block`, and `chunksize` can be specified by keyword only.
987 `block`, and `chunksize` can be specified by keyword only.
980
988
981 Each `chunksize` elements will be a separate task, and will be
989 Each `chunksize` elements will be a separate task, and will be
982 load-balanced. This lets individual elements be available for iteration
990 load-balanced. This lets individual elements be available for iteration
983 as soon as they arrive.
991 as soon as they arrive.
984
992
985 Parameters
993 Parameters
986 ----------
994 ----------
987
995
988 f : callable
996 f : callable
989 function to be mapped
997 function to be mapped
990 *sequences: one or more sequences of matching length
998 *sequences: one or more sequences of matching length
991 the sequences to be distributed and passed to `f`
999 the sequences to be distributed and passed to `f`
992 block : bool
1000 block : bool
993 whether to wait for the result or not [default self.block]
1001 whether to wait for the result or not [default self.block]
994 track : bool
1002 track : bool
995 whether to create a MessageTracker to allow the user to
1003 whether to create a MessageTracker to allow the user to
996 safely edit after arrays and buffers during non-copying
1004 safely edit after arrays and buffers during non-copying
997 sends.
1005 sends.
998 chunksize : int
1006 chunksize : int
999 how many elements should be in each task [default 1]
1007 how many elements should be in each task [default 1]
1000
1008
1001 Returns
1009 Returns
1002 -------
1010 -------
1003
1011
1004 if block=False:
1012 if block=False:
1005 AsyncMapResult
1013 AsyncMapResult
1006 An object like AsyncResult, but which reassembles the sequence of results
1014 An object like AsyncResult, but which reassembles the sequence of results
1007 into a single list. AsyncMapResults can be iterated through before all
1015 into a single list. AsyncMapResults can be iterated through before all
1008 results are complete.
1016 results are complete.
1009 else:
1017 else:
1010 the result of map(f,*sequences)
1018 the result of map(f,*sequences)
1011
1019
1012 """
1020 """
1013
1021
1014 # default
1022 # default
1015 block = kwargs.get('block', self.block)
1023 block = kwargs.get('block', self.block)
1016 chunksize = kwargs.get('chunksize', 1)
1024 chunksize = kwargs.get('chunksize', 1)
1017
1025
1018 keyset = set(kwargs.keys())
1026 keyset = set(kwargs.keys())
1019 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1027 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1020 if extra_keys:
1028 if extra_keys:
1021 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1029 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1022
1030
1023 assert len(sequences) > 0, "must have some sequences to map onto!"
1031 assert len(sequences) > 0, "must have some sequences to map onto!"
1024
1032
1025 pf = ParallelFunction(self, f, block=block, chunksize=chunksize)
1033 pf = ParallelFunction(self, f, block=block, chunksize=chunksize)
1026 return pf.map(*sequences)
1034 return pf.map(*sequences)
1027
1035
1028 __all__ = ['LoadBalancedView', 'DirectView'] No newline at end of file
1036 __all__ = ['LoadBalancedView', 'DirectView']
General Comments 0
You need to be logged in to leave comments. Login now