##// END OF EJS Templates
View no longer has separate results dict...
MinRK -
Show More
@@ -1,1104 +1,1103 b''
1 """Views of remote engines.
1 """Views of remote engines.
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import imp
18 import imp
19 import sys
19 import sys
20 import warnings
20 import warnings
21 from contextlib import contextmanager
21 from contextlib import contextmanager
22 from types import ModuleType
22 from types import ModuleType
23
23
24 import zmq
24 import zmq
25
25
26 from IPython.testing.skipdoctest import skip_doctest
26 from IPython.testing.skipdoctest import skip_doctest
27 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
29 )
29 )
30 from IPython.external.decorator import decorator
30 from IPython.external.decorator import decorator
31
31
32 from IPython.parallel import util
32 from IPython.parallel import util
33 from IPython.parallel.controller.dependency import Dependency, dependent
33 from IPython.parallel.controller.dependency import Dependency, dependent
34
34
35 from . import map as Map
35 from . import map as Map
36 from .asyncresult import AsyncResult, AsyncMapResult
36 from .asyncresult import AsyncResult, AsyncMapResult
37 from .remotefunction import ParallelFunction, parallel, remote, getname
37 from .remotefunction import ParallelFunction, parallel, remote, getname
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Decorators
40 # Decorators
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 @decorator
43 @decorator
44 def save_ids(f, self, *args, **kwargs):
44 def save_ids(f, self, *args, **kwargs):
45 """Keep our history and outstanding attributes up to date after a method call."""
45 """Keep our history and outstanding attributes up to date after a method call."""
46 n_previous = len(self.client.history)
46 n_previous = len(self.client.history)
47 try:
47 try:
48 ret = f(self, *args, **kwargs)
48 ret = f(self, *args, **kwargs)
49 finally:
49 finally:
50 nmsgs = len(self.client.history) - n_previous
50 nmsgs = len(self.client.history) - n_previous
51 msg_ids = self.client.history[-nmsgs:]
51 msg_ids = self.client.history[-nmsgs:]
52 self.history.extend(msg_ids)
52 self.history.extend(msg_ids)
53 map(self.outstanding.add, msg_ids)
53 map(self.outstanding.add, msg_ids)
54 return ret
54 return ret
55
55
56 @decorator
56 @decorator
57 def sync_results(f, self, *args, **kwargs):
57 def sync_results(f, self, *args, **kwargs):
58 """sync relevant results from self.client to our results attribute."""
58 """sync relevant results from self.client to our results attribute."""
59 ret = f(self, *args, **kwargs)
59 ret = f(self, *args, **kwargs)
60 delta = self.outstanding.difference(self.client.outstanding)
60 delta = self.outstanding.difference(self.client.outstanding)
61 completed = self.outstanding.intersection(delta)
61 completed = self.outstanding.intersection(delta)
62 self.outstanding = self.outstanding.difference(completed)
62 self.outstanding = self.outstanding.difference(completed)
63 for msg_id in completed:
64 self.results[msg_id] = self.client.results[msg_id]
65 return ret
63 return ret
66
64
67 @decorator
65 @decorator
68 def spin_after(f, self, *args, **kwargs):
66 def spin_after(f, self, *args, **kwargs):
69 """call spin after the method."""
67 """call spin after the method."""
70 ret = f(self, *args, **kwargs)
68 ret = f(self, *args, **kwargs)
71 self.spin()
69 self.spin()
72 return ret
70 return ret
73
71
74 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
75 # Classes
73 # Classes
76 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
77
75
78 @skip_doctest
76 @skip_doctest
79 class View(HasTraits):
77 class View(HasTraits):
80 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
78 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
81
79
82 Don't use this class, use subclasses.
80 Don't use this class, use subclasses.
83
81
84 Methods
82 Methods
85 -------
83 -------
86
84
87 spin
85 spin
88 flushes incoming results and registration state changes
86 flushes incoming results and registration state changes
89 control methods spin, and requesting `ids` also ensures up to date
87 control methods spin, and requesting `ids` also ensures up to date
90
88
91 wait
89 wait
92 wait on one or more msg_ids
90 wait on one or more msg_ids
93
91
94 execution methods
92 execution methods
95 apply
93 apply
96 legacy: execute, run
94 legacy: execute, run
97
95
98 data movement
96 data movement
99 push, pull, scatter, gather
97 push, pull, scatter, gather
100
98
101 query methods
99 query methods
102 get_result, queue_status, purge_results, result_status
100 get_result, queue_status, purge_results, result_status
103
101
104 control methods
102 control methods
105 abort, shutdown
103 abort, shutdown
106
104
107 """
105 """
108 # flags
106 # flags
109 block=Bool(False)
107 block=Bool(False)
110 track=Bool(True)
108 track=Bool(True)
111 targets = Any()
109 targets = Any()
112
110
113 history=List()
111 history=List()
114 outstanding = Set()
112 outstanding = Set()
115 results = Dict()
113 results = Dict()
116 client = Instance('IPython.parallel.Client')
114 client = Instance('IPython.parallel.Client')
117
115
118 _socket = Instance('zmq.Socket')
116 _socket = Instance('zmq.Socket')
119 _flag_names = List(['targets', 'block', 'track'])
117 _flag_names = List(['targets', 'block', 'track'])
120 _targets = Any()
118 _targets = Any()
121 _idents = Any()
119 _idents = Any()
122
120
123 def __init__(self, client=None, socket=None, **flags):
121 def __init__(self, client=None, socket=None, **flags):
124 super(View, self).__init__(client=client, _socket=socket)
122 super(View, self).__init__(client=client, _socket=socket)
123 self.results = client.results
125 self.block = client.block
124 self.block = client.block
126
125
127 self.set_flags(**flags)
126 self.set_flags(**flags)
128
127
129 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
128 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
130
129
131 def __repr__(self):
130 def __repr__(self):
132 strtargets = str(self.targets)
131 strtargets = str(self.targets)
133 if len(strtargets) > 16:
132 if len(strtargets) > 16:
134 strtargets = strtargets[:12]+'...]'
133 strtargets = strtargets[:12]+'...]'
135 return "<%s %s>"%(self.__class__.__name__, strtargets)
134 return "<%s %s>"%(self.__class__.__name__, strtargets)
136
135
137 def __len__(self):
136 def __len__(self):
138 if isinstance(self.targets, list):
137 if isinstance(self.targets, list):
139 return len(self.targets)
138 return len(self.targets)
140 elif isinstance(self.targets, int):
139 elif isinstance(self.targets, int):
141 return 1
140 return 1
142 else:
141 else:
143 return len(self.client)
142 return len(self.client)
144
143
145 def set_flags(self, **kwargs):
144 def set_flags(self, **kwargs):
146 """set my attribute flags by keyword.
145 """set my attribute flags by keyword.
147
146
148 Views determine behavior with a few attributes (`block`, `track`, etc.).
147 Views determine behavior with a few attributes (`block`, `track`, etc.).
149 These attributes can be set all at once by name with this method.
148 These attributes can be set all at once by name with this method.
150
149
151 Parameters
150 Parameters
152 ----------
151 ----------
153
152
154 block : bool
153 block : bool
155 whether to wait for results
154 whether to wait for results
156 track : bool
155 track : bool
157 whether to create a MessageTracker to allow the user to
156 whether to create a MessageTracker to allow the user to
158 safely edit after arrays and buffers during non-copying
157 safely edit after arrays and buffers during non-copying
159 sends.
158 sends.
160 """
159 """
161 for name, value in kwargs.iteritems():
160 for name, value in kwargs.iteritems():
162 if name not in self._flag_names:
161 if name not in self._flag_names:
163 raise KeyError("Invalid name: %r"%name)
162 raise KeyError("Invalid name: %r"%name)
164 else:
163 else:
165 setattr(self, name, value)
164 setattr(self, name, value)
166
165
167 @contextmanager
166 @contextmanager
168 def temp_flags(self, **kwargs):
167 def temp_flags(self, **kwargs):
169 """temporarily set flags, for use in `with` statements.
168 """temporarily set flags, for use in `with` statements.
170
169
171 See set_flags for permanent setting of flags
170 See set_flags for permanent setting of flags
172
171
173 Examples
172 Examples
174 --------
173 --------
175
174
176 >>> view.track=False
175 >>> view.track=False
177 ...
176 ...
178 >>> with view.temp_flags(track=True):
177 >>> with view.temp_flags(track=True):
179 ... ar = view.apply(dostuff, my_big_array)
178 ... ar = view.apply(dostuff, my_big_array)
180 ... ar.tracker.wait() # wait for send to finish
179 ... ar.tracker.wait() # wait for send to finish
181 >>> view.track
180 >>> view.track
182 False
181 False
183
182
184 """
183 """
185 # preflight: save flags, and set temporaries
184 # preflight: save flags, and set temporaries
186 saved_flags = {}
185 saved_flags = {}
187 for f in self._flag_names:
186 for f in self._flag_names:
188 saved_flags[f] = getattr(self, f)
187 saved_flags[f] = getattr(self, f)
189 self.set_flags(**kwargs)
188 self.set_flags(**kwargs)
190 # yield to the with-statement block
189 # yield to the with-statement block
191 try:
190 try:
192 yield
191 yield
193 finally:
192 finally:
194 # postflight: restore saved flags
193 # postflight: restore saved flags
195 self.set_flags(**saved_flags)
194 self.set_flags(**saved_flags)
196
195
197
196
198 #----------------------------------------------------------------
197 #----------------------------------------------------------------
199 # apply
198 # apply
200 #----------------------------------------------------------------
199 #----------------------------------------------------------------
201
200
202 @sync_results
201 @sync_results
203 @save_ids
202 @save_ids
204 def _really_apply(self, f, args, kwargs, block=None, **options):
203 def _really_apply(self, f, args, kwargs, block=None, **options):
205 """wrapper for client.send_apply_request"""
204 """wrapper for client.send_apply_request"""
206 raise NotImplementedError("Implement in subclasses")
205 raise NotImplementedError("Implement in subclasses")
207
206
208 def apply(self, f, *args, **kwargs):
207 def apply(self, f, *args, **kwargs):
209 """calls f(*args, **kwargs) on remote engines, returning the result.
208 """calls f(*args, **kwargs) on remote engines, returning the result.
210
209
211 This method sets all apply flags via this View's attributes.
210 This method sets all apply flags via this View's attributes.
212
211
213 if self.block is False:
212 if self.block is False:
214 returns AsyncResult
213 returns AsyncResult
215 else:
214 else:
216 returns actual result of f(*args, **kwargs)
215 returns actual result of f(*args, **kwargs)
217 """
216 """
218 return self._really_apply(f, args, kwargs)
217 return self._really_apply(f, args, kwargs)
219
218
220 def apply_async(self, f, *args, **kwargs):
219 def apply_async(self, f, *args, **kwargs):
221 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
220 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
222
221
223 returns AsyncResult
222 returns AsyncResult
224 """
223 """
225 return self._really_apply(f, args, kwargs, block=False)
224 return self._really_apply(f, args, kwargs, block=False)
226
225
227 @spin_after
226 @spin_after
228 def apply_sync(self, f, *args, **kwargs):
227 def apply_sync(self, f, *args, **kwargs):
229 """calls f(*args, **kwargs) on remote engines in a blocking manner,
228 """calls f(*args, **kwargs) on remote engines in a blocking manner,
230 returning the result.
229 returning the result.
231
230
232 returns: actual result of f(*args, **kwargs)
231 returns: actual result of f(*args, **kwargs)
233 """
232 """
234 return self._really_apply(f, args, kwargs, block=True)
233 return self._really_apply(f, args, kwargs, block=True)
235
234
236 #----------------------------------------------------------------
235 #----------------------------------------------------------------
237 # wrappers for client and control methods
236 # wrappers for client and control methods
238 #----------------------------------------------------------------
237 #----------------------------------------------------------------
239 @sync_results
238 @sync_results
240 def spin(self):
239 def spin(self):
241 """spin the client, and sync"""
240 """spin the client, and sync"""
242 self.client.spin()
241 self.client.spin()
243
242
244 @sync_results
243 @sync_results
245 def wait(self, jobs=None, timeout=-1):
244 def wait(self, jobs=None, timeout=-1):
246 """waits on one or more `jobs`, for up to `timeout` seconds.
245 """waits on one or more `jobs`, for up to `timeout` seconds.
247
246
248 Parameters
247 Parameters
249 ----------
248 ----------
250
249
251 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
250 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
252 ints are indices to self.history
251 ints are indices to self.history
253 strs are msg_ids
252 strs are msg_ids
254 default: wait on all outstanding messages
253 default: wait on all outstanding messages
255 timeout : float
254 timeout : float
256 a time in seconds, after which to give up.
255 a time in seconds, after which to give up.
257 default is -1, which means no timeout
256 default is -1, which means no timeout
258
257
259 Returns
258 Returns
260 -------
259 -------
261
260
262 True : when all msg_ids are done
261 True : when all msg_ids are done
263 False : timeout reached, some msg_ids still outstanding
262 False : timeout reached, some msg_ids still outstanding
264 """
263 """
265 if jobs is None:
264 if jobs is None:
266 jobs = self.history
265 jobs = self.history
267 return self.client.wait(jobs, timeout)
266 return self.client.wait(jobs, timeout)
268
267
269 def abort(self, jobs=None, targets=None, block=None):
268 def abort(self, jobs=None, targets=None, block=None):
270 """Abort jobs on my engines.
269 """Abort jobs on my engines.
271
270
272 Parameters
271 Parameters
273 ----------
272 ----------
274
273
275 jobs : None, str, list of strs, optional
274 jobs : None, str, list of strs, optional
276 if None: abort all jobs.
275 if None: abort all jobs.
277 else: abort specific msg_id(s).
276 else: abort specific msg_id(s).
278 """
277 """
279 block = block if block is not None else self.block
278 block = block if block is not None else self.block
280 targets = targets if targets is not None else self.targets
279 targets = targets if targets is not None else self.targets
281 jobs = jobs if jobs is not None else list(self.outstanding)
280 jobs = jobs if jobs is not None else list(self.outstanding)
282
281
283 return self.client.abort(jobs=jobs, targets=targets, block=block)
282 return self.client.abort(jobs=jobs, targets=targets, block=block)
284
283
285 def queue_status(self, targets=None, verbose=False):
284 def queue_status(self, targets=None, verbose=False):
286 """Fetch the Queue status of my engines"""
285 """Fetch the Queue status of my engines"""
287 targets = targets if targets is not None else self.targets
286 targets = targets if targets is not None else self.targets
288 return self.client.queue_status(targets=targets, verbose=verbose)
287 return self.client.queue_status(targets=targets, verbose=verbose)
289
288
290 def purge_results(self, jobs=[], targets=[]):
289 def purge_results(self, jobs=[], targets=[]):
291 """Instruct the controller to forget specific results."""
290 """Instruct the controller to forget specific results."""
292 if targets is None or targets == 'all':
291 if targets is None or targets == 'all':
293 targets = self.targets
292 targets = self.targets
294 return self.client.purge_results(jobs=jobs, targets=targets)
293 return self.client.purge_results(jobs=jobs, targets=targets)
295
294
296 def shutdown(self, targets=None, restart=False, hub=False, block=None):
295 def shutdown(self, targets=None, restart=False, hub=False, block=None):
297 """Terminates one or more engine processes, optionally including the hub.
296 """Terminates one or more engine processes, optionally including the hub.
298 """
297 """
299 block = self.block if block is None else block
298 block = self.block if block is None else block
300 if targets is None or targets == 'all':
299 if targets is None or targets == 'all':
301 targets = self.targets
300 targets = self.targets
302 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
301 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
303
302
304 @spin_after
303 @spin_after
305 def get_result(self, indices_or_msg_ids=None):
304 def get_result(self, indices_or_msg_ids=None):
306 """return one or more results, specified by history index or msg_id.
305 """return one or more results, specified by history index or msg_id.
307
306
308 See client.get_result for details.
307 See client.get_result for details.
309
308
310 """
309 """
311
310
312 if indices_or_msg_ids is None:
311 if indices_or_msg_ids is None:
313 indices_or_msg_ids = -1
312 indices_or_msg_ids = -1
314 if isinstance(indices_or_msg_ids, int):
313 if isinstance(indices_or_msg_ids, int):
315 indices_or_msg_ids = self.history[indices_or_msg_ids]
314 indices_or_msg_ids = self.history[indices_or_msg_ids]
316 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
315 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
317 indices_or_msg_ids = list(indices_or_msg_ids)
316 indices_or_msg_ids = list(indices_or_msg_ids)
318 for i,index in enumerate(indices_or_msg_ids):
317 for i,index in enumerate(indices_or_msg_ids):
319 if isinstance(index, int):
318 if isinstance(index, int):
320 indices_or_msg_ids[i] = self.history[index]
319 indices_or_msg_ids[i] = self.history[index]
321 return self.client.get_result(indices_or_msg_ids)
320 return self.client.get_result(indices_or_msg_ids)
322
321
323 #-------------------------------------------------------------------
322 #-------------------------------------------------------------------
324 # Map
323 # Map
325 #-------------------------------------------------------------------
324 #-------------------------------------------------------------------
326
325
327 def map(self, f, *sequences, **kwargs):
326 def map(self, f, *sequences, **kwargs):
328 """override in subclasses"""
327 """override in subclasses"""
329 raise NotImplementedError
328 raise NotImplementedError
330
329
331 def map_async(self, f, *sequences, **kwargs):
330 def map_async(self, f, *sequences, **kwargs):
332 """Parallel version of builtin `map`, using this view's engines.
331 """Parallel version of builtin `map`, using this view's engines.
333
332
334 This is equivalent to map(...block=False)
333 This is equivalent to map(...block=False)
335
334
336 See `self.map` for details.
335 See `self.map` for details.
337 """
336 """
338 if 'block' in kwargs:
337 if 'block' in kwargs:
339 raise TypeError("map_async doesn't take a `block` keyword argument.")
338 raise TypeError("map_async doesn't take a `block` keyword argument.")
340 kwargs['block'] = False
339 kwargs['block'] = False
341 return self.map(f,*sequences,**kwargs)
340 return self.map(f,*sequences,**kwargs)
342
341
343 def map_sync(self, f, *sequences, **kwargs):
342 def map_sync(self, f, *sequences, **kwargs):
344 """Parallel version of builtin `map`, using this view's engines.
343 """Parallel version of builtin `map`, using this view's engines.
345
344
346 This is equivalent to map(...block=True)
345 This is equivalent to map(...block=True)
347
346
348 See `self.map` for details.
347 See `self.map` for details.
349 """
348 """
350 if 'block' in kwargs:
349 if 'block' in kwargs:
351 raise TypeError("map_sync doesn't take a `block` keyword argument.")
350 raise TypeError("map_sync doesn't take a `block` keyword argument.")
352 kwargs['block'] = True
351 kwargs['block'] = True
353 return self.map(f,*sequences,**kwargs)
352 return self.map(f,*sequences,**kwargs)
354
353
355 def imap(self, f, *sequences, **kwargs):
354 def imap(self, f, *sequences, **kwargs):
356 """Parallel version of `itertools.imap`.
355 """Parallel version of `itertools.imap`.
357
356
358 See `self.map` for details.
357 See `self.map` for details.
359
358
360 """
359 """
361
360
362 return iter(self.map_async(f,*sequences, **kwargs))
361 return iter(self.map_async(f,*sequences, **kwargs))
363
362
364 #-------------------------------------------------------------------
363 #-------------------------------------------------------------------
365 # Decorators
364 # Decorators
366 #-------------------------------------------------------------------
365 #-------------------------------------------------------------------
367
366
368 def remote(self, block=True, **flags):
367 def remote(self, block=True, **flags):
369 """Decorator for making a RemoteFunction"""
368 """Decorator for making a RemoteFunction"""
370 block = self.block if block is None else block
369 block = self.block if block is None else block
371 return remote(self, block=block, **flags)
370 return remote(self, block=block, **flags)
372
371
373 def parallel(self, dist='b', block=None, **flags):
372 def parallel(self, dist='b', block=None, **flags):
374 """Decorator for making a ParallelFunction"""
373 """Decorator for making a ParallelFunction"""
375 block = self.block if block is None else block
374 block = self.block if block is None else block
376 return parallel(self, dist=dist, block=block, **flags)
375 return parallel(self, dist=dist, block=block, **flags)
377
376
378 @skip_doctest
377 @skip_doctest
379 class DirectView(View):
378 class DirectView(View):
380 """Direct Multiplexer View of one or more engines.
379 """Direct Multiplexer View of one or more engines.
381
380
382 These are created via indexed access to a client:
381 These are created via indexed access to a client:
383
382
384 >>> dv_1 = client[1]
383 >>> dv_1 = client[1]
385 >>> dv_all = client[:]
384 >>> dv_all = client[:]
386 >>> dv_even = client[::2]
385 >>> dv_even = client[::2]
387 >>> dv_some = client[1:3]
386 >>> dv_some = client[1:3]
388
387
389 This object provides dictionary access to engine namespaces:
388 This object provides dictionary access to engine namespaces:
390
389
391 # push a=5:
390 # push a=5:
392 >>> dv['a'] = 5
391 >>> dv['a'] = 5
393 # pull 'foo':
392 # pull 'foo':
394 >>> db['foo']
393 >>> db['foo']
395
394
396 """
395 """
397
396
398 def __init__(self, client=None, socket=None, targets=None):
397 def __init__(self, client=None, socket=None, targets=None):
399 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
398 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
400
399
401 @property
400 @property
402 def importer(self):
401 def importer(self):
403 """sync_imports(local=True) as a property.
402 """sync_imports(local=True) as a property.
404
403
405 See sync_imports for details.
404 See sync_imports for details.
406
405
407 """
406 """
408 return self.sync_imports(True)
407 return self.sync_imports(True)
409
408
410 @contextmanager
409 @contextmanager
411 def sync_imports(self, local=True, quiet=False):
410 def sync_imports(self, local=True, quiet=False):
412 """Context Manager for performing simultaneous local and remote imports.
411 """Context Manager for performing simultaneous local and remote imports.
413
412
414 'import x as y' will *not* work. The 'as y' part will simply be ignored.
413 'import x as y' will *not* work. The 'as y' part will simply be ignored.
415
414
416 If `local=True`, then the package will also be imported locally.
415 If `local=True`, then the package will also be imported locally.
417
416
418 If `quiet=True`, no output will be produced when attempting remote
417 If `quiet=True`, no output will be produced when attempting remote
419 imports.
418 imports.
420
419
421 Note that remote-only (`local=False`) imports have not been implemented.
420 Note that remote-only (`local=False`) imports have not been implemented.
422
421
423 >>> with view.sync_imports():
422 >>> with view.sync_imports():
424 ... from numpy import recarray
423 ... from numpy import recarray
425 importing recarray from numpy on engine(s)
424 importing recarray from numpy on engine(s)
426
425
427 """
426 """
428 import __builtin__
427 import __builtin__
429 local_import = __builtin__.__import__
428 local_import = __builtin__.__import__
430 modules = set()
429 modules = set()
431 results = []
430 results = []
432 @util.interactive
431 @util.interactive
433 def remote_import(name, fromlist, level):
432 def remote_import(name, fromlist, level):
434 """the function to be passed to apply, that actually performs the import
433 """the function to be passed to apply, that actually performs the import
435 on the engine, and loads up the user namespace.
434 on the engine, and loads up the user namespace.
436 """
435 """
437 import sys
436 import sys
438 user_ns = globals()
437 user_ns = globals()
439 mod = __import__(name, fromlist=fromlist, level=level)
438 mod = __import__(name, fromlist=fromlist, level=level)
440 if fromlist:
439 if fromlist:
441 for key in fromlist:
440 for key in fromlist:
442 user_ns[key] = getattr(mod, key)
441 user_ns[key] = getattr(mod, key)
443 else:
442 else:
444 user_ns[name] = sys.modules[name]
443 user_ns[name] = sys.modules[name]
445
444
446 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
445 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
447 """the drop-in replacement for __import__, that optionally imports
446 """the drop-in replacement for __import__, that optionally imports
448 locally as well.
447 locally as well.
449 """
448 """
450 # don't override nested imports
449 # don't override nested imports
451 save_import = __builtin__.__import__
450 save_import = __builtin__.__import__
452 __builtin__.__import__ = local_import
451 __builtin__.__import__ = local_import
453
452
454 if imp.lock_held():
453 if imp.lock_held():
455 # this is a side-effect import, don't do it remotely, or even
454 # this is a side-effect import, don't do it remotely, or even
456 # ignore the local effects
455 # ignore the local effects
457 return local_import(name, globals, locals, fromlist, level)
456 return local_import(name, globals, locals, fromlist, level)
458
457
459 imp.acquire_lock()
458 imp.acquire_lock()
460 if local:
459 if local:
461 mod = local_import(name, globals, locals, fromlist, level)
460 mod = local_import(name, globals, locals, fromlist, level)
462 else:
461 else:
463 raise NotImplementedError("remote-only imports not yet implemented")
462 raise NotImplementedError("remote-only imports not yet implemented")
464 imp.release_lock()
463 imp.release_lock()
465
464
466 key = name+':'+','.join(fromlist or [])
465 key = name+':'+','.join(fromlist or [])
467 if level == -1 and key not in modules:
466 if level == -1 and key not in modules:
468 modules.add(key)
467 modules.add(key)
469 if not quiet:
468 if not quiet:
470 if fromlist:
469 if fromlist:
471 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
470 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
472 else:
471 else:
473 print "importing %s on engine(s)"%name
472 print "importing %s on engine(s)"%name
474 results.append(self.apply_async(remote_import, name, fromlist, level))
473 results.append(self.apply_async(remote_import, name, fromlist, level))
475 # restore override
474 # restore override
476 __builtin__.__import__ = save_import
475 __builtin__.__import__ = save_import
477
476
478 return mod
477 return mod
479
478
480 # override __import__
479 # override __import__
481 __builtin__.__import__ = view_import
480 __builtin__.__import__ = view_import
482 try:
481 try:
483 # enter the block
482 # enter the block
484 yield
483 yield
485 except ImportError:
484 except ImportError:
486 if local:
485 if local:
487 raise
486 raise
488 else:
487 else:
489 # ignore import errors if not doing local imports
488 # ignore import errors if not doing local imports
490 pass
489 pass
491 finally:
490 finally:
492 # always restore __import__
491 # always restore __import__
493 __builtin__.__import__ = local_import
492 __builtin__.__import__ = local_import
494
493
495 for r in results:
494 for r in results:
496 # raise possible remote ImportErrors here
495 # raise possible remote ImportErrors here
497 r.get()
496 r.get()
498
497
499
498
500 @sync_results
499 @sync_results
501 @save_ids
500 @save_ids
502 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
501 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
503 """calls f(*args, **kwargs) on remote engines, returning the result.
502 """calls f(*args, **kwargs) on remote engines, returning the result.
504
503
505 This method sets all of `apply`'s flags via this View's attributes.
504 This method sets all of `apply`'s flags via this View's attributes.
506
505
507 Parameters
506 Parameters
508 ----------
507 ----------
509
508
510 f : callable
509 f : callable
511
510
512 args : list [default: empty]
511 args : list [default: empty]
513
512
514 kwargs : dict [default: empty]
513 kwargs : dict [default: empty]
515
514
516 targets : target list [default: self.targets]
515 targets : target list [default: self.targets]
517 where to run
516 where to run
518 block : bool [default: self.block]
517 block : bool [default: self.block]
519 whether to block
518 whether to block
520 track : bool [default: self.track]
519 track : bool [default: self.track]
521 whether to ask zmq to track the message, for safe non-copying sends
520 whether to ask zmq to track the message, for safe non-copying sends
522
521
523 Returns
522 Returns
524 -------
523 -------
525
524
526 if self.block is False:
525 if self.block is False:
527 returns AsyncResult
526 returns AsyncResult
528 else:
527 else:
529 returns actual result of f(*args, **kwargs) on the engine(s)
528 returns actual result of f(*args, **kwargs) on the engine(s)
530 This will be a list of self.targets is also a list (even length 1), or
529 This will be a list of self.targets is also a list (even length 1), or
531 the single result if self.targets is an integer engine id
530 the single result if self.targets is an integer engine id
532 """
531 """
533 args = [] if args is None else args
532 args = [] if args is None else args
534 kwargs = {} if kwargs is None else kwargs
533 kwargs = {} if kwargs is None else kwargs
535 block = self.block if block is None else block
534 block = self.block if block is None else block
536 track = self.track if track is None else track
535 track = self.track if track is None else track
537 targets = self.targets if targets is None else targets
536 targets = self.targets if targets is None else targets
538
537
539 _idents = self.client._build_targets(targets)[0]
538 _idents = self.client._build_targets(targets)[0]
540 msg_ids = []
539 msg_ids = []
541 trackers = []
540 trackers = []
542 for ident in _idents:
541 for ident in _idents:
543 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
542 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
544 ident=ident)
543 ident=ident)
545 if track:
544 if track:
546 trackers.append(msg['tracker'])
545 trackers.append(msg['tracker'])
547 msg_ids.append(msg['header']['msg_id'])
546 msg_ids.append(msg['header']['msg_id'])
548 tracker = None if track is False else zmq.MessageTracker(*trackers)
547 tracker = None if track is False else zmq.MessageTracker(*trackers)
549 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
548 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
550 if block:
549 if block:
551 try:
550 try:
552 return ar.get()
551 return ar.get()
553 except KeyboardInterrupt:
552 except KeyboardInterrupt:
554 pass
553 pass
555 return ar
554 return ar
556
555
557
556
558 @spin_after
557 @spin_after
559 def map(self, f, *sequences, **kwargs):
558 def map(self, f, *sequences, **kwargs):
560 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
559 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
561
560
562 Parallel version of builtin `map`, using this View's `targets`.
561 Parallel version of builtin `map`, using this View's `targets`.
563
562
564 There will be one task per target, so work will be chunked
563 There will be one task per target, so work will be chunked
565 if the sequences are longer than `targets`.
564 if the sequences are longer than `targets`.
566
565
567 Results can be iterated as they are ready, but will become available in chunks.
566 Results can be iterated as they are ready, but will become available in chunks.
568
567
569 Parameters
568 Parameters
570 ----------
569 ----------
571
570
572 f : callable
571 f : callable
573 function to be mapped
572 function to be mapped
574 *sequences: one or more sequences of matching length
573 *sequences: one or more sequences of matching length
575 the sequences to be distributed and passed to `f`
574 the sequences to be distributed and passed to `f`
576 block : bool
575 block : bool
577 whether to wait for the result or not [default self.block]
576 whether to wait for the result or not [default self.block]
578
577
579 Returns
578 Returns
580 -------
579 -------
581
580
582 if block=False:
581 if block=False:
583 AsyncMapResult
582 AsyncMapResult
584 An object like AsyncResult, but which reassembles the sequence of results
583 An object like AsyncResult, but which reassembles the sequence of results
585 into a single list. AsyncMapResults can be iterated through before all
584 into a single list. AsyncMapResults can be iterated through before all
586 results are complete.
585 results are complete.
587 else:
586 else:
588 list
587 list
589 the result of map(f,*sequences)
588 the result of map(f,*sequences)
590 """
589 """
591
590
592 block = kwargs.pop('block', self.block)
591 block = kwargs.pop('block', self.block)
593 for k in kwargs.keys():
592 for k in kwargs.keys():
594 if k not in ['block', 'track']:
593 if k not in ['block', 'track']:
595 raise TypeError("invalid keyword arg, %r"%k)
594 raise TypeError("invalid keyword arg, %r"%k)
596
595
597 assert len(sequences) > 0, "must have some sequences to map onto!"
596 assert len(sequences) > 0, "must have some sequences to map onto!"
598 pf = ParallelFunction(self, f, block=block, **kwargs)
597 pf = ParallelFunction(self, f, block=block, **kwargs)
599 return pf.map(*sequences)
598 return pf.map(*sequences)
600
599
601 @sync_results
600 @sync_results
602 @save_ids
601 @save_ids
603 def execute(self, code, silent=True, targets=None, block=None):
602 def execute(self, code, silent=True, targets=None, block=None):
604 """Executes `code` on `targets` in blocking or nonblocking manner.
603 """Executes `code` on `targets` in blocking or nonblocking manner.
605
604
606 ``execute`` is always `bound` (affects engine namespace)
605 ``execute`` is always `bound` (affects engine namespace)
607
606
608 Parameters
607 Parameters
609 ----------
608 ----------
610
609
611 code : str
610 code : str
612 the code string to be executed
611 the code string to be executed
613 block : bool
612 block : bool
614 whether or not to wait until done to return
613 whether or not to wait until done to return
615 default: self.block
614 default: self.block
616 """
615 """
617 block = self.block if block is None else block
616 block = self.block if block is None else block
618 targets = self.targets if targets is None else targets
617 targets = self.targets if targets is None else targets
619
618
620 _idents = self.client._build_targets(targets)[0]
619 _idents = self.client._build_targets(targets)[0]
621 msg_ids = []
620 msg_ids = []
622 trackers = []
621 trackers = []
623 for ident in _idents:
622 for ident in _idents:
624 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
623 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
625 msg_ids.append(msg['header']['msg_id'])
624 msg_ids.append(msg['header']['msg_id'])
626 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets)
625 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets)
627 if block:
626 if block:
628 try:
627 try:
629 ar.get()
628 ar.get()
630 except KeyboardInterrupt:
629 except KeyboardInterrupt:
631 pass
630 pass
632 return ar
631 return ar
633
632
634 def run(self, filename, targets=None, block=None):
633 def run(self, filename, targets=None, block=None):
635 """Execute contents of `filename` on my engine(s).
634 """Execute contents of `filename` on my engine(s).
636
635
637 This simply reads the contents of the file and calls `execute`.
636 This simply reads the contents of the file and calls `execute`.
638
637
639 Parameters
638 Parameters
640 ----------
639 ----------
641
640
642 filename : str
641 filename : str
643 The path to the file
642 The path to the file
644 targets : int/str/list of ints/strs
643 targets : int/str/list of ints/strs
645 the engines on which to execute
644 the engines on which to execute
646 default : all
645 default : all
647 block : bool
646 block : bool
648 whether or not to wait until done
647 whether or not to wait until done
649 default: self.block
648 default: self.block
650
649
651 """
650 """
652 with open(filename, 'r') as f:
651 with open(filename, 'r') as f:
653 # add newline in case of trailing indented whitespace
652 # add newline in case of trailing indented whitespace
654 # which will cause SyntaxError
653 # which will cause SyntaxError
655 code = f.read()+'\n'
654 code = f.read()+'\n'
656 return self.execute(code, block=block, targets=targets)
655 return self.execute(code, block=block, targets=targets)
657
656
658 def update(self, ns):
657 def update(self, ns):
659 """update remote namespace with dict `ns`
658 """update remote namespace with dict `ns`
660
659
661 See `push` for details.
660 See `push` for details.
662 """
661 """
663 return self.push(ns, block=self.block, track=self.track)
662 return self.push(ns, block=self.block, track=self.track)
664
663
665 def push(self, ns, targets=None, block=None, track=None):
664 def push(self, ns, targets=None, block=None, track=None):
666 """update remote namespace with dict `ns`
665 """update remote namespace with dict `ns`
667
666
668 Parameters
667 Parameters
669 ----------
668 ----------
670
669
671 ns : dict
670 ns : dict
672 dict of keys with which to update engine namespace(s)
671 dict of keys with which to update engine namespace(s)
673 block : bool [default : self.block]
672 block : bool [default : self.block]
674 whether to wait to be notified of engine receipt
673 whether to wait to be notified of engine receipt
675
674
676 """
675 """
677
676
678 block = block if block is not None else self.block
677 block = block if block is not None else self.block
679 track = track if track is not None else self.track
678 track = track if track is not None else self.track
680 targets = targets if targets is not None else self.targets
679 targets = targets if targets is not None else self.targets
681 # applier = self.apply_sync if block else self.apply_async
680 # applier = self.apply_sync if block else self.apply_async
682 if not isinstance(ns, dict):
681 if not isinstance(ns, dict):
683 raise TypeError("Must be a dict, not %s"%type(ns))
682 raise TypeError("Must be a dict, not %s"%type(ns))
684 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
683 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
685
684
686 def get(self, key_s):
685 def get(self, key_s):
687 """get object(s) by `key_s` from remote namespace
686 """get object(s) by `key_s` from remote namespace
688
687
689 see `pull` for details.
688 see `pull` for details.
690 """
689 """
691 # block = block if block is not None else self.block
690 # block = block if block is not None else self.block
692 return self.pull(key_s, block=True)
691 return self.pull(key_s, block=True)
693
692
694 def pull(self, names, targets=None, block=None):
693 def pull(self, names, targets=None, block=None):
695 """get object(s) by `name` from remote namespace
694 """get object(s) by `name` from remote namespace
696
695
697 will return one object if it is a key.
696 will return one object if it is a key.
698 can also take a list of keys, in which case it will return a list of objects.
697 can also take a list of keys, in which case it will return a list of objects.
699 """
698 """
700 block = block if block is not None else self.block
699 block = block if block is not None else self.block
701 targets = targets if targets is not None else self.targets
700 targets = targets if targets is not None else self.targets
702 applier = self.apply_sync if block else self.apply_async
701 applier = self.apply_sync if block else self.apply_async
703 if isinstance(names, basestring):
702 if isinstance(names, basestring):
704 pass
703 pass
705 elif isinstance(names, (list,tuple,set)):
704 elif isinstance(names, (list,tuple,set)):
706 for key in names:
705 for key in names:
707 if not isinstance(key, basestring):
706 if not isinstance(key, basestring):
708 raise TypeError("keys must be str, not type %r"%type(key))
707 raise TypeError("keys must be str, not type %r"%type(key))
709 else:
708 else:
710 raise TypeError("names must be strs, not %r"%names)
709 raise TypeError("names must be strs, not %r"%names)
711 return self._really_apply(util._pull, (names,), block=block, targets=targets)
710 return self._really_apply(util._pull, (names,), block=block, targets=targets)
712
711
713 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
712 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
714 """
713 """
715 Partition a Python sequence and send the partitions to a set of engines.
714 Partition a Python sequence and send the partitions to a set of engines.
716 """
715 """
717 block = block if block is not None else self.block
716 block = block if block is not None else self.block
718 track = track if track is not None else self.track
717 track = track if track is not None else self.track
719 targets = targets if targets is not None else self.targets
718 targets = targets if targets is not None else self.targets
720
719
721 # construct integer ID list:
720 # construct integer ID list:
722 targets = self.client._build_targets(targets)[1]
721 targets = self.client._build_targets(targets)[1]
723
722
724 mapObject = Map.dists[dist]()
723 mapObject = Map.dists[dist]()
725 nparts = len(targets)
724 nparts = len(targets)
726 msg_ids = []
725 msg_ids = []
727 trackers = []
726 trackers = []
728 for index, engineid in enumerate(targets):
727 for index, engineid in enumerate(targets):
729 partition = mapObject.getPartition(seq, index, nparts)
728 partition = mapObject.getPartition(seq, index, nparts)
730 if flatten and len(partition) == 1:
729 if flatten and len(partition) == 1:
731 ns = {key: partition[0]}
730 ns = {key: partition[0]}
732 else:
731 else:
733 ns = {key: partition}
732 ns = {key: partition}
734 r = self.push(ns, block=False, track=track, targets=engineid)
733 r = self.push(ns, block=False, track=track, targets=engineid)
735 msg_ids.extend(r.msg_ids)
734 msg_ids.extend(r.msg_ids)
736 if track:
735 if track:
737 trackers.append(r._tracker)
736 trackers.append(r._tracker)
738
737
739 if track:
738 if track:
740 tracker = zmq.MessageTracker(*trackers)
739 tracker = zmq.MessageTracker(*trackers)
741 else:
740 else:
742 tracker = None
741 tracker = None
743
742
744 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
743 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
745 if block:
744 if block:
746 r.wait()
745 r.wait()
747 else:
746 else:
748 return r
747 return r
749
748
750 @sync_results
749 @sync_results
751 @save_ids
750 @save_ids
752 def gather(self, key, dist='b', targets=None, block=None):
751 def gather(self, key, dist='b', targets=None, block=None):
753 """
752 """
754 Gather a partitioned sequence on a set of engines as a single local seq.
753 Gather a partitioned sequence on a set of engines as a single local seq.
755 """
754 """
756 block = block if block is not None else self.block
755 block = block if block is not None else self.block
757 targets = targets if targets is not None else self.targets
756 targets = targets if targets is not None else self.targets
758 mapObject = Map.dists[dist]()
757 mapObject = Map.dists[dist]()
759 msg_ids = []
758 msg_ids = []
760
759
761 # construct integer ID list:
760 # construct integer ID list:
762 targets = self.client._build_targets(targets)[1]
761 targets = self.client._build_targets(targets)[1]
763
762
764 for index, engineid in enumerate(targets):
763 for index, engineid in enumerate(targets):
765 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
764 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
766
765
767 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
766 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
768
767
769 if block:
768 if block:
770 try:
769 try:
771 return r.get()
770 return r.get()
772 except KeyboardInterrupt:
771 except KeyboardInterrupt:
773 pass
772 pass
774 return r
773 return r
775
774
776 def __getitem__(self, key):
775 def __getitem__(self, key):
777 return self.get(key)
776 return self.get(key)
778
777
779 def __setitem__(self,key, value):
778 def __setitem__(self,key, value):
780 self.update({key:value})
779 self.update({key:value})
781
780
782 def clear(self, targets=None, block=False):
781 def clear(self, targets=None, block=False):
783 """Clear the remote namespaces on my engines."""
782 """Clear the remote namespaces on my engines."""
784 block = block if block is not None else self.block
783 block = block if block is not None else self.block
785 targets = targets if targets is not None else self.targets
784 targets = targets if targets is not None else self.targets
786 return self.client.clear(targets=targets, block=block)
785 return self.client.clear(targets=targets, block=block)
787
786
788 def kill(self, targets=None, block=True):
787 def kill(self, targets=None, block=True):
789 """Kill my engines."""
788 """Kill my engines."""
790 block = block if block is not None else self.block
789 block = block if block is not None else self.block
791 targets = targets if targets is not None else self.targets
790 targets = targets if targets is not None else self.targets
792 return self.client.kill(targets=targets, block=block)
791 return self.client.kill(targets=targets, block=block)
793
792
794 #----------------------------------------
793 #----------------------------------------
795 # activate for %px, %autopx, etc. magics
794 # activate for %px, %autopx, etc. magics
796 #----------------------------------------
795 #----------------------------------------
797
796
798 def activate(self, suffix=''):
797 def activate(self, suffix=''):
799 """Activate IPython magics associated with this View
798 """Activate IPython magics associated with this View
800
799
801 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
800 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
802
801
803 Parameters
802 Parameters
804 ----------
803 ----------
805
804
806 suffix: str [default: '']
805 suffix: str [default: '']
807 The suffix, if any, for the magics. This allows you to have
806 The suffix, if any, for the magics. This allows you to have
808 multiple views associated with parallel magics at the same time.
807 multiple views associated with parallel magics at the same time.
809
808
810 e.g. ``rc[::2].activate(suffix='_even')`` will give you
809 e.g. ``rc[::2].activate(suffix='_even')`` will give you
811 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
810 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
812 on the even engines.
811 on the even engines.
813 """
812 """
814
813
815 from IPython.parallel.client.magics import ParallelMagics
814 from IPython.parallel.client.magics import ParallelMagics
816
815
817 try:
816 try:
818 # This is injected into __builtins__.
817 # This is injected into __builtins__.
819 ip = get_ipython()
818 ip = get_ipython()
820 except NameError:
819 except NameError:
821 print "The IPython parallel magics (%px, etc.) only work within IPython."
820 print "The IPython parallel magics (%px, etc.) only work within IPython."
822 return
821 return
823
822
824 M = ParallelMagics(ip, self, suffix)
823 M = ParallelMagics(ip, self, suffix)
825 ip.magics_manager.register(M)
824 ip.magics_manager.register(M)
826
825
827
826
828 @skip_doctest
827 @skip_doctest
829 class LoadBalancedView(View):
828 class LoadBalancedView(View):
830 """An load-balancing View that only executes via the Task scheduler.
829 """An load-balancing View that only executes via the Task scheduler.
831
830
832 Load-balanced views can be created with the client's `view` method:
831 Load-balanced views can be created with the client's `view` method:
833
832
834 >>> v = client.load_balanced_view()
833 >>> v = client.load_balanced_view()
835
834
836 or targets can be specified, to restrict the potential destinations:
835 or targets can be specified, to restrict the potential destinations:
837
836
838 >>> v = client.client.load_balanced_view([1,3])
837 >>> v = client.client.load_balanced_view([1,3])
839
838
840 which would restrict loadbalancing to between engines 1 and 3.
839 which would restrict loadbalancing to between engines 1 and 3.
841
840
842 """
841 """
843
842
844 follow=Any()
843 follow=Any()
845 after=Any()
844 after=Any()
846 timeout=CFloat()
845 timeout=CFloat()
847 retries = Integer(0)
846 retries = Integer(0)
848
847
849 _task_scheme = Any()
848 _task_scheme = Any()
850 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
849 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
851
850
852 def __init__(self, client=None, socket=None, **flags):
851 def __init__(self, client=None, socket=None, **flags):
853 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
852 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
854 self._task_scheme=client._task_scheme
853 self._task_scheme=client._task_scheme
855
854
856 def _validate_dependency(self, dep):
855 def _validate_dependency(self, dep):
857 """validate a dependency.
856 """validate a dependency.
858
857
859 For use in `set_flags`.
858 For use in `set_flags`.
860 """
859 """
861 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
860 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
862 return True
861 return True
863 elif isinstance(dep, (list,set, tuple)):
862 elif isinstance(dep, (list,set, tuple)):
864 for d in dep:
863 for d in dep:
865 if not isinstance(d, (basestring, AsyncResult)):
864 if not isinstance(d, (basestring, AsyncResult)):
866 return False
865 return False
867 elif isinstance(dep, dict):
866 elif isinstance(dep, dict):
868 if set(dep.keys()) != set(Dependency().as_dict().keys()):
867 if set(dep.keys()) != set(Dependency().as_dict().keys()):
869 return False
868 return False
870 if not isinstance(dep['msg_ids'], list):
869 if not isinstance(dep['msg_ids'], list):
871 return False
870 return False
872 for d in dep['msg_ids']:
871 for d in dep['msg_ids']:
873 if not isinstance(d, basestring):
872 if not isinstance(d, basestring):
874 return False
873 return False
875 else:
874 else:
876 return False
875 return False
877
876
878 return True
877 return True
879
878
880 def _render_dependency(self, dep):
879 def _render_dependency(self, dep):
881 """helper for building jsonable dependencies from various input forms."""
880 """helper for building jsonable dependencies from various input forms."""
882 if isinstance(dep, Dependency):
881 if isinstance(dep, Dependency):
883 return dep.as_dict()
882 return dep.as_dict()
884 elif isinstance(dep, AsyncResult):
883 elif isinstance(dep, AsyncResult):
885 return dep.msg_ids
884 return dep.msg_ids
886 elif dep is None:
885 elif dep is None:
887 return []
886 return []
888 else:
887 else:
889 # pass to Dependency constructor
888 # pass to Dependency constructor
890 return list(Dependency(dep))
889 return list(Dependency(dep))
891
890
892 def set_flags(self, **kwargs):
891 def set_flags(self, **kwargs):
893 """set my attribute flags by keyword.
892 """set my attribute flags by keyword.
894
893
895 A View is a wrapper for the Client's apply method, but with attributes
894 A View is a wrapper for the Client's apply method, but with attributes
896 that specify keyword arguments, those attributes can be set by keyword
895 that specify keyword arguments, those attributes can be set by keyword
897 argument with this method.
896 argument with this method.
898
897
899 Parameters
898 Parameters
900 ----------
899 ----------
901
900
902 block : bool
901 block : bool
903 whether to wait for results
902 whether to wait for results
904 track : bool
903 track : bool
905 whether to create a MessageTracker to allow the user to
904 whether to create a MessageTracker to allow the user to
906 safely edit after arrays and buffers during non-copying
905 safely edit after arrays and buffers during non-copying
907 sends.
906 sends.
908
907
909 after : Dependency or collection of msg_ids
908 after : Dependency or collection of msg_ids
910 Only for load-balanced execution (targets=None)
909 Only for load-balanced execution (targets=None)
911 Specify a list of msg_ids as a time-based dependency.
910 Specify a list of msg_ids as a time-based dependency.
912 This job will only be run *after* the dependencies
911 This job will only be run *after* the dependencies
913 have been met.
912 have been met.
914
913
915 follow : Dependency or collection of msg_ids
914 follow : Dependency or collection of msg_ids
916 Only for load-balanced execution (targets=None)
915 Only for load-balanced execution (targets=None)
917 Specify a list of msg_ids as a location-based dependency.
916 Specify a list of msg_ids as a location-based dependency.
918 This job will only be run on an engine where this dependency
917 This job will only be run on an engine where this dependency
919 is met.
918 is met.
920
919
921 timeout : float/int or None
920 timeout : float/int or None
922 Only for load-balanced execution (targets=None)
921 Only for load-balanced execution (targets=None)
923 Specify an amount of time (in seconds) for the scheduler to
922 Specify an amount of time (in seconds) for the scheduler to
924 wait for dependencies to be met before failing with a
923 wait for dependencies to be met before failing with a
925 DependencyTimeout.
924 DependencyTimeout.
926
925
927 retries : int
926 retries : int
928 Number of times a task will be retried on failure.
927 Number of times a task will be retried on failure.
929 """
928 """
930
929
931 super(LoadBalancedView, self).set_flags(**kwargs)
930 super(LoadBalancedView, self).set_flags(**kwargs)
932 for name in ('follow', 'after'):
931 for name in ('follow', 'after'):
933 if name in kwargs:
932 if name in kwargs:
934 value = kwargs[name]
933 value = kwargs[name]
935 if self._validate_dependency(value):
934 if self._validate_dependency(value):
936 setattr(self, name, value)
935 setattr(self, name, value)
937 else:
936 else:
938 raise ValueError("Invalid dependency: %r"%value)
937 raise ValueError("Invalid dependency: %r"%value)
939 if 'timeout' in kwargs:
938 if 'timeout' in kwargs:
940 t = kwargs['timeout']
939 t = kwargs['timeout']
941 if not isinstance(t, (int, long, float, type(None))):
940 if not isinstance(t, (int, long, float, type(None))):
942 raise TypeError("Invalid type for timeout: %r"%type(t))
941 raise TypeError("Invalid type for timeout: %r"%type(t))
943 if t is not None:
942 if t is not None:
944 if t < 0:
943 if t < 0:
945 raise ValueError("Invalid timeout: %s"%t)
944 raise ValueError("Invalid timeout: %s"%t)
946 self.timeout = t
945 self.timeout = t
947
946
948 @sync_results
947 @sync_results
949 @save_ids
948 @save_ids
950 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
949 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
951 after=None, follow=None, timeout=None,
950 after=None, follow=None, timeout=None,
952 targets=None, retries=None):
951 targets=None, retries=None):
953 """calls f(*args, **kwargs) on a remote engine, returning the result.
952 """calls f(*args, **kwargs) on a remote engine, returning the result.
954
953
955 This method temporarily sets all of `apply`'s flags for a single call.
954 This method temporarily sets all of `apply`'s flags for a single call.
956
955
957 Parameters
956 Parameters
958 ----------
957 ----------
959
958
960 f : callable
959 f : callable
961
960
962 args : list [default: empty]
961 args : list [default: empty]
963
962
964 kwargs : dict [default: empty]
963 kwargs : dict [default: empty]
965
964
966 block : bool [default: self.block]
965 block : bool [default: self.block]
967 whether to block
966 whether to block
968 track : bool [default: self.track]
967 track : bool [default: self.track]
969 whether to ask zmq to track the message, for safe non-copying sends
968 whether to ask zmq to track the message, for safe non-copying sends
970
969
971 !!!!!! TODO: THE REST HERE !!!!
970 !!!!!! TODO: THE REST HERE !!!!
972
971
973 Returns
972 Returns
974 -------
973 -------
975
974
976 if self.block is False:
975 if self.block is False:
977 returns AsyncResult
976 returns AsyncResult
978 else:
977 else:
979 returns actual result of f(*args, **kwargs) on the engine(s)
978 returns actual result of f(*args, **kwargs) on the engine(s)
980 This will be a list of self.targets is also a list (even length 1), or
979 This will be a list of self.targets is also a list (even length 1), or
981 the single result if self.targets is an integer engine id
980 the single result if self.targets is an integer engine id
982 """
981 """
983
982
984 # validate whether we can run
983 # validate whether we can run
985 if self._socket.closed:
984 if self._socket.closed:
986 msg = "Task farming is disabled"
985 msg = "Task farming is disabled"
987 if self._task_scheme == 'pure':
986 if self._task_scheme == 'pure':
988 msg += " because the pure ZMQ scheduler cannot handle"
987 msg += " because the pure ZMQ scheduler cannot handle"
989 msg += " disappearing engines."
988 msg += " disappearing engines."
990 raise RuntimeError(msg)
989 raise RuntimeError(msg)
991
990
992 if self._task_scheme == 'pure':
991 if self._task_scheme == 'pure':
993 # pure zmq scheme doesn't support extra features
992 # pure zmq scheme doesn't support extra features
994 msg = "Pure ZMQ scheduler doesn't support the following flags:"
993 msg = "Pure ZMQ scheduler doesn't support the following flags:"
995 "follow, after, retries, targets, timeout"
994 "follow, after, retries, targets, timeout"
996 if (follow or after or retries or targets or timeout):
995 if (follow or after or retries or targets or timeout):
997 # hard fail on Scheduler flags
996 # hard fail on Scheduler flags
998 raise RuntimeError(msg)
997 raise RuntimeError(msg)
999 if isinstance(f, dependent):
998 if isinstance(f, dependent):
1000 # soft warn on functional dependencies
999 # soft warn on functional dependencies
1001 warnings.warn(msg, RuntimeWarning)
1000 warnings.warn(msg, RuntimeWarning)
1002
1001
1003 # build args
1002 # build args
1004 args = [] if args is None else args
1003 args = [] if args is None else args
1005 kwargs = {} if kwargs is None else kwargs
1004 kwargs = {} if kwargs is None else kwargs
1006 block = self.block if block is None else block
1005 block = self.block if block is None else block
1007 track = self.track if track is None else track
1006 track = self.track if track is None else track
1008 after = self.after if after is None else after
1007 after = self.after if after is None else after
1009 retries = self.retries if retries is None else retries
1008 retries = self.retries if retries is None else retries
1010 follow = self.follow if follow is None else follow
1009 follow = self.follow if follow is None else follow
1011 timeout = self.timeout if timeout is None else timeout
1010 timeout = self.timeout if timeout is None else timeout
1012 targets = self.targets if targets is None else targets
1011 targets = self.targets if targets is None else targets
1013
1012
1014 if not isinstance(retries, int):
1013 if not isinstance(retries, int):
1015 raise TypeError('retries must be int, not %r'%type(retries))
1014 raise TypeError('retries must be int, not %r'%type(retries))
1016
1015
1017 if targets is None:
1016 if targets is None:
1018 idents = []
1017 idents = []
1019 else:
1018 else:
1020 idents = self.client._build_targets(targets)[0]
1019 idents = self.client._build_targets(targets)[0]
1021 # ensure *not* bytes
1020 # ensure *not* bytes
1022 idents = [ ident.decode() for ident in idents ]
1021 idents = [ ident.decode() for ident in idents ]
1023
1022
1024 after = self._render_dependency(after)
1023 after = self._render_dependency(after)
1025 follow = self._render_dependency(follow)
1024 follow = self._render_dependency(follow)
1026 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1025 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1027
1026
1028 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1027 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1029 subheader=subheader)
1028 subheader=subheader)
1030 tracker = None if track is False else msg['tracker']
1029 tracker = None if track is False else msg['tracker']
1031
1030
1032 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1031 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1033
1032
1034 if block:
1033 if block:
1035 try:
1034 try:
1036 return ar.get()
1035 return ar.get()
1037 except KeyboardInterrupt:
1036 except KeyboardInterrupt:
1038 pass
1037 pass
1039 return ar
1038 return ar
1040
1039
1041 @spin_after
1040 @spin_after
1042 @save_ids
1041 @save_ids
1043 def map(self, f, *sequences, **kwargs):
1042 def map(self, f, *sequences, **kwargs):
1044 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1043 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1045
1044
1046 Parallel version of builtin `map`, load-balanced by this View.
1045 Parallel version of builtin `map`, load-balanced by this View.
1047
1046
1048 `block`, and `chunksize` can be specified by keyword only.
1047 `block`, and `chunksize` can be specified by keyword only.
1049
1048
1050 Each `chunksize` elements will be a separate task, and will be
1049 Each `chunksize` elements will be a separate task, and will be
1051 load-balanced. This lets individual elements be available for iteration
1050 load-balanced. This lets individual elements be available for iteration
1052 as soon as they arrive.
1051 as soon as they arrive.
1053
1052
1054 Parameters
1053 Parameters
1055 ----------
1054 ----------
1056
1055
1057 f : callable
1056 f : callable
1058 function to be mapped
1057 function to be mapped
1059 *sequences: one or more sequences of matching length
1058 *sequences: one or more sequences of matching length
1060 the sequences to be distributed and passed to `f`
1059 the sequences to be distributed and passed to `f`
1061 block : bool [default self.block]
1060 block : bool [default self.block]
1062 whether to wait for the result or not
1061 whether to wait for the result or not
1063 track : bool
1062 track : bool
1064 whether to create a MessageTracker to allow the user to
1063 whether to create a MessageTracker to allow the user to
1065 safely edit after arrays and buffers during non-copying
1064 safely edit after arrays and buffers during non-copying
1066 sends.
1065 sends.
1067 chunksize : int [default 1]
1066 chunksize : int [default 1]
1068 how many elements should be in each task.
1067 how many elements should be in each task.
1069 ordered : bool [default True]
1068 ordered : bool [default True]
1070 Whether the results should be gathered as they arrive, or enforce
1069 Whether the results should be gathered as they arrive, or enforce
1071 the order of submission.
1070 the order of submission.
1072
1071
1073 Only applies when iterating through AsyncMapResult as results arrive.
1072 Only applies when iterating through AsyncMapResult as results arrive.
1074 Has no effect when block=True.
1073 Has no effect when block=True.
1075
1074
1076 Returns
1075 Returns
1077 -------
1076 -------
1078
1077
1079 if block=False:
1078 if block=False:
1080 AsyncMapResult
1079 AsyncMapResult
1081 An object like AsyncResult, but which reassembles the sequence of results
1080 An object like AsyncResult, but which reassembles the sequence of results
1082 into a single list. AsyncMapResults can be iterated through before all
1081 into a single list. AsyncMapResults can be iterated through before all
1083 results are complete.
1082 results are complete.
1084 else:
1083 else:
1085 the result of map(f,*sequences)
1084 the result of map(f,*sequences)
1086
1085
1087 """
1086 """
1088
1087
1089 # default
1088 # default
1090 block = kwargs.get('block', self.block)
1089 block = kwargs.get('block', self.block)
1091 chunksize = kwargs.get('chunksize', 1)
1090 chunksize = kwargs.get('chunksize', 1)
1092 ordered = kwargs.get('ordered', True)
1091 ordered = kwargs.get('ordered', True)
1093
1092
1094 keyset = set(kwargs.keys())
1093 keyset = set(kwargs.keys())
1095 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1094 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1096 if extra_keys:
1095 if extra_keys:
1097 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1096 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1098
1097
1099 assert len(sequences) > 0, "must have some sequences to map onto!"
1098 assert len(sequences) > 0, "must have some sequences to map onto!"
1100
1099
1101 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1100 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1102 return pf.map(*sequences)
1101 return pf.map(*sequences)
1103
1102
1104 __all__ = ['LoadBalancedView', 'DirectView']
1103 __all__ = ['LoadBalancedView', 'DirectView']
General Comments 0
You need to be logged in to leave comments. Login now