##// END OF EJS Templates
add len(view)...
MinRK -
Show More
@@ -1,1093 +1,1100 b''
1 """Views of remote engines.
1 """Views of remote engines.
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import imp
18 import imp
19 import sys
19 import sys
20 import warnings
20 import warnings
21 from contextlib import contextmanager
21 from contextlib import contextmanager
22 from types import ModuleType
22 from types import ModuleType
23
23
24 import zmq
24 import zmq
25
25
26 from IPython.testing.skipdoctest import skip_doctest
26 from IPython.testing.skipdoctest import skip_doctest
27 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
29 )
29 )
30 from IPython.external.decorator import decorator
30 from IPython.external.decorator import decorator
31
31
32 from IPython.parallel import util
32 from IPython.parallel import util
33 from IPython.parallel.controller.dependency import Dependency, dependent
33 from IPython.parallel.controller.dependency import Dependency, dependent
34
34
35 from . import map as Map
35 from . import map as Map
36 from .asyncresult import AsyncResult, AsyncMapResult
36 from .asyncresult import AsyncResult, AsyncMapResult
37 from .remotefunction import ParallelFunction, parallel, remote, getname
37 from .remotefunction import ParallelFunction, parallel, remote, getname
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Decorators
40 # Decorators
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 @decorator
43 @decorator
44 def save_ids(f, self, *args, **kwargs):
44 def save_ids(f, self, *args, **kwargs):
45 """Keep our history and outstanding attributes up to date after a method call."""
45 """Keep our history and outstanding attributes up to date after a method call."""
46 n_previous = len(self.client.history)
46 n_previous = len(self.client.history)
47 try:
47 try:
48 ret = f(self, *args, **kwargs)
48 ret = f(self, *args, **kwargs)
49 finally:
49 finally:
50 nmsgs = len(self.client.history) - n_previous
50 nmsgs = len(self.client.history) - n_previous
51 msg_ids = self.client.history[-nmsgs:]
51 msg_ids = self.client.history[-nmsgs:]
52 self.history.extend(msg_ids)
52 self.history.extend(msg_ids)
53 map(self.outstanding.add, msg_ids)
53 map(self.outstanding.add, msg_ids)
54 return ret
54 return ret
55
55
56 @decorator
56 @decorator
57 def sync_results(f, self, *args, **kwargs):
57 def sync_results(f, self, *args, **kwargs):
58 """sync relevant results from self.client to our results attribute."""
58 """sync relevant results from self.client to our results attribute."""
59 ret = f(self, *args, **kwargs)
59 ret = f(self, *args, **kwargs)
60 delta = self.outstanding.difference(self.client.outstanding)
60 delta = self.outstanding.difference(self.client.outstanding)
61 completed = self.outstanding.intersection(delta)
61 completed = self.outstanding.intersection(delta)
62 self.outstanding = self.outstanding.difference(completed)
62 self.outstanding = self.outstanding.difference(completed)
63 for msg_id in completed:
63 for msg_id in completed:
64 self.results[msg_id] = self.client.results[msg_id]
64 self.results[msg_id] = self.client.results[msg_id]
65 return ret
65 return ret
66
66
67 @decorator
67 @decorator
68 def spin_after(f, self, *args, **kwargs):
68 def spin_after(f, self, *args, **kwargs):
69 """call spin after the method."""
69 """call spin after the method."""
70 ret = f(self, *args, **kwargs)
70 ret = f(self, *args, **kwargs)
71 self.spin()
71 self.spin()
72 return ret
72 return ret
73
73
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75 # Classes
75 # Classes
76 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
77
77
78 @skip_doctest
78 @skip_doctest
79 class View(HasTraits):
79 class View(HasTraits):
80 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
80 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
81
81
82 Don't use this class, use subclasses.
82 Don't use this class, use subclasses.
83
83
84 Methods
84 Methods
85 -------
85 -------
86
86
87 spin
87 spin
88 flushes incoming results and registration state changes
88 flushes incoming results and registration state changes
89 control methods spin, and requesting `ids` also ensures up to date
89 control methods spin, and requesting `ids` also ensures up to date
90
90
91 wait
91 wait
92 wait on one or more msg_ids
92 wait on one or more msg_ids
93
93
94 execution methods
94 execution methods
95 apply
95 apply
96 legacy: execute, run
96 legacy: execute, run
97
97
98 data movement
98 data movement
99 push, pull, scatter, gather
99 push, pull, scatter, gather
100
100
101 query methods
101 query methods
102 get_result, queue_status, purge_results, result_status
102 get_result, queue_status, purge_results, result_status
103
103
104 control methods
104 control methods
105 abort, shutdown
105 abort, shutdown
106
106
107 """
107 """
108 # flags
108 # flags
109 block=Bool(False)
109 block=Bool(False)
110 track=Bool(True)
110 track=Bool(True)
111 targets = Any()
111 targets = Any()
112
112
113 history=List()
113 history=List()
114 outstanding = Set()
114 outstanding = Set()
115 results = Dict()
115 results = Dict()
116 client = Instance('IPython.parallel.Client')
116 client = Instance('IPython.parallel.Client')
117
117
118 _socket = Instance('zmq.Socket')
118 _socket = Instance('zmq.Socket')
119 _flag_names = List(['targets', 'block', 'track'])
119 _flag_names = List(['targets', 'block', 'track'])
120 _targets = Any()
120 _targets = Any()
121 _idents = Any()
121 _idents = Any()
122
122
123 def __init__(self, client=None, socket=None, **flags):
123 def __init__(self, client=None, socket=None, **flags):
124 super(View, self).__init__(client=client, _socket=socket)
124 super(View, self).__init__(client=client, _socket=socket)
125 self.block = client.block
125 self.block = client.block
126
126
127 self.set_flags(**flags)
127 self.set_flags(**flags)
128
128
129 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
129 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
130
130
131
132 def __repr__(self):
131 def __repr__(self):
133 strtargets = str(self.targets)
132 strtargets = str(self.targets)
134 if len(strtargets) > 16:
133 if len(strtargets) > 16:
135 strtargets = strtargets[:12]+'...]'
134 strtargets = strtargets[:12]+'...]'
136 return "<%s %s>"%(self.__class__.__name__, strtargets)
135 return "<%s %s>"%(self.__class__.__name__, strtargets)
137
136
137 def __len__(self):
138 if isinstance(self.targets, list):
139 return len(self.targets)
140 elif isinstance(self.targets, int):
141 return 1
142 else:
143 return len(self.client)
144
138 def set_flags(self, **kwargs):
145 def set_flags(self, **kwargs):
139 """set my attribute flags by keyword.
146 """set my attribute flags by keyword.
140
147
141 Views determine behavior with a few attributes (`block`, `track`, etc.).
148 Views determine behavior with a few attributes (`block`, `track`, etc.).
142 These attributes can be set all at once by name with this method.
149 These attributes can be set all at once by name with this method.
143
150
144 Parameters
151 Parameters
145 ----------
152 ----------
146
153
147 block : bool
154 block : bool
148 whether to wait for results
155 whether to wait for results
149 track : bool
156 track : bool
150 whether to create a MessageTracker to allow the user to
157 whether to create a MessageTracker to allow the user to
151 safely edit after arrays and buffers during non-copying
158 safely edit after arrays and buffers during non-copying
152 sends.
159 sends.
153 """
160 """
154 for name, value in kwargs.iteritems():
161 for name, value in kwargs.iteritems():
155 if name not in self._flag_names:
162 if name not in self._flag_names:
156 raise KeyError("Invalid name: %r"%name)
163 raise KeyError("Invalid name: %r"%name)
157 else:
164 else:
158 setattr(self, name, value)
165 setattr(self, name, value)
159
166
160 @contextmanager
167 @contextmanager
161 def temp_flags(self, **kwargs):
168 def temp_flags(self, **kwargs):
162 """temporarily set flags, for use in `with` statements.
169 """temporarily set flags, for use in `with` statements.
163
170
164 See set_flags for permanent setting of flags
171 See set_flags for permanent setting of flags
165
172
166 Examples
173 Examples
167 --------
174 --------
168
175
169 >>> view.track=False
176 >>> view.track=False
170 ...
177 ...
171 >>> with view.temp_flags(track=True):
178 >>> with view.temp_flags(track=True):
172 ... ar = view.apply(dostuff, my_big_array)
179 ... ar = view.apply(dostuff, my_big_array)
173 ... ar.tracker.wait() # wait for send to finish
180 ... ar.tracker.wait() # wait for send to finish
174 >>> view.track
181 >>> view.track
175 False
182 False
176
183
177 """
184 """
178 # preflight: save flags, and set temporaries
185 # preflight: save flags, and set temporaries
179 saved_flags = {}
186 saved_flags = {}
180 for f in self._flag_names:
187 for f in self._flag_names:
181 saved_flags[f] = getattr(self, f)
188 saved_flags[f] = getattr(self, f)
182 self.set_flags(**kwargs)
189 self.set_flags(**kwargs)
183 # yield to the with-statement block
190 # yield to the with-statement block
184 try:
191 try:
185 yield
192 yield
186 finally:
193 finally:
187 # postflight: restore saved flags
194 # postflight: restore saved flags
188 self.set_flags(**saved_flags)
195 self.set_flags(**saved_flags)
189
196
190
197
191 #----------------------------------------------------------------
198 #----------------------------------------------------------------
192 # apply
199 # apply
193 #----------------------------------------------------------------
200 #----------------------------------------------------------------
194
201
195 @sync_results
202 @sync_results
196 @save_ids
203 @save_ids
197 def _really_apply(self, f, args, kwargs, block=None, **options):
204 def _really_apply(self, f, args, kwargs, block=None, **options):
198 """wrapper for client.send_apply_request"""
205 """wrapper for client.send_apply_request"""
199 raise NotImplementedError("Implement in subclasses")
206 raise NotImplementedError("Implement in subclasses")
200
207
201 def apply(self, f, *args, **kwargs):
208 def apply(self, f, *args, **kwargs):
202 """calls f(*args, **kwargs) on remote engines, returning the result.
209 """calls f(*args, **kwargs) on remote engines, returning the result.
203
210
204 This method sets all apply flags via this View's attributes.
211 This method sets all apply flags via this View's attributes.
205
212
206 if self.block is False:
213 if self.block is False:
207 returns AsyncResult
214 returns AsyncResult
208 else:
215 else:
209 returns actual result of f(*args, **kwargs)
216 returns actual result of f(*args, **kwargs)
210 """
217 """
211 return self._really_apply(f, args, kwargs)
218 return self._really_apply(f, args, kwargs)
212
219
213 def apply_async(self, f, *args, **kwargs):
220 def apply_async(self, f, *args, **kwargs):
214 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
221 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
215
222
216 returns AsyncResult
223 returns AsyncResult
217 """
224 """
218 return self._really_apply(f, args, kwargs, block=False)
225 return self._really_apply(f, args, kwargs, block=False)
219
226
220 @spin_after
227 @spin_after
221 def apply_sync(self, f, *args, **kwargs):
228 def apply_sync(self, f, *args, **kwargs):
222 """calls f(*args, **kwargs) on remote engines in a blocking manner,
229 """calls f(*args, **kwargs) on remote engines in a blocking manner,
223 returning the result.
230 returning the result.
224
231
225 returns: actual result of f(*args, **kwargs)
232 returns: actual result of f(*args, **kwargs)
226 """
233 """
227 return self._really_apply(f, args, kwargs, block=True)
234 return self._really_apply(f, args, kwargs, block=True)
228
235
229 #----------------------------------------------------------------
236 #----------------------------------------------------------------
230 # wrappers for client and control methods
237 # wrappers for client and control methods
231 #----------------------------------------------------------------
238 #----------------------------------------------------------------
232 @sync_results
239 @sync_results
233 def spin(self):
240 def spin(self):
234 """spin the client, and sync"""
241 """spin the client, and sync"""
235 self.client.spin()
242 self.client.spin()
236
243
237 @sync_results
244 @sync_results
238 def wait(self, jobs=None, timeout=-1):
245 def wait(self, jobs=None, timeout=-1):
239 """waits on one or more `jobs`, for up to `timeout` seconds.
246 """waits on one or more `jobs`, for up to `timeout` seconds.
240
247
241 Parameters
248 Parameters
242 ----------
249 ----------
243
250
244 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
251 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
245 ints are indices to self.history
252 ints are indices to self.history
246 strs are msg_ids
253 strs are msg_ids
247 default: wait on all outstanding messages
254 default: wait on all outstanding messages
248 timeout : float
255 timeout : float
249 a time in seconds, after which to give up.
256 a time in seconds, after which to give up.
250 default is -1, which means no timeout
257 default is -1, which means no timeout
251
258
252 Returns
259 Returns
253 -------
260 -------
254
261
255 True : when all msg_ids are done
262 True : when all msg_ids are done
256 False : timeout reached, some msg_ids still outstanding
263 False : timeout reached, some msg_ids still outstanding
257 """
264 """
258 if jobs is None:
265 if jobs is None:
259 jobs = self.history
266 jobs = self.history
260 return self.client.wait(jobs, timeout)
267 return self.client.wait(jobs, timeout)
261
268
262 def abort(self, jobs=None, targets=None, block=None):
269 def abort(self, jobs=None, targets=None, block=None):
263 """Abort jobs on my engines.
270 """Abort jobs on my engines.
264
271
265 Parameters
272 Parameters
266 ----------
273 ----------
267
274
268 jobs : None, str, list of strs, optional
275 jobs : None, str, list of strs, optional
269 if None: abort all jobs.
276 if None: abort all jobs.
270 else: abort specific msg_id(s).
277 else: abort specific msg_id(s).
271 """
278 """
272 block = block if block is not None else self.block
279 block = block if block is not None else self.block
273 targets = targets if targets is not None else self.targets
280 targets = targets if targets is not None else self.targets
274 jobs = jobs if jobs is not None else list(self.outstanding)
281 jobs = jobs if jobs is not None else list(self.outstanding)
275
282
276 return self.client.abort(jobs=jobs, targets=targets, block=block)
283 return self.client.abort(jobs=jobs, targets=targets, block=block)
277
284
278 def queue_status(self, targets=None, verbose=False):
285 def queue_status(self, targets=None, verbose=False):
279 """Fetch the Queue status of my engines"""
286 """Fetch the Queue status of my engines"""
280 targets = targets if targets is not None else self.targets
287 targets = targets if targets is not None else self.targets
281 return self.client.queue_status(targets=targets, verbose=verbose)
288 return self.client.queue_status(targets=targets, verbose=verbose)
282
289
283 def purge_results(self, jobs=[], targets=[]):
290 def purge_results(self, jobs=[], targets=[]):
284 """Instruct the controller to forget specific results."""
291 """Instruct the controller to forget specific results."""
285 if targets is None or targets == 'all':
292 if targets is None or targets == 'all':
286 targets = self.targets
293 targets = self.targets
287 return self.client.purge_results(jobs=jobs, targets=targets)
294 return self.client.purge_results(jobs=jobs, targets=targets)
288
295
289 def shutdown(self, targets=None, restart=False, hub=False, block=None):
296 def shutdown(self, targets=None, restart=False, hub=False, block=None):
290 """Terminates one or more engine processes, optionally including the hub.
297 """Terminates one or more engine processes, optionally including the hub.
291 """
298 """
292 block = self.block if block is None else block
299 block = self.block if block is None else block
293 if targets is None or targets == 'all':
300 if targets is None or targets == 'all':
294 targets = self.targets
301 targets = self.targets
295 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
302 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
296
303
297 @spin_after
304 @spin_after
298 def get_result(self, indices_or_msg_ids=None):
305 def get_result(self, indices_or_msg_ids=None):
299 """return one or more results, specified by history index or msg_id.
306 """return one or more results, specified by history index or msg_id.
300
307
301 See client.get_result for details.
308 See client.get_result for details.
302
309
303 """
310 """
304
311
305 if indices_or_msg_ids is None:
312 if indices_or_msg_ids is None:
306 indices_or_msg_ids = -1
313 indices_or_msg_ids = -1
307 if isinstance(indices_or_msg_ids, int):
314 if isinstance(indices_or_msg_ids, int):
308 indices_or_msg_ids = self.history[indices_or_msg_ids]
315 indices_or_msg_ids = self.history[indices_or_msg_ids]
309 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
316 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
310 indices_or_msg_ids = list(indices_or_msg_ids)
317 indices_or_msg_ids = list(indices_or_msg_ids)
311 for i,index in enumerate(indices_or_msg_ids):
318 for i,index in enumerate(indices_or_msg_ids):
312 if isinstance(index, int):
319 if isinstance(index, int):
313 indices_or_msg_ids[i] = self.history[index]
320 indices_or_msg_ids[i] = self.history[index]
314 return self.client.get_result(indices_or_msg_ids)
321 return self.client.get_result(indices_or_msg_ids)
315
322
316 #-------------------------------------------------------------------
323 #-------------------------------------------------------------------
317 # Map
324 # Map
318 #-------------------------------------------------------------------
325 #-------------------------------------------------------------------
319
326
320 def map(self, f, *sequences, **kwargs):
327 def map(self, f, *sequences, **kwargs):
321 """override in subclasses"""
328 """override in subclasses"""
322 raise NotImplementedError
329 raise NotImplementedError
323
330
324 def map_async(self, f, *sequences, **kwargs):
331 def map_async(self, f, *sequences, **kwargs):
325 """Parallel version of builtin `map`, using this view's engines.
332 """Parallel version of builtin `map`, using this view's engines.
326
333
327 This is equivalent to map(...block=False)
334 This is equivalent to map(...block=False)
328
335
329 See `self.map` for details.
336 See `self.map` for details.
330 """
337 """
331 if 'block' in kwargs:
338 if 'block' in kwargs:
332 raise TypeError("map_async doesn't take a `block` keyword argument.")
339 raise TypeError("map_async doesn't take a `block` keyword argument.")
333 kwargs['block'] = False
340 kwargs['block'] = False
334 return self.map(f,*sequences,**kwargs)
341 return self.map(f,*sequences,**kwargs)
335
342
336 def map_sync(self, f, *sequences, **kwargs):
343 def map_sync(self, f, *sequences, **kwargs):
337 """Parallel version of builtin `map`, using this view's engines.
344 """Parallel version of builtin `map`, using this view's engines.
338
345
339 This is equivalent to map(...block=True)
346 This is equivalent to map(...block=True)
340
347
341 See `self.map` for details.
348 See `self.map` for details.
342 """
349 """
343 if 'block' in kwargs:
350 if 'block' in kwargs:
344 raise TypeError("map_sync doesn't take a `block` keyword argument.")
351 raise TypeError("map_sync doesn't take a `block` keyword argument.")
345 kwargs['block'] = True
352 kwargs['block'] = True
346 return self.map(f,*sequences,**kwargs)
353 return self.map(f,*sequences,**kwargs)
347
354
348 def imap(self, f, *sequences, **kwargs):
355 def imap(self, f, *sequences, **kwargs):
349 """Parallel version of `itertools.imap`.
356 """Parallel version of `itertools.imap`.
350
357
351 See `self.map` for details.
358 See `self.map` for details.
352
359
353 """
360 """
354
361
355 return iter(self.map_async(f,*sequences, **kwargs))
362 return iter(self.map_async(f,*sequences, **kwargs))
356
363
357 #-------------------------------------------------------------------
364 #-------------------------------------------------------------------
358 # Decorators
365 # Decorators
359 #-------------------------------------------------------------------
366 #-------------------------------------------------------------------
360
367
361 def remote(self, block=True, **flags):
368 def remote(self, block=True, **flags):
362 """Decorator for making a RemoteFunction"""
369 """Decorator for making a RemoteFunction"""
363 block = self.block if block is None else block
370 block = self.block if block is None else block
364 return remote(self, block=block, **flags)
371 return remote(self, block=block, **flags)
365
372
366 def parallel(self, dist='b', block=None, **flags):
373 def parallel(self, dist='b', block=None, **flags):
367 """Decorator for making a ParallelFunction"""
374 """Decorator for making a ParallelFunction"""
368 block = self.block if block is None else block
375 block = self.block if block is None else block
369 return parallel(self, dist=dist, block=block, **flags)
376 return parallel(self, dist=dist, block=block, **flags)
370
377
371 @skip_doctest
378 @skip_doctest
372 class DirectView(View):
379 class DirectView(View):
373 """Direct Multiplexer View of one or more engines.
380 """Direct Multiplexer View of one or more engines.
374
381
375 These are created via indexed access to a client:
382 These are created via indexed access to a client:
376
383
377 >>> dv_1 = client[1]
384 >>> dv_1 = client[1]
378 >>> dv_all = client[:]
385 >>> dv_all = client[:]
379 >>> dv_even = client[::2]
386 >>> dv_even = client[::2]
380 >>> dv_some = client[1:3]
387 >>> dv_some = client[1:3]
381
388
382 This object provides dictionary access to engine namespaces:
389 This object provides dictionary access to engine namespaces:
383
390
384 # push a=5:
391 # push a=5:
385 >>> dv['a'] = 5
392 >>> dv['a'] = 5
386 # pull 'foo':
393 # pull 'foo':
387 >>> db['foo']
394 >>> db['foo']
388
395
389 """
396 """
390
397
391 def __init__(self, client=None, socket=None, targets=None):
398 def __init__(self, client=None, socket=None, targets=None):
392 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
399 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
393
400
394 @property
401 @property
395 def importer(self):
402 def importer(self):
396 """sync_imports(local=True) as a property.
403 """sync_imports(local=True) as a property.
397
404
398 See sync_imports for details.
405 See sync_imports for details.
399
406
400 """
407 """
401 return self.sync_imports(True)
408 return self.sync_imports(True)
402
409
403 @contextmanager
410 @contextmanager
404 def sync_imports(self, local=True, quiet=False):
411 def sync_imports(self, local=True, quiet=False):
405 """Context Manager for performing simultaneous local and remote imports.
412 """Context Manager for performing simultaneous local and remote imports.
406
413
407 'import x as y' will *not* work. The 'as y' part will simply be ignored.
414 'import x as y' will *not* work. The 'as y' part will simply be ignored.
408
415
409 If `local=True`, then the package will also be imported locally.
416 If `local=True`, then the package will also be imported locally.
410
417
411 If `quiet=True`, no output will be produced when attempting remote
418 If `quiet=True`, no output will be produced when attempting remote
412 imports.
419 imports.
413
420
414 Note that remote-only (`local=False`) imports have not been implemented.
421 Note that remote-only (`local=False`) imports have not been implemented.
415
422
416 >>> with view.sync_imports():
423 >>> with view.sync_imports():
417 ... from numpy import recarray
424 ... from numpy import recarray
418 importing recarray from numpy on engine(s)
425 importing recarray from numpy on engine(s)
419
426
420 """
427 """
421 import __builtin__
428 import __builtin__
422 local_import = __builtin__.__import__
429 local_import = __builtin__.__import__
423 modules = set()
430 modules = set()
424 results = []
431 results = []
425 @util.interactive
432 @util.interactive
426 def remote_import(name, fromlist, level):
433 def remote_import(name, fromlist, level):
427 """the function to be passed to apply, that actually performs the import
434 """the function to be passed to apply, that actually performs the import
428 on the engine, and loads up the user namespace.
435 on the engine, and loads up the user namespace.
429 """
436 """
430 import sys
437 import sys
431 user_ns = globals()
438 user_ns = globals()
432 mod = __import__(name, fromlist=fromlist, level=level)
439 mod = __import__(name, fromlist=fromlist, level=level)
433 if fromlist:
440 if fromlist:
434 for key in fromlist:
441 for key in fromlist:
435 user_ns[key] = getattr(mod, key)
442 user_ns[key] = getattr(mod, key)
436 else:
443 else:
437 user_ns[name] = sys.modules[name]
444 user_ns[name] = sys.modules[name]
438
445
439 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
446 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
440 """the drop-in replacement for __import__, that optionally imports
447 """the drop-in replacement for __import__, that optionally imports
441 locally as well.
448 locally as well.
442 """
449 """
443 # don't override nested imports
450 # don't override nested imports
444 save_import = __builtin__.__import__
451 save_import = __builtin__.__import__
445 __builtin__.__import__ = local_import
452 __builtin__.__import__ = local_import
446
453
447 if imp.lock_held():
454 if imp.lock_held():
448 # this is a side-effect import, don't do it remotely, or even
455 # this is a side-effect import, don't do it remotely, or even
449 # ignore the local effects
456 # ignore the local effects
450 return local_import(name, globals, locals, fromlist, level)
457 return local_import(name, globals, locals, fromlist, level)
451
458
452 imp.acquire_lock()
459 imp.acquire_lock()
453 if local:
460 if local:
454 mod = local_import(name, globals, locals, fromlist, level)
461 mod = local_import(name, globals, locals, fromlist, level)
455 else:
462 else:
456 raise NotImplementedError("remote-only imports not yet implemented")
463 raise NotImplementedError("remote-only imports not yet implemented")
457 imp.release_lock()
464 imp.release_lock()
458
465
459 key = name+':'+','.join(fromlist or [])
466 key = name+':'+','.join(fromlist or [])
460 if level == -1 and key not in modules:
467 if level == -1 and key not in modules:
461 modules.add(key)
468 modules.add(key)
462 if not quiet:
469 if not quiet:
463 if fromlist:
470 if fromlist:
464 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
471 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
465 else:
472 else:
466 print "importing %s on engine(s)"%name
473 print "importing %s on engine(s)"%name
467 results.append(self.apply_async(remote_import, name, fromlist, level))
474 results.append(self.apply_async(remote_import, name, fromlist, level))
468 # restore override
475 # restore override
469 __builtin__.__import__ = save_import
476 __builtin__.__import__ = save_import
470
477
471 return mod
478 return mod
472
479
473 # override __import__
480 # override __import__
474 __builtin__.__import__ = view_import
481 __builtin__.__import__ = view_import
475 try:
482 try:
476 # enter the block
483 # enter the block
477 yield
484 yield
478 except ImportError:
485 except ImportError:
479 if local:
486 if local:
480 raise
487 raise
481 else:
488 else:
482 # ignore import errors if not doing local imports
489 # ignore import errors if not doing local imports
483 pass
490 pass
484 finally:
491 finally:
485 # always restore __import__
492 # always restore __import__
486 __builtin__.__import__ = local_import
493 __builtin__.__import__ = local_import
487
494
488 for r in results:
495 for r in results:
489 # raise possible remote ImportErrors here
496 # raise possible remote ImportErrors here
490 r.get()
497 r.get()
491
498
492
499
493 @sync_results
500 @sync_results
494 @save_ids
501 @save_ids
495 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
502 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
496 """calls f(*args, **kwargs) on remote engines, returning the result.
503 """calls f(*args, **kwargs) on remote engines, returning the result.
497
504
498 This method sets all of `apply`'s flags via this View's attributes.
505 This method sets all of `apply`'s flags via this View's attributes.
499
506
500 Parameters
507 Parameters
501 ----------
508 ----------
502
509
503 f : callable
510 f : callable
504
511
505 args : list [default: empty]
512 args : list [default: empty]
506
513
507 kwargs : dict [default: empty]
514 kwargs : dict [default: empty]
508
515
509 targets : target list [default: self.targets]
516 targets : target list [default: self.targets]
510 where to run
517 where to run
511 block : bool [default: self.block]
518 block : bool [default: self.block]
512 whether to block
519 whether to block
513 track : bool [default: self.track]
520 track : bool [default: self.track]
514 whether to ask zmq to track the message, for safe non-copying sends
521 whether to ask zmq to track the message, for safe non-copying sends
515
522
516 Returns
523 Returns
517 -------
524 -------
518
525
519 if self.block is False:
526 if self.block is False:
520 returns AsyncResult
527 returns AsyncResult
521 else:
528 else:
522 returns actual result of f(*args, **kwargs) on the engine(s)
529 returns actual result of f(*args, **kwargs) on the engine(s)
523 This will be a list of self.targets is also a list (even length 1), or
530 This will be a list of self.targets is also a list (even length 1), or
524 the single result if self.targets is an integer engine id
531 the single result if self.targets is an integer engine id
525 """
532 """
526 args = [] if args is None else args
533 args = [] if args is None else args
527 kwargs = {} if kwargs is None else kwargs
534 kwargs = {} if kwargs is None else kwargs
528 block = self.block if block is None else block
535 block = self.block if block is None else block
529 track = self.track if track is None else track
536 track = self.track if track is None else track
530 targets = self.targets if targets is None else targets
537 targets = self.targets if targets is None else targets
531
538
532 _idents = self.client._build_targets(targets)[0]
539 _idents = self.client._build_targets(targets)[0]
533 msg_ids = []
540 msg_ids = []
534 trackers = []
541 trackers = []
535 for ident in _idents:
542 for ident in _idents:
536 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
543 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
537 ident=ident)
544 ident=ident)
538 if track:
545 if track:
539 trackers.append(msg['tracker'])
546 trackers.append(msg['tracker'])
540 msg_ids.append(msg['header']['msg_id'])
547 msg_ids.append(msg['header']['msg_id'])
541 tracker = None if track is False else zmq.MessageTracker(*trackers)
548 tracker = None if track is False else zmq.MessageTracker(*trackers)
542 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
549 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
543 if block:
550 if block:
544 try:
551 try:
545 return ar.get()
552 return ar.get()
546 except KeyboardInterrupt:
553 except KeyboardInterrupt:
547 pass
554 pass
548 return ar
555 return ar
549
556
550
557
551 @spin_after
558 @spin_after
552 def map(self, f, *sequences, **kwargs):
559 def map(self, f, *sequences, **kwargs):
553 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
560 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
554
561
555 Parallel version of builtin `map`, using this View's `targets`.
562 Parallel version of builtin `map`, using this View's `targets`.
556
563
557 There will be one task per target, so work will be chunked
564 There will be one task per target, so work will be chunked
558 if the sequences are longer than `targets`.
565 if the sequences are longer than `targets`.
559
566
560 Results can be iterated as they are ready, but will become available in chunks.
567 Results can be iterated as they are ready, but will become available in chunks.
561
568
562 Parameters
569 Parameters
563 ----------
570 ----------
564
571
565 f : callable
572 f : callable
566 function to be mapped
573 function to be mapped
567 *sequences: one or more sequences of matching length
574 *sequences: one or more sequences of matching length
568 the sequences to be distributed and passed to `f`
575 the sequences to be distributed and passed to `f`
569 block : bool
576 block : bool
570 whether to wait for the result or not [default self.block]
577 whether to wait for the result or not [default self.block]
571
578
572 Returns
579 Returns
573 -------
580 -------
574
581
575 if block=False:
582 if block=False:
576 AsyncMapResult
583 AsyncMapResult
577 An object like AsyncResult, but which reassembles the sequence of results
584 An object like AsyncResult, but which reassembles the sequence of results
578 into a single list. AsyncMapResults can be iterated through before all
585 into a single list. AsyncMapResults can be iterated through before all
579 results are complete.
586 results are complete.
580 else:
587 else:
581 list
588 list
582 the result of map(f,*sequences)
589 the result of map(f,*sequences)
583 """
590 """
584
591
585 block = kwargs.pop('block', self.block)
592 block = kwargs.pop('block', self.block)
586 for k in kwargs.keys():
593 for k in kwargs.keys():
587 if k not in ['block', 'track']:
594 if k not in ['block', 'track']:
588 raise TypeError("invalid keyword arg, %r"%k)
595 raise TypeError("invalid keyword arg, %r"%k)
589
596
590 assert len(sequences) > 0, "must have some sequences to map onto!"
597 assert len(sequences) > 0, "must have some sequences to map onto!"
591 pf = ParallelFunction(self, f, block=block, **kwargs)
598 pf = ParallelFunction(self, f, block=block, **kwargs)
592 return pf.map(*sequences)
599 return pf.map(*sequences)
593
600
594 @sync_results
601 @sync_results
595 @save_ids
602 @save_ids
596 def execute(self, code, silent=True, targets=None, block=None):
603 def execute(self, code, silent=True, targets=None, block=None):
597 """Executes `code` on `targets` in blocking or nonblocking manner.
604 """Executes `code` on `targets` in blocking or nonblocking manner.
598
605
599 ``execute`` is always `bound` (affects engine namespace)
606 ``execute`` is always `bound` (affects engine namespace)
600
607
601 Parameters
608 Parameters
602 ----------
609 ----------
603
610
604 code : str
611 code : str
605 the code string to be executed
612 the code string to be executed
606 block : bool
613 block : bool
607 whether or not to wait until done to return
614 whether or not to wait until done to return
608 default: self.block
615 default: self.block
609 """
616 """
610 block = self.block if block is None else block
617 block = self.block if block is None else block
611 targets = self.targets if targets is None else targets
618 targets = self.targets if targets is None else targets
612
619
613 _idents = self.client._build_targets(targets)[0]
620 _idents = self.client._build_targets(targets)[0]
614 msg_ids = []
621 msg_ids = []
615 trackers = []
622 trackers = []
616 for ident in _idents:
623 for ident in _idents:
617 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
624 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
618 msg_ids.append(msg['header']['msg_id'])
625 msg_ids.append(msg['header']['msg_id'])
619 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets)
626 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets)
620 if block:
627 if block:
621 try:
628 try:
622 ar.get()
629 ar.get()
623 except KeyboardInterrupt:
630 except KeyboardInterrupt:
624 pass
631 pass
625 return ar
632 return ar
626
633
627 def run(self, filename, targets=None, block=None):
634 def run(self, filename, targets=None, block=None):
628 """Execute contents of `filename` on my engine(s).
635 """Execute contents of `filename` on my engine(s).
629
636
630 This simply reads the contents of the file and calls `execute`.
637 This simply reads the contents of the file and calls `execute`.
631
638
632 Parameters
639 Parameters
633 ----------
640 ----------
634
641
635 filename : str
642 filename : str
636 The path to the file
643 The path to the file
637 targets : int/str/list of ints/strs
644 targets : int/str/list of ints/strs
638 the engines on which to execute
645 the engines on which to execute
639 default : all
646 default : all
640 block : bool
647 block : bool
641 whether or not to wait until done
648 whether or not to wait until done
642 default: self.block
649 default: self.block
643
650
644 """
651 """
645 with open(filename, 'r') as f:
652 with open(filename, 'r') as f:
646 # add newline in case of trailing indented whitespace
653 # add newline in case of trailing indented whitespace
647 # which will cause SyntaxError
654 # which will cause SyntaxError
648 code = f.read()+'\n'
655 code = f.read()+'\n'
649 return self.execute(code, block=block, targets=targets)
656 return self.execute(code, block=block, targets=targets)
650
657
651 def update(self, ns):
658 def update(self, ns):
652 """update remote namespace with dict `ns`
659 """update remote namespace with dict `ns`
653
660
654 See `push` for details.
661 See `push` for details.
655 """
662 """
656 return self.push(ns, block=self.block, track=self.track)
663 return self.push(ns, block=self.block, track=self.track)
657
664
658 def push(self, ns, targets=None, block=None, track=None):
665 def push(self, ns, targets=None, block=None, track=None):
659 """update remote namespace with dict `ns`
666 """update remote namespace with dict `ns`
660
667
661 Parameters
668 Parameters
662 ----------
669 ----------
663
670
664 ns : dict
671 ns : dict
665 dict of keys with which to update engine namespace(s)
672 dict of keys with which to update engine namespace(s)
666 block : bool [default : self.block]
673 block : bool [default : self.block]
667 whether to wait to be notified of engine receipt
674 whether to wait to be notified of engine receipt
668
675
669 """
676 """
670
677
671 block = block if block is not None else self.block
678 block = block if block is not None else self.block
672 track = track if track is not None else self.track
679 track = track if track is not None else self.track
673 targets = targets if targets is not None else self.targets
680 targets = targets if targets is not None else self.targets
674 # applier = self.apply_sync if block else self.apply_async
681 # applier = self.apply_sync if block else self.apply_async
675 if not isinstance(ns, dict):
682 if not isinstance(ns, dict):
676 raise TypeError("Must be a dict, not %s"%type(ns))
683 raise TypeError("Must be a dict, not %s"%type(ns))
677 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
684 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
678
685
679 def get(self, key_s):
686 def get(self, key_s):
680 """get object(s) by `key_s` from remote namespace
687 """get object(s) by `key_s` from remote namespace
681
688
682 see `pull` for details.
689 see `pull` for details.
683 """
690 """
684 # block = block if block is not None else self.block
691 # block = block if block is not None else self.block
685 return self.pull(key_s, block=True)
692 return self.pull(key_s, block=True)
686
693
687 def pull(self, names, targets=None, block=None):
694 def pull(self, names, targets=None, block=None):
688 """get object(s) by `name` from remote namespace
695 """get object(s) by `name` from remote namespace
689
696
690 will return one object if it is a key.
697 will return one object if it is a key.
691 can also take a list of keys, in which case it will return a list of objects.
698 can also take a list of keys, in which case it will return a list of objects.
692 """
699 """
693 block = block if block is not None else self.block
700 block = block if block is not None else self.block
694 targets = targets if targets is not None else self.targets
701 targets = targets if targets is not None else self.targets
695 applier = self.apply_sync if block else self.apply_async
702 applier = self.apply_sync if block else self.apply_async
696 if isinstance(names, basestring):
703 if isinstance(names, basestring):
697 pass
704 pass
698 elif isinstance(names, (list,tuple,set)):
705 elif isinstance(names, (list,tuple,set)):
699 for key in names:
706 for key in names:
700 if not isinstance(key, basestring):
707 if not isinstance(key, basestring):
701 raise TypeError("keys must be str, not type %r"%type(key))
708 raise TypeError("keys must be str, not type %r"%type(key))
702 else:
709 else:
703 raise TypeError("names must be strs, not %r"%names)
710 raise TypeError("names must be strs, not %r"%names)
704 return self._really_apply(util._pull, (names,), block=block, targets=targets)
711 return self._really_apply(util._pull, (names,), block=block, targets=targets)
705
712
706 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
713 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
707 """
714 """
708 Partition a Python sequence and send the partitions to a set of engines.
715 Partition a Python sequence and send the partitions to a set of engines.
709 """
716 """
710 block = block if block is not None else self.block
717 block = block if block is not None else self.block
711 track = track if track is not None else self.track
718 track = track if track is not None else self.track
712 targets = targets if targets is not None else self.targets
719 targets = targets if targets is not None else self.targets
713
720
714 # construct integer ID list:
721 # construct integer ID list:
715 targets = self.client._build_targets(targets)[1]
722 targets = self.client._build_targets(targets)[1]
716
723
717 mapObject = Map.dists[dist]()
724 mapObject = Map.dists[dist]()
718 nparts = len(targets)
725 nparts = len(targets)
719 msg_ids = []
726 msg_ids = []
720 trackers = []
727 trackers = []
721 for index, engineid in enumerate(targets):
728 for index, engineid in enumerate(targets):
722 partition = mapObject.getPartition(seq, index, nparts)
729 partition = mapObject.getPartition(seq, index, nparts)
723 if flatten and len(partition) == 1:
730 if flatten and len(partition) == 1:
724 ns = {key: partition[0]}
731 ns = {key: partition[0]}
725 else:
732 else:
726 ns = {key: partition}
733 ns = {key: partition}
727 r = self.push(ns, block=False, track=track, targets=engineid)
734 r = self.push(ns, block=False, track=track, targets=engineid)
728 msg_ids.extend(r.msg_ids)
735 msg_ids.extend(r.msg_ids)
729 if track:
736 if track:
730 trackers.append(r._tracker)
737 trackers.append(r._tracker)
731
738
732 if track:
739 if track:
733 tracker = zmq.MessageTracker(*trackers)
740 tracker = zmq.MessageTracker(*trackers)
734 else:
741 else:
735 tracker = None
742 tracker = None
736
743
737 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
744 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
738 if block:
745 if block:
739 r.wait()
746 r.wait()
740 else:
747 else:
741 return r
748 return r
742
749
743 @sync_results
750 @sync_results
744 @save_ids
751 @save_ids
745 def gather(self, key, dist='b', targets=None, block=None):
752 def gather(self, key, dist='b', targets=None, block=None):
746 """
753 """
747 Gather a partitioned sequence on a set of engines as a single local seq.
754 Gather a partitioned sequence on a set of engines as a single local seq.
748 """
755 """
749 block = block if block is not None else self.block
756 block = block if block is not None else self.block
750 targets = targets if targets is not None else self.targets
757 targets = targets if targets is not None else self.targets
751 mapObject = Map.dists[dist]()
758 mapObject = Map.dists[dist]()
752 msg_ids = []
759 msg_ids = []
753
760
754 # construct integer ID list:
761 # construct integer ID list:
755 targets = self.client._build_targets(targets)[1]
762 targets = self.client._build_targets(targets)[1]
756
763
757 for index, engineid in enumerate(targets):
764 for index, engineid in enumerate(targets):
758 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
765 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
759
766
760 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
767 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
761
768
762 if block:
769 if block:
763 try:
770 try:
764 return r.get()
771 return r.get()
765 except KeyboardInterrupt:
772 except KeyboardInterrupt:
766 pass
773 pass
767 return r
774 return r
768
775
769 def __getitem__(self, key):
776 def __getitem__(self, key):
770 return self.get(key)
777 return self.get(key)
771
778
772 def __setitem__(self,key, value):
779 def __setitem__(self,key, value):
773 self.update({key:value})
780 self.update({key:value})
774
781
775 def clear(self, targets=None, block=False):
782 def clear(self, targets=None, block=False):
776 """Clear the remote namespaces on my engines."""
783 """Clear the remote namespaces on my engines."""
777 block = block if block is not None else self.block
784 block = block if block is not None else self.block
778 targets = targets if targets is not None else self.targets
785 targets = targets if targets is not None else self.targets
779 return self.client.clear(targets=targets, block=block)
786 return self.client.clear(targets=targets, block=block)
780
787
781 def kill(self, targets=None, block=True):
788 def kill(self, targets=None, block=True):
782 """Kill my engines."""
789 """Kill my engines."""
783 block = block if block is not None else self.block
790 block = block if block is not None else self.block
784 targets = targets if targets is not None else self.targets
791 targets = targets if targets is not None else self.targets
785 return self.client.kill(targets=targets, block=block)
792 return self.client.kill(targets=targets, block=block)
786
793
787 #----------------------------------------
794 #----------------------------------------
788 # activate for %px,%autopx magics
795 # activate for %px,%autopx magics
789 #----------------------------------------
796 #----------------------------------------
790 def activate(self):
797 def activate(self):
791 """Make this `View` active for parallel magic commands.
798 """Make this `View` active for parallel magic commands.
792
799
793 IPython has a magic command syntax to work with `MultiEngineClient` objects.
800 IPython has a magic command syntax to work with `MultiEngineClient` objects.
794 In a given IPython session there is a single active one. While
801 In a given IPython session there is a single active one. While
795 there can be many `Views` created and used by the user,
802 there can be many `Views` created and used by the user,
796 there is only one active one. The active `View` is used whenever
803 there is only one active one. The active `View` is used whenever
797 the magic commands %px and %autopx are used.
804 the magic commands %px and %autopx are used.
798
805
799 The activate() method is called on a given `View` to make it
806 The activate() method is called on a given `View` to make it
800 active. Once this has been done, the magic commands can be used.
807 active. Once this has been done, the magic commands can be used.
801 """
808 """
802
809
803 try:
810 try:
804 # This is injected into __builtins__.
811 # This is injected into __builtins__.
805 ip = get_ipython()
812 ip = get_ipython()
806 except NameError:
813 except NameError:
807 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
814 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
808 else:
815 else:
809 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
816 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
810 if pmagic is None:
817 if pmagic is None:
811 ip.magic_load_ext('parallelmagic')
818 ip.magic_load_ext('parallelmagic')
812 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
819 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
813
820
814 pmagic.active_view = self
821 pmagic.active_view = self
815
822
816
823
817 @skip_doctest
824 @skip_doctest
818 class LoadBalancedView(View):
825 class LoadBalancedView(View):
819 """An load-balancing View that only executes via the Task scheduler.
826 """An load-balancing View that only executes via the Task scheduler.
820
827
821 Load-balanced views can be created with the client's `view` method:
828 Load-balanced views can be created with the client's `view` method:
822
829
823 >>> v = client.load_balanced_view()
830 >>> v = client.load_balanced_view()
824
831
825 or targets can be specified, to restrict the potential destinations:
832 or targets can be specified, to restrict the potential destinations:
826
833
827 >>> v = client.client.load_balanced_view([1,3])
834 >>> v = client.client.load_balanced_view([1,3])
828
835
829 which would restrict loadbalancing to between engines 1 and 3.
836 which would restrict loadbalancing to between engines 1 and 3.
830
837
831 """
838 """
832
839
833 follow=Any()
840 follow=Any()
834 after=Any()
841 after=Any()
835 timeout=CFloat()
842 timeout=CFloat()
836 retries = Integer(0)
843 retries = Integer(0)
837
844
838 _task_scheme = Any()
845 _task_scheme = Any()
839 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
846 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
840
847
841 def __init__(self, client=None, socket=None, **flags):
848 def __init__(self, client=None, socket=None, **flags):
842 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
849 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
843 self._task_scheme=client._task_scheme
850 self._task_scheme=client._task_scheme
844
851
845 def _validate_dependency(self, dep):
852 def _validate_dependency(self, dep):
846 """validate a dependency.
853 """validate a dependency.
847
854
848 For use in `set_flags`.
855 For use in `set_flags`.
849 """
856 """
850 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
857 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
851 return True
858 return True
852 elif isinstance(dep, (list,set, tuple)):
859 elif isinstance(dep, (list,set, tuple)):
853 for d in dep:
860 for d in dep:
854 if not isinstance(d, (basestring, AsyncResult)):
861 if not isinstance(d, (basestring, AsyncResult)):
855 return False
862 return False
856 elif isinstance(dep, dict):
863 elif isinstance(dep, dict):
857 if set(dep.keys()) != set(Dependency().as_dict().keys()):
864 if set(dep.keys()) != set(Dependency().as_dict().keys()):
858 return False
865 return False
859 if not isinstance(dep['msg_ids'], list):
866 if not isinstance(dep['msg_ids'], list):
860 return False
867 return False
861 for d in dep['msg_ids']:
868 for d in dep['msg_ids']:
862 if not isinstance(d, basestring):
869 if not isinstance(d, basestring):
863 return False
870 return False
864 else:
871 else:
865 return False
872 return False
866
873
867 return True
874 return True
868
875
869 def _render_dependency(self, dep):
876 def _render_dependency(self, dep):
870 """helper for building jsonable dependencies from various input forms."""
877 """helper for building jsonable dependencies from various input forms."""
871 if isinstance(dep, Dependency):
878 if isinstance(dep, Dependency):
872 return dep.as_dict()
879 return dep.as_dict()
873 elif isinstance(dep, AsyncResult):
880 elif isinstance(dep, AsyncResult):
874 return dep.msg_ids
881 return dep.msg_ids
875 elif dep is None:
882 elif dep is None:
876 return []
883 return []
877 else:
884 else:
878 # pass to Dependency constructor
885 # pass to Dependency constructor
879 return list(Dependency(dep))
886 return list(Dependency(dep))
880
887
881 def set_flags(self, **kwargs):
888 def set_flags(self, **kwargs):
882 """set my attribute flags by keyword.
889 """set my attribute flags by keyword.
883
890
884 A View is a wrapper for the Client's apply method, but with attributes
891 A View is a wrapper for the Client's apply method, but with attributes
885 that specify keyword arguments, those attributes can be set by keyword
892 that specify keyword arguments, those attributes can be set by keyword
886 argument with this method.
893 argument with this method.
887
894
888 Parameters
895 Parameters
889 ----------
896 ----------
890
897
891 block : bool
898 block : bool
892 whether to wait for results
899 whether to wait for results
893 track : bool
900 track : bool
894 whether to create a MessageTracker to allow the user to
901 whether to create a MessageTracker to allow the user to
895 safely edit after arrays and buffers during non-copying
902 safely edit after arrays and buffers during non-copying
896 sends.
903 sends.
897
904
898 after : Dependency or collection of msg_ids
905 after : Dependency or collection of msg_ids
899 Only for load-balanced execution (targets=None)
906 Only for load-balanced execution (targets=None)
900 Specify a list of msg_ids as a time-based dependency.
907 Specify a list of msg_ids as a time-based dependency.
901 This job will only be run *after* the dependencies
908 This job will only be run *after* the dependencies
902 have been met.
909 have been met.
903
910
904 follow : Dependency or collection of msg_ids
911 follow : Dependency or collection of msg_ids
905 Only for load-balanced execution (targets=None)
912 Only for load-balanced execution (targets=None)
906 Specify a list of msg_ids as a location-based dependency.
913 Specify a list of msg_ids as a location-based dependency.
907 This job will only be run on an engine where this dependency
914 This job will only be run on an engine where this dependency
908 is met.
915 is met.
909
916
910 timeout : float/int or None
917 timeout : float/int or None
911 Only for load-balanced execution (targets=None)
918 Only for load-balanced execution (targets=None)
912 Specify an amount of time (in seconds) for the scheduler to
919 Specify an amount of time (in seconds) for the scheduler to
913 wait for dependencies to be met before failing with a
920 wait for dependencies to be met before failing with a
914 DependencyTimeout.
921 DependencyTimeout.
915
922
916 retries : int
923 retries : int
917 Number of times a task will be retried on failure.
924 Number of times a task will be retried on failure.
918 """
925 """
919
926
920 super(LoadBalancedView, self).set_flags(**kwargs)
927 super(LoadBalancedView, self).set_flags(**kwargs)
921 for name in ('follow', 'after'):
928 for name in ('follow', 'after'):
922 if name in kwargs:
929 if name in kwargs:
923 value = kwargs[name]
930 value = kwargs[name]
924 if self._validate_dependency(value):
931 if self._validate_dependency(value):
925 setattr(self, name, value)
932 setattr(self, name, value)
926 else:
933 else:
927 raise ValueError("Invalid dependency: %r"%value)
934 raise ValueError("Invalid dependency: %r"%value)
928 if 'timeout' in kwargs:
935 if 'timeout' in kwargs:
929 t = kwargs['timeout']
936 t = kwargs['timeout']
930 if not isinstance(t, (int, long, float, type(None))):
937 if not isinstance(t, (int, long, float, type(None))):
931 raise TypeError("Invalid type for timeout: %r"%type(t))
938 raise TypeError("Invalid type for timeout: %r"%type(t))
932 if t is not None:
939 if t is not None:
933 if t < 0:
940 if t < 0:
934 raise ValueError("Invalid timeout: %s"%t)
941 raise ValueError("Invalid timeout: %s"%t)
935 self.timeout = t
942 self.timeout = t
936
943
937 @sync_results
944 @sync_results
938 @save_ids
945 @save_ids
939 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
946 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
940 after=None, follow=None, timeout=None,
947 after=None, follow=None, timeout=None,
941 targets=None, retries=None):
948 targets=None, retries=None):
942 """calls f(*args, **kwargs) on a remote engine, returning the result.
949 """calls f(*args, **kwargs) on a remote engine, returning the result.
943
950
944 This method temporarily sets all of `apply`'s flags for a single call.
951 This method temporarily sets all of `apply`'s flags for a single call.
945
952
946 Parameters
953 Parameters
947 ----------
954 ----------
948
955
949 f : callable
956 f : callable
950
957
951 args : list [default: empty]
958 args : list [default: empty]
952
959
953 kwargs : dict [default: empty]
960 kwargs : dict [default: empty]
954
961
955 block : bool [default: self.block]
962 block : bool [default: self.block]
956 whether to block
963 whether to block
957 track : bool [default: self.track]
964 track : bool [default: self.track]
958 whether to ask zmq to track the message, for safe non-copying sends
965 whether to ask zmq to track the message, for safe non-copying sends
959
966
960 !!!!!! TODO: THE REST HERE !!!!
967 !!!!!! TODO: THE REST HERE !!!!
961
968
962 Returns
969 Returns
963 -------
970 -------
964
971
965 if self.block is False:
972 if self.block is False:
966 returns AsyncResult
973 returns AsyncResult
967 else:
974 else:
968 returns actual result of f(*args, **kwargs) on the engine(s)
975 returns actual result of f(*args, **kwargs) on the engine(s)
969 This will be a list of self.targets is also a list (even length 1), or
976 This will be a list of self.targets is also a list (even length 1), or
970 the single result if self.targets is an integer engine id
977 the single result if self.targets is an integer engine id
971 """
978 """
972
979
973 # validate whether we can run
980 # validate whether we can run
974 if self._socket.closed:
981 if self._socket.closed:
975 msg = "Task farming is disabled"
982 msg = "Task farming is disabled"
976 if self._task_scheme == 'pure':
983 if self._task_scheme == 'pure':
977 msg += " because the pure ZMQ scheduler cannot handle"
984 msg += " because the pure ZMQ scheduler cannot handle"
978 msg += " disappearing engines."
985 msg += " disappearing engines."
979 raise RuntimeError(msg)
986 raise RuntimeError(msg)
980
987
981 if self._task_scheme == 'pure':
988 if self._task_scheme == 'pure':
982 # pure zmq scheme doesn't support extra features
989 # pure zmq scheme doesn't support extra features
983 msg = "Pure ZMQ scheduler doesn't support the following flags:"
990 msg = "Pure ZMQ scheduler doesn't support the following flags:"
984 "follow, after, retries, targets, timeout"
991 "follow, after, retries, targets, timeout"
985 if (follow or after or retries or targets or timeout):
992 if (follow or after or retries or targets or timeout):
986 # hard fail on Scheduler flags
993 # hard fail on Scheduler flags
987 raise RuntimeError(msg)
994 raise RuntimeError(msg)
988 if isinstance(f, dependent):
995 if isinstance(f, dependent):
989 # soft warn on functional dependencies
996 # soft warn on functional dependencies
990 warnings.warn(msg, RuntimeWarning)
997 warnings.warn(msg, RuntimeWarning)
991
998
992 # build args
999 # build args
993 args = [] if args is None else args
1000 args = [] if args is None else args
994 kwargs = {} if kwargs is None else kwargs
1001 kwargs = {} if kwargs is None else kwargs
995 block = self.block if block is None else block
1002 block = self.block if block is None else block
996 track = self.track if track is None else track
1003 track = self.track if track is None else track
997 after = self.after if after is None else after
1004 after = self.after if after is None else after
998 retries = self.retries if retries is None else retries
1005 retries = self.retries if retries is None else retries
999 follow = self.follow if follow is None else follow
1006 follow = self.follow if follow is None else follow
1000 timeout = self.timeout if timeout is None else timeout
1007 timeout = self.timeout if timeout is None else timeout
1001 targets = self.targets if targets is None else targets
1008 targets = self.targets if targets is None else targets
1002
1009
1003 if not isinstance(retries, int):
1010 if not isinstance(retries, int):
1004 raise TypeError('retries must be int, not %r'%type(retries))
1011 raise TypeError('retries must be int, not %r'%type(retries))
1005
1012
1006 if targets is None:
1013 if targets is None:
1007 idents = []
1014 idents = []
1008 else:
1015 else:
1009 idents = self.client._build_targets(targets)[0]
1016 idents = self.client._build_targets(targets)[0]
1010 # ensure *not* bytes
1017 # ensure *not* bytes
1011 idents = [ ident.decode() for ident in idents ]
1018 idents = [ ident.decode() for ident in idents ]
1012
1019
1013 after = self._render_dependency(after)
1020 after = self._render_dependency(after)
1014 follow = self._render_dependency(follow)
1021 follow = self._render_dependency(follow)
1015 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1022 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1016
1023
1017 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1024 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1018 subheader=subheader)
1025 subheader=subheader)
1019 tracker = None if track is False else msg['tracker']
1026 tracker = None if track is False else msg['tracker']
1020
1027
1021 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1028 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1022
1029
1023 if block:
1030 if block:
1024 try:
1031 try:
1025 return ar.get()
1032 return ar.get()
1026 except KeyboardInterrupt:
1033 except KeyboardInterrupt:
1027 pass
1034 pass
1028 return ar
1035 return ar
1029
1036
1030 @spin_after
1037 @spin_after
1031 @save_ids
1038 @save_ids
1032 def map(self, f, *sequences, **kwargs):
1039 def map(self, f, *sequences, **kwargs):
1033 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1040 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1034
1041
1035 Parallel version of builtin `map`, load-balanced by this View.
1042 Parallel version of builtin `map`, load-balanced by this View.
1036
1043
1037 `block`, and `chunksize` can be specified by keyword only.
1044 `block`, and `chunksize` can be specified by keyword only.
1038
1045
1039 Each `chunksize` elements will be a separate task, and will be
1046 Each `chunksize` elements will be a separate task, and will be
1040 load-balanced. This lets individual elements be available for iteration
1047 load-balanced. This lets individual elements be available for iteration
1041 as soon as they arrive.
1048 as soon as they arrive.
1042
1049
1043 Parameters
1050 Parameters
1044 ----------
1051 ----------
1045
1052
1046 f : callable
1053 f : callable
1047 function to be mapped
1054 function to be mapped
1048 *sequences: one or more sequences of matching length
1055 *sequences: one or more sequences of matching length
1049 the sequences to be distributed and passed to `f`
1056 the sequences to be distributed and passed to `f`
1050 block : bool [default self.block]
1057 block : bool [default self.block]
1051 whether to wait for the result or not
1058 whether to wait for the result or not
1052 track : bool
1059 track : bool
1053 whether to create a MessageTracker to allow the user to
1060 whether to create a MessageTracker to allow the user to
1054 safely edit after arrays and buffers during non-copying
1061 safely edit after arrays and buffers during non-copying
1055 sends.
1062 sends.
1056 chunksize : int [default 1]
1063 chunksize : int [default 1]
1057 how many elements should be in each task.
1064 how many elements should be in each task.
1058 ordered : bool [default True]
1065 ordered : bool [default True]
1059 Whether the results should be gathered as they arrive, or enforce
1066 Whether the results should be gathered as they arrive, or enforce
1060 the order of submission.
1067 the order of submission.
1061
1068
1062 Only applies when iterating through AsyncMapResult as results arrive.
1069 Only applies when iterating through AsyncMapResult as results arrive.
1063 Has no effect when block=True.
1070 Has no effect when block=True.
1064
1071
1065 Returns
1072 Returns
1066 -------
1073 -------
1067
1074
1068 if block=False:
1075 if block=False:
1069 AsyncMapResult
1076 AsyncMapResult
1070 An object like AsyncResult, but which reassembles the sequence of results
1077 An object like AsyncResult, but which reassembles the sequence of results
1071 into a single list. AsyncMapResults can be iterated through before all
1078 into a single list. AsyncMapResults can be iterated through before all
1072 results are complete.
1079 results are complete.
1073 else:
1080 else:
1074 the result of map(f,*sequences)
1081 the result of map(f,*sequences)
1075
1082
1076 """
1083 """
1077
1084
1078 # default
1085 # default
1079 block = kwargs.get('block', self.block)
1086 block = kwargs.get('block', self.block)
1080 chunksize = kwargs.get('chunksize', 1)
1087 chunksize = kwargs.get('chunksize', 1)
1081 ordered = kwargs.get('ordered', True)
1088 ordered = kwargs.get('ordered', True)
1082
1089
1083 keyset = set(kwargs.keys())
1090 keyset = set(kwargs.keys())
1084 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1091 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1085 if extra_keys:
1092 if extra_keys:
1086 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1093 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1087
1094
1088 assert len(sequences) > 0, "must have some sequences to map onto!"
1095 assert len(sequences) > 0, "must have some sequences to map onto!"
1089
1096
1090 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1097 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1091 return pf.map(*sequences)
1098 return pf.map(*sequences)
1092
1099
1093 __all__ = ['LoadBalancedView', 'DirectView']
1100 __all__ = ['LoadBalancedView', 'DirectView']
@@ -1,553 +1,571 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21 from tempfile import mktemp
21 from tempfile import mktemp
22 from StringIO import StringIO
22 from StringIO import StringIO
23
23
24 import zmq
24 import zmq
25 from nose import SkipTest
25 from nose import SkipTest
26
26
27 from IPython.testing import decorators as dec
27 from IPython.testing import decorators as dec
28 from IPython.testing.ipunittest import ParametricTestCase
28
29
29 from IPython import parallel as pmod
30 from IPython import parallel as pmod
30 from IPython.parallel import error
31 from IPython.parallel import error
31 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
32 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
32 from IPython.parallel import DirectView
33 from IPython.parallel import DirectView
33 from IPython.parallel.util import interactive
34 from IPython.parallel.util import interactive
34
35
35 from IPython.parallel.tests import add_engines
36 from IPython.parallel.tests import add_engines
36
37
37 from .clienttest import ClusterTestCase, crash, wait, skip_without
38 from .clienttest import ClusterTestCase, crash, wait, skip_without
38
39
39 def setup():
40 def setup():
40 add_engines(3, total=True)
41 add_engines(3, total=True)
41
42
42 class TestView(ClusterTestCase):
43 class TestView(ClusterTestCase, ParametricTestCase):
43
44
44 def test_z_crash_mux(self):
45 def test_z_crash_mux(self):
45 """test graceful handling of engine death (direct)"""
46 """test graceful handling of engine death (direct)"""
46 raise SkipTest("crash tests disabled, due to undesirable crash reports")
47 raise SkipTest("crash tests disabled, due to undesirable crash reports")
47 # self.add_engines(1)
48 # self.add_engines(1)
48 eid = self.client.ids[-1]
49 eid = self.client.ids[-1]
49 ar = self.client[eid].apply_async(crash)
50 ar = self.client[eid].apply_async(crash)
50 self.assertRaisesRemote(error.EngineError, ar.get, 10)
51 self.assertRaisesRemote(error.EngineError, ar.get, 10)
51 eid = ar.engine_id
52 eid = ar.engine_id
52 tic = time.time()
53 tic = time.time()
53 while eid in self.client.ids and time.time()-tic < 5:
54 while eid in self.client.ids and time.time()-tic < 5:
54 time.sleep(.01)
55 time.sleep(.01)
55 self.client.spin()
56 self.client.spin()
56 self.assertFalse(eid in self.client.ids, "Engine should have died")
57 self.assertFalse(eid in self.client.ids, "Engine should have died")
57
58
58 def test_push_pull(self):
59 def test_push_pull(self):
59 """test pushing and pulling"""
60 """test pushing and pulling"""
60 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
61 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
61 t = self.client.ids[-1]
62 t = self.client.ids[-1]
62 v = self.client[t]
63 v = self.client[t]
63 push = v.push
64 push = v.push
64 pull = v.pull
65 pull = v.pull
65 v.block=True
66 v.block=True
66 nengines = len(self.client)
67 nengines = len(self.client)
67 push({'data':data})
68 push({'data':data})
68 d = pull('data')
69 d = pull('data')
69 self.assertEquals(d, data)
70 self.assertEquals(d, data)
70 self.client[:].push({'data':data})
71 self.client[:].push({'data':data})
71 d = self.client[:].pull('data', block=True)
72 d = self.client[:].pull('data', block=True)
72 self.assertEquals(d, nengines*[data])
73 self.assertEquals(d, nengines*[data])
73 ar = push({'data':data}, block=False)
74 ar = push({'data':data}, block=False)
74 self.assertTrue(isinstance(ar, AsyncResult))
75 self.assertTrue(isinstance(ar, AsyncResult))
75 r = ar.get()
76 r = ar.get()
76 ar = self.client[:].pull('data', block=False)
77 ar = self.client[:].pull('data', block=False)
77 self.assertTrue(isinstance(ar, AsyncResult))
78 self.assertTrue(isinstance(ar, AsyncResult))
78 r = ar.get()
79 r = ar.get()
79 self.assertEquals(r, nengines*[data])
80 self.assertEquals(r, nengines*[data])
80 self.client[:].push(dict(a=10,b=20))
81 self.client[:].push(dict(a=10,b=20))
81 r = self.client[:].pull(('a','b'), block=True)
82 r = self.client[:].pull(('a','b'), block=True)
82 self.assertEquals(r, nengines*[[10,20]])
83 self.assertEquals(r, nengines*[[10,20]])
83
84
84 def test_push_pull_function(self):
85 def test_push_pull_function(self):
85 "test pushing and pulling functions"
86 "test pushing and pulling functions"
86 def testf(x):
87 def testf(x):
87 return 2.0*x
88 return 2.0*x
88
89
89 t = self.client.ids[-1]
90 t = self.client.ids[-1]
90 v = self.client[t]
91 v = self.client[t]
91 v.block=True
92 v.block=True
92 push = v.push
93 push = v.push
93 pull = v.pull
94 pull = v.pull
94 execute = v.execute
95 execute = v.execute
95 push({'testf':testf})
96 push({'testf':testf})
96 r = pull('testf')
97 r = pull('testf')
97 self.assertEqual(r(1.0), testf(1.0))
98 self.assertEqual(r(1.0), testf(1.0))
98 execute('r = testf(10)')
99 execute('r = testf(10)')
99 r = pull('r')
100 r = pull('r')
100 self.assertEquals(r, testf(10))
101 self.assertEquals(r, testf(10))
101 ar = self.client[:].push({'testf':testf}, block=False)
102 ar = self.client[:].push({'testf':testf}, block=False)
102 ar.get()
103 ar.get()
103 ar = self.client[:].pull('testf', block=False)
104 ar = self.client[:].pull('testf', block=False)
104 rlist = ar.get()
105 rlist = ar.get()
105 for r in rlist:
106 for r in rlist:
106 self.assertEqual(r(1.0), testf(1.0))
107 self.assertEqual(r(1.0), testf(1.0))
107 execute("def g(x): return x*x")
108 execute("def g(x): return x*x")
108 r = pull(('testf','g'))
109 r = pull(('testf','g'))
109 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
110 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
110
111
111 def test_push_function_globals(self):
112 def test_push_function_globals(self):
112 """test that pushed functions have access to globals"""
113 """test that pushed functions have access to globals"""
113 @interactive
114 @interactive
114 def geta():
115 def geta():
115 return a
116 return a
116 # self.add_engines(1)
117 # self.add_engines(1)
117 v = self.client[-1]
118 v = self.client[-1]
118 v.block=True
119 v.block=True
119 v['f'] = geta
120 v['f'] = geta
120 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
121 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
121 v.execute('a=5')
122 v.execute('a=5')
122 v.execute('b=f()')
123 v.execute('b=f()')
123 self.assertEquals(v['b'], 5)
124 self.assertEquals(v['b'], 5)
124
125
125 def test_push_function_defaults(self):
126 def test_push_function_defaults(self):
126 """test that pushed functions preserve default args"""
127 """test that pushed functions preserve default args"""
127 def echo(a=10):
128 def echo(a=10):
128 return a
129 return a
129 v = self.client[-1]
130 v = self.client[-1]
130 v.block=True
131 v.block=True
131 v['f'] = echo
132 v['f'] = echo
132 v.execute('b=f()')
133 v.execute('b=f()')
133 self.assertEquals(v['b'], 10)
134 self.assertEquals(v['b'], 10)
134
135
135 def test_get_result(self):
136 def test_get_result(self):
136 """test getting results from the Hub."""
137 """test getting results from the Hub."""
137 c = pmod.Client(profile='iptest')
138 c = pmod.Client(profile='iptest')
138 # self.add_engines(1)
139 # self.add_engines(1)
139 t = c.ids[-1]
140 t = c.ids[-1]
140 v = c[t]
141 v = c[t]
141 v2 = self.client[t]
142 v2 = self.client[t]
142 ar = v.apply_async(wait, 1)
143 ar = v.apply_async(wait, 1)
143 # give the monitor time to notice the message
144 # give the monitor time to notice the message
144 time.sleep(.25)
145 time.sleep(.25)
145 ahr = v2.get_result(ar.msg_ids)
146 ahr = v2.get_result(ar.msg_ids)
146 self.assertTrue(isinstance(ahr, AsyncHubResult))
147 self.assertTrue(isinstance(ahr, AsyncHubResult))
147 self.assertEquals(ahr.get(), ar.get())
148 self.assertEquals(ahr.get(), ar.get())
148 ar2 = v2.get_result(ar.msg_ids)
149 ar2 = v2.get_result(ar.msg_ids)
149 self.assertFalse(isinstance(ar2, AsyncHubResult))
150 self.assertFalse(isinstance(ar2, AsyncHubResult))
150 c.spin()
151 c.spin()
151 c.close()
152 c.close()
152
153
153 def test_run_newline(self):
154 def test_run_newline(self):
154 """test that run appends newline to files"""
155 """test that run appends newline to files"""
155 tmpfile = mktemp()
156 tmpfile = mktemp()
156 with open(tmpfile, 'w') as f:
157 with open(tmpfile, 'w') as f:
157 f.write("""def g():
158 f.write("""def g():
158 return 5
159 return 5
159 """)
160 """)
160 v = self.client[-1]
161 v = self.client[-1]
161 v.run(tmpfile, block=True)
162 v.run(tmpfile, block=True)
162 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
163 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
163
164
164 def test_apply_tracked(self):
165 def test_apply_tracked(self):
165 """test tracking for apply"""
166 """test tracking for apply"""
166 # self.add_engines(1)
167 # self.add_engines(1)
167 t = self.client.ids[-1]
168 t = self.client.ids[-1]
168 v = self.client[t]
169 v = self.client[t]
169 v.block=False
170 v.block=False
170 def echo(n=1024*1024, **kwargs):
171 def echo(n=1024*1024, **kwargs):
171 with v.temp_flags(**kwargs):
172 with v.temp_flags(**kwargs):
172 return v.apply(lambda x: x, 'x'*n)
173 return v.apply(lambda x: x, 'x'*n)
173 ar = echo(1, track=False)
174 ar = echo(1, track=False)
174 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
175 self.assertTrue(ar.sent)
176 self.assertTrue(ar.sent)
176 ar = echo(track=True)
177 ar = echo(track=True)
177 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
178 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
178 self.assertEquals(ar.sent, ar._tracker.done)
179 self.assertEquals(ar.sent, ar._tracker.done)
179 ar._tracker.wait()
180 ar._tracker.wait()
180 self.assertTrue(ar.sent)
181 self.assertTrue(ar.sent)
181
182
182 def test_push_tracked(self):
183 def test_push_tracked(self):
183 t = self.client.ids[-1]
184 t = self.client.ids[-1]
184 ns = dict(x='x'*1024*1024)
185 ns = dict(x='x'*1024*1024)
185 v = self.client[t]
186 v = self.client[t]
186 ar = v.push(ns, block=False, track=False)
187 ar = v.push(ns, block=False, track=False)
187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertTrue(ar.sent)
189 self.assertTrue(ar.sent)
189
190
190 ar = v.push(ns, block=False, track=True)
191 ar = v.push(ns, block=False, track=True)
191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
192 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
192 ar._tracker.wait()
193 ar._tracker.wait()
193 self.assertEquals(ar.sent, ar._tracker.done)
194 self.assertEquals(ar.sent, ar._tracker.done)
194 self.assertTrue(ar.sent)
195 self.assertTrue(ar.sent)
195 ar.get()
196 ar.get()
196
197
197 def test_scatter_tracked(self):
198 def test_scatter_tracked(self):
198 t = self.client.ids
199 t = self.client.ids
199 x='x'*1024*1024
200 x='x'*1024*1024
200 ar = self.client[t].scatter('x', x, block=False, track=False)
201 ar = self.client[t].scatter('x', x, block=False, track=False)
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 self.assertTrue(ar.sent)
203 self.assertTrue(ar.sent)
203
204
204 ar = self.client[t].scatter('x', x, block=False, track=True)
205 ar = self.client[t].scatter('x', x, block=False, track=True)
205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
206 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
206 self.assertEquals(ar.sent, ar._tracker.done)
207 self.assertEquals(ar.sent, ar._tracker.done)
207 ar._tracker.wait()
208 ar._tracker.wait()
208 self.assertTrue(ar.sent)
209 self.assertTrue(ar.sent)
209 ar.get()
210 ar.get()
210
211
211 def test_remote_reference(self):
212 def test_remote_reference(self):
212 v = self.client[-1]
213 v = self.client[-1]
213 v['a'] = 123
214 v['a'] = 123
214 ra = pmod.Reference('a')
215 ra = pmod.Reference('a')
215 b = v.apply_sync(lambda x: x, ra)
216 b = v.apply_sync(lambda x: x, ra)
216 self.assertEquals(b, 123)
217 self.assertEquals(b, 123)
217
218
218
219
219 def test_scatter_gather(self):
220 def test_scatter_gather(self):
220 view = self.client[:]
221 view = self.client[:]
221 seq1 = range(16)
222 seq1 = range(16)
222 view.scatter('a', seq1)
223 view.scatter('a', seq1)
223 seq2 = view.gather('a', block=True)
224 seq2 = view.gather('a', block=True)
224 self.assertEquals(seq2, seq1)
225 self.assertEquals(seq2, seq1)
225 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
226 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
226
227
227 @skip_without('numpy')
228 @skip_without('numpy')
228 def test_scatter_gather_numpy(self):
229 def test_scatter_gather_numpy(self):
229 import numpy
230 import numpy
230 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
231 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
231 view = self.client[:]
232 view = self.client[:]
232 a = numpy.arange(64)
233 a = numpy.arange(64)
233 view.scatter('a', a)
234 view.scatter('a', a)
234 b = view.gather('a', block=True)
235 b = view.gather('a', block=True)
235 assert_array_equal(b, a)
236 assert_array_equal(b, a)
236
237
237 def test_scatter_gather_lazy(self):
238 def test_scatter_gather_lazy(self):
238 """scatter/gather with targets='all'"""
239 """scatter/gather with targets='all'"""
239 view = self.client.direct_view(targets='all')
240 view = self.client.direct_view(targets='all')
240 x = range(64)
241 x = range(64)
241 view.scatter('x', x)
242 view.scatter('x', x)
242 gathered = view.gather('x', block=True)
243 gathered = view.gather('x', block=True)
243 self.assertEquals(gathered, x)
244 self.assertEquals(gathered, x)
244
245
245
246
246 @dec.known_failure_py3
247 @dec.known_failure_py3
247 @skip_without('numpy')
248 @skip_without('numpy')
248 def test_push_numpy_nocopy(self):
249 def test_push_numpy_nocopy(self):
249 import numpy
250 import numpy
250 view = self.client[:]
251 view = self.client[:]
251 a = numpy.arange(64)
252 a = numpy.arange(64)
252 view['A'] = a
253 view['A'] = a
253 @interactive
254 @interactive
254 def check_writeable(x):
255 def check_writeable(x):
255 return x.flags.writeable
256 return x.flags.writeable
256
257
257 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
258 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
258 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
259 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
259
260
260 view.push(dict(B=a))
261 view.push(dict(B=a))
261 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
262 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
262 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
263 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
263
264
264 @skip_without('numpy')
265 @skip_without('numpy')
265 def test_apply_numpy(self):
266 def test_apply_numpy(self):
266 """view.apply(f, ndarray)"""
267 """view.apply(f, ndarray)"""
267 import numpy
268 import numpy
268 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
269 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
269
270
270 A = numpy.random.random((100,100))
271 A = numpy.random.random((100,100))
271 view = self.client[-1]
272 view = self.client[-1]
272 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
273 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
273 B = A.astype(dt)
274 B = A.astype(dt)
274 C = view.apply_sync(lambda x:x, B)
275 C = view.apply_sync(lambda x:x, B)
275 assert_array_equal(B,C)
276 assert_array_equal(B,C)
276
277
277 def test_map(self):
278 def test_map(self):
278 view = self.client[:]
279 view = self.client[:]
279 def f(x):
280 def f(x):
280 return x**2
281 return x**2
281 data = range(16)
282 data = range(16)
282 r = view.map_sync(f, data)
283 r = view.map_sync(f, data)
283 self.assertEquals(r, map(f, data))
284 self.assertEquals(r, map(f, data))
284
285
285 def test_map_iterable(self):
286 def test_map_iterable(self):
286 """test map on iterables (direct)"""
287 """test map on iterables (direct)"""
287 view = self.client[:]
288 view = self.client[:]
288 # 101 is prime, so it won't be evenly distributed
289 # 101 is prime, so it won't be evenly distributed
289 arr = range(101)
290 arr = range(101)
290 # ensure it will be an iterator, even in Python 3
291 # ensure it will be an iterator, even in Python 3
291 it = iter(arr)
292 it = iter(arr)
292 r = view.map_sync(lambda x:x, arr)
293 r = view.map_sync(lambda x:x, arr)
293 self.assertEquals(r, list(arr))
294 self.assertEquals(r, list(arr))
294
295
295 def test_scatterGatherNonblocking(self):
296 def test_scatterGatherNonblocking(self):
296 data = range(16)
297 data = range(16)
297 view = self.client[:]
298 view = self.client[:]
298 view.scatter('a', data, block=False)
299 view.scatter('a', data, block=False)
299 ar = view.gather('a', block=False)
300 ar = view.gather('a', block=False)
300 self.assertEquals(ar.get(), data)
301 self.assertEquals(ar.get(), data)
301
302
302 @skip_without('numpy')
303 @skip_without('numpy')
303 def test_scatter_gather_numpy_nonblocking(self):
304 def test_scatter_gather_numpy_nonblocking(self):
304 import numpy
305 import numpy
305 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
306 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
306 a = numpy.arange(64)
307 a = numpy.arange(64)
307 view = self.client[:]
308 view = self.client[:]
308 ar = view.scatter('a', a, block=False)
309 ar = view.scatter('a', a, block=False)
309 self.assertTrue(isinstance(ar, AsyncResult))
310 self.assertTrue(isinstance(ar, AsyncResult))
310 amr = view.gather('a', block=False)
311 amr = view.gather('a', block=False)
311 self.assertTrue(isinstance(amr, AsyncMapResult))
312 self.assertTrue(isinstance(amr, AsyncMapResult))
312 assert_array_equal(amr.get(), a)
313 assert_array_equal(amr.get(), a)
313
314
314 def test_execute(self):
315 def test_execute(self):
315 view = self.client[:]
316 view = self.client[:]
316 # self.client.debug=True
317 # self.client.debug=True
317 execute = view.execute
318 execute = view.execute
318 ar = execute('c=30', block=False)
319 ar = execute('c=30', block=False)
319 self.assertTrue(isinstance(ar, AsyncResult))
320 self.assertTrue(isinstance(ar, AsyncResult))
320 ar = execute('d=[0,1,2]', block=False)
321 ar = execute('d=[0,1,2]', block=False)
321 self.client.wait(ar, 1)
322 self.client.wait(ar, 1)
322 self.assertEquals(len(ar.get()), len(self.client))
323 self.assertEquals(len(ar.get()), len(self.client))
323 for c in view['c']:
324 for c in view['c']:
324 self.assertEquals(c, 30)
325 self.assertEquals(c, 30)
325
326
326 def test_abort(self):
327 def test_abort(self):
327 view = self.client[-1]
328 view = self.client[-1]
328 ar = view.execute('import time; time.sleep(1)', block=False)
329 ar = view.execute('import time; time.sleep(1)', block=False)
329 ar2 = view.apply_async(lambda : 2)
330 ar2 = view.apply_async(lambda : 2)
330 ar3 = view.apply_async(lambda : 3)
331 ar3 = view.apply_async(lambda : 3)
331 view.abort(ar2)
332 view.abort(ar2)
332 view.abort(ar3.msg_ids)
333 view.abort(ar3.msg_ids)
333 self.assertRaises(error.TaskAborted, ar2.get)
334 self.assertRaises(error.TaskAborted, ar2.get)
334 self.assertRaises(error.TaskAborted, ar3.get)
335 self.assertRaises(error.TaskAborted, ar3.get)
335
336
336 def test_abort_all(self):
337 def test_abort_all(self):
337 """view.abort() aborts all outstanding tasks"""
338 """view.abort() aborts all outstanding tasks"""
338 view = self.client[-1]
339 view = self.client[-1]
339 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
340 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
340 view.abort()
341 view.abort()
341 view.wait(timeout=5)
342 view.wait(timeout=5)
342 for ar in ars[5:]:
343 for ar in ars[5:]:
343 self.assertRaises(error.TaskAborted, ar.get)
344 self.assertRaises(error.TaskAborted, ar.get)
344
345
345 def test_temp_flags(self):
346 def test_temp_flags(self):
346 view = self.client[-1]
347 view = self.client[-1]
347 view.block=True
348 view.block=True
348 with view.temp_flags(block=False):
349 with view.temp_flags(block=False):
349 self.assertFalse(view.block)
350 self.assertFalse(view.block)
350 self.assertTrue(view.block)
351 self.assertTrue(view.block)
351
352
352 @dec.known_failure_py3
353 @dec.known_failure_py3
353 def test_importer(self):
354 def test_importer(self):
354 view = self.client[-1]
355 view = self.client[-1]
355 view.clear(block=True)
356 view.clear(block=True)
356 with view.importer:
357 with view.importer:
357 import re
358 import re
358
359
359 @interactive
360 @interactive
360 def findall(pat, s):
361 def findall(pat, s):
361 # this globals() step isn't necessary in real code
362 # this globals() step isn't necessary in real code
362 # only to prevent a closure in the test
363 # only to prevent a closure in the test
363 re = globals()['re']
364 re = globals()['re']
364 return re.findall(pat, s)
365 return re.findall(pat, s)
365
366
366 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
367 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
367
368
368 # parallel magic tests
369 # parallel magic tests
369
370
370 def test_magic_px_blocking(self):
371 def test_magic_px_blocking(self):
371 ip = get_ipython()
372 ip = get_ipython()
372 v = self.client[-1]
373 v = self.client[-1]
373 v.activate()
374 v.activate()
374 v.block=True
375 v.block=True
375
376
376 ip.magic_px('a=5')
377 ip.magic_px('a=5')
377 self.assertEquals(v['a'], 5)
378 self.assertEquals(v['a'], 5)
378 ip.magic_px('a=10')
379 ip.magic_px('a=10')
379 self.assertEquals(v['a'], 10)
380 self.assertEquals(v['a'], 10)
380 sio = StringIO()
381 sio = StringIO()
381 savestdout = sys.stdout
382 savestdout = sys.stdout
382 sys.stdout = sio
383 sys.stdout = sio
383 # just 'print a' worst ~99% of the time, but this ensures that
384 # just 'print a' worst ~99% of the time, but this ensures that
384 # the stdout message has arrived when the result is finished:
385 # the stdout message has arrived when the result is finished:
385 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
386 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
386 sys.stdout = savestdout
387 sys.stdout = savestdout
387 buf = sio.getvalue()
388 buf = sio.getvalue()
388 self.assertTrue('[stdout:' in buf, buf)
389 self.assertTrue('[stdout:' in buf, buf)
389 self.assertTrue(buf.rstrip().endswith('10'))
390 self.assertTrue(buf.rstrip().endswith('10'))
390 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
391 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
391
392
392 def test_magic_px_nonblocking(self):
393 def test_magic_px_nonblocking(self):
393 ip = get_ipython()
394 ip = get_ipython()
394 v = self.client[-1]
395 v = self.client[-1]
395 v.activate()
396 v.activate()
396 v.block=False
397 v.block=False
397
398
398 ip.magic_px('a=5')
399 ip.magic_px('a=5')
399 self.assertEquals(v['a'], 5)
400 self.assertEquals(v['a'], 5)
400 ip.magic_px('a=10')
401 ip.magic_px('a=10')
401 self.assertEquals(v['a'], 10)
402 self.assertEquals(v['a'], 10)
402 sio = StringIO()
403 sio = StringIO()
403 savestdout = sys.stdout
404 savestdout = sys.stdout
404 sys.stdout = sio
405 sys.stdout = sio
405 ip.magic_px('print a')
406 ip.magic_px('print a')
406 sys.stdout = savestdout
407 sys.stdout = savestdout
407 buf = sio.getvalue()
408 buf = sio.getvalue()
408 self.assertFalse('[stdout:%i]'%v.targets in buf)
409 self.assertFalse('[stdout:%i]'%v.targets in buf)
409 ip.magic_px('1/0')
410 ip.magic_px('1/0')
410 ar = v.get_result(-1)
411 ar = v.get_result(-1)
411 self.assertRaisesRemote(ZeroDivisionError, ar.get)
412 self.assertRaisesRemote(ZeroDivisionError, ar.get)
412
413
413 def test_magic_autopx_blocking(self):
414 def test_magic_autopx_blocking(self):
414 ip = get_ipython()
415 ip = get_ipython()
415 v = self.client[-1]
416 v = self.client[-1]
416 v.activate()
417 v.activate()
417 v.block=True
418 v.block=True
418
419
419 sio = StringIO()
420 sio = StringIO()
420 savestdout = sys.stdout
421 savestdout = sys.stdout
421 sys.stdout = sio
422 sys.stdout = sio
422 ip.magic_autopx()
423 ip.magic_autopx()
423 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
424 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
424 ip.run_cell('b*=2')
425 ip.run_cell('b*=2')
425 ip.run_cell('print (b)')
426 ip.run_cell('print (b)')
426 ip.run_cell("b/c")
427 ip.run_cell("b/c")
427 ip.magic_autopx()
428 ip.magic_autopx()
428 sys.stdout = savestdout
429 sys.stdout = savestdout
429 output = sio.getvalue().strip()
430 output = sio.getvalue().strip()
430 self.assertTrue(output.startswith('%autopx enabled'))
431 self.assertTrue(output.startswith('%autopx enabled'))
431 self.assertTrue(output.endswith('%autopx disabled'))
432 self.assertTrue(output.endswith('%autopx disabled'))
432 self.assertTrue('RemoteError: ZeroDivisionError' in output)
433 self.assertTrue('RemoteError: ZeroDivisionError' in output)
433 ar = v.get_result(-1)
434 ar = v.get_result(-1)
434 self.assertEquals(v['a'], 5)
435 self.assertEquals(v['a'], 5)
435 self.assertEquals(v['b'], 20)
436 self.assertEquals(v['b'], 20)
436 self.assertRaisesRemote(ZeroDivisionError, ar.get)
437 self.assertRaisesRemote(ZeroDivisionError, ar.get)
437
438
438 def test_magic_autopx_nonblocking(self):
439 def test_magic_autopx_nonblocking(self):
439 ip = get_ipython()
440 ip = get_ipython()
440 v = self.client[-1]
441 v = self.client[-1]
441 v.activate()
442 v.activate()
442 v.block=False
443 v.block=False
443
444
444 sio = StringIO()
445 sio = StringIO()
445 savestdout = sys.stdout
446 savestdout = sys.stdout
446 sys.stdout = sio
447 sys.stdout = sio
447 ip.magic_autopx()
448 ip.magic_autopx()
448 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
449 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
449 ip.run_cell('print (b)')
450 ip.run_cell('print (b)')
450 ip.run_cell("b/c")
451 ip.run_cell("b/c")
451 ip.run_cell('b*=2')
452 ip.run_cell('b*=2')
452 ip.magic_autopx()
453 ip.magic_autopx()
453 sys.stdout = savestdout
454 sys.stdout = savestdout
454 output = sio.getvalue().strip()
455 output = sio.getvalue().strip()
455 self.assertTrue(output.startswith('%autopx enabled'))
456 self.assertTrue(output.startswith('%autopx enabled'))
456 self.assertTrue(output.endswith('%autopx disabled'))
457 self.assertTrue(output.endswith('%autopx disabled'))
457 self.assertFalse('ZeroDivisionError' in output)
458 self.assertFalse('ZeroDivisionError' in output)
458 ar = v.get_result(-2)
459 ar = v.get_result(-2)
459 self.assertEquals(v['a'], 5)
460 self.assertEquals(v['a'], 5)
460 self.assertEquals(v['b'], 20)
461 self.assertEquals(v['b'], 20)
461 self.assertRaisesRemote(ZeroDivisionError, ar.get)
462 self.assertRaisesRemote(ZeroDivisionError, ar.get)
462
463
463 def test_magic_result(self):
464 def test_magic_result(self):
464 ip = get_ipython()
465 ip = get_ipython()
465 v = self.client[-1]
466 v = self.client[-1]
466 v.activate()
467 v.activate()
467 v['a'] = 111
468 v['a'] = 111
468 ra = v['a']
469 ra = v['a']
469
470
470 ar = ip.magic_result()
471 ar = ip.magic_result()
471 self.assertEquals(ar.msg_ids, [v.history[-1]])
472 self.assertEquals(ar.msg_ids, [v.history[-1]])
472 self.assertEquals(ar.get(), 111)
473 self.assertEquals(ar.get(), 111)
473 ar = ip.magic_result('-2')
474 ar = ip.magic_result('-2')
474 self.assertEquals(ar.msg_ids, [v.history[-2]])
475 self.assertEquals(ar.msg_ids, [v.history[-2]])
475
476
476 def test_unicode_execute(self):
477 def test_unicode_execute(self):
477 """test executing unicode strings"""
478 """test executing unicode strings"""
478 v = self.client[-1]
479 v = self.client[-1]
479 v.block=True
480 v.block=True
480 if sys.version_info[0] >= 3:
481 if sys.version_info[0] >= 3:
481 code="a='é'"
482 code="a='é'"
482 else:
483 else:
483 code=u"a=u'é'"
484 code=u"a=u'é'"
484 v.execute(code)
485 v.execute(code)
485 self.assertEquals(v['a'], u'é')
486 self.assertEquals(v['a'], u'é')
486
487
487 def test_unicode_apply_result(self):
488 def test_unicode_apply_result(self):
488 """test unicode apply results"""
489 """test unicode apply results"""
489 v = self.client[-1]
490 v = self.client[-1]
490 r = v.apply_sync(lambda : u'é')
491 r = v.apply_sync(lambda : u'é')
491 self.assertEquals(r, u'é')
492 self.assertEquals(r, u'é')
492
493
493 def test_unicode_apply_arg(self):
494 def test_unicode_apply_arg(self):
494 """test passing unicode arguments to apply"""
495 """test passing unicode arguments to apply"""
495 v = self.client[-1]
496 v = self.client[-1]
496
497
497 @interactive
498 @interactive
498 def check_unicode(a, check):
499 def check_unicode(a, check):
499 assert isinstance(a, unicode), "%r is not unicode"%a
500 assert isinstance(a, unicode), "%r is not unicode"%a
500 assert isinstance(check, bytes), "%r is not bytes"%check
501 assert isinstance(check, bytes), "%r is not bytes"%check
501 assert a.encode('utf8') == check, "%s != %s"%(a,check)
502 assert a.encode('utf8') == check, "%s != %s"%(a,check)
502
503
503 for s in [ u'é', u'ßø®∫',u'asdf' ]:
504 for s in [ u'é', u'ßø®∫',u'asdf' ]:
504 try:
505 try:
505 v.apply_sync(check_unicode, s, s.encode('utf8'))
506 v.apply_sync(check_unicode, s, s.encode('utf8'))
506 except error.RemoteError as e:
507 except error.RemoteError as e:
507 if e.ename == 'AssertionError':
508 if e.ename == 'AssertionError':
508 self.fail(e.evalue)
509 self.fail(e.evalue)
509 else:
510 else:
510 raise e
511 raise e
511
512
512 def test_map_reference(self):
513 def test_map_reference(self):
513 """view.map(<Reference>, *seqs) should work"""
514 """view.map(<Reference>, *seqs) should work"""
514 v = self.client[:]
515 v = self.client[:]
515 v.scatter('n', self.client.ids, flatten=True)
516 v.scatter('n', self.client.ids, flatten=True)
516 v.execute("f = lambda x,y: x*y")
517 v.execute("f = lambda x,y: x*y")
517 rf = pmod.Reference('f')
518 rf = pmod.Reference('f')
518 nlist = list(range(10))
519 nlist = list(range(10))
519 mlist = nlist[::-1]
520 mlist = nlist[::-1]
520 expected = [ m*n for m,n in zip(mlist, nlist) ]
521 expected = [ m*n for m,n in zip(mlist, nlist) ]
521 result = v.map_sync(rf, mlist, nlist)
522 result = v.map_sync(rf, mlist, nlist)
522 self.assertEquals(result, expected)
523 self.assertEquals(result, expected)
523
524
524 def test_apply_reference(self):
525 def test_apply_reference(self):
525 """view.apply(<Reference>, *args) should work"""
526 """view.apply(<Reference>, *args) should work"""
526 v = self.client[:]
527 v = self.client[:]
527 v.scatter('n', self.client.ids, flatten=True)
528 v.scatter('n', self.client.ids, flatten=True)
528 v.execute("f = lambda x: n*x")
529 v.execute("f = lambda x: n*x")
529 rf = pmod.Reference('f')
530 rf = pmod.Reference('f')
530 result = v.apply_sync(rf, 5)
531 result = v.apply_sync(rf, 5)
531 expected = [ 5*id for id in self.client.ids ]
532 expected = [ 5*id for id in self.client.ids ]
532 self.assertEquals(result, expected)
533 self.assertEquals(result, expected)
533
534
534 def test_eval_reference(self):
535 def test_eval_reference(self):
535 v = self.client[self.client.ids[0]]
536 v = self.client[self.client.ids[0]]
536 v['g'] = range(5)
537 v['g'] = range(5)
537 rg = pmod.Reference('g[0]')
538 rg = pmod.Reference('g[0]')
538 echo = lambda x:x
539 echo = lambda x:x
539 self.assertEquals(v.apply_sync(echo, rg), 0)
540 self.assertEquals(v.apply_sync(echo, rg), 0)
540
541
541 def test_reference_nameerror(self):
542 def test_reference_nameerror(self):
542 v = self.client[self.client.ids[0]]
543 v = self.client[self.client.ids[0]]
543 r = pmod.Reference('elvis_has_left')
544 r = pmod.Reference('elvis_has_left')
544 echo = lambda x:x
545 echo = lambda x:x
545 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
546 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
546
547
547 def test_single_engine_map(self):
548 def test_single_engine_map(self):
548 e0 = self.client[self.client.ids[0]]
549 e0 = self.client[self.client.ids[0]]
549 r = range(5)
550 r = range(5)
550 check = [ -1*i for i in r ]
551 check = [ -1*i for i in r ]
551 result = e0.map_sync(lambda x: -1*x, r)
552 result = e0.map_sync(lambda x: -1*x, r)
552 self.assertEquals(result, check)
553 self.assertEquals(result, check)
554
555 def test_len(self):
556 """len(view) makes sense"""
557 e0 = self.client[self.client.ids[0]]
558 yield self.assertEquals(len(e0), 1)
559 v = self.client[:]
560 yield self.assertEquals(len(v), len(self.client.ids))
561 v = self.client.direct_view('all')
562 yield self.assertEquals(len(v), len(self.client.ids))
563 v = self.client[:2]
564 yield self.assertEquals(len(v), 2)
565 v = self.client[:1]
566 yield self.assertEquals(len(v), 1)
567 v = self.client.load_balanced_view()
568 yield self.assertEquals(len(v), len(self.client.ids))
569 # parametric tests seem to require manual closing?
570 self.client.close()
553
571
General Comments 0
You need to be logged in to leave comments. Login now