##// END OF EJS Templates
fix scatter/gather with targets='all'...
MinRK -
Show More
@@ -1,1069 +1,1075 b''
1 """Views of remote engines.
1 """Views of remote engines.
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import imp
18 import imp
19 import sys
19 import sys
20 import warnings
20 import warnings
21 from contextlib import contextmanager
21 from contextlib import contextmanager
22 from types import ModuleType
22 from types import ModuleType
23
23
24 import zmq
24 import zmq
25
25
26 from IPython.testing.skipdoctest import skip_doctest
26 from IPython.testing.skipdoctest import skip_doctest
27 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
29 )
29 )
30 from IPython.external.decorator import decorator
30 from IPython.external.decorator import decorator
31
31
32 from IPython.parallel import util
32 from IPython.parallel import util
33 from IPython.parallel.controller.dependency import Dependency, dependent
33 from IPython.parallel.controller.dependency import Dependency, dependent
34
34
35 from . import map as Map
35 from . import map as Map
36 from .asyncresult import AsyncResult, AsyncMapResult
36 from .asyncresult import AsyncResult, AsyncMapResult
37 from .remotefunction import ParallelFunction, parallel, remote, getname
37 from .remotefunction import ParallelFunction, parallel, remote, getname
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Decorators
40 # Decorators
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 @decorator
43 @decorator
44 def save_ids(f, self, *args, **kwargs):
44 def save_ids(f, self, *args, **kwargs):
45 """Keep our history and outstanding attributes up to date after a method call."""
45 """Keep our history and outstanding attributes up to date after a method call."""
46 n_previous = len(self.client.history)
46 n_previous = len(self.client.history)
47 try:
47 try:
48 ret = f(self, *args, **kwargs)
48 ret = f(self, *args, **kwargs)
49 finally:
49 finally:
50 nmsgs = len(self.client.history) - n_previous
50 nmsgs = len(self.client.history) - n_previous
51 msg_ids = self.client.history[-nmsgs:]
51 msg_ids = self.client.history[-nmsgs:]
52 self.history.extend(msg_ids)
52 self.history.extend(msg_ids)
53 map(self.outstanding.add, msg_ids)
53 map(self.outstanding.add, msg_ids)
54 return ret
54 return ret
55
55
56 @decorator
56 @decorator
57 def sync_results(f, self, *args, **kwargs):
57 def sync_results(f, self, *args, **kwargs):
58 """sync relevant results from self.client to our results attribute."""
58 """sync relevant results from self.client to our results attribute."""
59 ret = f(self, *args, **kwargs)
59 ret = f(self, *args, **kwargs)
60 delta = self.outstanding.difference(self.client.outstanding)
60 delta = self.outstanding.difference(self.client.outstanding)
61 completed = self.outstanding.intersection(delta)
61 completed = self.outstanding.intersection(delta)
62 self.outstanding = self.outstanding.difference(completed)
62 self.outstanding = self.outstanding.difference(completed)
63 for msg_id in completed:
63 for msg_id in completed:
64 self.results[msg_id] = self.client.results[msg_id]
64 self.results[msg_id] = self.client.results[msg_id]
65 return ret
65 return ret
66
66
67 @decorator
67 @decorator
68 def spin_after(f, self, *args, **kwargs):
68 def spin_after(f, self, *args, **kwargs):
69 """call spin after the method."""
69 """call spin after the method."""
70 ret = f(self, *args, **kwargs)
70 ret = f(self, *args, **kwargs)
71 self.spin()
71 self.spin()
72 return ret
72 return ret
73
73
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75 # Classes
75 # Classes
76 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
77
77
78 @skip_doctest
78 @skip_doctest
79 class View(HasTraits):
79 class View(HasTraits):
80 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
80 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
81
81
82 Don't use this class, use subclasses.
82 Don't use this class, use subclasses.
83
83
84 Methods
84 Methods
85 -------
85 -------
86
86
87 spin
87 spin
88 flushes incoming results and registration state changes
88 flushes incoming results and registration state changes
89 control methods spin, and requesting `ids` also ensures up to date
89 control methods spin, and requesting `ids` also ensures up to date
90
90
91 wait
91 wait
92 wait on one or more msg_ids
92 wait on one or more msg_ids
93
93
94 execution methods
94 execution methods
95 apply
95 apply
96 legacy: execute, run
96 legacy: execute, run
97
97
98 data movement
98 data movement
99 push, pull, scatter, gather
99 push, pull, scatter, gather
100
100
101 query methods
101 query methods
102 get_result, queue_status, purge_results, result_status
102 get_result, queue_status, purge_results, result_status
103
103
104 control methods
104 control methods
105 abort, shutdown
105 abort, shutdown
106
106
107 """
107 """
108 # flags
108 # flags
109 block=Bool(False)
109 block=Bool(False)
110 track=Bool(True)
110 track=Bool(True)
111 targets = Any()
111 targets = Any()
112
112
113 history=List()
113 history=List()
114 outstanding = Set()
114 outstanding = Set()
115 results = Dict()
115 results = Dict()
116 client = Instance('IPython.parallel.Client')
116 client = Instance('IPython.parallel.Client')
117
117
118 _socket = Instance('zmq.Socket')
118 _socket = Instance('zmq.Socket')
119 _flag_names = List(['targets', 'block', 'track'])
119 _flag_names = List(['targets', 'block', 'track'])
120 _targets = Any()
120 _targets = Any()
121 _idents = Any()
121 _idents = Any()
122
122
123 def __init__(self, client=None, socket=None, **flags):
123 def __init__(self, client=None, socket=None, **flags):
124 super(View, self).__init__(client=client, _socket=socket)
124 super(View, self).__init__(client=client, _socket=socket)
125 self.block = client.block
125 self.block = client.block
126
126
127 self.set_flags(**flags)
127 self.set_flags(**flags)
128
128
129 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
129 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
130
130
131
131
132 def __repr__(self):
132 def __repr__(self):
133 strtargets = str(self.targets)
133 strtargets = str(self.targets)
134 if len(strtargets) > 16:
134 if len(strtargets) > 16:
135 strtargets = strtargets[:12]+'...]'
135 strtargets = strtargets[:12]+'...]'
136 return "<%s %s>"%(self.__class__.__name__, strtargets)
136 return "<%s %s>"%(self.__class__.__name__, strtargets)
137
137
138 def set_flags(self, **kwargs):
138 def set_flags(self, **kwargs):
139 """set my attribute flags by keyword.
139 """set my attribute flags by keyword.
140
140
141 Views determine behavior with a few attributes (`block`, `track`, etc.).
141 Views determine behavior with a few attributes (`block`, `track`, etc.).
142 These attributes can be set all at once by name with this method.
142 These attributes can be set all at once by name with this method.
143
143
144 Parameters
144 Parameters
145 ----------
145 ----------
146
146
147 block : bool
147 block : bool
148 whether to wait for results
148 whether to wait for results
149 track : bool
149 track : bool
150 whether to create a MessageTracker to allow the user to
150 whether to create a MessageTracker to allow the user to
151 safely edit after arrays and buffers during non-copying
151 safely edit after arrays and buffers during non-copying
152 sends.
152 sends.
153 """
153 """
154 for name, value in kwargs.iteritems():
154 for name, value in kwargs.iteritems():
155 if name not in self._flag_names:
155 if name not in self._flag_names:
156 raise KeyError("Invalid name: %r"%name)
156 raise KeyError("Invalid name: %r"%name)
157 else:
157 else:
158 setattr(self, name, value)
158 setattr(self, name, value)
159
159
160 @contextmanager
160 @contextmanager
161 def temp_flags(self, **kwargs):
161 def temp_flags(self, **kwargs):
162 """temporarily set flags, for use in `with` statements.
162 """temporarily set flags, for use in `with` statements.
163
163
164 See set_flags for permanent setting of flags
164 See set_flags for permanent setting of flags
165
165
166 Examples
166 Examples
167 --------
167 --------
168
168
169 >>> view.track=False
169 >>> view.track=False
170 ...
170 ...
171 >>> with view.temp_flags(track=True):
171 >>> with view.temp_flags(track=True):
172 ... ar = view.apply(dostuff, my_big_array)
172 ... ar = view.apply(dostuff, my_big_array)
173 ... ar.tracker.wait() # wait for send to finish
173 ... ar.tracker.wait() # wait for send to finish
174 >>> view.track
174 >>> view.track
175 False
175 False
176
176
177 """
177 """
178 # preflight: save flags, and set temporaries
178 # preflight: save flags, and set temporaries
179 saved_flags = {}
179 saved_flags = {}
180 for f in self._flag_names:
180 for f in self._flag_names:
181 saved_flags[f] = getattr(self, f)
181 saved_flags[f] = getattr(self, f)
182 self.set_flags(**kwargs)
182 self.set_flags(**kwargs)
183 # yield to the with-statement block
183 # yield to the with-statement block
184 try:
184 try:
185 yield
185 yield
186 finally:
186 finally:
187 # postflight: restore saved flags
187 # postflight: restore saved flags
188 self.set_flags(**saved_flags)
188 self.set_flags(**saved_flags)
189
189
190
190
191 #----------------------------------------------------------------
191 #----------------------------------------------------------------
192 # apply
192 # apply
193 #----------------------------------------------------------------
193 #----------------------------------------------------------------
194
194
195 @sync_results
195 @sync_results
196 @save_ids
196 @save_ids
197 def _really_apply(self, f, args, kwargs, block=None, **options):
197 def _really_apply(self, f, args, kwargs, block=None, **options):
198 """wrapper for client.send_apply_message"""
198 """wrapper for client.send_apply_message"""
199 raise NotImplementedError("Implement in subclasses")
199 raise NotImplementedError("Implement in subclasses")
200
200
201 def apply(self, f, *args, **kwargs):
201 def apply(self, f, *args, **kwargs):
202 """calls f(*args, **kwargs) on remote engines, returning the result.
202 """calls f(*args, **kwargs) on remote engines, returning the result.
203
203
204 This method sets all apply flags via this View's attributes.
204 This method sets all apply flags via this View's attributes.
205
205
206 if self.block is False:
206 if self.block is False:
207 returns AsyncResult
207 returns AsyncResult
208 else:
208 else:
209 returns actual result of f(*args, **kwargs)
209 returns actual result of f(*args, **kwargs)
210 """
210 """
211 return self._really_apply(f, args, kwargs)
211 return self._really_apply(f, args, kwargs)
212
212
213 def apply_async(self, f, *args, **kwargs):
213 def apply_async(self, f, *args, **kwargs):
214 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
214 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
215
215
216 returns AsyncResult
216 returns AsyncResult
217 """
217 """
218 return self._really_apply(f, args, kwargs, block=False)
218 return self._really_apply(f, args, kwargs, block=False)
219
219
220 @spin_after
220 @spin_after
221 def apply_sync(self, f, *args, **kwargs):
221 def apply_sync(self, f, *args, **kwargs):
222 """calls f(*args, **kwargs) on remote engines in a blocking manner,
222 """calls f(*args, **kwargs) on remote engines in a blocking manner,
223 returning the result.
223 returning the result.
224
224
225 returns: actual result of f(*args, **kwargs)
225 returns: actual result of f(*args, **kwargs)
226 """
226 """
227 return self._really_apply(f, args, kwargs, block=True)
227 return self._really_apply(f, args, kwargs, block=True)
228
228
229 #----------------------------------------------------------------
229 #----------------------------------------------------------------
230 # wrappers for client and control methods
230 # wrappers for client and control methods
231 #----------------------------------------------------------------
231 #----------------------------------------------------------------
232 @sync_results
232 @sync_results
233 def spin(self):
233 def spin(self):
234 """spin the client, and sync"""
234 """spin the client, and sync"""
235 self.client.spin()
235 self.client.spin()
236
236
237 @sync_results
237 @sync_results
238 def wait(self, jobs=None, timeout=-1):
238 def wait(self, jobs=None, timeout=-1):
239 """waits on one or more `jobs`, for up to `timeout` seconds.
239 """waits on one or more `jobs`, for up to `timeout` seconds.
240
240
241 Parameters
241 Parameters
242 ----------
242 ----------
243
243
244 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
244 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
245 ints are indices to self.history
245 ints are indices to self.history
246 strs are msg_ids
246 strs are msg_ids
247 default: wait on all outstanding messages
247 default: wait on all outstanding messages
248 timeout : float
248 timeout : float
249 a time in seconds, after which to give up.
249 a time in seconds, after which to give up.
250 default is -1, which means no timeout
250 default is -1, which means no timeout
251
251
252 Returns
252 Returns
253 -------
253 -------
254
254
255 True : when all msg_ids are done
255 True : when all msg_ids are done
256 False : timeout reached, some msg_ids still outstanding
256 False : timeout reached, some msg_ids still outstanding
257 """
257 """
258 if jobs is None:
258 if jobs is None:
259 jobs = self.history
259 jobs = self.history
260 return self.client.wait(jobs, timeout)
260 return self.client.wait(jobs, timeout)
261
261
262 def abort(self, jobs=None, targets=None, block=None):
262 def abort(self, jobs=None, targets=None, block=None):
263 """Abort jobs on my engines.
263 """Abort jobs on my engines.
264
264
265 Parameters
265 Parameters
266 ----------
266 ----------
267
267
268 jobs : None, str, list of strs, optional
268 jobs : None, str, list of strs, optional
269 if None: abort all jobs.
269 if None: abort all jobs.
270 else: abort specific msg_id(s).
270 else: abort specific msg_id(s).
271 """
271 """
272 block = block if block is not None else self.block
272 block = block if block is not None else self.block
273 targets = targets if targets is not None else self.targets
273 targets = targets if targets is not None else self.targets
274 jobs = jobs if jobs is not None else list(self.outstanding)
274 jobs = jobs if jobs is not None else list(self.outstanding)
275
275
276 return self.client.abort(jobs=jobs, targets=targets, block=block)
276 return self.client.abort(jobs=jobs, targets=targets, block=block)
277
277
278 def queue_status(self, targets=None, verbose=False):
278 def queue_status(self, targets=None, verbose=False):
279 """Fetch the Queue status of my engines"""
279 """Fetch the Queue status of my engines"""
280 targets = targets if targets is not None else self.targets
280 targets = targets if targets is not None else self.targets
281 return self.client.queue_status(targets=targets, verbose=verbose)
281 return self.client.queue_status(targets=targets, verbose=verbose)
282
282
283 def purge_results(self, jobs=[], targets=[]):
283 def purge_results(self, jobs=[], targets=[]):
284 """Instruct the controller to forget specific results."""
284 """Instruct the controller to forget specific results."""
285 if targets is None or targets == 'all':
285 if targets is None or targets == 'all':
286 targets = self.targets
286 targets = self.targets
287 return self.client.purge_results(jobs=jobs, targets=targets)
287 return self.client.purge_results(jobs=jobs, targets=targets)
288
288
289 def shutdown(self, targets=None, restart=False, hub=False, block=None):
289 def shutdown(self, targets=None, restart=False, hub=False, block=None):
290 """Terminates one or more engine processes, optionally including the hub.
290 """Terminates one or more engine processes, optionally including the hub.
291 """
291 """
292 block = self.block if block is None else block
292 block = self.block if block is None else block
293 if targets is None or targets == 'all':
293 if targets is None or targets == 'all':
294 targets = self.targets
294 targets = self.targets
295 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
295 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
296
296
297 @spin_after
297 @spin_after
298 def get_result(self, indices_or_msg_ids=None):
298 def get_result(self, indices_or_msg_ids=None):
299 """return one or more results, specified by history index or msg_id.
299 """return one or more results, specified by history index or msg_id.
300
300
301 See client.get_result for details.
301 See client.get_result for details.
302
302
303 """
303 """
304
304
305 if indices_or_msg_ids is None:
305 if indices_or_msg_ids is None:
306 indices_or_msg_ids = -1
306 indices_or_msg_ids = -1
307 if isinstance(indices_or_msg_ids, int):
307 if isinstance(indices_or_msg_ids, int):
308 indices_or_msg_ids = self.history[indices_or_msg_ids]
308 indices_or_msg_ids = self.history[indices_or_msg_ids]
309 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
309 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
310 indices_or_msg_ids = list(indices_or_msg_ids)
310 indices_or_msg_ids = list(indices_or_msg_ids)
311 for i,index in enumerate(indices_or_msg_ids):
311 for i,index in enumerate(indices_or_msg_ids):
312 if isinstance(index, int):
312 if isinstance(index, int):
313 indices_or_msg_ids[i] = self.history[index]
313 indices_or_msg_ids[i] = self.history[index]
314 return self.client.get_result(indices_or_msg_ids)
314 return self.client.get_result(indices_or_msg_ids)
315
315
316 #-------------------------------------------------------------------
316 #-------------------------------------------------------------------
317 # Map
317 # Map
318 #-------------------------------------------------------------------
318 #-------------------------------------------------------------------
319
319
320 def map(self, f, *sequences, **kwargs):
320 def map(self, f, *sequences, **kwargs):
321 """override in subclasses"""
321 """override in subclasses"""
322 raise NotImplementedError
322 raise NotImplementedError
323
323
324 def map_async(self, f, *sequences, **kwargs):
324 def map_async(self, f, *sequences, **kwargs):
325 """Parallel version of builtin `map`, using this view's engines.
325 """Parallel version of builtin `map`, using this view's engines.
326
326
327 This is equivalent to map(...block=False)
327 This is equivalent to map(...block=False)
328
328
329 See `self.map` for details.
329 See `self.map` for details.
330 """
330 """
331 if 'block' in kwargs:
331 if 'block' in kwargs:
332 raise TypeError("map_async doesn't take a `block` keyword argument.")
332 raise TypeError("map_async doesn't take a `block` keyword argument.")
333 kwargs['block'] = False
333 kwargs['block'] = False
334 return self.map(f,*sequences,**kwargs)
334 return self.map(f,*sequences,**kwargs)
335
335
336 def map_sync(self, f, *sequences, **kwargs):
336 def map_sync(self, f, *sequences, **kwargs):
337 """Parallel version of builtin `map`, using this view's engines.
337 """Parallel version of builtin `map`, using this view's engines.
338
338
339 This is equivalent to map(...block=True)
339 This is equivalent to map(...block=True)
340
340
341 See `self.map` for details.
341 See `self.map` for details.
342 """
342 """
343 if 'block' in kwargs:
343 if 'block' in kwargs:
344 raise TypeError("map_sync doesn't take a `block` keyword argument.")
344 raise TypeError("map_sync doesn't take a `block` keyword argument.")
345 kwargs['block'] = True
345 kwargs['block'] = True
346 return self.map(f,*sequences,**kwargs)
346 return self.map(f,*sequences,**kwargs)
347
347
348 def imap(self, f, *sequences, **kwargs):
348 def imap(self, f, *sequences, **kwargs):
349 """Parallel version of `itertools.imap`.
349 """Parallel version of `itertools.imap`.
350
350
351 See `self.map` for details.
351 See `self.map` for details.
352
352
353 """
353 """
354
354
355 return iter(self.map_async(f,*sequences, **kwargs))
355 return iter(self.map_async(f,*sequences, **kwargs))
356
356
357 #-------------------------------------------------------------------
357 #-------------------------------------------------------------------
358 # Decorators
358 # Decorators
359 #-------------------------------------------------------------------
359 #-------------------------------------------------------------------
360
360
361 def remote(self, block=True, **flags):
361 def remote(self, block=True, **flags):
362 """Decorator for making a RemoteFunction"""
362 """Decorator for making a RemoteFunction"""
363 block = self.block if block is None else block
363 block = self.block if block is None else block
364 return remote(self, block=block, **flags)
364 return remote(self, block=block, **flags)
365
365
366 def parallel(self, dist='b', block=None, **flags):
366 def parallel(self, dist='b', block=None, **flags):
367 """Decorator for making a ParallelFunction"""
367 """Decorator for making a ParallelFunction"""
368 block = self.block if block is None else block
368 block = self.block if block is None else block
369 return parallel(self, dist=dist, block=block, **flags)
369 return parallel(self, dist=dist, block=block, **flags)
370
370
371 @skip_doctest
371 @skip_doctest
372 class DirectView(View):
372 class DirectView(View):
373 """Direct Multiplexer View of one or more engines.
373 """Direct Multiplexer View of one or more engines.
374
374
375 These are created via indexed access to a client:
375 These are created via indexed access to a client:
376
376
377 >>> dv_1 = client[1]
377 >>> dv_1 = client[1]
378 >>> dv_all = client[:]
378 >>> dv_all = client[:]
379 >>> dv_even = client[::2]
379 >>> dv_even = client[::2]
380 >>> dv_some = client[1:3]
380 >>> dv_some = client[1:3]
381
381
382 This object provides dictionary access to engine namespaces:
382 This object provides dictionary access to engine namespaces:
383
383
384 # push a=5:
384 # push a=5:
385 >>> dv['a'] = 5
385 >>> dv['a'] = 5
386 # pull 'foo':
386 # pull 'foo':
387 >>> db['foo']
387 >>> db['foo']
388
388
389 """
389 """
390
390
391 def __init__(self, client=None, socket=None, targets=None):
391 def __init__(self, client=None, socket=None, targets=None):
392 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
392 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
393
393
394 @property
394 @property
395 def importer(self):
395 def importer(self):
396 """sync_imports(local=True) as a property.
396 """sync_imports(local=True) as a property.
397
397
398 See sync_imports for details.
398 See sync_imports for details.
399
399
400 """
400 """
401 return self.sync_imports(True)
401 return self.sync_imports(True)
402
402
403 @contextmanager
403 @contextmanager
404 def sync_imports(self, local=True, quiet=False):
404 def sync_imports(self, local=True, quiet=False):
405 """Context Manager for performing simultaneous local and remote imports.
405 """Context Manager for performing simultaneous local and remote imports.
406
406
407 'import x as y' will *not* work. The 'as y' part will simply be ignored.
407 'import x as y' will *not* work. The 'as y' part will simply be ignored.
408
408
409 If `local=True`, then the package will also be imported locally.
409 If `local=True`, then the package will also be imported locally.
410
410
411 If `quiet=True`, no output will be produced when attempting remote
411 If `quiet=True`, no output will be produced when attempting remote
412 imports.
412 imports.
413
413
414 Note that remote-only (`local=False`) imports have not been implemented.
414 Note that remote-only (`local=False`) imports have not been implemented.
415
415
416 >>> with view.sync_imports():
416 >>> with view.sync_imports():
417 ... from numpy import recarray
417 ... from numpy import recarray
418 importing recarray from numpy on engine(s)
418 importing recarray from numpy on engine(s)
419
419
420 """
420 """
421 import __builtin__
421 import __builtin__
422 local_import = __builtin__.__import__
422 local_import = __builtin__.__import__
423 modules = set()
423 modules = set()
424 results = []
424 results = []
425 @util.interactive
425 @util.interactive
426 def remote_import(name, fromlist, level):
426 def remote_import(name, fromlist, level):
427 """the function to be passed to apply, that actually performs the import
427 """the function to be passed to apply, that actually performs the import
428 on the engine, and loads up the user namespace.
428 on the engine, and loads up the user namespace.
429 """
429 """
430 import sys
430 import sys
431 user_ns = globals()
431 user_ns = globals()
432 mod = __import__(name, fromlist=fromlist, level=level)
432 mod = __import__(name, fromlist=fromlist, level=level)
433 if fromlist:
433 if fromlist:
434 for key in fromlist:
434 for key in fromlist:
435 user_ns[key] = getattr(mod, key)
435 user_ns[key] = getattr(mod, key)
436 else:
436 else:
437 user_ns[name] = sys.modules[name]
437 user_ns[name] = sys.modules[name]
438
438
439 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
439 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
440 """the drop-in replacement for __import__, that optionally imports
440 """the drop-in replacement for __import__, that optionally imports
441 locally as well.
441 locally as well.
442 """
442 """
443 # don't override nested imports
443 # don't override nested imports
444 save_import = __builtin__.__import__
444 save_import = __builtin__.__import__
445 __builtin__.__import__ = local_import
445 __builtin__.__import__ = local_import
446
446
447 if imp.lock_held():
447 if imp.lock_held():
448 # this is a side-effect import, don't do it remotely, or even
448 # this is a side-effect import, don't do it remotely, or even
449 # ignore the local effects
449 # ignore the local effects
450 return local_import(name, globals, locals, fromlist, level)
450 return local_import(name, globals, locals, fromlist, level)
451
451
452 imp.acquire_lock()
452 imp.acquire_lock()
453 if local:
453 if local:
454 mod = local_import(name, globals, locals, fromlist, level)
454 mod = local_import(name, globals, locals, fromlist, level)
455 else:
455 else:
456 raise NotImplementedError("remote-only imports not yet implemented")
456 raise NotImplementedError("remote-only imports not yet implemented")
457 imp.release_lock()
457 imp.release_lock()
458
458
459 key = name+':'+','.join(fromlist or [])
459 key = name+':'+','.join(fromlist or [])
460 if level == -1 and key not in modules:
460 if level == -1 and key not in modules:
461 modules.add(key)
461 modules.add(key)
462 if not quiet:
462 if not quiet:
463 if fromlist:
463 if fromlist:
464 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
464 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
465 else:
465 else:
466 print "importing %s on engine(s)"%name
466 print "importing %s on engine(s)"%name
467 results.append(self.apply_async(remote_import, name, fromlist, level))
467 results.append(self.apply_async(remote_import, name, fromlist, level))
468 # restore override
468 # restore override
469 __builtin__.__import__ = save_import
469 __builtin__.__import__ = save_import
470
470
471 return mod
471 return mod
472
472
473 # override __import__
473 # override __import__
474 __builtin__.__import__ = view_import
474 __builtin__.__import__ = view_import
475 try:
475 try:
476 # enter the block
476 # enter the block
477 yield
477 yield
478 except ImportError:
478 except ImportError:
479 if local:
479 if local:
480 raise
480 raise
481 else:
481 else:
482 # ignore import errors if not doing local imports
482 # ignore import errors if not doing local imports
483 pass
483 pass
484 finally:
484 finally:
485 # always restore __import__
485 # always restore __import__
486 __builtin__.__import__ = local_import
486 __builtin__.__import__ = local_import
487
487
488 for r in results:
488 for r in results:
489 # raise possible remote ImportErrors here
489 # raise possible remote ImportErrors here
490 r.get()
490 r.get()
491
491
492
492
493 @sync_results
493 @sync_results
494 @save_ids
494 @save_ids
495 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
495 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
496 """calls f(*args, **kwargs) on remote engines, returning the result.
496 """calls f(*args, **kwargs) on remote engines, returning the result.
497
497
498 This method sets all of `apply`'s flags via this View's attributes.
498 This method sets all of `apply`'s flags via this View's attributes.
499
499
500 Parameters
500 Parameters
501 ----------
501 ----------
502
502
503 f : callable
503 f : callable
504
504
505 args : list [default: empty]
505 args : list [default: empty]
506
506
507 kwargs : dict [default: empty]
507 kwargs : dict [default: empty]
508
508
509 targets : target list [default: self.targets]
509 targets : target list [default: self.targets]
510 where to run
510 where to run
511 block : bool [default: self.block]
511 block : bool [default: self.block]
512 whether to block
512 whether to block
513 track : bool [default: self.track]
513 track : bool [default: self.track]
514 whether to ask zmq to track the message, for safe non-copying sends
514 whether to ask zmq to track the message, for safe non-copying sends
515
515
516 Returns
516 Returns
517 -------
517 -------
518
518
519 if self.block is False:
519 if self.block is False:
520 returns AsyncResult
520 returns AsyncResult
521 else:
521 else:
522 returns actual result of f(*args, **kwargs) on the engine(s)
522 returns actual result of f(*args, **kwargs) on the engine(s)
523 This will be a list of self.targets is also a list (even length 1), or
523 This will be a list of self.targets is also a list (even length 1), or
524 the single result if self.targets is an integer engine id
524 the single result if self.targets is an integer engine id
525 """
525 """
526 args = [] if args is None else args
526 args = [] if args is None else args
527 kwargs = {} if kwargs is None else kwargs
527 kwargs = {} if kwargs is None else kwargs
528 block = self.block if block is None else block
528 block = self.block if block is None else block
529 track = self.track if track is None else track
529 track = self.track if track is None else track
530 targets = self.targets if targets is None else targets
530 targets = self.targets if targets is None else targets
531
531
532 _idents = self.client._build_targets(targets)[0]
532 _idents = self.client._build_targets(targets)[0]
533 msg_ids = []
533 msg_ids = []
534 trackers = []
534 trackers = []
535 for ident in _idents:
535 for ident in _idents:
536 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
536 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
537 ident=ident)
537 ident=ident)
538 if track:
538 if track:
539 trackers.append(msg['tracker'])
539 trackers.append(msg['tracker'])
540 msg_ids.append(msg['header']['msg_id'])
540 msg_ids.append(msg['header']['msg_id'])
541 tracker = None if track is False else zmq.MessageTracker(*trackers)
541 tracker = None if track is False else zmq.MessageTracker(*trackers)
542 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
542 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
543 if block:
543 if block:
544 try:
544 try:
545 return ar.get()
545 return ar.get()
546 except KeyboardInterrupt:
546 except KeyboardInterrupt:
547 pass
547 pass
548 return ar
548 return ar
549
549
550 @spin_after
550 @spin_after
551 def map(self, f, *sequences, **kwargs):
551 def map(self, f, *sequences, **kwargs):
552 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
552 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
553
553
554 Parallel version of builtin `map`, using this View's `targets`.
554 Parallel version of builtin `map`, using this View's `targets`.
555
555
556 There will be one task per target, so work will be chunked
556 There will be one task per target, so work will be chunked
557 if the sequences are longer than `targets`.
557 if the sequences are longer than `targets`.
558
558
559 Results can be iterated as they are ready, but will become available in chunks.
559 Results can be iterated as they are ready, but will become available in chunks.
560
560
561 Parameters
561 Parameters
562 ----------
562 ----------
563
563
564 f : callable
564 f : callable
565 function to be mapped
565 function to be mapped
566 *sequences: one or more sequences of matching length
566 *sequences: one or more sequences of matching length
567 the sequences to be distributed and passed to `f`
567 the sequences to be distributed and passed to `f`
568 block : bool
568 block : bool
569 whether to wait for the result or not [default self.block]
569 whether to wait for the result or not [default self.block]
570
570
571 Returns
571 Returns
572 -------
572 -------
573
573
574 if block=False:
574 if block=False:
575 AsyncMapResult
575 AsyncMapResult
576 An object like AsyncResult, but which reassembles the sequence of results
576 An object like AsyncResult, but which reassembles the sequence of results
577 into a single list. AsyncMapResults can be iterated through before all
577 into a single list. AsyncMapResults can be iterated through before all
578 results are complete.
578 results are complete.
579 else:
579 else:
580 list
580 list
581 the result of map(f,*sequences)
581 the result of map(f,*sequences)
582 """
582 """
583
583
584 block = kwargs.pop('block', self.block)
584 block = kwargs.pop('block', self.block)
585 for k in kwargs.keys():
585 for k in kwargs.keys():
586 if k not in ['block', 'track']:
586 if k not in ['block', 'track']:
587 raise TypeError("invalid keyword arg, %r"%k)
587 raise TypeError("invalid keyword arg, %r"%k)
588
588
589 assert len(sequences) > 0, "must have some sequences to map onto!"
589 assert len(sequences) > 0, "must have some sequences to map onto!"
590 pf = ParallelFunction(self, f, block=block, **kwargs)
590 pf = ParallelFunction(self, f, block=block, **kwargs)
591 return pf.map(*sequences)
591 return pf.map(*sequences)
592
592
593 def execute(self, code, targets=None, block=None):
593 def execute(self, code, targets=None, block=None):
594 """Executes `code` on `targets` in blocking or nonblocking manner.
594 """Executes `code` on `targets` in blocking or nonblocking manner.
595
595
596 ``execute`` is always `bound` (affects engine namespace)
596 ``execute`` is always `bound` (affects engine namespace)
597
597
598 Parameters
598 Parameters
599 ----------
599 ----------
600
600
601 code : str
601 code : str
602 the code string to be executed
602 the code string to be executed
603 block : bool
603 block : bool
604 whether or not to wait until done to return
604 whether or not to wait until done to return
605 default: self.block
605 default: self.block
606 """
606 """
607 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
607 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
608
608
609 def run(self, filename, targets=None, block=None):
609 def run(self, filename, targets=None, block=None):
610 """Execute contents of `filename` on my engine(s).
610 """Execute contents of `filename` on my engine(s).
611
611
612 This simply reads the contents of the file and calls `execute`.
612 This simply reads the contents of the file and calls `execute`.
613
613
614 Parameters
614 Parameters
615 ----------
615 ----------
616
616
617 filename : str
617 filename : str
618 The path to the file
618 The path to the file
619 targets : int/str/list of ints/strs
619 targets : int/str/list of ints/strs
620 the engines on which to execute
620 the engines on which to execute
621 default : all
621 default : all
622 block : bool
622 block : bool
623 whether or not to wait until done
623 whether or not to wait until done
624 default: self.block
624 default: self.block
625
625
626 """
626 """
627 with open(filename, 'r') as f:
627 with open(filename, 'r') as f:
628 # add newline in case of trailing indented whitespace
628 # add newline in case of trailing indented whitespace
629 # which will cause SyntaxError
629 # which will cause SyntaxError
630 code = f.read()+'\n'
630 code = f.read()+'\n'
631 return self.execute(code, block=block, targets=targets)
631 return self.execute(code, block=block, targets=targets)
632
632
633 def update(self, ns):
633 def update(self, ns):
634 """update remote namespace with dict `ns`
634 """update remote namespace with dict `ns`
635
635
636 See `push` for details.
636 See `push` for details.
637 """
637 """
638 return self.push(ns, block=self.block, track=self.track)
638 return self.push(ns, block=self.block, track=self.track)
639
639
640 def push(self, ns, targets=None, block=None, track=None):
640 def push(self, ns, targets=None, block=None, track=None):
641 """update remote namespace with dict `ns`
641 """update remote namespace with dict `ns`
642
642
643 Parameters
643 Parameters
644 ----------
644 ----------
645
645
646 ns : dict
646 ns : dict
647 dict of keys with which to update engine namespace(s)
647 dict of keys with which to update engine namespace(s)
648 block : bool [default : self.block]
648 block : bool [default : self.block]
649 whether to wait to be notified of engine receipt
649 whether to wait to be notified of engine receipt
650
650
651 """
651 """
652
652
653 block = block if block is not None else self.block
653 block = block if block is not None else self.block
654 track = track if track is not None else self.track
654 track = track if track is not None else self.track
655 targets = targets if targets is not None else self.targets
655 targets = targets if targets is not None else self.targets
656 # applier = self.apply_sync if block else self.apply_async
656 # applier = self.apply_sync if block else self.apply_async
657 if not isinstance(ns, dict):
657 if not isinstance(ns, dict):
658 raise TypeError("Must be a dict, not %s"%type(ns))
658 raise TypeError("Must be a dict, not %s"%type(ns))
659 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
659 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
660
660
661 def get(self, key_s):
661 def get(self, key_s):
662 """get object(s) by `key_s` from remote namespace
662 """get object(s) by `key_s` from remote namespace
663
663
664 see `pull` for details.
664 see `pull` for details.
665 """
665 """
666 # block = block if block is not None else self.block
666 # block = block if block is not None else self.block
667 return self.pull(key_s, block=True)
667 return self.pull(key_s, block=True)
668
668
669 def pull(self, names, targets=None, block=None):
669 def pull(self, names, targets=None, block=None):
670 """get object(s) by `name` from remote namespace
670 """get object(s) by `name` from remote namespace
671
671
672 will return one object if it is a key.
672 will return one object if it is a key.
673 can also take a list of keys, in which case it will return a list of objects.
673 can also take a list of keys, in which case it will return a list of objects.
674 """
674 """
675 block = block if block is not None else self.block
675 block = block if block is not None else self.block
676 targets = targets if targets is not None else self.targets
676 targets = targets if targets is not None else self.targets
677 applier = self.apply_sync if block else self.apply_async
677 applier = self.apply_sync if block else self.apply_async
678 if isinstance(names, basestring):
678 if isinstance(names, basestring):
679 pass
679 pass
680 elif isinstance(names, (list,tuple,set)):
680 elif isinstance(names, (list,tuple,set)):
681 for key in names:
681 for key in names:
682 if not isinstance(key, basestring):
682 if not isinstance(key, basestring):
683 raise TypeError("keys must be str, not type %r"%type(key))
683 raise TypeError("keys must be str, not type %r"%type(key))
684 else:
684 else:
685 raise TypeError("names must be strs, not %r"%names)
685 raise TypeError("names must be strs, not %r"%names)
686 return self._really_apply(util._pull, (names,), block=block, targets=targets)
686 return self._really_apply(util._pull, (names,), block=block, targets=targets)
687
687
688 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
688 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
689 """
689 """
690 Partition a Python sequence and send the partitions to a set of engines.
690 Partition a Python sequence and send the partitions to a set of engines.
691 """
691 """
692 block = block if block is not None else self.block
692 block = block if block is not None else self.block
693 track = track if track is not None else self.track
693 track = track if track is not None else self.track
694 targets = targets if targets is not None else self.targets
694 targets = targets if targets is not None else self.targets
695
696 # construct integer ID list:
697 targets = self.client._build_targets(targets)[1]
695
698
696 mapObject = Map.dists[dist]()
699 mapObject = Map.dists[dist]()
697 nparts = len(targets)
700 nparts = len(targets)
698 msg_ids = []
701 msg_ids = []
699 trackers = []
702 trackers = []
700 for index, engineid in enumerate(targets):
703 for index, engineid in enumerate(targets):
701 partition = mapObject.getPartition(seq, index, nparts)
704 partition = mapObject.getPartition(seq, index, nparts)
702 if flatten and len(partition) == 1:
705 if flatten and len(partition) == 1:
703 ns = {key: partition[0]}
706 ns = {key: partition[0]}
704 else:
707 else:
705 ns = {key: partition}
708 ns = {key: partition}
706 r = self.push(ns, block=False, track=track, targets=engineid)
709 r = self.push(ns, block=False, track=track, targets=engineid)
707 msg_ids.extend(r.msg_ids)
710 msg_ids.extend(r.msg_ids)
708 if track:
711 if track:
709 trackers.append(r._tracker)
712 trackers.append(r._tracker)
710
713
711 if track:
714 if track:
712 tracker = zmq.MessageTracker(*trackers)
715 tracker = zmq.MessageTracker(*trackers)
713 else:
716 else:
714 tracker = None
717 tracker = None
715
718
716 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
719 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
717 if block:
720 if block:
718 r.wait()
721 r.wait()
719 else:
722 else:
720 return r
723 return r
721
724
722 @sync_results
725 @sync_results
723 @save_ids
726 @save_ids
724 def gather(self, key, dist='b', targets=None, block=None):
727 def gather(self, key, dist='b', targets=None, block=None):
725 """
728 """
726 Gather a partitioned sequence on a set of engines as a single local seq.
729 Gather a partitioned sequence on a set of engines as a single local seq.
727 """
730 """
728 block = block if block is not None else self.block
731 block = block if block is not None else self.block
729 targets = targets if targets is not None else self.targets
732 targets = targets if targets is not None else self.targets
730 mapObject = Map.dists[dist]()
733 mapObject = Map.dists[dist]()
731 msg_ids = []
734 msg_ids = []
732
735
736 # construct integer ID list:
737 targets = self.client._build_targets(targets)[1]
738
733 for index, engineid in enumerate(targets):
739 for index, engineid in enumerate(targets):
734 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
740 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
735
741
736 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
742 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
737
743
738 if block:
744 if block:
739 try:
745 try:
740 return r.get()
746 return r.get()
741 except KeyboardInterrupt:
747 except KeyboardInterrupt:
742 pass
748 pass
743 return r
749 return r
744
750
745 def __getitem__(self, key):
751 def __getitem__(self, key):
746 return self.get(key)
752 return self.get(key)
747
753
748 def __setitem__(self,key, value):
754 def __setitem__(self,key, value):
749 self.update({key:value})
755 self.update({key:value})
750
756
751 def clear(self, targets=None, block=False):
757 def clear(self, targets=None, block=False):
752 """Clear the remote namespaces on my engines."""
758 """Clear the remote namespaces on my engines."""
753 block = block if block is not None else self.block
759 block = block if block is not None else self.block
754 targets = targets if targets is not None else self.targets
760 targets = targets if targets is not None else self.targets
755 return self.client.clear(targets=targets, block=block)
761 return self.client.clear(targets=targets, block=block)
756
762
757 def kill(self, targets=None, block=True):
763 def kill(self, targets=None, block=True):
758 """Kill my engines."""
764 """Kill my engines."""
759 block = block if block is not None else self.block
765 block = block if block is not None else self.block
760 targets = targets if targets is not None else self.targets
766 targets = targets if targets is not None else self.targets
761 return self.client.kill(targets=targets, block=block)
767 return self.client.kill(targets=targets, block=block)
762
768
763 #----------------------------------------
769 #----------------------------------------
764 # activate for %px,%autopx magics
770 # activate for %px,%autopx magics
765 #----------------------------------------
771 #----------------------------------------
766 def activate(self):
772 def activate(self):
767 """Make this `View` active for parallel magic commands.
773 """Make this `View` active for parallel magic commands.
768
774
769 IPython has a magic command syntax to work with `MultiEngineClient` objects.
775 IPython has a magic command syntax to work with `MultiEngineClient` objects.
770 In a given IPython session there is a single active one. While
776 In a given IPython session there is a single active one. While
771 there can be many `Views` created and used by the user,
777 there can be many `Views` created and used by the user,
772 there is only one active one. The active `View` is used whenever
778 there is only one active one. The active `View` is used whenever
773 the magic commands %px and %autopx are used.
779 the magic commands %px and %autopx are used.
774
780
775 The activate() method is called on a given `View` to make it
781 The activate() method is called on a given `View` to make it
776 active. Once this has been done, the magic commands can be used.
782 active. Once this has been done, the magic commands can be used.
777 """
783 """
778
784
779 try:
785 try:
780 # This is injected into __builtins__.
786 # This is injected into __builtins__.
781 ip = get_ipython()
787 ip = get_ipython()
782 except NameError:
788 except NameError:
783 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
789 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
784 else:
790 else:
785 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
791 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
786 if pmagic is None:
792 if pmagic is None:
787 ip.magic_load_ext('parallelmagic')
793 ip.magic_load_ext('parallelmagic')
788 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
794 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
789
795
790 pmagic.active_view = self
796 pmagic.active_view = self
791
797
792
798
793 @skip_doctest
799 @skip_doctest
794 class LoadBalancedView(View):
800 class LoadBalancedView(View):
795 """An load-balancing View that only executes via the Task scheduler.
801 """An load-balancing View that only executes via the Task scheduler.
796
802
797 Load-balanced views can be created with the client's `view` method:
803 Load-balanced views can be created with the client's `view` method:
798
804
799 >>> v = client.load_balanced_view()
805 >>> v = client.load_balanced_view()
800
806
801 or targets can be specified, to restrict the potential destinations:
807 or targets can be specified, to restrict the potential destinations:
802
808
803 >>> v = client.client.load_balanced_view([1,3])
809 >>> v = client.client.load_balanced_view([1,3])
804
810
805 which would restrict loadbalancing to between engines 1 and 3.
811 which would restrict loadbalancing to between engines 1 and 3.
806
812
807 """
813 """
808
814
809 follow=Any()
815 follow=Any()
810 after=Any()
816 after=Any()
811 timeout=CFloat()
817 timeout=CFloat()
812 retries = Integer(0)
818 retries = Integer(0)
813
819
814 _task_scheme = Any()
820 _task_scheme = Any()
815 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
821 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
816
822
817 def __init__(self, client=None, socket=None, **flags):
823 def __init__(self, client=None, socket=None, **flags):
818 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
824 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
819 self._task_scheme=client._task_scheme
825 self._task_scheme=client._task_scheme
820
826
821 def _validate_dependency(self, dep):
827 def _validate_dependency(self, dep):
822 """validate a dependency.
828 """validate a dependency.
823
829
824 For use in `set_flags`.
830 For use in `set_flags`.
825 """
831 """
826 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
832 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
827 return True
833 return True
828 elif isinstance(dep, (list,set, tuple)):
834 elif isinstance(dep, (list,set, tuple)):
829 for d in dep:
835 for d in dep:
830 if not isinstance(d, (basestring, AsyncResult)):
836 if not isinstance(d, (basestring, AsyncResult)):
831 return False
837 return False
832 elif isinstance(dep, dict):
838 elif isinstance(dep, dict):
833 if set(dep.keys()) != set(Dependency().as_dict().keys()):
839 if set(dep.keys()) != set(Dependency().as_dict().keys()):
834 return False
840 return False
835 if not isinstance(dep['msg_ids'], list):
841 if not isinstance(dep['msg_ids'], list):
836 return False
842 return False
837 for d in dep['msg_ids']:
843 for d in dep['msg_ids']:
838 if not isinstance(d, basestring):
844 if not isinstance(d, basestring):
839 return False
845 return False
840 else:
846 else:
841 return False
847 return False
842
848
843 return True
849 return True
844
850
845 def _render_dependency(self, dep):
851 def _render_dependency(self, dep):
846 """helper for building jsonable dependencies from various input forms."""
852 """helper for building jsonable dependencies from various input forms."""
847 if isinstance(dep, Dependency):
853 if isinstance(dep, Dependency):
848 return dep.as_dict()
854 return dep.as_dict()
849 elif isinstance(dep, AsyncResult):
855 elif isinstance(dep, AsyncResult):
850 return dep.msg_ids
856 return dep.msg_ids
851 elif dep is None:
857 elif dep is None:
852 return []
858 return []
853 else:
859 else:
854 # pass to Dependency constructor
860 # pass to Dependency constructor
855 return list(Dependency(dep))
861 return list(Dependency(dep))
856
862
857 def set_flags(self, **kwargs):
863 def set_flags(self, **kwargs):
858 """set my attribute flags by keyword.
864 """set my attribute flags by keyword.
859
865
860 A View is a wrapper for the Client's apply method, but with attributes
866 A View is a wrapper for the Client's apply method, but with attributes
861 that specify keyword arguments, those attributes can be set by keyword
867 that specify keyword arguments, those attributes can be set by keyword
862 argument with this method.
868 argument with this method.
863
869
864 Parameters
870 Parameters
865 ----------
871 ----------
866
872
867 block : bool
873 block : bool
868 whether to wait for results
874 whether to wait for results
869 track : bool
875 track : bool
870 whether to create a MessageTracker to allow the user to
876 whether to create a MessageTracker to allow the user to
871 safely edit after arrays and buffers during non-copying
877 safely edit after arrays and buffers during non-copying
872 sends.
878 sends.
873
879
874 after : Dependency or collection of msg_ids
880 after : Dependency or collection of msg_ids
875 Only for load-balanced execution (targets=None)
881 Only for load-balanced execution (targets=None)
876 Specify a list of msg_ids as a time-based dependency.
882 Specify a list of msg_ids as a time-based dependency.
877 This job will only be run *after* the dependencies
883 This job will only be run *after* the dependencies
878 have been met.
884 have been met.
879
885
880 follow : Dependency or collection of msg_ids
886 follow : Dependency or collection of msg_ids
881 Only for load-balanced execution (targets=None)
887 Only for load-balanced execution (targets=None)
882 Specify a list of msg_ids as a location-based dependency.
888 Specify a list of msg_ids as a location-based dependency.
883 This job will only be run on an engine where this dependency
889 This job will only be run on an engine where this dependency
884 is met.
890 is met.
885
891
886 timeout : float/int or None
892 timeout : float/int or None
887 Only for load-balanced execution (targets=None)
893 Only for load-balanced execution (targets=None)
888 Specify an amount of time (in seconds) for the scheduler to
894 Specify an amount of time (in seconds) for the scheduler to
889 wait for dependencies to be met before failing with a
895 wait for dependencies to be met before failing with a
890 DependencyTimeout.
896 DependencyTimeout.
891
897
892 retries : int
898 retries : int
893 Number of times a task will be retried on failure.
899 Number of times a task will be retried on failure.
894 """
900 """
895
901
896 super(LoadBalancedView, self).set_flags(**kwargs)
902 super(LoadBalancedView, self).set_flags(**kwargs)
897 for name in ('follow', 'after'):
903 for name in ('follow', 'after'):
898 if name in kwargs:
904 if name in kwargs:
899 value = kwargs[name]
905 value = kwargs[name]
900 if self._validate_dependency(value):
906 if self._validate_dependency(value):
901 setattr(self, name, value)
907 setattr(self, name, value)
902 else:
908 else:
903 raise ValueError("Invalid dependency: %r"%value)
909 raise ValueError("Invalid dependency: %r"%value)
904 if 'timeout' in kwargs:
910 if 'timeout' in kwargs:
905 t = kwargs['timeout']
911 t = kwargs['timeout']
906 if not isinstance(t, (int, long, float, type(None))):
912 if not isinstance(t, (int, long, float, type(None))):
907 raise TypeError("Invalid type for timeout: %r"%type(t))
913 raise TypeError("Invalid type for timeout: %r"%type(t))
908 if t is not None:
914 if t is not None:
909 if t < 0:
915 if t < 0:
910 raise ValueError("Invalid timeout: %s"%t)
916 raise ValueError("Invalid timeout: %s"%t)
911 self.timeout = t
917 self.timeout = t
912
918
913 @sync_results
919 @sync_results
914 @save_ids
920 @save_ids
915 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
921 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
916 after=None, follow=None, timeout=None,
922 after=None, follow=None, timeout=None,
917 targets=None, retries=None):
923 targets=None, retries=None):
918 """calls f(*args, **kwargs) on a remote engine, returning the result.
924 """calls f(*args, **kwargs) on a remote engine, returning the result.
919
925
920 This method temporarily sets all of `apply`'s flags for a single call.
926 This method temporarily sets all of `apply`'s flags for a single call.
921
927
922 Parameters
928 Parameters
923 ----------
929 ----------
924
930
925 f : callable
931 f : callable
926
932
927 args : list [default: empty]
933 args : list [default: empty]
928
934
929 kwargs : dict [default: empty]
935 kwargs : dict [default: empty]
930
936
931 block : bool [default: self.block]
937 block : bool [default: self.block]
932 whether to block
938 whether to block
933 track : bool [default: self.track]
939 track : bool [default: self.track]
934 whether to ask zmq to track the message, for safe non-copying sends
940 whether to ask zmq to track the message, for safe non-copying sends
935
941
936 !!!!!! TODO: THE REST HERE !!!!
942 !!!!!! TODO: THE REST HERE !!!!
937
943
938 Returns
944 Returns
939 -------
945 -------
940
946
941 if self.block is False:
947 if self.block is False:
942 returns AsyncResult
948 returns AsyncResult
943 else:
949 else:
944 returns actual result of f(*args, **kwargs) on the engine(s)
950 returns actual result of f(*args, **kwargs) on the engine(s)
945 This will be a list of self.targets is also a list (even length 1), or
951 This will be a list of self.targets is also a list (even length 1), or
946 the single result if self.targets is an integer engine id
952 the single result if self.targets is an integer engine id
947 """
953 """
948
954
949 # validate whether we can run
955 # validate whether we can run
950 if self._socket.closed:
956 if self._socket.closed:
951 msg = "Task farming is disabled"
957 msg = "Task farming is disabled"
952 if self._task_scheme == 'pure':
958 if self._task_scheme == 'pure':
953 msg += " because the pure ZMQ scheduler cannot handle"
959 msg += " because the pure ZMQ scheduler cannot handle"
954 msg += " disappearing engines."
960 msg += " disappearing engines."
955 raise RuntimeError(msg)
961 raise RuntimeError(msg)
956
962
957 if self._task_scheme == 'pure':
963 if self._task_scheme == 'pure':
958 # pure zmq scheme doesn't support extra features
964 # pure zmq scheme doesn't support extra features
959 msg = "Pure ZMQ scheduler doesn't support the following flags:"
965 msg = "Pure ZMQ scheduler doesn't support the following flags:"
960 "follow, after, retries, targets, timeout"
966 "follow, after, retries, targets, timeout"
961 if (follow or after or retries or targets or timeout):
967 if (follow or after or retries or targets or timeout):
962 # hard fail on Scheduler flags
968 # hard fail on Scheduler flags
963 raise RuntimeError(msg)
969 raise RuntimeError(msg)
964 if isinstance(f, dependent):
970 if isinstance(f, dependent):
965 # soft warn on functional dependencies
971 # soft warn on functional dependencies
966 warnings.warn(msg, RuntimeWarning)
972 warnings.warn(msg, RuntimeWarning)
967
973
968 # build args
974 # build args
969 args = [] if args is None else args
975 args = [] if args is None else args
970 kwargs = {} if kwargs is None else kwargs
976 kwargs = {} if kwargs is None else kwargs
971 block = self.block if block is None else block
977 block = self.block if block is None else block
972 track = self.track if track is None else track
978 track = self.track if track is None else track
973 after = self.after if after is None else after
979 after = self.after if after is None else after
974 retries = self.retries if retries is None else retries
980 retries = self.retries if retries is None else retries
975 follow = self.follow if follow is None else follow
981 follow = self.follow if follow is None else follow
976 timeout = self.timeout if timeout is None else timeout
982 timeout = self.timeout if timeout is None else timeout
977 targets = self.targets if targets is None else targets
983 targets = self.targets if targets is None else targets
978
984
979 if not isinstance(retries, int):
985 if not isinstance(retries, int):
980 raise TypeError('retries must be int, not %r'%type(retries))
986 raise TypeError('retries must be int, not %r'%type(retries))
981
987
982 if targets is None:
988 if targets is None:
983 idents = []
989 idents = []
984 else:
990 else:
985 idents = self.client._build_targets(targets)[0]
991 idents = self.client._build_targets(targets)[0]
986 # ensure *not* bytes
992 # ensure *not* bytes
987 idents = [ ident.decode() for ident in idents ]
993 idents = [ ident.decode() for ident in idents ]
988
994
989 after = self._render_dependency(after)
995 after = self._render_dependency(after)
990 follow = self._render_dependency(follow)
996 follow = self._render_dependency(follow)
991 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
997 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
992
998
993 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
999 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
994 subheader=subheader)
1000 subheader=subheader)
995 tracker = None if track is False else msg['tracker']
1001 tracker = None if track is False else msg['tracker']
996
1002
997 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1003 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
998
1004
999 if block:
1005 if block:
1000 try:
1006 try:
1001 return ar.get()
1007 return ar.get()
1002 except KeyboardInterrupt:
1008 except KeyboardInterrupt:
1003 pass
1009 pass
1004 return ar
1010 return ar
1005
1011
1006 @spin_after
1012 @spin_after
1007 @save_ids
1013 @save_ids
1008 def map(self, f, *sequences, **kwargs):
1014 def map(self, f, *sequences, **kwargs):
1009 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1015 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1010
1016
1011 Parallel version of builtin `map`, load-balanced by this View.
1017 Parallel version of builtin `map`, load-balanced by this View.
1012
1018
1013 `block`, and `chunksize` can be specified by keyword only.
1019 `block`, and `chunksize` can be specified by keyword only.
1014
1020
1015 Each `chunksize` elements will be a separate task, and will be
1021 Each `chunksize` elements will be a separate task, and will be
1016 load-balanced. This lets individual elements be available for iteration
1022 load-balanced. This lets individual elements be available for iteration
1017 as soon as they arrive.
1023 as soon as they arrive.
1018
1024
1019 Parameters
1025 Parameters
1020 ----------
1026 ----------
1021
1027
1022 f : callable
1028 f : callable
1023 function to be mapped
1029 function to be mapped
1024 *sequences: one or more sequences of matching length
1030 *sequences: one or more sequences of matching length
1025 the sequences to be distributed and passed to `f`
1031 the sequences to be distributed and passed to `f`
1026 block : bool [default self.block]
1032 block : bool [default self.block]
1027 whether to wait for the result or not
1033 whether to wait for the result or not
1028 track : bool
1034 track : bool
1029 whether to create a MessageTracker to allow the user to
1035 whether to create a MessageTracker to allow the user to
1030 safely edit after arrays and buffers during non-copying
1036 safely edit after arrays and buffers during non-copying
1031 sends.
1037 sends.
1032 chunksize : int [default 1]
1038 chunksize : int [default 1]
1033 how many elements should be in each task.
1039 how many elements should be in each task.
1034 ordered : bool [default True]
1040 ordered : bool [default True]
1035 Whether the results should be gathered as they arrive, or enforce
1041 Whether the results should be gathered as they arrive, or enforce
1036 the order of submission.
1042 the order of submission.
1037
1043
1038 Only applies when iterating through AsyncMapResult as results arrive.
1044 Only applies when iterating through AsyncMapResult as results arrive.
1039 Has no effect when block=True.
1045 Has no effect when block=True.
1040
1046
1041 Returns
1047 Returns
1042 -------
1048 -------
1043
1049
1044 if block=False:
1050 if block=False:
1045 AsyncMapResult
1051 AsyncMapResult
1046 An object like AsyncResult, but which reassembles the sequence of results
1052 An object like AsyncResult, but which reassembles the sequence of results
1047 into a single list. AsyncMapResults can be iterated through before all
1053 into a single list. AsyncMapResults can be iterated through before all
1048 results are complete.
1054 results are complete.
1049 else:
1055 else:
1050 the result of map(f,*sequences)
1056 the result of map(f,*sequences)
1051
1057
1052 """
1058 """
1053
1059
1054 # default
1060 # default
1055 block = kwargs.get('block', self.block)
1061 block = kwargs.get('block', self.block)
1056 chunksize = kwargs.get('chunksize', 1)
1062 chunksize = kwargs.get('chunksize', 1)
1057 ordered = kwargs.get('ordered', True)
1063 ordered = kwargs.get('ordered', True)
1058
1064
1059 keyset = set(kwargs.keys())
1065 keyset = set(kwargs.keys())
1060 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1066 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1061 if extra_keys:
1067 if extra_keys:
1062 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1068 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1063
1069
1064 assert len(sequences) > 0, "must have some sequences to map onto!"
1070 assert len(sequences) > 0, "must have some sequences to map onto!"
1065
1071
1066 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1072 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1067 return pf.map(*sequences)
1073 return pf.map(*sequences)
1068
1074
1069 __all__ = ['LoadBalancedView', 'DirectView']
1075 __all__ = ['LoadBalancedView', 'DirectView']
@@ -1,544 +1,553 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21 from tempfile import mktemp
21 from tempfile import mktemp
22 from StringIO import StringIO
22 from StringIO import StringIO
23
23
24 import zmq
24 import zmq
25 from nose import SkipTest
25 from nose import SkipTest
26
26
27 from IPython.testing import decorators as dec
27 from IPython.testing import decorators as dec
28
28
29 from IPython import parallel as pmod
29 from IPython import parallel as pmod
30 from IPython.parallel import error
30 from IPython.parallel import error
31 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
31 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
32 from IPython.parallel import DirectView
32 from IPython.parallel import DirectView
33 from IPython.parallel.util import interactive
33 from IPython.parallel.util import interactive
34
34
35 from IPython.parallel.tests import add_engines
35 from IPython.parallel.tests import add_engines
36
36
37 from .clienttest import ClusterTestCase, crash, wait, skip_without
37 from .clienttest import ClusterTestCase, crash, wait, skip_without
38
38
39 def setup():
39 def setup():
40 add_engines(3, total=True)
40 add_engines(3, total=True)
41
41
42 class TestView(ClusterTestCase):
42 class TestView(ClusterTestCase):
43
43
44 def test_z_crash_mux(self):
44 def test_z_crash_mux(self):
45 """test graceful handling of engine death (direct)"""
45 """test graceful handling of engine death (direct)"""
46 raise SkipTest("crash tests disabled, due to undesirable crash reports")
46 raise SkipTest("crash tests disabled, due to undesirable crash reports")
47 # self.add_engines(1)
47 # self.add_engines(1)
48 eid = self.client.ids[-1]
48 eid = self.client.ids[-1]
49 ar = self.client[eid].apply_async(crash)
49 ar = self.client[eid].apply_async(crash)
50 self.assertRaisesRemote(error.EngineError, ar.get, 10)
50 self.assertRaisesRemote(error.EngineError, ar.get, 10)
51 eid = ar.engine_id
51 eid = ar.engine_id
52 tic = time.time()
52 tic = time.time()
53 while eid in self.client.ids and time.time()-tic < 5:
53 while eid in self.client.ids and time.time()-tic < 5:
54 time.sleep(.01)
54 time.sleep(.01)
55 self.client.spin()
55 self.client.spin()
56 self.assertFalse(eid in self.client.ids, "Engine should have died")
56 self.assertFalse(eid in self.client.ids, "Engine should have died")
57
57
58 def test_push_pull(self):
58 def test_push_pull(self):
59 """test pushing and pulling"""
59 """test pushing and pulling"""
60 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
60 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
61 t = self.client.ids[-1]
61 t = self.client.ids[-1]
62 v = self.client[t]
62 v = self.client[t]
63 push = v.push
63 push = v.push
64 pull = v.pull
64 pull = v.pull
65 v.block=True
65 v.block=True
66 nengines = len(self.client)
66 nengines = len(self.client)
67 push({'data':data})
67 push({'data':data})
68 d = pull('data')
68 d = pull('data')
69 self.assertEquals(d, data)
69 self.assertEquals(d, data)
70 self.client[:].push({'data':data})
70 self.client[:].push({'data':data})
71 d = self.client[:].pull('data', block=True)
71 d = self.client[:].pull('data', block=True)
72 self.assertEquals(d, nengines*[data])
72 self.assertEquals(d, nengines*[data])
73 ar = push({'data':data}, block=False)
73 ar = push({'data':data}, block=False)
74 self.assertTrue(isinstance(ar, AsyncResult))
74 self.assertTrue(isinstance(ar, AsyncResult))
75 r = ar.get()
75 r = ar.get()
76 ar = self.client[:].pull('data', block=False)
76 ar = self.client[:].pull('data', block=False)
77 self.assertTrue(isinstance(ar, AsyncResult))
77 self.assertTrue(isinstance(ar, AsyncResult))
78 r = ar.get()
78 r = ar.get()
79 self.assertEquals(r, nengines*[data])
79 self.assertEquals(r, nengines*[data])
80 self.client[:].push(dict(a=10,b=20))
80 self.client[:].push(dict(a=10,b=20))
81 r = self.client[:].pull(('a','b'), block=True)
81 r = self.client[:].pull(('a','b'), block=True)
82 self.assertEquals(r, nengines*[[10,20]])
82 self.assertEquals(r, nengines*[[10,20]])
83
83
84 def test_push_pull_function(self):
84 def test_push_pull_function(self):
85 "test pushing and pulling functions"
85 "test pushing and pulling functions"
86 def testf(x):
86 def testf(x):
87 return 2.0*x
87 return 2.0*x
88
88
89 t = self.client.ids[-1]
89 t = self.client.ids[-1]
90 v = self.client[t]
90 v = self.client[t]
91 v.block=True
91 v.block=True
92 push = v.push
92 push = v.push
93 pull = v.pull
93 pull = v.pull
94 execute = v.execute
94 execute = v.execute
95 push({'testf':testf})
95 push({'testf':testf})
96 r = pull('testf')
96 r = pull('testf')
97 self.assertEqual(r(1.0), testf(1.0))
97 self.assertEqual(r(1.0), testf(1.0))
98 execute('r = testf(10)')
98 execute('r = testf(10)')
99 r = pull('r')
99 r = pull('r')
100 self.assertEquals(r, testf(10))
100 self.assertEquals(r, testf(10))
101 ar = self.client[:].push({'testf':testf}, block=False)
101 ar = self.client[:].push({'testf':testf}, block=False)
102 ar.get()
102 ar.get()
103 ar = self.client[:].pull('testf', block=False)
103 ar = self.client[:].pull('testf', block=False)
104 rlist = ar.get()
104 rlist = ar.get()
105 for r in rlist:
105 for r in rlist:
106 self.assertEqual(r(1.0), testf(1.0))
106 self.assertEqual(r(1.0), testf(1.0))
107 execute("def g(x): return x*x")
107 execute("def g(x): return x*x")
108 r = pull(('testf','g'))
108 r = pull(('testf','g'))
109 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
109 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
110
110
111 def test_push_function_globals(self):
111 def test_push_function_globals(self):
112 """test that pushed functions have access to globals"""
112 """test that pushed functions have access to globals"""
113 @interactive
113 @interactive
114 def geta():
114 def geta():
115 return a
115 return a
116 # self.add_engines(1)
116 # self.add_engines(1)
117 v = self.client[-1]
117 v = self.client[-1]
118 v.block=True
118 v.block=True
119 v['f'] = geta
119 v['f'] = geta
120 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
120 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
121 v.execute('a=5')
121 v.execute('a=5')
122 v.execute('b=f()')
122 v.execute('b=f()')
123 self.assertEquals(v['b'], 5)
123 self.assertEquals(v['b'], 5)
124
124
125 def test_push_function_defaults(self):
125 def test_push_function_defaults(self):
126 """test that pushed functions preserve default args"""
126 """test that pushed functions preserve default args"""
127 def echo(a=10):
127 def echo(a=10):
128 return a
128 return a
129 v = self.client[-1]
129 v = self.client[-1]
130 v.block=True
130 v.block=True
131 v['f'] = echo
131 v['f'] = echo
132 v.execute('b=f()')
132 v.execute('b=f()')
133 self.assertEquals(v['b'], 10)
133 self.assertEquals(v['b'], 10)
134
134
135 def test_get_result(self):
135 def test_get_result(self):
136 """test getting results from the Hub."""
136 """test getting results from the Hub."""
137 c = pmod.Client(profile='iptest')
137 c = pmod.Client(profile='iptest')
138 # self.add_engines(1)
138 # self.add_engines(1)
139 t = c.ids[-1]
139 t = c.ids[-1]
140 v = c[t]
140 v = c[t]
141 v2 = self.client[t]
141 v2 = self.client[t]
142 ar = v.apply_async(wait, 1)
142 ar = v.apply_async(wait, 1)
143 # give the monitor time to notice the message
143 # give the monitor time to notice the message
144 time.sleep(.25)
144 time.sleep(.25)
145 ahr = v2.get_result(ar.msg_ids)
145 ahr = v2.get_result(ar.msg_ids)
146 self.assertTrue(isinstance(ahr, AsyncHubResult))
146 self.assertTrue(isinstance(ahr, AsyncHubResult))
147 self.assertEquals(ahr.get(), ar.get())
147 self.assertEquals(ahr.get(), ar.get())
148 ar2 = v2.get_result(ar.msg_ids)
148 ar2 = v2.get_result(ar.msg_ids)
149 self.assertFalse(isinstance(ar2, AsyncHubResult))
149 self.assertFalse(isinstance(ar2, AsyncHubResult))
150 c.spin()
150 c.spin()
151 c.close()
151 c.close()
152
152
153 def test_run_newline(self):
153 def test_run_newline(self):
154 """test that run appends newline to files"""
154 """test that run appends newline to files"""
155 tmpfile = mktemp()
155 tmpfile = mktemp()
156 with open(tmpfile, 'w') as f:
156 with open(tmpfile, 'w') as f:
157 f.write("""def g():
157 f.write("""def g():
158 return 5
158 return 5
159 """)
159 """)
160 v = self.client[-1]
160 v = self.client[-1]
161 v.run(tmpfile, block=True)
161 v.run(tmpfile, block=True)
162 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
162 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
163
163
164 def test_apply_tracked(self):
164 def test_apply_tracked(self):
165 """test tracking for apply"""
165 """test tracking for apply"""
166 # self.add_engines(1)
166 # self.add_engines(1)
167 t = self.client.ids[-1]
167 t = self.client.ids[-1]
168 v = self.client[t]
168 v = self.client[t]
169 v.block=False
169 v.block=False
170 def echo(n=1024*1024, **kwargs):
170 def echo(n=1024*1024, **kwargs):
171 with v.temp_flags(**kwargs):
171 with v.temp_flags(**kwargs):
172 return v.apply(lambda x: x, 'x'*n)
172 return v.apply(lambda x: x, 'x'*n)
173 ar = echo(1, track=False)
173 ar = echo(1, track=False)
174 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
174 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
175 self.assertTrue(ar.sent)
175 self.assertTrue(ar.sent)
176 ar = echo(track=True)
176 ar = echo(track=True)
177 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
177 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
178 self.assertEquals(ar.sent, ar._tracker.done)
178 self.assertEquals(ar.sent, ar._tracker.done)
179 ar._tracker.wait()
179 ar._tracker.wait()
180 self.assertTrue(ar.sent)
180 self.assertTrue(ar.sent)
181
181
182 def test_push_tracked(self):
182 def test_push_tracked(self):
183 t = self.client.ids[-1]
183 t = self.client.ids[-1]
184 ns = dict(x='x'*1024*1024)
184 ns = dict(x='x'*1024*1024)
185 v = self.client[t]
185 v = self.client[t]
186 ar = v.push(ns, block=False, track=False)
186 ar = v.push(ns, block=False, track=False)
187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertTrue(ar.sent)
188 self.assertTrue(ar.sent)
189
189
190 ar = v.push(ns, block=False, track=True)
190 ar = v.push(ns, block=False, track=True)
191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
192 ar._tracker.wait()
192 ar._tracker.wait()
193 self.assertEquals(ar.sent, ar._tracker.done)
193 self.assertEquals(ar.sent, ar._tracker.done)
194 self.assertTrue(ar.sent)
194 self.assertTrue(ar.sent)
195 ar.get()
195 ar.get()
196
196
197 def test_scatter_tracked(self):
197 def test_scatter_tracked(self):
198 t = self.client.ids
198 t = self.client.ids
199 x='x'*1024*1024
199 x='x'*1024*1024
200 ar = self.client[t].scatter('x', x, block=False, track=False)
200 ar = self.client[t].scatter('x', x, block=False, track=False)
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 self.assertTrue(ar.sent)
202 self.assertTrue(ar.sent)
203
203
204 ar = self.client[t].scatter('x', x, block=False, track=True)
204 ar = self.client[t].scatter('x', x, block=False, track=True)
205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
206 self.assertEquals(ar.sent, ar._tracker.done)
206 self.assertEquals(ar.sent, ar._tracker.done)
207 ar._tracker.wait()
207 ar._tracker.wait()
208 self.assertTrue(ar.sent)
208 self.assertTrue(ar.sent)
209 ar.get()
209 ar.get()
210
210
211 def test_remote_reference(self):
211 def test_remote_reference(self):
212 v = self.client[-1]
212 v = self.client[-1]
213 v['a'] = 123
213 v['a'] = 123
214 ra = pmod.Reference('a')
214 ra = pmod.Reference('a')
215 b = v.apply_sync(lambda x: x, ra)
215 b = v.apply_sync(lambda x: x, ra)
216 self.assertEquals(b, 123)
216 self.assertEquals(b, 123)
217
217
218
218
219 def test_scatter_gather(self):
219 def test_scatter_gather(self):
220 view = self.client[:]
220 view = self.client[:]
221 seq1 = range(16)
221 seq1 = range(16)
222 view.scatter('a', seq1)
222 view.scatter('a', seq1)
223 seq2 = view.gather('a', block=True)
223 seq2 = view.gather('a', block=True)
224 self.assertEquals(seq2, seq1)
224 self.assertEquals(seq2, seq1)
225 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
225 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
226
226
227 @skip_without('numpy')
227 @skip_without('numpy')
228 def test_scatter_gather_numpy(self):
228 def test_scatter_gather_numpy(self):
229 import numpy
229 import numpy
230 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
230 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
231 view = self.client[:]
231 view = self.client[:]
232 a = numpy.arange(64)
232 a = numpy.arange(64)
233 view.scatter('a', a)
233 view.scatter('a', a)
234 b = view.gather('a', block=True)
234 b = view.gather('a', block=True)
235 assert_array_equal(b, a)
235 assert_array_equal(b, a)
236
237 def test_scatter_gather_lazy(self):
238 """scatter/gather with targets='all'"""
239 view = self.client.direct_view(targets='all')
240 x = range(64)
241 view.scatter('x', x)
242 gathered = view.gather('x', block=True)
243 self.assertEquals(gathered, x)
244
236
245
237 @dec.known_failure_py3
246 @dec.known_failure_py3
238 @skip_without('numpy')
247 @skip_without('numpy')
239 def test_push_numpy_nocopy(self):
248 def test_push_numpy_nocopy(self):
240 import numpy
249 import numpy
241 view = self.client[:]
250 view = self.client[:]
242 a = numpy.arange(64)
251 a = numpy.arange(64)
243 view['A'] = a
252 view['A'] = a
244 @interactive
253 @interactive
245 def check_writeable(x):
254 def check_writeable(x):
246 return x.flags.writeable
255 return x.flags.writeable
247
256
248 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
257 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
249 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
258 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
250
259
251 view.push(dict(B=a))
260 view.push(dict(B=a))
252 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
261 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
253 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
262 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
254
263
255 @skip_without('numpy')
264 @skip_without('numpy')
256 def test_apply_numpy(self):
265 def test_apply_numpy(self):
257 """view.apply(f, ndarray)"""
266 """view.apply(f, ndarray)"""
258 import numpy
267 import numpy
259 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
268 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
260
269
261 A = numpy.random.random((100,100))
270 A = numpy.random.random((100,100))
262 view = self.client[-1]
271 view = self.client[-1]
263 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
272 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
264 B = A.astype(dt)
273 B = A.astype(dt)
265 C = view.apply_sync(lambda x:x, B)
274 C = view.apply_sync(lambda x:x, B)
266 assert_array_equal(B,C)
275 assert_array_equal(B,C)
267
276
268 def test_map(self):
277 def test_map(self):
269 view = self.client[:]
278 view = self.client[:]
270 def f(x):
279 def f(x):
271 return x**2
280 return x**2
272 data = range(16)
281 data = range(16)
273 r = view.map_sync(f, data)
282 r = view.map_sync(f, data)
274 self.assertEquals(r, map(f, data))
283 self.assertEquals(r, map(f, data))
275
284
276 def test_map_iterable(self):
285 def test_map_iterable(self):
277 """test map on iterables (direct)"""
286 """test map on iterables (direct)"""
278 view = self.client[:]
287 view = self.client[:]
279 # 101 is prime, so it won't be evenly distributed
288 # 101 is prime, so it won't be evenly distributed
280 arr = range(101)
289 arr = range(101)
281 # ensure it will be an iterator, even in Python 3
290 # ensure it will be an iterator, even in Python 3
282 it = iter(arr)
291 it = iter(arr)
283 r = view.map_sync(lambda x:x, arr)
292 r = view.map_sync(lambda x:x, arr)
284 self.assertEquals(r, list(arr))
293 self.assertEquals(r, list(arr))
285
294
286 def test_scatterGatherNonblocking(self):
295 def test_scatterGatherNonblocking(self):
287 data = range(16)
296 data = range(16)
288 view = self.client[:]
297 view = self.client[:]
289 view.scatter('a', data, block=False)
298 view.scatter('a', data, block=False)
290 ar = view.gather('a', block=False)
299 ar = view.gather('a', block=False)
291 self.assertEquals(ar.get(), data)
300 self.assertEquals(ar.get(), data)
292
301
293 @skip_without('numpy')
302 @skip_without('numpy')
294 def test_scatter_gather_numpy_nonblocking(self):
303 def test_scatter_gather_numpy_nonblocking(self):
295 import numpy
304 import numpy
296 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
305 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
297 a = numpy.arange(64)
306 a = numpy.arange(64)
298 view = self.client[:]
307 view = self.client[:]
299 ar = view.scatter('a', a, block=False)
308 ar = view.scatter('a', a, block=False)
300 self.assertTrue(isinstance(ar, AsyncResult))
309 self.assertTrue(isinstance(ar, AsyncResult))
301 amr = view.gather('a', block=False)
310 amr = view.gather('a', block=False)
302 self.assertTrue(isinstance(amr, AsyncMapResult))
311 self.assertTrue(isinstance(amr, AsyncMapResult))
303 assert_array_equal(amr.get(), a)
312 assert_array_equal(amr.get(), a)
304
313
305 def test_execute(self):
314 def test_execute(self):
306 view = self.client[:]
315 view = self.client[:]
307 # self.client.debug=True
316 # self.client.debug=True
308 execute = view.execute
317 execute = view.execute
309 ar = execute('c=30', block=False)
318 ar = execute('c=30', block=False)
310 self.assertTrue(isinstance(ar, AsyncResult))
319 self.assertTrue(isinstance(ar, AsyncResult))
311 ar = execute('d=[0,1,2]', block=False)
320 ar = execute('d=[0,1,2]', block=False)
312 self.client.wait(ar, 1)
321 self.client.wait(ar, 1)
313 self.assertEquals(len(ar.get()), len(self.client))
322 self.assertEquals(len(ar.get()), len(self.client))
314 for c in view['c']:
323 for c in view['c']:
315 self.assertEquals(c, 30)
324 self.assertEquals(c, 30)
316
325
317 def test_abort(self):
326 def test_abort(self):
318 view = self.client[-1]
327 view = self.client[-1]
319 ar = view.execute('import time; time.sleep(1)', block=False)
328 ar = view.execute('import time; time.sleep(1)', block=False)
320 ar2 = view.apply_async(lambda : 2)
329 ar2 = view.apply_async(lambda : 2)
321 ar3 = view.apply_async(lambda : 3)
330 ar3 = view.apply_async(lambda : 3)
322 view.abort(ar2)
331 view.abort(ar2)
323 view.abort(ar3.msg_ids)
332 view.abort(ar3.msg_ids)
324 self.assertRaises(error.TaskAborted, ar2.get)
333 self.assertRaises(error.TaskAborted, ar2.get)
325 self.assertRaises(error.TaskAborted, ar3.get)
334 self.assertRaises(error.TaskAborted, ar3.get)
326
335
327 def test_abort_all(self):
336 def test_abort_all(self):
328 """view.abort() aborts all outstanding tasks"""
337 """view.abort() aborts all outstanding tasks"""
329 view = self.client[-1]
338 view = self.client[-1]
330 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
339 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
331 view.abort()
340 view.abort()
332 view.wait(timeout=5)
341 view.wait(timeout=5)
333 for ar in ars[5:]:
342 for ar in ars[5:]:
334 self.assertRaises(error.TaskAborted, ar.get)
343 self.assertRaises(error.TaskAborted, ar.get)
335
344
336 def test_temp_flags(self):
345 def test_temp_flags(self):
337 view = self.client[-1]
346 view = self.client[-1]
338 view.block=True
347 view.block=True
339 with view.temp_flags(block=False):
348 with view.temp_flags(block=False):
340 self.assertFalse(view.block)
349 self.assertFalse(view.block)
341 self.assertTrue(view.block)
350 self.assertTrue(view.block)
342
351
343 @dec.known_failure_py3
352 @dec.known_failure_py3
344 def test_importer(self):
353 def test_importer(self):
345 view = self.client[-1]
354 view = self.client[-1]
346 view.clear(block=True)
355 view.clear(block=True)
347 with view.importer:
356 with view.importer:
348 import re
357 import re
349
358
350 @interactive
359 @interactive
351 def findall(pat, s):
360 def findall(pat, s):
352 # this globals() step isn't necessary in real code
361 # this globals() step isn't necessary in real code
353 # only to prevent a closure in the test
362 # only to prevent a closure in the test
354 re = globals()['re']
363 re = globals()['re']
355 return re.findall(pat, s)
364 return re.findall(pat, s)
356
365
357 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
366 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
358
367
359 # parallel magic tests
368 # parallel magic tests
360
369
361 def test_magic_px_blocking(self):
370 def test_magic_px_blocking(self):
362 ip = get_ipython()
371 ip = get_ipython()
363 v = self.client[-1]
372 v = self.client[-1]
364 v.activate()
373 v.activate()
365 v.block=True
374 v.block=True
366
375
367 ip.magic_px('a=5')
376 ip.magic_px('a=5')
368 self.assertEquals(v['a'], 5)
377 self.assertEquals(v['a'], 5)
369 ip.magic_px('a=10')
378 ip.magic_px('a=10')
370 self.assertEquals(v['a'], 10)
379 self.assertEquals(v['a'], 10)
371 sio = StringIO()
380 sio = StringIO()
372 savestdout = sys.stdout
381 savestdout = sys.stdout
373 sys.stdout = sio
382 sys.stdout = sio
374 # just 'print a' worst ~99% of the time, but this ensures that
383 # just 'print a' worst ~99% of the time, but this ensures that
375 # the stdout message has arrived when the result is finished:
384 # the stdout message has arrived when the result is finished:
376 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
385 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
377 sys.stdout = savestdout
386 sys.stdout = savestdout
378 buf = sio.getvalue()
387 buf = sio.getvalue()
379 self.assertTrue('[stdout:' in buf, buf)
388 self.assertTrue('[stdout:' in buf, buf)
380 self.assertTrue(buf.rstrip().endswith('10'))
389 self.assertTrue(buf.rstrip().endswith('10'))
381 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
390 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
382
391
383 def test_magic_px_nonblocking(self):
392 def test_magic_px_nonblocking(self):
384 ip = get_ipython()
393 ip = get_ipython()
385 v = self.client[-1]
394 v = self.client[-1]
386 v.activate()
395 v.activate()
387 v.block=False
396 v.block=False
388
397
389 ip.magic_px('a=5')
398 ip.magic_px('a=5')
390 self.assertEquals(v['a'], 5)
399 self.assertEquals(v['a'], 5)
391 ip.magic_px('a=10')
400 ip.magic_px('a=10')
392 self.assertEquals(v['a'], 10)
401 self.assertEquals(v['a'], 10)
393 sio = StringIO()
402 sio = StringIO()
394 savestdout = sys.stdout
403 savestdout = sys.stdout
395 sys.stdout = sio
404 sys.stdout = sio
396 ip.magic_px('print a')
405 ip.magic_px('print a')
397 sys.stdout = savestdout
406 sys.stdout = savestdout
398 buf = sio.getvalue()
407 buf = sio.getvalue()
399 self.assertFalse('[stdout:%i]'%v.targets in buf)
408 self.assertFalse('[stdout:%i]'%v.targets in buf)
400 ip.magic_px('1/0')
409 ip.magic_px('1/0')
401 ar = v.get_result(-1)
410 ar = v.get_result(-1)
402 self.assertRaisesRemote(ZeroDivisionError, ar.get)
411 self.assertRaisesRemote(ZeroDivisionError, ar.get)
403
412
404 def test_magic_autopx_blocking(self):
413 def test_magic_autopx_blocking(self):
405 ip = get_ipython()
414 ip = get_ipython()
406 v = self.client[-1]
415 v = self.client[-1]
407 v.activate()
416 v.activate()
408 v.block=True
417 v.block=True
409
418
410 sio = StringIO()
419 sio = StringIO()
411 savestdout = sys.stdout
420 savestdout = sys.stdout
412 sys.stdout = sio
421 sys.stdout = sio
413 ip.magic_autopx()
422 ip.magic_autopx()
414 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
423 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
415 ip.run_cell('print b')
424 ip.run_cell('print b')
416 ip.run_cell("b/c")
425 ip.run_cell("b/c")
417 ip.run_code(compile('b*=2', '', 'single'))
426 ip.run_code(compile('b*=2', '', 'single'))
418 ip.magic_autopx()
427 ip.magic_autopx()
419 sys.stdout = savestdout
428 sys.stdout = savestdout
420 output = sio.getvalue().strip()
429 output = sio.getvalue().strip()
421 self.assertTrue(output.startswith('%autopx enabled'))
430 self.assertTrue(output.startswith('%autopx enabled'))
422 self.assertTrue(output.endswith('%autopx disabled'))
431 self.assertTrue(output.endswith('%autopx disabled'))
423 self.assertTrue('RemoteError: ZeroDivisionError' in output)
432 self.assertTrue('RemoteError: ZeroDivisionError' in output)
424 ar = v.get_result(-2)
433 ar = v.get_result(-2)
425 self.assertEquals(v['a'], 5)
434 self.assertEquals(v['a'], 5)
426 self.assertEquals(v['b'], 20)
435 self.assertEquals(v['b'], 20)
427 self.assertRaisesRemote(ZeroDivisionError, ar.get)
436 self.assertRaisesRemote(ZeroDivisionError, ar.get)
428
437
429 def test_magic_autopx_nonblocking(self):
438 def test_magic_autopx_nonblocking(self):
430 ip = get_ipython()
439 ip = get_ipython()
431 v = self.client[-1]
440 v = self.client[-1]
432 v.activate()
441 v.activate()
433 v.block=False
442 v.block=False
434
443
435 sio = StringIO()
444 sio = StringIO()
436 savestdout = sys.stdout
445 savestdout = sys.stdout
437 sys.stdout = sio
446 sys.stdout = sio
438 ip.magic_autopx()
447 ip.magic_autopx()
439 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
448 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
440 ip.run_cell('print b')
449 ip.run_cell('print b')
441 ip.run_cell("b/c")
450 ip.run_cell("b/c")
442 ip.run_code(compile('b*=2', '', 'single'))
451 ip.run_code(compile('b*=2', '', 'single'))
443 ip.magic_autopx()
452 ip.magic_autopx()
444 sys.stdout = savestdout
453 sys.stdout = savestdout
445 output = sio.getvalue().strip()
454 output = sio.getvalue().strip()
446 self.assertTrue(output.startswith('%autopx enabled'))
455 self.assertTrue(output.startswith('%autopx enabled'))
447 self.assertTrue(output.endswith('%autopx disabled'))
456 self.assertTrue(output.endswith('%autopx disabled'))
448 self.assertFalse('ZeroDivisionError' in output)
457 self.assertFalse('ZeroDivisionError' in output)
449 ar = v.get_result(-2)
458 ar = v.get_result(-2)
450 self.assertEquals(v['a'], 5)
459 self.assertEquals(v['a'], 5)
451 self.assertEquals(v['b'], 20)
460 self.assertEquals(v['b'], 20)
452 self.assertRaisesRemote(ZeroDivisionError, ar.get)
461 self.assertRaisesRemote(ZeroDivisionError, ar.get)
453
462
454 def test_magic_result(self):
463 def test_magic_result(self):
455 ip = get_ipython()
464 ip = get_ipython()
456 v = self.client[-1]
465 v = self.client[-1]
457 v.activate()
466 v.activate()
458 v['a'] = 111
467 v['a'] = 111
459 ra = v['a']
468 ra = v['a']
460
469
461 ar = ip.magic_result()
470 ar = ip.magic_result()
462 self.assertEquals(ar.msg_ids, [v.history[-1]])
471 self.assertEquals(ar.msg_ids, [v.history[-1]])
463 self.assertEquals(ar.get(), 111)
472 self.assertEquals(ar.get(), 111)
464 ar = ip.magic_result('-2')
473 ar = ip.magic_result('-2')
465 self.assertEquals(ar.msg_ids, [v.history[-2]])
474 self.assertEquals(ar.msg_ids, [v.history[-2]])
466
475
467 def test_unicode_execute(self):
476 def test_unicode_execute(self):
468 """test executing unicode strings"""
477 """test executing unicode strings"""
469 v = self.client[-1]
478 v = self.client[-1]
470 v.block=True
479 v.block=True
471 if sys.version_info[0] >= 3:
480 if sys.version_info[0] >= 3:
472 code="a='é'"
481 code="a='é'"
473 else:
482 else:
474 code=u"a=u'é'"
483 code=u"a=u'é'"
475 v.execute(code)
484 v.execute(code)
476 self.assertEquals(v['a'], u'é')
485 self.assertEquals(v['a'], u'é')
477
486
478 def test_unicode_apply_result(self):
487 def test_unicode_apply_result(self):
479 """test unicode apply results"""
488 """test unicode apply results"""
480 v = self.client[-1]
489 v = self.client[-1]
481 r = v.apply_sync(lambda : u'é')
490 r = v.apply_sync(lambda : u'é')
482 self.assertEquals(r, u'é')
491 self.assertEquals(r, u'é')
483
492
484 def test_unicode_apply_arg(self):
493 def test_unicode_apply_arg(self):
485 """test passing unicode arguments to apply"""
494 """test passing unicode arguments to apply"""
486 v = self.client[-1]
495 v = self.client[-1]
487
496
488 @interactive
497 @interactive
489 def check_unicode(a, check):
498 def check_unicode(a, check):
490 assert isinstance(a, unicode), "%r is not unicode"%a
499 assert isinstance(a, unicode), "%r is not unicode"%a
491 assert isinstance(check, bytes), "%r is not bytes"%check
500 assert isinstance(check, bytes), "%r is not bytes"%check
492 assert a.encode('utf8') == check, "%s != %s"%(a,check)
501 assert a.encode('utf8') == check, "%s != %s"%(a,check)
493
502
494 for s in [ u'é', u'ßø®∫',u'asdf' ]:
503 for s in [ u'é', u'ßø®∫',u'asdf' ]:
495 try:
504 try:
496 v.apply_sync(check_unicode, s, s.encode('utf8'))
505 v.apply_sync(check_unicode, s, s.encode('utf8'))
497 except error.RemoteError as e:
506 except error.RemoteError as e:
498 if e.ename == 'AssertionError':
507 if e.ename == 'AssertionError':
499 self.fail(e.evalue)
508 self.fail(e.evalue)
500 else:
509 else:
501 raise e
510 raise e
502
511
503 def test_map_reference(self):
512 def test_map_reference(self):
504 """view.map(<Reference>, *seqs) should work"""
513 """view.map(<Reference>, *seqs) should work"""
505 v = self.client[:]
514 v = self.client[:]
506 v.scatter('n', self.client.ids, flatten=True)
515 v.scatter('n', self.client.ids, flatten=True)
507 v.execute("f = lambda x,y: x*y")
516 v.execute("f = lambda x,y: x*y")
508 rf = pmod.Reference('f')
517 rf = pmod.Reference('f')
509 nlist = list(range(10))
518 nlist = list(range(10))
510 mlist = nlist[::-1]
519 mlist = nlist[::-1]
511 expected = [ m*n for m,n in zip(mlist, nlist) ]
520 expected = [ m*n for m,n in zip(mlist, nlist) ]
512 result = v.map_sync(rf, mlist, nlist)
521 result = v.map_sync(rf, mlist, nlist)
513 self.assertEquals(result, expected)
522 self.assertEquals(result, expected)
514
523
515 def test_apply_reference(self):
524 def test_apply_reference(self):
516 """view.apply(<Reference>, *args) should work"""
525 """view.apply(<Reference>, *args) should work"""
517 v = self.client[:]
526 v = self.client[:]
518 v.scatter('n', self.client.ids, flatten=True)
527 v.scatter('n', self.client.ids, flatten=True)
519 v.execute("f = lambda x: n*x")
528 v.execute("f = lambda x: n*x")
520 rf = pmod.Reference('f')
529 rf = pmod.Reference('f')
521 result = v.apply_sync(rf, 5)
530 result = v.apply_sync(rf, 5)
522 expected = [ 5*id for id in self.client.ids ]
531 expected = [ 5*id for id in self.client.ids ]
523 self.assertEquals(result, expected)
532 self.assertEquals(result, expected)
524
533
525 def test_eval_reference(self):
534 def test_eval_reference(self):
526 v = self.client[self.client.ids[0]]
535 v = self.client[self.client.ids[0]]
527 v['g'] = range(5)
536 v['g'] = range(5)
528 rg = pmod.Reference('g[0]')
537 rg = pmod.Reference('g[0]')
529 echo = lambda x:x
538 echo = lambda x:x
530 self.assertEquals(v.apply_sync(echo, rg), 0)
539 self.assertEquals(v.apply_sync(echo, rg), 0)
531
540
532 def test_reference_nameerror(self):
541 def test_reference_nameerror(self):
533 v = self.client[self.client.ids[0]]
542 v = self.client[self.client.ids[0]]
534 r = pmod.Reference('elvis_has_left')
543 r = pmod.Reference('elvis_has_left')
535 echo = lambda x:x
544 echo = lambda x:x
536 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
545 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
537
546
538 def test_single_engine_map(self):
547 def test_single_engine_map(self):
539 e0 = self.client[self.client.ids[0]]
548 e0 = self.client[self.client.ids[0]]
540 r = range(5)
549 r = range(5)
541 check = [ -1*i for i in r ]
550 check = [ -1*i for i in r ]
542 result = e0.map_sync(lambda x: -1*x, r)
551 result = e0.map_sync(lambda x: -1*x, r)
543 self.assertEquals(result, check)
552 self.assertEquals(result, check)
544
553
General Comments 0
You need to be logged in to leave comments. Login now