##// END OF EJS Templates
remove non-functional View.kill method...
MinRK -
Show More
@@ -1,1103 +1,1097 b''
1 """Views of remote engines.
1 """Views of remote engines.
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import imp
18 import imp
19 import sys
19 import sys
20 import warnings
20 import warnings
21 from contextlib import contextmanager
21 from contextlib import contextmanager
22 from types import ModuleType
22 from types import ModuleType
23
23
24 import zmq
24 import zmq
25
25
26 from IPython.testing.skipdoctest import skip_doctest
26 from IPython.testing.skipdoctest import skip_doctest
27 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
29 )
29 )
30 from IPython.external.decorator import decorator
30 from IPython.external.decorator import decorator
31
31
32 from IPython.parallel import util
32 from IPython.parallel import util
33 from IPython.parallel.controller.dependency import Dependency, dependent
33 from IPython.parallel.controller.dependency import Dependency, dependent
34
34
35 from . import map as Map
35 from . import map as Map
36 from .asyncresult import AsyncResult, AsyncMapResult
36 from .asyncresult import AsyncResult, AsyncMapResult
37 from .remotefunction import ParallelFunction, parallel, remote, getname
37 from .remotefunction import ParallelFunction, parallel, remote, getname
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Decorators
40 # Decorators
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 @decorator
43 @decorator
44 def save_ids(f, self, *args, **kwargs):
44 def save_ids(f, self, *args, **kwargs):
45 """Keep our history and outstanding attributes up to date after a method call."""
45 """Keep our history and outstanding attributes up to date after a method call."""
46 n_previous = len(self.client.history)
46 n_previous = len(self.client.history)
47 try:
47 try:
48 ret = f(self, *args, **kwargs)
48 ret = f(self, *args, **kwargs)
49 finally:
49 finally:
50 nmsgs = len(self.client.history) - n_previous
50 nmsgs = len(self.client.history) - n_previous
51 msg_ids = self.client.history[-nmsgs:]
51 msg_ids = self.client.history[-nmsgs:]
52 self.history.extend(msg_ids)
52 self.history.extend(msg_ids)
53 map(self.outstanding.add, msg_ids)
53 map(self.outstanding.add, msg_ids)
54 return ret
54 return ret
55
55
56 @decorator
56 @decorator
57 def sync_results(f, self, *args, **kwargs):
57 def sync_results(f, self, *args, **kwargs):
58 """sync relevant results from self.client to our results attribute."""
58 """sync relevant results from self.client to our results attribute."""
59 ret = f(self, *args, **kwargs)
59 ret = f(self, *args, **kwargs)
60 delta = self.outstanding.difference(self.client.outstanding)
60 delta = self.outstanding.difference(self.client.outstanding)
61 completed = self.outstanding.intersection(delta)
61 completed = self.outstanding.intersection(delta)
62 self.outstanding = self.outstanding.difference(completed)
62 self.outstanding = self.outstanding.difference(completed)
63 return ret
63 return ret
64
64
65 @decorator
65 @decorator
66 def spin_after(f, self, *args, **kwargs):
66 def spin_after(f, self, *args, **kwargs):
67 """call spin after the method."""
67 """call spin after the method."""
68 ret = f(self, *args, **kwargs)
68 ret = f(self, *args, **kwargs)
69 self.spin()
69 self.spin()
70 return ret
70 return ret
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # Classes
73 # Classes
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76 @skip_doctest
76 @skip_doctest
77 class View(HasTraits):
77 class View(HasTraits):
78 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
78 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
79
79
80 Don't use this class, use subclasses.
80 Don't use this class, use subclasses.
81
81
82 Methods
82 Methods
83 -------
83 -------
84
84
85 spin
85 spin
86 flushes incoming results and registration state changes
86 flushes incoming results and registration state changes
87 control methods spin, and requesting `ids` also ensures up to date
87 control methods spin, and requesting `ids` also ensures up to date
88
88
89 wait
89 wait
90 wait on one or more msg_ids
90 wait on one or more msg_ids
91
91
92 execution methods
92 execution methods
93 apply
93 apply
94 legacy: execute, run
94 legacy: execute, run
95
95
96 data movement
96 data movement
97 push, pull, scatter, gather
97 push, pull, scatter, gather
98
98
99 query methods
99 query methods
100 get_result, queue_status, purge_results, result_status
100 get_result, queue_status, purge_results, result_status
101
101
102 control methods
102 control methods
103 abort, shutdown
103 abort, shutdown
104
104
105 """
105 """
106 # flags
106 # flags
107 block=Bool(False)
107 block=Bool(False)
108 track=Bool(True)
108 track=Bool(True)
109 targets = Any()
109 targets = Any()
110
110
111 history=List()
111 history=List()
112 outstanding = Set()
112 outstanding = Set()
113 results = Dict()
113 results = Dict()
114 client = Instance('IPython.parallel.Client')
114 client = Instance('IPython.parallel.Client')
115
115
116 _socket = Instance('zmq.Socket')
116 _socket = Instance('zmq.Socket')
117 _flag_names = List(['targets', 'block', 'track'])
117 _flag_names = List(['targets', 'block', 'track'])
118 _targets = Any()
118 _targets = Any()
119 _idents = Any()
119 _idents = Any()
120
120
121 def __init__(self, client=None, socket=None, **flags):
121 def __init__(self, client=None, socket=None, **flags):
122 super(View, self).__init__(client=client, _socket=socket)
122 super(View, self).__init__(client=client, _socket=socket)
123 self.results = client.results
123 self.results = client.results
124 self.block = client.block
124 self.block = client.block
125
125
126 self.set_flags(**flags)
126 self.set_flags(**flags)
127
127
128 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
128 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
129
129
130 def __repr__(self):
130 def __repr__(self):
131 strtargets = str(self.targets)
131 strtargets = str(self.targets)
132 if len(strtargets) > 16:
132 if len(strtargets) > 16:
133 strtargets = strtargets[:12]+'...]'
133 strtargets = strtargets[:12]+'...]'
134 return "<%s %s>"%(self.__class__.__name__, strtargets)
134 return "<%s %s>"%(self.__class__.__name__, strtargets)
135
135
136 def __len__(self):
136 def __len__(self):
137 if isinstance(self.targets, list):
137 if isinstance(self.targets, list):
138 return len(self.targets)
138 return len(self.targets)
139 elif isinstance(self.targets, int):
139 elif isinstance(self.targets, int):
140 return 1
140 return 1
141 else:
141 else:
142 return len(self.client)
142 return len(self.client)
143
143
144 def set_flags(self, **kwargs):
144 def set_flags(self, **kwargs):
145 """set my attribute flags by keyword.
145 """set my attribute flags by keyword.
146
146
147 Views determine behavior with a few attributes (`block`, `track`, etc.).
147 Views determine behavior with a few attributes (`block`, `track`, etc.).
148 These attributes can be set all at once by name with this method.
148 These attributes can be set all at once by name with this method.
149
149
150 Parameters
150 Parameters
151 ----------
151 ----------
152
152
153 block : bool
153 block : bool
154 whether to wait for results
154 whether to wait for results
155 track : bool
155 track : bool
156 whether to create a MessageTracker to allow the user to
156 whether to create a MessageTracker to allow the user to
157 safely edit after arrays and buffers during non-copying
157 safely edit after arrays and buffers during non-copying
158 sends.
158 sends.
159 """
159 """
160 for name, value in kwargs.iteritems():
160 for name, value in kwargs.iteritems():
161 if name not in self._flag_names:
161 if name not in self._flag_names:
162 raise KeyError("Invalid name: %r"%name)
162 raise KeyError("Invalid name: %r"%name)
163 else:
163 else:
164 setattr(self, name, value)
164 setattr(self, name, value)
165
165
166 @contextmanager
166 @contextmanager
167 def temp_flags(self, **kwargs):
167 def temp_flags(self, **kwargs):
168 """temporarily set flags, for use in `with` statements.
168 """temporarily set flags, for use in `with` statements.
169
169
170 See set_flags for permanent setting of flags
170 See set_flags for permanent setting of flags
171
171
172 Examples
172 Examples
173 --------
173 --------
174
174
175 >>> view.track=False
175 >>> view.track=False
176 ...
176 ...
177 >>> with view.temp_flags(track=True):
177 >>> with view.temp_flags(track=True):
178 ... ar = view.apply(dostuff, my_big_array)
178 ... ar = view.apply(dostuff, my_big_array)
179 ... ar.tracker.wait() # wait for send to finish
179 ... ar.tracker.wait() # wait for send to finish
180 >>> view.track
180 >>> view.track
181 False
181 False
182
182
183 """
183 """
184 # preflight: save flags, and set temporaries
184 # preflight: save flags, and set temporaries
185 saved_flags = {}
185 saved_flags = {}
186 for f in self._flag_names:
186 for f in self._flag_names:
187 saved_flags[f] = getattr(self, f)
187 saved_flags[f] = getattr(self, f)
188 self.set_flags(**kwargs)
188 self.set_flags(**kwargs)
189 # yield to the with-statement block
189 # yield to the with-statement block
190 try:
190 try:
191 yield
191 yield
192 finally:
192 finally:
193 # postflight: restore saved flags
193 # postflight: restore saved flags
194 self.set_flags(**saved_flags)
194 self.set_flags(**saved_flags)
195
195
196
196
197 #----------------------------------------------------------------
197 #----------------------------------------------------------------
198 # apply
198 # apply
199 #----------------------------------------------------------------
199 #----------------------------------------------------------------
200
200
201 @sync_results
201 @sync_results
202 @save_ids
202 @save_ids
203 def _really_apply(self, f, args, kwargs, block=None, **options):
203 def _really_apply(self, f, args, kwargs, block=None, **options):
204 """wrapper for client.send_apply_request"""
204 """wrapper for client.send_apply_request"""
205 raise NotImplementedError("Implement in subclasses")
205 raise NotImplementedError("Implement in subclasses")
206
206
207 def apply(self, f, *args, **kwargs):
207 def apply(self, f, *args, **kwargs):
208 """calls f(*args, **kwargs) on remote engines, returning the result.
208 """calls f(*args, **kwargs) on remote engines, returning the result.
209
209
210 This method sets all apply flags via this View's attributes.
210 This method sets all apply flags via this View's attributes.
211
211
212 if self.block is False:
212 if self.block is False:
213 returns AsyncResult
213 returns AsyncResult
214 else:
214 else:
215 returns actual result of f(*args, **kwargs)
215 returns actual result of f(*args, **kwargs)
216 """
216 """
217 return self._really_apply(f, args, kwargs)
217 return self._really_apply(f, args, kwargs)
218
218
219 def apply_async(self, f, *args, **kwargs):
219 def apply_async(self, f, *args, **kwargs):
220 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
220 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
221
221
222 returns AsyncResult
222 returns AsyncResult
223 """
223 """
224 return self._really_apply(f, args, kwargs, block=False)
224 return self._really_apply(f, args, kwargs, block=False)
225
225
226 @spin_after
226 @spin_after
227 def apply_sync(self, f, *args, **kwargs):
227 def apply_sync(self, f, *args, **kwargs):
228 """calls f(*args, **kwargs) on remote engines in a blocking manner,
228 """calls f(*args, **kwargs) on remote engines in a blocking manner,
229 returning the result.
229 returning the result.
230
230
231 returns: actual result of f(*args, **kwargs)
231 returns: actual result of f(*args, **kwargs)
232 """
232 """
233 return self._really_apply(f, args, kwargs, block=True)
233 return self._really_apply(f, args, kwargs, block=True)
234
234
235 #----------------------------------------------------------------
235 #----------------------------------------------------------------
236 # wrappers for client and control methods
236 # wrappers for client and control methods
237 #----------------------------------------------------------------
237 #----------------------------------------------------------------
238 @sync_results
238 @sync_results
239 def spin(self):
239 def spin(self):
240 """spin the client, and sync"""
240 """spin the client, and sync"""
241 self.client.spin()
241 self.client.spin()
242
242
243 @sync_results
243 @sync_results
244 def wait(self, jobs=None, timeout=-1):
244 def wait(self, jobs=None, timeout=-1):
245 """waits on one or more `jobs`, for up to `timeout` seconds.
245 """waits on one or more `jobs`, for up to `timeout` seconds.
246
246
247 Parameters
247 Parameters
248 ----------
248 ----------
249
249
250 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
250 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
251 ints are indices to self.history
251 ints are indices to self.history
252 strs are msg_ids
252 strs are msg_ids
253 default: wait on all outstanding messages
253 default: wait on all outstanding messages
254 timeout : float
254 timeout : float
255 a time in seconds, after which to give up.
255 a time in seconds, after which to give up.
256 default is -1, which means no timeout
256 default is -1, which means no timeout
257
257
258 Returns
258 Returns
259 -------
259 -------
260
260
261 True : when all msg_ids are done
261 True : when all msg_ids are done
262 False : timeout reached, some msg_ids still outstanding
262 False : timeout reached, some msg_ids still outstanding
263 """
263 """
264 if jobs is None:
264 if jobs is None:
265 jobs = self.history
265 jobs = self.history
266 return self.client.wait(jobs, timeout)
266 return self.client.wait(jobs, timeout)
267
267
268 def abort(self, jobs=None, targets=None, block=None):
268 def abort(self, jobs=None, targets=None, block=None):
269 """Abort jobs on my engines.
269 """Abort jobs on my engines.
270
270
271 Parameters
271 Parameters
272 ----------
272 ----------
273
273
274 jobs : None, str, list of strs, optional
274 jobs : None, str, list of strs, optional
275 if None: abort all jobs.
275 if None: abort all jobs.
276 else: abort specific msg_id(s).
276 else: abort specific msg_id(s).
277 """
277 """
278 block = block if block is not None else self.block
278 block = block if block is not None else self.block
279 targets = targets if targets is not None else self.targets
279 targets = targets if targets is not None else self.targets
280 jobs = jobs if jobs is not None else list(self.outstanding)
280 jobs = jobs if jobs is not None else list(self.outstanding)
281
281
282 return self.client.abort(jobs=jobs, targets=targets, block=block)
282 return self.client.abort(jobs=jobs, targets=targets, block=block)
283
283
284 def queue_status(self, targets=None, verbose=False):
284 def queue_status(self, targets=None, verbose=False):
285 """Fetch the Queue status of my engines"""
285 """Fetch the Queue status of my engines"""
286 targets = targets if targets is not None else self.targets
286 targets = targets if targets is not None else self.targets
287 return self.client.queue_status(targets=targets, verbose=verbose)
287 return self.client.queue_status(targets=targets, verbose=verbose)
288
288
289 def purge_results(self, jobs=[], targets=[]):
289 def purge_results(self, jobs=[], targets=[]):
290 """Instruct the controller to forget specific results."""
290 """Instruct the controller to forget specific results."""
291 if targets is None or targets == 'all':
291 if targets is None or targets == 'all':
292 targets = self.targets
292 targets = self.targets
293 return self.client.purge_results(jobs=jobs, targets=targets)
293 return self.client.purge_results(jobs=jobs, targets=targets)
294
294
295 def shutdown(self, targets=None, restart=False, hub=False, block=None):
295 def shutdown(self, targets=None, restart=False, hub=False, block=None):
296 """Terminates one or more engine processes, optionally including the hub.
296 """Terminates one or more engine processes, optionally including the hub.
297 """
297 """
298 block = self.block if block is None else block
298 block = self.block if block is None else block
299 if targets is None or targets == 'all':
299 if targets is None or targets == 'all':
300 targets = self.targets
300 targets = self.targets
301 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
301 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
302
302
303 @spin_after
303 @spin_after
304 def get_result(self, indices_or_msg_ids=None):
304 def get_result(self, indices_or_msg_ids=None):
305 """return one or more results, specified by history index or msg_id.
305 """return one or more results, specified by history index or msg_id.
306
306
307 See client.get_result for details.
307 See client.get_result for details.
308
308
309 """
309 """
310
310
311 if indices_or_msg_ids is None:
311 if indices_or_msg_ids is None:
312 indices_or_msg_ids = -1
312 indices_or_msg_ids = -1
313 if isinstance(indices_or_msg_ids, int):
313 if isinstance(indices_or_msg_ids, int):
314 indices_or_msg_ids = self.history[indices_or_msg_ids]
314 indices_or_msg_ids = self.history[indices_or_msg_ids]
315 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
315 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
316 indices_or_msg_ids = list(indices_or_msg_ids)
316 indices_or_msg_ids = list(indices_or_msg_ids)
317 for i,index in enumerate(indices_or_msg_ids):
317 for i,index in enumerate(indices_or_msg_ids):
318 if isinstance(index, int):
318 if isinstance(index, int):
319 indices_or_msg_ids[i] = self.history[index]
319 indices_or_msg_ids[i] = self.history[index]
320 return self.client.get_result(indices_or_msg_ids)
320 return self.client.get_result(indices_or_msg_ids)
321
321
322 #-------------------------------------------------------------------
322 #-------------------------------------------------------------------
323 # Map
323 # Map
324 #-------------------------------------------------------------------
324 #-------------------------------------------------------------------
325
325
326 def map(self, f, *sequences, **kwargs):
326 def map(self, f, *sequences, **kwargs):
327 """override in subclasses"""
327 """override in subclasses"""
328 raise NotImplementedError
328 raise NotImplementedError
329
329
330 def map_async(self, f, *sequences, **kwargs):
330 def map_async(self, f, *sequences, **kwargs):
331 """Parallel version of builtin `map`, using this view's engines.
331 """Parallel version of builtin `map`, using this view's engines.
332
332
333 This is equivalent to map(...block=False)
333 This is equivalent to map(...block=False)
334
334
335 See `self.map` for details.
335 See `self.map` for details.
336 """
336 """
337 if 'block' in kwargs:
337 if 'block' in kwargs:
338 raise TypeError("map_async doesn't take a `block` keyword argument.")
338 raise TypeError("map_async doesn't take a `block` keyword argument.")
339 kwargs['block'] = False
339 kwargs['block'] = False
340 return self.map(f,*sequences,**kwargs)
340 return self.map(f,*sequences,**kwargs)
341
341
342 def map_sync(self, f, *sequences, **kwargs):
342 def map_sync(self, f, *sequences, **kwargs):
343 """Parallel version of builtin `map`, using this view's engines.
343 """Parallel version of builtin `map`, using this view's engines.
344
344
345 This is equivalent to map(...block=True)
345 This is equivalent to map(...block=True)
346
346
347 See `self.map` for details.
347 See `self.map` for details.
348 """
348 """
349 if 'block' in kwargs:
349 if 'block' in kwargs:
350 raise TypeError("map_sync doesn't take a `block` keyword argument.")
350 raise TypeError("map_sync doesn't take a `block` keyword argument.")
351 kwargs['block'] = True
351 kwargs['block'] = True
352 return self.map(f,*sequences,**kwargs)
352 return self.map(f,*sequences,**kwargs)
353
353
354 def imap(self, f, *sequences, **kwargs):
354 def imap(self, f, *sequences, **kwargs):
355 """Parallel version of `itertools.imap`.
355 """Parallel version of `itertools.imap`.
356
356
357 See `self.map` for details.
357 See `self.map` for details.
358
358
359 """
359 """
360
360
361 return iter(self.map_async(f,*sequences, **kwargs))
361 return iter(self.map_async(f,*sequences, **kwargs))
362
362
363 #-------------------------------------------------------------------
363 #-------------------------------------------------------------------
364 # Decorators
364 # Decorators
365 #-------------------------------------------------------------------
365 #-------------------------------------------------------------------
366
366
367 def remote(self, block=True, **flags):
367 def remote(self, block=True, **flags):
368 """Decorator for making a RemoteFunction"""
368 """Decorator for making a RemoteFunction"""
369 block = self.block if block is None else block
369 block = self.block if block is None else block
370 return remote(self, block=block, **flags)
370 return remote(self, block=block, **flags)
371
371
372 def parallel(self, dist='b', block=None, **flags):
372 def parallel(self, dist='b', block=None, **flags):
373 """Decorator for making a ParallelFunction"""
373 """Decorator for making a ParallelFunction"""
374 block = self.block if block is None else block
374 block = self.block if block is None else block
375 return parallel(self, dist=dist, block=block, **flags)
375 return parallel(self, dist=dist, block=block, **flags)
376
376
377 @skip_doctest
377 @skip_doctest
378 class DirectView(View):
378 class DirectView(View):
379 """Direct Multiplexer View of one or more engines.
379 """Direct Multiplexer View of one or more engines.
380
380
381 These are created via indexed access to a client:
381 These are created via indexed access to a client:
382
382
383 >>> dv_1 = client[1]
383 >>> dv_1 = client[1]
384 >>> dv_all = client[:]
384 >>> dv_all = client[:]
385 >>> dv_even = client[::2]
385 >>> dv_even = client[::2]
386 >>> dv_some = client[1:3]
386 >>> dv_some = client[1:3]
387
387
388 This object provides dictionary access to engine namespaces:
388 This object provides dictionary access to engine namespaces:
389
389
390 # push a=5:
390 # push a=5:
391 >>> dv['a'] = 5
391 >>> dv['a'] = 5
392 # pull 'foo':
392 # pull 'foo':
393 >>> db['foo']
393 >>> db['foo']
394
394
395 """
395 """
396
396
397 def __init__(self, client=None, socket=None, targets=None):
397 def __init__(self, client=None, socket=None, targets=None):
398 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
398 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
399
399
400 @property
400 @property
401 def importer(self):
401 def importer(self):
402 """sync_imports(local=True) as a property.
402 """sync_imports(local=True) as a property.
403
403
404 See sync_imports for details.
404 See sync_imports for details.
405
405
406 """
406 """
407 return self.sync_imports(True)
407 return self.sync_imports(True)
408
408
409 @contextmanager
409 @contextmanager
410 def sync_imports(self, local=True, quiet=False):
410 def sync_imports(self, local=True, quiet=False):
411 """Context Manager for performing simultaneous local and remote imports.
411 """Context Manager for performing simultaneous local and remote imports.
412
412
413 'import x as y' will *not* work. The 'as y' part will simply be ignored.
413 'import x as y' will *not* work. The 'as y' part will simply be ignored.
414
414
415 If `local=True`, then the package will also be imported locally.
415 If `local=True`, then the package will also be imported locally.
416
416
417 If `quiet=True`, no output will be produced when attempting remote
417 If `quiet=True`, no output will be produced when attempting remote
418 imports.
418 imports.
419
419
420 Note that remote-only (`local=False`) imports have not been implemented.
420 Note that remote-only (`local=False`) imports have not been implemented.
421
421
422 >>> with view.sync_imports():
422 >>> with view.sync_imports():
423 ... from numpy import recarray
423 ... from numpy import recarray
424 importing recarray from numpy on engine(s)
424 importing recarray from numpy on engine(s)
425
425
426 """
426 """
427 import __builtin__
427 import __builtin__
428 local_import = __builtin__.__import__
428 local_import = __builtin__.__import__
429 modules = set()
429 modules = set()
430 results = []
430 results = []
431 @util.interactive
431 @util.interactive
432 def remote_import(name, fromlist, level):
432 def remote_import(name, fromlist, level):
433 """the function to be passed to apply, that actually performs the import
433 """the function to be passed to apply, that actually performs the import
434 on the engine, and loads up the user namespace.
434 on the engine, and loads up the user namespace.
435 """
435 """
436 import sys
436 import sys
437 user_ns = globals()
437 user_ns = globals()
438 mod = __import__(name, fromlist=fromlist, level=level)
438 mod = __import__(name, fromlist=fromlist, level=level)
439 if fromlist:
439 if fromlist:
440 for key in fromlist:
440 for key in fromlist:
441 user_ns[key] = getattr(mod, key)
441 user_ns[key] = getattr(mod, key)
442 else:
442 else:
443 user_ns[name] = sys.modules[name]
443 user_ns[name] = sys.modules[name]
444
444
445 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
445 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
446 """the drop-in replacement for __import__, that optionally imports
446 """the drop-in replacement for __import__, that optionally imports
447 locally as well.
447 locally as well.
448 """
448 """
449 # don't override nested imports
449 # don't override nested imports
450 save_import = __builtin__.__import__
450 save_import = __builtin__.__import__
451 __builtin__.__import__ = local_import
451 __builtin__.__import__ = local_import
452
452
453 if imp.lock_held():
453 if imp.lock_held():
454 # this is a side-effect import, don't do it remotely, or even
454 # this is a side-effect import, don't do it remotely, or even
455 # ignore the local effects
455 # ignore the local effects
456 return local_import(name, globals, locals, fromlist, level)
456 return local_import(name, globals, locals, fromlist, level)
457
457
458 imp.acquire_lock()
458 imp.acquire_lock()
459 if local:
459 if local:
460 mod = local_import(name, globals, locals, fromlist, level)
460 mod = local_import(name, globals, locals, fromlist, level)
461 else:
461 else:
462 raise NotImplementedError("remote-only imports not yet implemented")
462 raise NotImplementedError("remote-only imports not yet implemented")
463 imp.release_lock()
463 imp.release_lock()
464
464
465 key = name+':'+','.join(fromlist or [])
465 key = name+':'+','.join(fromlist or [])
466 if level == -1 and key not in modules:
466 if level == -1 and key not in modules:
467 modules.add(key)
467 modules.add(key)
468 if not quiet:
468 if not quiet:
469 if fromlist:
469 if fromlist:
470 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
470 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
471 else:
471 else:
472 print "importing %s on engine(s)"%name
472 print "importing %s on engine(s)"%name
473 results.append(self.apply_async(remote_import, name, fromlist, level))
473 results.append(self.apply_async(remote_import, name, fromlist, level))
474 # restore override
474 # restore override
475 __builtin__.__import__ = save_import
475 __builtin__.__import__ = save_import
476
476
477 return mod
477 return mod
478
478
479 # override __import__
479 # override __import__
480 __builtin__.__import__ = view_import
480 __builtin__.__import__ = view_import
481 try:
481 try:
482 # enter the block
482 # enter the block
483 yield
483 yield
484 except ImportError:
484 except ImportError:
485 if local:
485 if local:
486 raise
486 raise
487 else:
487 else:
488 # ignore import errors if not doing local imports
488 # ignore import errors if not doing local imports
489 pass
489 pass
490 finally:
490 finally:
491 # always restore __import__
491 # always restore __import__
492 __builtin__.__import__ = local_import
492 __builtin__.__import__ = local_import
493
493
494 for r in results:
494 for r in results:
495 # raise possible remote ImportErrors here
495 # raise possible remote ImportErrors here
496 r.get()
496 r.get()
497
497
498
498
499 @sync_results
499 @sync_results
500 @save_ids
500 @save_ids
501 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
501 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
502 """calls f(*args, **kwargs) on remote engines, returning the result.
502 """calls f(*args, **kwargs) on remote engines, returning the result.
503
503
504 This method sets all of `apply`'s flags via this View's attributes.
504 This method sets all of `apply`'s flags via this View's attributes.
505
505
506 Parameters
506 Parameters
507 ----------
507 ----------
508
508
509 f : callable
509 f : callable
510
510
511 args : list [default: empty]
511 args : list [default: empty]
512
512
513 kwargs : dict [default: empty]
513 kwargs : dict [default: empty]
514
514
515 targets : target list [default: self.targets]
515 targets : target list [default: self.targets]
516 where to run
516 where to run
517 block : bool [default: self.block]
517 block : bool [default: self.block]
518 whether to block
518 whether to block
519 track : bool [default: self.track]
519 track : bool [default: self.track]
520 whether to ask zmq to track the message, for safe non-copying sends
520 whether to ask zmq to track the message, for safe non-copying sends
521
521
522 Returns
522 Returns
523 -------
523 -------
524
524
525 if self.block is False:
525 if self.block is False:
526 returns AsyncResult
526 returns AsyncResult
527 else:
527 else:
528 returns actual result of f(*args, **kwargs) on the engine(s)
528 returns actual result of f(*args, **kwargs) on the engine(s)
529 This will be a list of self.targets is also a list (even length 1), or
529 This will be a list of self.targets is also a list (even length 1), or
530 the single result if self.targets is an integer engine id
530 the single result if self.targets is an integer engine id
531 """
531 """
532 args = [] if args is None else args
532 args = [] if args is None else args
533 kwargs = {} if kwargs is None else kwargs
533 kwargs = {} if kwargs is None else kwargs
534 block = self.block if block is None else block
534 block = self.block if block is None else block
535 track = self.track if track is None else track
535 track = self.track if track is None else track
536 targets = self.targets if targets is None else targets
536 targets = self.targets if targets is None else targets
537
537
538 _idents = self.client._build_targets(targets)[0]
538 _idents = self.client._build_targets(targets)[0]
539 msg_ids = []
539 msg_ids = []
540 trackers = []
540 trackers = []
541 for ident in _idents:
541 for ident in _idents:
542 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
542 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
543 ident=ident)
543 ident=ident)
544 if track:
544 if track:
545 trackers.append(msg['tracker'])
545 trackers.append(msg['tracker'])
546 msg_ids.append(msg['header']['msg_id'])
546 msg_ids.append(msg['header']['msg_id'])
547 tracker = None if track is False else zmq.MessageTracker(*trackers)
547 tracker = None if track is False else zmq.MessageTracker(*trackers)
548 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
548 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
549 if block:
549 if block:
550 try:
550 try:
551 return ar.get()
551 return ar.get()
552 except KeyboardInterrupt:
552 except KeyboardInterrupt:
553 pass
553 pass
554 return ar
554 return ar
555
555
556
556
557 @spin_after
557 @spin_after
558 def map(self, f, *sequences, **kwargs):
558 def map(self, f, *sequences, **kwargs):
559 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
559 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
560
560
561 Parallel version of builtin `map`, using this View's `targets`.
561 Parallel version of builtin `map`, using this View's `targets`.
562
562
563 There will be one task per target, so work will be chunked
563 There will be one task per target, so work will be chunked
564 if the sequences are longer than `targets`.
564 if the sequences are longer than `targets`.
565
565
566 Results can be iterated as they are ready, but will become available in chunks.
566 Results can be iterated as they are ready, but will become available in chunks.
567
567
568 Parameters
568 Parameters
569 ----------
569 ----------
570
570
571 f : callable
571 f : callable
572 function to be mapped
572 function to be mapped
573 *sequences: one or more sequences of matching length
573 *sequences: one or more sequences of matching length
574 the sequences to be distributed and passed to `f`
574 the sequences to be distributed and passed to `f`
575 block : bool
575 block : bool
576 whether to wait for the result or not [default self.block]
576 whether to wait for the result or not [default self.block]
577
577
578 Returns
578 Returns
579 -------
579 -------
580
580
581 if block=False:
581 if block=False:
582 AsyncMapResult
582 AsyncMapResult
583 An object like AsyncResult, but which reassembles the sequence of results
583 An object like AsyncResult, but which reassembles the sequence of results
584 into a single list. AsyncMapResults can be iterated through before all
584 into a single list. AsyncMapResults can be iterated through before all
585 results are complete.
585 results are complete.
586 else:
586 else:
587 list
587 list
588 the result of map(f,*sequences)
588 the result of map(f,*sequences)
589 """
589 """
590
590
591 block = kwargs.pop('block', self.block)
591 block = kwargs.pop('block', self.block)
592 for k in kwargs.keys():
592 for k in kwargs.keys():
593 if k not in ['block', 'track']:
593 if k not in ['block', 'track']:
594 raise TypeError("invalid keyword arg, %r"%k)
594 raise TypeError("invalid keyword arg, %r"%k)
595
595
596 assert len(sequences) > 0, "must have some sequences to map onto!"
596 assert len(sequences) > 0, "must have some sequences to map onto!"
597 pf = ParallelFunction(self, f, block=block, **kwargs)
597 pf = ParallelFunction(self, f, block=block, **kwargs)
598 return pf.map(*sequences)
598 return pf.map(*sequences)
599
599
600 @sync_results
600 @sync_results
601 @save_ids
601 @save_ids
602 def execute(self, code, silent=True, targets=None, block=None):
602 def execute(self, code, silent=True, targets=None, block=None):
603 """Executes `code` on `targets` in blocking or nonblocking manner.
603 """Executes `code` on `targets` in blocking or nonblocking manner.
604
604
605 ``execute`` is always `bound` (affects engine namespace)
605 ``execute`` is always `bound` (affects engine namespace)
606
606
607 Parameters
607 Parameters
608 ----------
608 ----------
609
609
610 code : str
610 code : str
611 the code string to be executed
611 the code string to be executed
612 block : bool
612 block : bool
613 whether or not to wait until done to return
613 whether or not to wait until done to return
614 default: self.block
614 default: self.block
615 """
615 """
616 block = self.block if block is None else block
616 block = self.block if block is None else block
617 targets = self.targets if targets is None else targets
617 targets = self.targets if targets is None else targets
618
618
619 _idents = self.client._build_targets(targets)[0]
619 _idents = self.client._build_targets(targets)[0]
620 msg_ids = []
620 msg_ids = []
621 trackers = []
621 trackers = []
622 for ident in _idents:
622 for ident in _idents:
623 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
623 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
624 msg_ids.append(msg['header']['msg_id'])
624 msg_ids.append(msg['header']['msg_id'])
625 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets)
625 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets)
626 if block:
626 if block:
627 try:
627 try:
628 ar.get()
628 ar.get()
629 except KeyboardInterrupt:
629 except KeyboardInterrupt:
630 pass
630 pass
631 return ar
631 return ar
632
632
633 def run(self, filename, targets=None, block=None):
633 def run(self, filename, targets=None, block=None):
634 """Execute contents of `filename` on my engine(s).
634 """Execute contents of `filename` on my engine(s).
635
635
636 This simply reads the contents of the file and calls `execute`.
636 This simply reads the contents of the file and calls `execute`.
637
637
638 Parameters
638 Parameters
639 ----------
639 ----------
640
640
641 filename : str
641 filename : str
642 The path to the file
642 The path to the file
643 targets : int/str/list of ints/strs
643 targets : int/str/list of ints/strs
644 the engines on which to execute
644 the engines on which to execute
645 default : all
645 default : all
646 block : bool
646 block : bool
647 whether or not to wait until done
647 whether or not to wait until done
648 default: self.block
648 default: self.block
649
649
650 """
650 """
651 with open(filename, 'r') as f:
651 with open(filename, 'r') as f:
652 # add newline in case of trailing indented whitespace
652 # add newline in case of trailing indented whitespace
653 # which will cause SyntaxError
653 # which will cause SyntaxError
654 code = f.read()+'\n'
654 code = f.read()+'\n'
655 return self.execute(code, block=block, targets=targets)
655 return self.execute(code, block=block, targets=targets)
656
656
657 def update(self, ns):
657 def update(self, ns):
658 """update remote namespace with dict `ns`
658 """update remote namespace with dict `ns`
659
659
660 See `push` for details.
660 See `push` for details.
661 """
661 """
662 return self.push(ns, block=self.block, track=self.track)
662 return self.push(ns, block=self.block, track=self.track)
663
663
664 def push(self, ns, targets=None, block=None, track=None):
664 def push(self, ns, targets=None, block=None, track=None):
665 """update remote namespace with dict `ns`
665 """update remote namespace with dict `ns`
666
666
667 Parameters
667 Parameters
668 ----------
668 ----------
669
669
670 ns : dict
670 ns : dict
671 dict of keys with which to update engine namespace(s)
671 dict of keys with which to update engine namespace(s)
672 block : bool [default : self.block]
672 block : bool [default : self.block]
673 whether to wait to be notified of engine receipt
673 whether to wait to be notified of engine receipt
674
674
675 """
675 """
676
676
677 block = block if block is not None else self.block
677 block = block if block is not None else self.block
678 track = track if track is not None else self.track
678 track = track if track is not None else self.track
679 targets = targets if targets is not None else self.targets
679 targets = targets if targets is not None else self.targets
680 # applier = self.apply_sync if block else self.apply_async
680 # applier = self.apply_sync if block else self.apply_async
681 if not isinstance(ns, dict):
681 if not isinstance(ns, dict):
682 raise TypeError("Must be a dict, not %s"%type(ns))
682 raise TypeError("Must be a dict, not %s"%type(ns))
683 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
683 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
684
684
685 def get(self, key_s):
685 def get(self, key_s):
686 """get object(s) by `key_s` from remote namespace
686 """get object(s) by `key_s` from remote namespace
687
687
688 see `pull` for details.
688 see `pull` for details.
689 """
689 """
690 # block = block if block is not None else self.block
690 # block = block if block is not None else self.block
691 return self.pull(key_s, block=True)
691 return self.pull(key_s, block=True)
692
692
693 def pull(self, names, targets=None, block=None):
693 def pull(self, names, targets=None, block=None):
694 """get object(s) by `name` from remote namespace
694 """get object(s) by `name` from remote namespace
695
695
696 will return one object if it is a key.
696 will return one object if it is a key.
697 can also take a list of keys, in which case it will return a list of objects.
697 can also take a list of keys, in which case it will return a list of objects.
698 """
698 """
699 block = block if block is not None else self.block
699 block = block if block is not None else self.block
700 targets = targets if targets is not None else self.targets
700 targets = targets if targets is not None else self.targets
701 applier = self.apply_sync if block else self.apply_async
701 applier = self.apply_sync if block else self.apply_async
702 if isinstance(names, basestring):
702 if isinstance(names, basestring):
703 pass
703 pass
704 elif isinstance(names, (list,tuple,set)):
704 elif isinstance(names, (list,tuple,set)):
705 for key in names:
705 for key in names:
706 if not isinstance(key, basestring):
706 if not isinstance(key, basestring):
707 raise TypeError("keys must be str, not type %r"%type(key))
707 raise TypeError("keys must be str, not type %r"%type(key))
708 else:
708 else:
709 raise TypeError("names must be strs, not %r"%names)
709 raise TypeError("names must be strs, not %r"%names)
710 return self._really_apply(util._pull, (names,), block=block, targets=targets)
710 return self._really_apply(util._pull, (names,), block=block, targets=targets)
711
711
712 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
712 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
713 """
713 """
714 Partition a Python sequence and send the partitions to a set of engines.
714 Partition a Python sequence and send the partitions to a set of engines.
715 """
715 """
716 block = block if block is not None else self.block
716 block = block if block is not None else self.block
717 track = track if track is not None else self.track
717 track = track if track is not None else self.track
718 targets = targets if targets is not None else self.targets
718 targets = targets if targets is not None else self.targets
719
719
720 # construct integer ID list:
720 # construct integer ID list:
721 targets = self.client._build_targets(targets)[1]
721 targets = self.client._build_targets(targets)[1]
722
722
723 mapObject = Map.dists[dist]()
723 mapObject = Map.dists[dist]()
724 nparts = len(targets)
724 nparts = len(targets)
725 msg_ids = []
725 msg_ids = []
726 trackers = []
726 trackers = []
727 for index, engineid in enumerate(targets):
727 for index, engineid in enumerate(targets):
728 partition = mapObject.getPartition(seq, index, nparts)
728 partition = mapObject.getPartition(seq, index, nparts)
729 if flatten and len(partition) == 1:
729 if flatten and len(partition) == 1:
730 ns = {key: partition[0]}
730 ns = {key: partition[0]}
731 else:
731 else:
732 ns = {key: partition}
732 ns = {key: partition}
733 r = self.push(ns, block=False, track=track, targets=engineid)
733 r = self.push(ns, block=False, track=track, targets=engineid)
734 msg_ids.extend(r.msg_ids)
734 msg_ids.extend(r.msg_ids)
735 if track:
735 if track:
736 trackers.append(r._tracker)
736 trackers.append(r._tracker)
737
737
738 if track:
738 if track:
739 tracker = zmq.MessageTracker(*trackers)
739 tracker = zmq.MessageTracker(*trackers)
740 else:
740 else:
741 tracker = None
741 tracker = None
742
742
743 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
743 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
744 if block:
744 if block:
745 r.wait()
745 r.wait()
746 else:
746 else:
747 return r
747 return r
748
748
749 @sync_results
749 @sync_results
750 @save_ids
750 @save_ids
751 def gather(self, key, dist='b', targets=None, block=None):
751 def gather(self, key, dist='b', targets=None, block=None):
752 """
752 """
753 Gather a partitioned sequence on a set of engines as a single local seq.
753 Gather a partitioned sequence on a set of engines as a single local seq.
754 """
754 """
755 block = block if block is not None else self.block
755 block = block if block is not None else self.block
756 targets = targets if targets is not None else self.targets
756 targets = targets if targets is not None else self.targets
757 mapObject = Map.dists[dist]()
757 mapObject = Map.dists[dist]()
758 msg_ids = []
758 msg_ids = []
759
759
760 # construct integer ID list:
760 # construct integer ID list:
761 targets = self.client._build_targets(targets)[1]
761 targets = self.client._build_targets(targets)[1]
762
762
763 for index, engineid in enumerate(targets):
763 for index, engineid in enumerate(targets):
764 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
764 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
765
765
766 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
766 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
767
767
768 if block:
768 if block:
769 try:
769 try:
770 return r.get()
770 return r.get()
771 except KeyboardInterrupt:
771 except KeyboardInterrupt:
772 pass
772 pass
773 return r
773 return r
774
774
775 def __getitem__(self, key):
775 def __getitem__(self, key):
776 return self.get(key)
776 return self.get(key)
777
777
778 def __setitem__(self,key, value):
778 def __setitem__(self,key, value):
779 self.update({key:value})
779 self.update({key:value})
780
780
781 def clear(self, targets=None, block=False):
781 def clear(self, targets=None, block=False):
782 """Clear the remote namespaces on my engines."""
782 """Clear the remote namespaces on my engines."""
783 block = block if block is not None else self.block
783 block = block if block is not None else self.block
784 targets = targets if targets is not None else self.targets
784 targets = targets if targets is not None else self.targets
785 return self.client.clear(targets=targets, block=block)
785 return self.client.clear(targets=targets, block=block)
786
786
787 def kill(self, targets=None, block=True):
788 """Kill my engines."""
789 block = block if block is not None else self.block
790 targets = targets if targets is not None else self.targets
791 return self.client.kill(targets=targets, block=block)
792
793 #----------------------------------------
787 #----------------------------------------
794 # activate for %px, %autopx, etc. magics
788 # activate for %px, %autopx, etc. magics
795 #----------------------------------------
789 #----------------------------------------
796
790
797 def activate(self, suffix=''):
791 def activate(self, suffix=''):
798 """Activate IPython magics associated with this View
792 """Activate IPython magics associated with this View
799
793
800 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
794 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
801
795
802 Parameters
796 Parameters
803 ----------
797 ----------
804
798
805 suffix: str [default: '']
799 suffix: str [default: '']
806 The suffix, if any, for the magics. This allows you to have
800 The suffix, if any, for the magics. This allows you to have
807 multiple views associated with parallel magics at the same time.
801 multiple views associated with parallel magics at the same time.
808
802
809 e.g. ``rc[::2].activate(suffix='_even')`` will give you
803 e.g. ``rc[::2].activate(suffix='_even')`` will give you
810 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
804 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
811 on the even engines.
805 on the even engines.
812 """
806 """
813
807
814 from IPython.parallel.client.magics import ParallelMagics
808 from IPython.parallel.client.magics import ParallelMagics
815
809
816 try:
810 try:
817 # This is injected into __builtins__.
811 # This is injected into __builtins__.
818 ip = get_ipython()
812 ip = get_ipython()
819 except NameError:
813 except NameError:
820 print "The IPython parallel magics (%px, etc.) only work within IPython."
814 print "The IPython parallel magics (%px, etc.) only work within IPython."
821 return
815 return
822
816
823 M = ParallelMagics(ip, self, suffix)
817 M = ParallelMagics(ip, self, suffix)
824 ip.magics_manager.register(M)
818 ip.magics_manager.register(M)
825
819
826
820
827 @skip_doctest
821 @skip_doctest
828 class LoadBalancedView(View):
822 class LoadBalancedView(View):
829 """An load-balancing View that only executes via the Task scheduler.
823 """An load-balancing View that only executes via the Task scheduler.
830
824
831 Load-balanced views can be created with the client's `view` method:
825 Load-balanced views can be created with the client's `view` method:
832
826
833 >>> v = client.load_balanced_view()
827 >>> v = client.load_balanced_view()
834
828
835 or targets can be specified, to restrict the potential destinations:
829 or targets can be specified, to restrict the potential destinations:
836
830
837 >>> v = client.client.load_balanced_view([1,3])
831 >>> v = client.client.load_balanced_view([1,3])
838
832
839 which would restrict loadbalancing to between engines 1 and 3.
833 which would restrict loadbalancing to between engines 1 and 3.
840
834
841 """
835 """
842
836
843 follow=Any()
837 follow=Any()
844 after=Any()
838 after=Any()
845 timeout=CFloat()
839 timeout=CFloat()
846 retries = Integer(0)
840 retries = Integer(0)
847
841
848 _task_scheme = Any()
842 _task_scheme = Any()
849 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
843 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
850
844
851 def __init__(self, client=None, socket=None, **flags):
845 def __init__(self, client=None, socket=None, **flags):
852 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
846 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
853 self._task_scheme=client._task_scheme
847 self._task_scheme=client._task_scheme
854
848
855 def _validate_dependency(self, dep):
849 def _validate_dependency(self, dep):
856 """validate a dependency.
850 """validate a dependency.
857
851
858 For use in `set_flags`.
852 For use in `set_flags`.
859 """
853 """
860 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
854 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
861 return True
855 return True
862 elif isinstance(dep, (list,set, tuple)):
856 elif isinstance(dep, (list,set, tuple)):
863 for d in dep:
857 for d in dep:
864 if not isinstance(d, (basestring, AsyncResult)):
858 if not isinstance(d, (basestring, AsyncResult)):
865 return False
859 return False
866 elif isinstance(dep, dict):
860 elif isinstance(dep, dict):
867 if set(dep.keys()) != set(Dependency().as_dict().keys()):
861 if set(dep.keys()) != set(Dependency().as_dict().keys()):
868 return False
862 return False
869 if not isinstance(dep['msg_ids'], list):
863 if not isinstance(dep['msg_ids'], list):
870 return False
864 return False
871 for d in dep['msg_ids']:
865 for d in dep['msg_ids']:
872 if not isinstance(d, basestring):
866 if not isinstance(d, basestring):
873 return False
867 return False
874 else:
868 else:
875 return False
869 return False
876
870
877 return True
871 return True
878
872
879 def _render_dependency(self, dep):
873 def _render_dependency(self, dep):
880 """helper for building jsonable dependencies from various input forms."""
874 """helper for building jsonable dependencies from various input forms."""
881 if isinstance(dep, Dependency):
875 if isinstance(dep, Dependency):
882 return dep.as_dict()
876 return dep.as_dict()
883 elif isinstance(dep, AsyncResult):
877 elif isinstance(dep, AsyncResult):
884 return dep.msg_ids
878 return dep.msg_ids
885 elif dep is None:
879 elif dep is None:
886 return []
880 return []
887 else:
881 else:
888 # pass to Dependency constructor
882 # pass to Dependency constructor
889 return list(Dependency(dep))
883 return list(Dependency(dep))
890
884
891 def set_flags(self, **kwargs):
885 def set_flags(self, **kwargs):
892 """set my attribute flags by keyword.
886 """set my attribute flags by keyword.
893
887
894 A View is a wrapper for the Client's apply method, but with attributes
888 A View is a wrapper for the Client's apply method, but with attributes
895 that specify keyword arguments, those attributes can be set by keyword
889 that specify keyword arguments, those attributes can be set by keyword
896 argument with this method.
890 argument with this method.
897
891
898 Parameters
892 Parameters
899 ----------
893 ----------
900
894
901 block : bool
895 block : bool
902 whether to wait for results
896 whether to wait for results
903 track : bool
897 track : bool
904 whether to create a MessageTracker to allow the user to
898 whether to create a MessageTracker to allow the user to
905 safely edit after arrays and buffers during non-copying
899 safely edit after arrays and buffers during non-copying
906 sends.
900 sends.
907
901
908 after : Dependency or collection of msg_ids
902 after : Dependency or collection of msg_ids
909 Only for load-balanced execution (targets=None)
903 Only for load-balanced execution (targets=None)
910 Specify a list of msg_ids as a time-based dependency.
904 Specify a list of msg_ids as a time-based dependency.
911 This job will only be run *after* the dependencies
905 This job will only be run *after* the dependencies
912 have been met.
906 have been met.
913
907
914 follow : Dependency or collection of msg_ids
908 follow : Dependency or collection of msg_ids
915 Only for load-balanced execution (targets=None)
909 Only for load-balanced execution (targets=None)
916 Specify a list of msg_ids as a location-based dependency.
910 Specify a list of msg_ids as a location-based dependency.
917 This job will only be run on an engine where this dependency
911 This job will only be run on an engine where this dependency
918 is met.
912 is met.
919
913
920 timeout : float/int or None
914 timeout : float/int or None
921 Only for load-balanced execution (targets=None)
915 Only for load-balanced execution (targets=None)
922 Specify an amount of time (in seconds) for the scheduler to
916 Specify an amount of time (in seconds) for the scheduler to
923 wait for dependencies to be met before failing with a
917 wait for dependencies to be met before failing with a
924 DependencyTimeout.
918 DependencyTimeout.
925
919
926 retries : int
920 retries : int
927 Number of times a task will be retried on failure.
921 Number of times a task will be retried on failure.
928 """
922 """
929
923
930 super(LoadBalancedView, self).set_flags(**kwargs)
924 super(LoadBalancedView, self).set_flags(**kwargs)
931 for name in ('follow', 'after'):
925 for name in ('follow', 'after'):
932 if name in kwargs:
926 if name in kwargs:
933 value = kwargs[name]
927 value = kwargs[name]
934 if self._validate_dependency(value):
928 if self._validate_dependency(value):
935 setattr(self, name, value)
929 setattr(self, name, value)
936 else:
930 else:
937 raise ValueError("Invalid dependency: %r"%value)
931 raise ValueError("Invalid dependency: %r"%value)
938 if 'timeout' in kwargs:
932 if 'timeout' in kwargs:
939 t = kwargs['timeout']
933 t = kwargs['timeout']
940 if not isinstance(t, (int, long, float, type(None))):
934 if not isinstance(t, (int, long, float, type(None))):
941 raise TypeError("Invalid type for timeout: %r"%type(t))
935 raise TypeError("Invalid type for timeout: %r"%type(t))
942 if t is not None:
936 if t is not None:
943 if t < 0:
937 if t < 0:
944 raise ValueError("Invalid timeout: %s"%t)
938 raise ValueError("Invalid timeout: %s"%t)
945 self.timeout = t
939 self.timeout = t
946
940
947 @sync_results
941 @sync_results
948 @save_ids
942 @save_ids
949 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
943 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
950 after=None, follow=None, timeout=None,
944 after=None, follow=None, timeout=None,
951 targets=None, retries=None):
945 targets=None, retries=None):
952 """calls f(*args, **kwargs) on a remote engine, returning the result.
946 """calls f(*args, **kwargs) on a remote engine, returning the result.
953
947
954 This method temporarily sets all of `apply`'s flags for a single call.
948 This method temporarily sets all of `apply`'s flags for a single call.
955
949
956 Parameters
950 Parameters
957 ----------
951 ----------
958
952
959 f : callable
953 f : callable
960
954
961 args : list [default: empty]
955 args : list [default: empty]
962
956
963 kwargs : dict [default: empty]
957 kwargs : dict [default: empty]
964
958
965 block : bool [default: self.block]
959 block : bool [default: self.block]
966 whether to block
960 whether to block
967 track : bool [default: self.track]
961 track : bool [default: self.track]
968 whether to ask zmq to track the message, for safe non-copying sends
962 whether to ask zmq to track the message, for safe non-copying sends
969
963
970 !!!!!! TODO: THE REST HERE !!!!
964 !!!!!! TODO: THE REST HERE !!!!
971
965
972 Returns
966 Returns
973 -------
967 -------
974
968
975 if self.block is False:
969 if self.block is False:
976 returns AsyncResult
970 returns AsyncResult
977 else:
971 else:
978 returns actual result of f(*args, **kwargs) on the engine(s)
972 returns actual result of f(*args, **kwargs) on the engine(s)
979 This will be a list of self.targets is also a list (even length 1), or
973 This will be a list of self.targets is also a list (even length 1), or
980 the single result if self.targets is an integer engine id
974 the single result if self.targets is an integer engine id
981 """
975 """
982
976
983 # validate whether we can run
977 # validate whether we can run
984 if self._socket.closed:
978 if self._socket.closed:
985 msg = "Task farming is disabled"
979 msg = "Task farming is disabled"
986 if self._task_scheme == 'pure':
980 if self._task_scheme == 'pure':
987 msg += " because the pure ZMQ scheduler cannot handle"
981 msg += " because the pure ZMQ scheduler cannot handle"
988 msg += " disappearing engines."
982 msg += " disappearing engines."
989 raise RuntimeError(msg)
983 raise RuntimeError(msg)
990
984
991 if self._task_scheme == 'pure':
985 if self._task_scheme == 'pure':
992 # pure zmq scheme doesn't support extra features
986 # pure zmq scheme doesn't support extra features
993 msg = "Pure ZMQ scheduler doesn't support the following flags:"
987 msg = "Pure ZMQ scheduler doesn't support the following flags:"
994 "follow, after, retries, targets, timeout"
988 "follow, after, retries, targets, timeout"
995 if (follow or after or retries or targets or timeout):
989 if (follow or after or retries or targets or timeout):
996 # hard fail on Scheduler flags
990 # hard fail on Scheduler flags
997 raise RuntimeError(msg)
991 raise RuntimeError(msg)
998 if isinstance(f, dependent):
992 if isinstance(f, dependent):
999 # soft warn on functional dependencies
993 # soft warn on functional dependencies
1000 warnings.warn(msg, RuntimeWarning)
994 warnings.warn(msg, RuntimeWarning)
1001
995
1002 # build args
996 # build args
1003 args = [] if args is None else args
997 args = [] if args is None else args
1004 kwargs = {} if kwargs is None else kwargs
998 kwargs = {} if kwargs is None else kwargs
1005 block = self.block if block is None else block
999 block = self.block if block is None else block
1006 track = self.track if track is None else track
1000 track = self.track if track is None else track
1007 after = self.after if after is None else after
1001 after = self.after if after is None else after
1008 retries = self.retries if retries is None else retries
1002 retries = self.retries if retries is None else retries
1009 follow = self.follow if follow is None else follow
1003 follow = self.follow if follow is None else follow
1010 timeout = self.timeout if timeout is None else timeout
1004 timeout = self.timeout if timeout is None else timeout
1011 targets = self.targets if targets is None else targets
1005 targets = self.targets if targets is None else targets
1012
1006
1013 if not isinstance(retries, int):
1007 if not isinstance(retries, int):
1014 raise TypeError('retries must be int, not %r'%type(retries))
1008 raise TypeError('retries must be int, not %r'%type(retries))
1015
1009
1016 if targets is None:
1010 if targets is None:
1017 idents = []
1011 idents = []
1018 else:
1012 else:
1019 idents = self.client._build_targets(targets)[0]
1013 idents = self.client._build_targets(targets)[0]
1020 # ensure *not* bytes
1014 # ensure *not* bytes
1021 idents = [ ident.decode() for ident in idents ]
1015 idents = [ ident.decode() for ident in idents ]
1022
1016
1023 after = self._render_dependency(after)
1017 after = self._render_dependency(after)
1024 follow = self._render_dependency(follow)
1018 follow = self._render_dependency(follow)
1025 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1019 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1026
1020
1027 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1021 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1028 metadata=metadata)
1022 metadata=metadata)
1029 tracker = None if track is False else msg['tracker']
1023 tracker = None if track is False else msg['tracker']
1030
1024
1031 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1025 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1032
1026
1033 if block:
1027 if block:
1034 try:
1028 try:
1035 return ar.get()
1029 return ar.get()
1036 except KeyboardInterrupt:
1030 except KeyboardInterrupt:
1037 pass
1031 pass
1038 return ar
1032 return ar
1039
1033
1040 @spin_after
1034 @spin_after
1041 @save_ids
1035 @save_ids
1042 def map(self, f, *sequences, **kwargs):
1036 def map(self, f, *sequences, **kwargs):
1043 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1037 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1044
1038
1045 Parallel version of builtin `map`, load-balanced by this View.
1039 Parallel version of builtin `map`, load-balanced by this View.
1046
1040
1047 `block`, and `chunksize` can be specified by keyword only.
1041 `block`, and `chunksize` can be specified by keyword only.
1048
1042
1049 Each `chunksize` elements will be a separate task, and will be
1043 Each `chunksize` elements will be a separate task, and will be
1050 load-balanced. This lets individual elements be available for iteration
1044 load-balanced. This lets individual elements be available for iteration
1051 as soon as they arrive.
1045 as soon as they arrive.
1052
1046
1053 Parameters
1047 Parameters
1054 ----------
1048 ----------
1055
1049
1056 f : callable
1050 f : callable
1057 function to be mapped
1051 function to be mapped
1058 *sequences: one or more sequences of matching length
1052 *sequences: one or more sequences of matching length
1059 the sequences to be distributed and passed to `f`
1053 the sequences to be distributed and passed to `f`
1060 block : bool [default self.block]
1054 block : bool [default self.block]
1061 whether to wait for the result or not
1055 whether to wait for the result or not
1062 track : bool
1056 track : bool
1063 whether to create a MessageTracker to allow the user to
1057 whether to create a MessageTracker to allow the user to
1064 safely edit after arrays and buffers during non-copying
1058 safely edit after arrays and buffers during non-copying
1065 sends.
1059 sends.
1066 chunksize : int [default 1]
1060 chunksize : int [default 1]
1067 how many elements should be in each task.
1061 how many elements should be in each task.
1068 ordered : bool [default True]
1062 ordered : bool [default True]
1069 Whether the results should be gathered as they arrive, or enforce
1063 Whether the results should be gathered as they arrive, or enforce
1070 the order of submission.
1064 the order of submission.
1071
1065
1072 Only applies when iterating through AsyncMapResult as results arrive.
1066 Only applies when iterating through AsyncMapResult as results arrive.
1073 Has no effect when block=True.
1067 Has no effect when block=True.
1074
1068
1075 Returns
1069 Returns
1076 -------
1070 -------
1077
1071
1078 if block=False:
1072 if block=False:
1079 AsyncMapResult
1073 AsyncMapResult
1080 An object like AsyncResult, but which reassembles the sequence of results
1074 An object like AsyncResult, but which reassembles the sequence of results
1081 into a single list. AsyncMapResults can be iterated through before all
1075 into a single list. AsyncMapResults can be iterated through before all
1082 results are complete.
1076 results are complete.
1083 else:
1077 else:
1084 the result of map(f,*sequences)
1078 the result of map(f,*sequences)
1085
1079
1086 """
1080 """
1087
1081
1088 # default
1082 # default
1089 block = kwargs.get('block', self.block)
1083 block = kwargs.get('block', self.block)
1090 chunksize = kwargs.get('chunksize', 1)
1084 chunksize = kwargs.get('chunksize', 1)
1091 ordered = kwargs.get('ordered', True)
1085 ordered = kwargs.get('ordered', True)
1092
1086
1093 keyset = set(kwargs.keys())
1087 keyset = set(kwargs.keys())
1094 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1088 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1095 if extra_keys:
1089 if extra_keys:
1096 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1090 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1097
1091
1098 assert len(sequences) > 0, "must have some sequences to map onto!"
1092 assert len(sequences) > 0, "must have some sequences to map onto!"
1099
1093
1100 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1094 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1101 return pf.map(*sequences)
1095 return pf.map(*sequences)
1102
1096
1103 __all__ = ['LoadBalancedView', 'DirectView']
1097 __all__ = ['LoadBalancedView', 'DirectView']
General Comments 0
You need to be logged in to leave comments. Login now