##// END OF EJS Templates
enable non-copying sends in push...
MinRK -
Show More
@@ -1,1069 +1,1069 b''
1 """Views of remote engines.
1 """Views of remote engines.
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import imp
18 import imp
19 import sys
19 import sys
20 import warnings
20 import warnings
21 from contextlib import contextmanager
21 from contextlib import contextmanager
22 from types import ModuleType
22 from types import ModuleType
23
23
24 import zmq
24 import zmq
25
25
26 from IPython.testing.skipdoctest import skip_doctest
26 from IPython.testing.skipdoctest import skip_doctest
27 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
29 )
29 )
30 from IPython.external.decorator import decorator
30 from IPython.external.decorator import decorator
31
31
32 from IPython.parallel import util
32 from IPython.parallel import util
33 from IPython.parallel.controller.dependency import Dependency, dependent
33 from IPython.parallel.controller.dependency import Dependency, dependent
34
34
35 from . import map as Map
35 from . import map as Map
36 from .asyncresult import AsyncResult, AsyncMapResult
36 from .asyncresult import AsyncResult, AsyncMapResult
37 from .remotefunction import ParallelFunction, parallel, remote, getname
37 from .remotefunction import ParallelFunction, parallel, remote, getname
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Decorators
40 # Decorators
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 @decorator
43 @decorator
44 def save_ids(f, self, *args, **kwargs):
44 def save_ids(f, self, *args, **kwargs):
45 """Keep our history and outstanding attributes up to date after a method call."""
45 """Keep our history and outstanding attributes up to date after a method call."""
46 n_previous = len(self.client.history)
46 n_previous = len(self.client.history)
47 try:
47 try:
48 ret = f(self, *args, **kwargs)
48 ret = f(self, *args, **kwargs)
49 finally:
49 finally:
50 nmsgs = len(self.client.history) - n_previous
50 nmsgs = len(self.client.history) - n_previous
51 msg_ids = self.client.history[-nmsgs:]
51 msg_ids = self.client.history[-nmsgs:]
52 self.history.extend(msg_ids)
52 self.history.extend(msg_ids)
53 map(self.outstanding.add, msg_ids)
53 map(self.outstanding.add, msg_ids)
54 return ret
54 return ret
55
55
56 @decorator
56 @decorator
57 def sync_results(f, self, *args, **kwargs):
57 def sync_results(f, self, *args, **kwargs):
58 """sync relevant results from self.client to our results attribute."""
58 """sync relevant results from self.client to our results attribute."""
59 ret = f(self, *args, **kwargs)
59 ret = f(self, *args, **kwargs)
60 delta = self.outstanding.difference(self.client.outstanding)
60 delta = self.outstanding.difference(self.client.outstanding)
61 completed = self.outstanding.intersection(delta)
61 completed = self.outstanding.intersection(delta)
62 self.outstanding = self.outstanding.difference(completed)
62 self.outstanding = self.outstanding.difference(completed)
63 for msg_id in completed:
63 for msg_id in completed:
64 self.results[msg_id] = self.client.results[msg_id]
64 self.results[msg_id] = self.client.results[msg_id]
65 return ret
65 return ret
66
66
67 @decorator
67 @decorator
68 def spin_after(f, self, *args, **kwargs):
68 def spin_after(f, self, *args, **kwargs):
69 """call spin after the method."""
69 """call spin after the method."""
70 ret = f(self, *args, **kwargs)
70 ret = f(self, *args, **kwargs)
71 self.spin()
71 self.spin()
72 return ret
72 return ret
73
73
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75 # Classes
75 # Classes
76 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
77
77
78 @skip_doctest
78 @skip_doctest
79 class View(HasTraits):
79 class View(HasTraits):
80 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
80 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
81
81
82 Don't use this class, use subclasses.
82 Don't use this class, use subclasses.
83
83
84 Methods
84 Methods
85 -------
85 -------
86
86
87 spin
87 spin
88 flushes incoming results and registration state changes
88 flushes incoming results and registration state changes
89 control methods spin, and requesting `ids` also ensures up to date
89 control methods spin, and requesting `ids` also ensures up to date
90
90
91 wait
91 wait
92 wait on one or more msg_ids
92 wait on one or more msg_ids
93
93
94 execution methods
94 execution methods
95 apply
95 apply
96 legacy: execute, run
96 legacy: execute, run
97
97
98 data movement
98 data movement
99 push, pull, scatter, gather
99 push, pull, scatter, gather
100
100
101 query methods
101 query methods
102 get_result, queue_status, purge_results, result_status
102 get_result, queue_status, purge_results, result_status
103
103
104 control methods
104 control methods
105 abort, shutdown
105 abort, shutdown
106
106
107 """
107 """
108 # flags
108 # flags
109 block=Bool(False)
109 block=Bool(False)
110 track=Bool(True)
110 track=Bool(True)
111 targets = Any()
111 targets = Any()
112
112
113 history=List()
113 history=List()
114 outstanding = Set()
114 outstanding = Set()
115 results = Dict()
115 results = Dict()
116 client = Instance('IPython.parallel.Client')
116 client = Instance('IPython.parallel.Client')
117
117
118 _socket = Instance('zmq.Socket')
118 _socket = Instance('zmq.Socket')
119 _flag_names = List(['targets', 'block', 'track'])
119 _flag_names = List(['targets', 'block', 'track'])
120 _targets = Any()
120 _targets = Any()
121 _idents = Any()
121 _idents = Any()
122
122
123 def __init__(self, client=None, socket=None, **flags):
123 def __init__(self, client=None, socket=None, **flags):
124 super(View, self).__init__(client=client, _socket=socket)
124 super(View, self).__init__(client=client, _socket=socket)
125 self.block = client.block
125 self.block = client.block
126
126
127 self.set_flags(**flags)
127 self.set_flags(**flags)
128
128
129 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
129 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
130
130
131
131
132 def __repr__(self):
132 def __repr__(self):
133 strtargets = str(self.targets)
133 strtargets = str(self.targets)
134 if len(strtargets) > 16:
134 if len(strtargets) > 16:
135 strtargets = strtargets[:12]+'...]'
135 strtargets = strtargets[:12]+'...]'
136 return "<%s %s>"%(self.__class__.__name__, strtargets)
136 return "<%s %s>"%(self.__class__.__name__, strtargets)
137
137
138 def set_flags(self, **kwargs):
138 def set_flags(self, **kwargs):
139 """set my attribute flags by keyword.
139 """set my attribute flags by keyword.
140
140
141 Views determine behavior with a few attributes (`block`, `track`, etc.).
141 Views determine behavior with a few attributes (`block`, `track`, etc.).
142 These attributes can be set all at once by name with this method.
142 These attributes can be set all at once by name with this method.
143
143
144 Parameters
144 Parameters
145 ----------
145 ----------
146
146
147 block : bool
147 block : bool
148 whether to wait for results
148 whether to wait for results
149 track : bool
149 track : bool
150 whether to create a MessageTracker to allow the user to
150 whether to create a MessageTracker to allow the user to
151 safely edit after arrays and buffers during non-copying
151 safely edit after arrays and buffers during non-copying
152 sends.
152 sends.
153 """
153 """
154 for name, value in kwargs.iteritems():
154 for name, value in kwargs.iteritems():
155 if name not in self._flag_names:
155 if name not in self._flag_names:
156 raise KeyError("Invalid name: %r"%name)
156 raise KeyError("Invalid name: %r"%name)
157 else:
157 else:
158 setattr(self, name, value)
158 setattr(self, name, value)
159
159
160 @contextmanager
160 @contextmanager
161 def temp_flags(self, **kwargs):
161 def temp_flags(self, **kwargs):
162 """temporarily set flags, for use in `with` statements.
162 """temporarily set flags, for use in `with` statements.
163
163
164 See set_flags for permanent setting of flags
164 See set_flags for permanent setting of flags
165
165
166 Examples
166 Examples
167 --------
167 --------
168
168
169 >>> view.track=False
169 >>> view.track=False
170 ...
170 ...
171 >>> with view.temp_flags(track=True):
171 >>> with view.temp_flags(track=True):
172 ... ar = view.apply(dostuff, my_big_array)
172 ... ar = view.apply(dostuff, my_big_array)
173 ... ar.tracker.wait() # wait for send to finish
173 ... ar.tracker.wait() # wait for send to finish
174 >>> view.track
174 >>> view.track
175 False
175 False
176
176
177 """
177 """
178 # preflight: save flags, and set temporaries
178 # preflight: save flags, and set temporaries
179 saved_flags = {}
179 saved_flags = {}
180 for f in self._flag_names:
180 for f in self._flag_names:
181 saved_flags[f] = getattr(self, f)
181 saved_flags[f] = getattr(self, f)
182 self.set_flags(**kwargs)
182 self.set_flags(**kwargs)
183 # yield to the with-statement block
183 # yield to the with-statement block
184 try:
184 try:
185 yield
185 yield
186 finally:
186 finally:
187 # postflight: restore saved flags
187 # postflight: restore saved flags
188 self.set_flags(**saved_flags)
188 self.set_flags(**saved_flags)
189
189
190
190
191 #----------------------------------------------------------------
191 #----------------------------------------------------------------
192 # apply
192 # apply
193 #----------------------------------------------------------------
193 #----------------------------------------------------------------
194
194
195 @sync_results
195 @sync_results
196 @save_ids
196 @save_ids
197 def _really_apply(self, f, args, kwargs, block=None, **options):
197 def _really_apply(self, f, args, kwargs, block=None, **options):
198 """wrapper for client.send_apply_message"""
198 """wrapper for client.send_apply_message"""
199 raise NotImplementedError("Implement in subclasses")
199 raise NotImplementedError("Implement in subclasses")
200
200
201 def apply(self, f, *args, **kwargs):
201 def apply(self, f, *args, **kwargs):
202 """calls f(*args, **kwargs) on remote engines, returning the result.
202 """calls f(*args, **kwargs) on remote engines, returning the result.
203
203
204 This method sets all apply flags via this View's attributes.
204 This method sets all apply flags via this View's attributes.
205
205
206 if self.block is False:
206 if self.block is False:
207 returns AsyncResult
207 returns AsyncResult
208 else:
208 else:
209 returns actual result of f(*args, **kwargs)
209 returns actual result of f(*args, **kwargs)
210 """
210 """
211 return self._really_apply(f, args, kwargs)
211 return self._really_apply(f, args, kwargs)
212
212
213 def apply_async(self, f, *args, **kwargs):
213 def apply_async(self, f, *args, **kwargs):
214 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
214 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
215
215
216 returns AsyncResult
216 returns AsyncResult
217 """
217 """
218 return self._really_apply(f, args, kwargs, block=False)
218 return self._really_apply(f, args, kwargs, block=False)
219
219
220 @spin_after
220 @spin_after
221 def apply_sync(self, f, *args, **kwargs):
221 def apply_sync(self, f, *args, **kwargs):
222 """calls f(*args, **kwargs) on remote engines in a blocking manner,
222 """calls f(*args, **kwargs) on remote engines in a blocking manner,
223 returning the result.
223 returning the result.
224
224
225 returns: actual result of f(*args, **kwargs)
225 returns: actual result of f(*args, **kwargs)
226 """
226 """
227 return self._really_apply(f, args, kwargs, block=True)
227 return self._really_apply(f, args, kwargs, block=True)
228
228
229 #----------------------------------------------------------------
229 #----------------------------------------------------------------
230 # wrappers for client and control methods
230 # wrappers for client and control methods
231 #----------------------------------------------------------------
231 #----------------------------------------------------------------
232 @sync_results
232 @sync_results
233 def spin(self):
233 def spin(self):
234 """spin the client, and sync"""
234 """spin the client, and sync"""
235 self.client.spin()
235 self.client.spin()
236
236
237 @sync_results
237 @sync_results
238 def wait(self, jobs=None, timeout=-1):
238 def wait(self, jobs=None, timeout=-1):
239 """waits on one or more `jobs`, for up to `timeout` seconds.
239 """waits on one or more `jobs`, for up to `timeout` seconds.
240
240
241 Parameters
241 Parameters
242 ----------
242 ----------
243
243
244 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
244 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
245 ints are indices to self.history
245 ints are indices to self.history
246 strs are msg_ids
246 strs are msg_ids
247 default: wait on all outstanding messages
247 default: wait on all outstanding messages
248 timeout : float
248 timeout : float
249 a time in seconds, after which to give up.
249 a time in seconds, after which to give up.
250 default is -1, which means no timeout
250 default is -1, which means no timeout
251
251
252 Returns
252 Returns
253 -------
253 -------
254
254
255 True : when all msg_ids are done
255 True : when all msg_ids are done
256 False : timeout reached, some msg_ids still outstanding
256 False : timeout reached, some msg_ids still outstanding
257 """
257 """
258 if jobs is None:
258 if jobs is None:
259 jobs = self.history
259 jobs = self.history
260 return self.client.wait(jobs, timeout)
260 return self.client.wait(jobs, timeout)
261
261
262 def abort(self, jobs=None, targets=None, block=None):
262 def abort(self, jobs=None, targets=None, block=None):
263 """Abort jobs on my engines.
263 """Abort jobs on my engines.
264
264
265 Parameters
265 Parameters
266 ----------
266 ----------
267
267
268 jobs : None, str, list of strs, optional
268 jobs : None, str, list of strs, optional
269 if None: abort all jobs.
269 if None: abort all jobs.
270 else: abort specific msg_id(s).
270 else: abort specific msg_id(s).
271 """
271 """
272 block = block if block is not None else self.block
272 block = block if block is not None else self.block
273 targets = targets if targets is not None else self.targets
273 targets = targets if targets is not None else self.targets
274 jobs = jobs if jobs is not None else list(self.outstanding)
274 jobs = jobs if jobs is not None else list(self.outstanding)
275
275
276 return self.client.abort(jobs=jobs, targets=targets, block=block)
276 return self.client.abort(jobs=jobs, targets=targets, block=block)
277
277
278 def queue_status(self, targets=None, verbose=False):
278 def queue_status(self, targets=None, verbose=False):
279 """Fetch the Queue status of my engines"""
279 """Fetch the Queue status of my engines"""
280 targets = targets if targets is not None else self.targets
280 targets = targets if targets is not None else self.targets
281 return self.client.queue_status(targets=targets, verbose=verbose)
281 return self.client.queue_status(targets=targets, verbose=verbose)
282
282
283 def purge_results(self, jobs=[], targets=[]):
283 def purge_results(self, jobs=[], targets=[]):
284 """Instruct the controller to forget specific results."""
284 """Instruct the controller to forget specific results."""
285 if targets is None or targets == 'all':
285 if targets is None or targets == 'all':
286 targets = self.targets
286 targets = self.targets
287 return self.client.purge_results(jobs=jobs, targets=targets)
287 return self.client.purge_results(jobs=jobs, targets=targets)
288
288
289 def shutdown(self, targets=None, restart=False, hub=False, block=None):
289 def shutdown(self, targets=None, restart=False, hub=False, block=None):
290 """Terminates one or more engine processes, optionally including the hub.
290 """Terminates one or more engine processes, optionally including the hub.
291 """
291 """
292 block = self.block if block is None else block
292 block = self.block if block is None else block
293 if targets is None or targets == 'all':
293 if targets is None or targets == 'all':
294 targets = self.targets
294 targets = self.targets
295 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
295 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
296
296
297 @spin_after
297 @spin_after
298 def get_result(self, indices_or_msg_ids=None):
298 def get_result(self, indices_or_msg_ids=None):
299 """return one or more results, specified by history index or msg_id.
299 """return one or more results, specified by history index or msg_id.
300
300
301 See client.get_result for details.
301 See client.get_result for details.
302
302
303 """
303 """
304
304
305 if indices_or_msg_ids is None:
305 if indices_or_msg_ids is None:
306 indices_or_msg_ids = -1
306 indices_or_msg_ids = -1
307 if isinstance(indices_or_msg_ids, int):
307 if isinstance(indices_or_msg_ids, int):
308 indices_or_msg_ids = self.history[indices_or_msg_ids]
308 indices_or_msg_ids = self.history[indices_or_msg_ids]
309 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
309 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
310 indices_or_msg_ids = list(indices_or_msg_ids)
310 indices_or_msg_ids = list(indices_or_msg_ids)
311 for i,index in enumerate(indices_or_msg_ids):
311 for i,index in enumerate(indices_or_msg_ids):
312 if isinstance(index, int):
312 if isinstance(index, int):
313 indices_or_msg_ids[i] = self.history[index]
313 indices_or_msg_ids[i] = self.history[index]
314 return self.client.get_result(indices_or_msg_ids)
314 return self.client.get_result(indices_or_msg_ids)
315
315
316 #-------------------------------------------------------------------
316 #-------------------------------------------------------------------
317 # Map
317 # Map
318 #-------------------------------------------------------------------
318 #-------------------------------------------------------------------
319
319
320 def map(self, f, *sequences, **kwargs):
320 def map(self, f, *sequences, **kwargs):
321 """override in subclasses"""
321 """override in subclasses"""
322 raise NotImplementedError
322 raise NotImplementedError
323
323
324 def map_async(self, f, *sequences, **kwargs):
324 def map_async(self, f, *sequences, **kwargs):
325 """Parallel version of builtin `map`, using this view's engines.
325 """Parallel version of builtin `map`, using this view's engines.
326
326
327 This is equivalent to map(...block=False)
327 This is equivalent to map(...block=False)
328
328
329 See `self.map` for details.
329 See `self.map` for details.
330 """
330 """
331 if 'block' in kwargs:
331 if 'block' in kwargs:
332 raise TypeError("map_async doesn't take a `block` keyword argument.")
332 raise TypeError("map_async doesn't take a `block` keyword argument.")
333 kwargs['block'] = False
333 kwargs['block'] = False
334 return self.map(f,*sequences,**kwargs)
334 return self.map(f,*sequences,**kwargs)
335
335
336 def map_sync(self, f, *sequences, **kwargs):
336 def map_sync(self, f, *sequences, **kwargs):
337 """Parallel version of builtin `map`, using this view's engines.
337 """Parallel version of builtin `map`, using this view's engines.
338
338
339 This is equivalent to map(...block=True)
339 This is equivalent to map(...block=True)
340
340
341 See `self.map` for details.
341 See `self.map` for details.
342 """
342 """
343 if 'block' in kwargs:
343 if 'block' in kwargs:
344 raise TypeError("map_sync doesn't take a `block` keyword argument.")
344 raise TypeError("map_sync doesn't take a `block` keyword argument.")
345 kwargs['block'] = True
345 kwargs['block'] = True
346 return self.map(f,*sequences,**kwargs)
346 return self.map(f,*sequences,**kwargs)
347
347
348 def imap(self, f, *sequences, **kwargs):
348 def imap(self, f, *sequences, **kwargs):
349 """Parallel version of `itertools.imap`.
349 """Parallel version of `itertools.imap`.
350
350
351 See `self.map` for details.
351 See `self.map` for details.
352
352
353 """
353 """
354
354
355 return iter(self.map_async(f,*sequences, **kwargs))
355 return iter(self.map_async(f,*sequences, **kwargs))
356
356
357 #-------------------------------------------------------------------
357 #-------------------------------------------------------------------
358 # Decorators
358 # Decorators
359 #-------------------------------------------------------------------
359 #-------------------------------------------------------------------
360
360
361 def remote(self, block=True, **flags):
361 def remote(self, block=True, **flags):
362 """Decorator for making a RemoteFunction"""
362 """Decorator for making a RemoteFunction"""
363 block = self.block if block is None else block
363 block = self.block if block is None else block
364 return remote(self, block=block, **flags)
364 return remote(self, block=block, **flags)
365
365
366 def parallel(self, dist='b', block=None, **flags):
366 def parallel(self, dist='b', block=None, **flags):
367 """Decorator for making a ParallelFunction"""
367 """Decorator for making a ParallelFunction"""
368 block = self.block if block is None else block
368 block = self.block if block is None else block
369 return parallel(self, dist=dist, block=block, **flags)
369 return parallel(self, dist=dist, block=block, **flags)
370
370
371 @skip_doctest
371 @skip_doctest
372 class DirectView(View):
372 class DirectView(View):
373 """Direct Multiplexer View of one or more engines.
373 """Direct Multiplexer View of one or more engines.
374
374
375 These are created via indexed access to a client:
375 These are created via indexed access to a client:
376
376
377 >>> dv_1 = client[1]
377 >>> dv_1 = client[1]
378 >>> dv_all = client[:]
378 >>> dv_all = client[:]
379 >>> dv_even = client[::2]
379 >>> dv_even = client[::2]
380 >>> dv_some = client[1:3]
380 >>> dv_some = client[1:3]
381
381
382 This object provides dictionary access to engine namespaces:
382 This object provides dictionary access to engine namespaces:
383
383
384 # push a=5:
384 # push a=5:
385 >>> dv['a'] = 5
385 >>> dv['a'] = 5
386 # pull 'foo':
386 # pull 'foo':
387 >>> db['foo']
387 >>> db['foo']
388
388
389 """
389 """
390
390
391 def __init__(self, client=None, socket=None, targets=None):
391 def __init__(self, client=None, socket=None, targets=None):
392 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
392 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
393
393
394 @property
394 @property
395 def importer(self):
395 def importer(self):
396 """sync_imports(local=True) as a property.
396 """sync_imports(local=True) as a property.
397
397
398 See sync_imports for details.
398 See sync_imports for details.
399
399
400 """
400 """
401 return self.sync_imports(True)
401 return self.sync_imports(True)
402
402
403 @contextmanager
403 @contextmanager
404 def sync_imports(self, local=True, quiet=False):
404 def sync_imports(self, local=True, quiet=False):
405 """Context Manager for performing simultaneous local and remote imports.
405 """Context Manager for performing simultaneous local and remote imports.
406
406
407 'import x as y' will *not* work. The 'as y' part will simply be ignored.
407 'import x as y' will *not* work. The 'as y' part will simply be ignored.
408
408
409 If `local=True`, then the package will also be imported locally.
409 If `local=True`, then the package will also be imported locally.
410
410
411 If `quiet=True`, no output will be produced when attempting remote
411 If `quiet=True`, no output will be produced when attempting remote
412 imports.
412 imports.
413
413
414 Note that remote-only (`local=False`) imports have not been implemented.
414 Note that remote-only (`local=False`) imports have not been implemented.
415
415
416 >>> with view.sync_imports():
416 >>> with view.sync_imports():
417 ... from numpy import recarray
417 ... from numpy import recarray
418 importing recarray from numpy on engine(s)
418 importing recarray from numpy on engine(s)
419
419
420 """
420 """
421 import __builtin__
421 import __builtin__
422 local_import = __builtin__.__import__
422 local_import = __builtin__.__import__
423 modules = set()
423 modules = set()
424 results = []
424 results = []
425 @util.interactive
425 @util.interactive
426 def remote_import(name, fromlist, level):
426 def remote_import(name, fromlist, level):
427 """the function to be passed to apply, that actually performs the import
427 """the function to be passed to apply, that actually performs the import
428 on the engine, and loads up the user namespace.
428 on the engine, and loads up the user namespace.
429 """
429 """
430 import sys
430 import sys
431 user_ns = globals()
431 user_ns = globals()
432 mod = __import__(name, fromlist=fromlist, level=level)
432 mod = __import__(name, fromlist=fromlist, level=level)
433 if fromlist:
433 if fromlist:
434 for key in fromlist:
434 for key in fromlist:
435 user_ns[key] = getattr(mod, key)
435 user_ns[key] = getattr(mod, key)
436 else:
436 else:
437 user_ns[name] = sys.modules[name]
437 user_ns[name] = sys.modules[name]
438
438
439 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
439 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
440 """the drop-in replacement for __import__, that optionally imports
440 """the drop-in replacement for __import__, that optionally imports
441 locally as well.
441 locally as well.
442 """
442 """
443 # don't override nested imports
443 # don't override nested imports
444 save_import = __builtin__.__import__
444 save_import = __builtin__.__import__
445 __builtin__.__import__ = local_import
445 __builtin__.__import__ = local_import
446
446
447 if imp.lock_held():
447 if imp.lock_held():
448 # this is a side-effect import, don't do it remotely, or even
448 # this is a side-effect import, don't do it remotely, or even
449 # ignore the local effects
449 # ignore the local effects
450 return local_import(name, globals, locals, fromlist, level)
450 return local_import(name, globals, locals, fromlist, level)
451
451
452 imp.acquire_lock()
452 imp.acquire_lock()
453 if local:
453 if local:
454 mod = local_import(name, globals, locals, fromlist, level)
454 mod = local_import(name, globals, locals, fromlist, level)
455 else:
455 else:
456 raise NotImplementedError("remote-only imports not yet implemented")
456 raise NotImplementedError("remote-only imports not yet implemented")
457 imp.release_lock()
457 imp.release_lock()
458
458
459 key = name+':'+','.join(fromlist or [])
459 key = name+':'+','.join(fromlist or [])
460 if level == -1 and key not in modules:
460 if level == -1 and key not in modules:
461 modules.add(key)
461 modules.add(key)
462 if not quiet:
462 if not quiet:
463 if fromlist:
463 if fromlist:
464 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
464 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
465 else:
465 else:
466 print "importing %s on engine(s)"%name
466 print "importing %s on engine(s)"%name
467 results.append(self.apply_async(remote_import, name, fromlist, level))
467 results.append(self.apply_async(remote_import, name, fromlist, level))
468 # restore override
468 # restore override
469 __builtin__.__import__ = save_import
469 __builtin__.__import__ = save_import
470
470
471 return mod
471 return mod
472
472
473 # override __import__
473 # override __import__
474 __builtin__.__import__ = view_import
474 __builtin__.__import__ = view_import
475 try:
475 try:
476 # enter the block
476 # enter the block
477 yield
477 yield
478 except ImportError:
478 except ImportError:
479 if local:
479 if local:
480 raise
480 raise
481 else:
481 else:
482 # ignore import errors if not doing local imports
482 # ignore import errors if not doing local imports
483 pass
483 pass
484 finally:
484 finally:
485 # always restore __import__
485 # always restore __import__
486 __builtin__.__import__ = local_import
486 __builtin__.__import__ = local_import
487
487
488 for r in results:
488 for r in results:
489 # raise possible remote ImportErrors here
489 # raise possible remote ImportErrors here
490 r.get()
490 r.get()
491
491
492
492
493 @sync_results
493 @sync_results
494 @save_ids
494 @save_ids
495 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
495 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
496 """calls f(*args, **kwargs) on remote engines, returning the result.
496 """calls f(*args, **kwargs) on remote engines, returning the result.
497
497
498 This method sets all of `apply`'s flags via this View's attributes.
498 This method sets all of `apply`'s flags via this View's attributes.
499
499
500 Parameters
500 Parameters
501 ----------
501 ----------
502
502
503 f : callable
503 f : callable
504
504
505 args : list [default: empty]
505 args : list [default: empty]
506
506
507 kwargs : dict [default: empty]
507 kwargs : dict [default: empty]
508
508
509 targets : target list [default: self.targets]
509 targets : target list [default: self.targets]
510 where to run
510 where to run
511 block : bool [default: self.block]
511 block : bool [default: self.block]
512 whether to block
512 whether to block
513 track : bool [default: self.track]
513 track : bool [default: self.track]
514 whether to ask zmq to track the message, for safe non-copying sends
514 whether to ask zmq to track the message, for safe non-copying sends
515
515
516 Returns
516 Returns
517 -------
517 -------
518
518
519 if self.block is False:
519 if self.block is False:
520 returns AsyncResult
520 returns AsyncResult
521 else:
521 else:
522 returns actual result of f(*args, **kwargs) on the engine(s)
522 returns actual result of f(*args, **kwargs) on the engine(s)
523 This will be a list of self.targets is also a list (even length 1), or
523 This will be a list of self.targets is also a list (even length 1), or
524 the single result if self.targets is an integer engine id
524 the single result if self.targets is an integer engine id
525 """
525 """
526 args = [] if args is None else args
526 args = [] if args is None else args
527 kwargs = {} if kwargs is None else kwargs
527 kwargs = {} if kwargs is None else kwargs
528 block = self.block if block is None else block
528 block = self.block if block is None else block
529 track = self.track if track is None else track
529 track = self.track if track is None else track
530 targets = self.targets if targets is None else targets
530 targets = self.targets if targets is None else targets
531
531
532 _idents = self.client._build_targets(targets)[0]
532 _idents = self.client._build_targets(targets)[0]
533 msg_ids = []
533 msg_ids = []
534 trackers = []
534 trackers = []
535 for ident in _idents:
535 for ident in _idents:
536 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
536 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
537 ident=ident)
537 ident=ident)
538 if track:
538 if track:
539 trackers.append(msg['tracker'])
539 trackers.append(msg['tracker'])
540 msg_ids.append(msg['header']['msg_id'])
540 msg_ids.append(msg['header']['msg_id'])
541 tracker = None if track is False else zmq.MessageTracker(*trackers)
541 tracker = None if track is False else zmq.MessageTracker(*trackers)
542 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
542 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
543 if block:
543 if block:
544 try:
544 try:
545 return ar.get()
545 return ar.get()
546 except KeyboardInterrupt:
546 except KeyboardInterrupt:
547 pass
547 pass
548 return ar
548 return ar
549
549
550 @spin_after
550 @spin_after
551 def map(self, f, *sequences, **kwargs):
551 def map(self, f, *sequences, **kwargs):
552 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
552 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
553
553
554 Parallel version of builtin `map`, using this View's `targets`.
554 Parallel version of builtin `map`, using this View's `targets`.
555
555
556 There will be one task per target, so work will be chunked
556 There will be one task per target, so work will be chunked
557 if the sequences are longer than `targets`.
557 if the sequences are longer than `targets`.
558
558
559 Results can be iterated as they are ready, but will become available in chunks.
559 Results can be iterated as they are ready, but will become available in chunks.
560
560
561 Parameters
561 Parameters
562 ----------
562 ----------
563
563
564 f : callable
564 f : callable
565 function to be mapped
565 function to be mapped
566 *sequences: one or more sequences of matching length
566 *sequences: one or more sequences of matching length
567 the sequences to be distributed and passed to `f`
567 the sequences to be distributed and passed to `f`
568 block : bool
568 block : bool
569 whether to wait for the result or not [default self.block]
569 whether to wait for the result or not [default self.block]
570
570
571 Returns
571 Returns
572 -------
572 -------
573
573
574 if block=False:
574 if block=False:
575 AsyncMapResult
575 AsyncMapResult
576 An object like AsyncResult, but which reassembles the sequence of results
576 An object like AsyncResult, but which reassembles the sequence of results
577 into a single list. AsyncMapResults can be iterated through before all
577 into a single list. AsyncMapResults can be iterated through before all
578 results are complete.
578 results are complete.
579 else:
579 else:
580 list
580 list
581 the result of map(f,*sequences)
581 the result of map(f,*sequences)
582 """
582 """
583
583
584 block = kwargs.pop('block', self.block)
584 block = kwargs.pop('block', self.block)
585 for k in kwargs.keys():
585 for k in kwargs.keys():
586 if k not in ['block', 'track']:
586 if k not in ['block', 'track']:
587 raise TypeError("invalid keyword arg, %r"%k)
587 raise TypeError("invalid keyword arg, %r"%k)
588
588
589 assert len(sequences) > 0, "must have some sequences to map onto!"
589 assert len(sequences) > 0, "must have some sequences to map onto!"
590 pf = ParallelFunction(self, f, block=block, **kwargs)
590 pf = ParallelFunction(self, f, block=block, **kwargs)
591 return pf.map(*sequences)
591 return pf.map(*sequences)
592
592
593 def execute(self, code, targets=None, block=None):
593 def execute(self, code, targets=None, block=None):
594 """Executes `code` on `targets` in blocking or nonblocking manner.
594 """Executes `code` on `targets` in blocking or nonblocking manner.
595
595
596 ``execute`` is always `bound` (affects engine namespace)
596 ``execute`` is always `bound` (affects engine namespace)
597
597
598 Parameters
598 Parameters
599 ----------
599 ----------
600
600
601 code : str
601 code : str
602 the code string to be executed
602 the code string to be executed
603 block : bool
603 block : bool
604 whether or not to wait until done to return
604 whether or not to wait until done to return
605 default: self.block
605 default: self.block
606 """
606 """
607 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
607 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
608
608
609 def run(self, filename, targets=None, block=None):
609 def run(self, filename, targets=None, block=None):
610 """Execute contents of `filename` on my engine(s).
610 """Execute contents of `filename` on my engine(s).
611
611
612 This simply reads the contents of the file and calls `execute`.
612 This simply reads the contents of the file and calls `execute`.
613
613
614 Parameters
614 Parameters
615 ----------
615 ----------
616
616
617 filename : str
617 filename : str
618 The path to the file
618 The path to the file
619 targets : int/str/list of ints/strs
619 targets : int/str/list of ints/strs
620 the engines on which to execute
620 the engines on which to execute
621 default : all
621 default : all
622 block : bool
622 block : bool
623 whether or not to wait until done
623 whether or not to wait until done
624 default: self.block
624 default: self.block
625
625
626 """
626 """
627 with open(filename, 'r') as f:
627 with open(filename, 'r') as f:
628 # add newline in case of trailing indented whitespace
628 # add newline in case of trailing indented whitespace
629 # which will cause SyntaxError
629 # which will cause SyntaxError
630 code = f.read()+'\n'
630 code = f.read()+'\n'
631 return self.execute(code, block=block, targets=targets)
631 return self.execute(code, block=block, targets=targets)
632
632
633 def update(self, ns):
633 def update(self, ns):
634 """update remote namespace with dict `ns`
634 """update remote namespace with dict `ns`
635
635
636 See `push` for details.
636 See `push` for details.
637 """
637 """
638 return self.push(ns, block=self.block, track=self.track)
638 return self.push(ns, block=self.block, track=self.track)
639
639
640 def push(self, ns, targets=None, block=None, track=None):
640 def push(self, ns, targets=None, block=None, track=None):
641 """update remote namespace with dict `ns`
641 """update remote namespace with dict `ns`
642
642
643 Parameters
643 Parameters
644 ----------
644 ----------
645
645
646 ns : dict
646 ns : dict
647 dict of keys with which to update engine namespace(s)
647 dict of keys with which to update engine namespace(s)
648 block : bool [default : self.block]
648 block : bool [default : self.block]
649 whether to wait to be notified of engine receipt
649 whether to wait to be notified of engine receipt
650
650
651 """
651 """
652
652
653 block = block if block is not None else self.block
653 block = block if block is not None else self.block
654 track = track if track is not None else self.track
654 track = track if track is not None else self.track
655 targets = targets if targets is not None else self.targets
655 targets = targets if targets is not None else self.targets
656 # applier = self.apply_sync if block else self.apply_async
656 # applier = self.apply_sync if block else self.apply_async
657 if not isinstance(ns, dict):
657 if not isinstance(ns, dict):
658 raise TypeError("Must be a dict, not %s"%type(ns))
658 raise TypeError("Must be a dict, not %s"%type(ns))
659 return self._really_apply(util._push, (ns,), block=block, track=track, targets=targets)
659 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
660
660
661 def get(self, key_s):
661 def get(self, key_s):
662 """get object(s) by `key_s` from remote namespace
662 """get object(s) by `key_s` from remote namespace
663
663
664 see `pull` for details.
664 see `pull` for details.
665 """
665 """
666 # block = block if block is not None else self.block
666 # block = block if block is not None else self.block
667 return self.pull(key_s, block=True)
667 return self.pull(key_s, block=True)
668
668
669 def pull(self, names, targets=None, block=None):
669 def pull(self, names, targets=None, block=None):
670 """get object(s) by `name` from remote namespace
670 """get object(s) by `name` from remote namespace
671
671
672 will return one object if it is a key.
672 will return one object if it is a key.
673 can also take a list of keys, in which case it will return a list of objects.
673 can also take a list of keys, in which case it will return a list of objects.
674 """
674 """
675 block = block if block is not None else self.block
675 block = block if block is not None else self.block
676 targets = targets if targets is not None else self.targets
676 targets = targets if targets is not None else self.targets
677 applier = self.apply_sync if block else self.apply_async
677 applier = self.apply_sync if block else self.apply_async
678 if isinstance(names, basestring):
678 if isinstance(names, basestring):
679 pass
679 pass
680 elif isinstance(names, (list,tuple,set)):
680 elif isinstance(names, (list,tuple,set)):
681 for key in names:
681 for key in names:
682 if not isinstance(key, basestring):
682 if not isinstance(key, basestring):
683 raise TypeError("keys must be str, not type %r"%type(key))
683 raise TypeError("keys must be str, not type %r"%type(key))
684 else:
684 else:
685 raise TypeError("names must be strs, not %r"%names)
685 raise TypeError("names must be strs, not %r"%names)
686 return self._really_apply(util._pull, (names,), block=block, targets=targets)
686 return self._really_apply(util._pull, (names,), block=block, targets=targets)
687
687
688 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
688 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
689 """
689 """
690 Partition a Python sequence and send the partitions to a set of engines.
690 Partition a Python sequence and send the partitions to a set of engines.
691 """
691 """
692 block = block if block is not None else self.block
692 block = block if block is not None else self.block
693 track = track if track is not None else self.track
693 track = track if track is not None else self.track
694 targets = targets if targets is not None else self.targets
694 targets = targets if targets is not None else self.targets
695
695
696 mapObject = Map.dists[dist]()
696 mapObject = Map.dists[dist]()
697 nparts = len(targets)
697 nparts = len(targets)
698 msg_ids = []
698 msg_ids = []
699 trackers = []
699 trackers = []
700 for index, engineid in enumerate(targets):
700 for index, engineid in enumerate(targets):
701 partition = mapObject.getPartition(seq, index, nparts)
701 partition = mapObject.getPartition(seq, index, nparts)
702 if flatten and len(partition) == 1:
702 if flatten and len(partition) == 1:
703 ns = {key: partition[0]}
703 ns = {key: partition[0]}
704 else:
704 else:
705 ns = {key: partition}
705 ns = {key: partition}
706 r = self.push(ns, block=False, track=track, targets=engineid)
706 r = self.push(ns, block=False, track=track, targets=engineid)
707 msg_ids.extend(r.msg_ids)
707 msg_ids.extend(r.msg_ids)
708 if track:
708 if track:
709 trackers.append(r._tracker)
709 trackers.append(r._tracker)
710
710
711 if track:
711 if track:
712 tracker = zmq.MessageTracker(*trackers)
712 tracker = zmq.MessageTracker(*trackers)
713 else:
713 else:
714 tracker = None
714 tracker = None
715
715
716 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
716 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
717 if block:
717 if block:
718 r.wait()
718 r.wait()
719 else:
719 else:
720 return r
720 return r
721
721
722 @sync_results
722 @sync_results
723 @save_ids
723 @save_ids
724 def gather(self, key, dist='b', targets=None, block=None):
724 def gather(self, key, dist='b', targets=None, block=None):
725 """
725 """
726 Gather a partitioned sequence on a set of engines as a single local seq.
726 Gather a partitioned sequence on a set of engines as a single local seq.
727 """
727 """
728 block = block if block is not None else self.block
728 block = block if block is not None else self.block
729 targets = targets if targets is not None else self.targets
729 targets = targets if targets is not None else self.targets
730 mapObject = Map.dists[dist]()
730 mapObject = Map.dists[dist]()
731 msg_ids = []
731 msg_ids = []
732
732
733 for index, engineid in enumerate(targets):
733 for index, engineid in enumerate(targets):
734 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
734 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
735
735
736 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
736 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
737
737
738 if block:
738 if block:
739 try:
739 try:
740 return r.get()
740 return r.get()
741 except KeyboardInterrupt:
741 except KeyboardInterrupt:
742 pass
742 pass
743 return r
743 return r
744
744
745 def __getitem__(self, key):
745 def __getitem__(self, key):
746 return self.get(key)
746 return self.get(key)
747
747
748 def __setitem__(self,key, value):
748 def __setitem__(self,key, value):
749 self.update({key:value})
749 self.update({key:value})
750
750
751 def clear(self, targets=None, block=False):
751 def clear(self, targets=None, block=False):
752 """Clear the remote namespaces on my engines."""
752 """Clear the remote namespaces on my engines."""
753 block = block if block is not None else self.block
753 block = block if block is not None else self.block
754 targets = targets if targets is not None else self.targets
754 targets = targets if targets is not None else self.targets
755 return self.client.clear(targets=targets, block=block)
755 return self.client.clear(targets=targets, block=block)
756
756
757 def kill(self, targets=None, block=True):
757 def kill(self, targets=None, block=True):
758 """Kill my engines."""
758 """Kill my engines."""
759 block = block if block is not None else self.block
759 block = block if block is not None else self.block
760 targets = targets if targets is not None else self.targets
760 targets = targets if targets is not None else self.targets
761 return self.client.kill(targets=targets, block=block)
761 return self.client.kill(targets=targets, block=block)
762
762
763 #----------------------------------------
763 #----------------------------------------
764 # activate for %px,%autopx magics
764 # activate for %px,%autopx magics
765 #----------------------------------------
765 #----------------------------------------
766 def activate(self):
766 def activate(self):
767 """Make this `View` active for parallel magic commands.
767 """Make this `View` active for parallel magic commands.
768
768
769 IPython has a magic command syntax to work with `MultiEngineClient` objects.
769 IPython has a magic command syntax to work with `MultiEngineClient` objects.
770 In a given IPython session there is a single active one. While
770 In a given IPython session there is a single active one. While
771 there can be many `Views` created and used by the user,
771 there can be many `Views` created and used by the user,
772 there is only one active one. The active `View` is used whenever
772 there is only one active one. The active `View` is used whenever
773 the magic commands %px and %autopx are used.
773 the magic commands %px and %autopx are used.
774
774
775 The activate() method is called on a given `View` to make it
775 The activate() method is called on a given `View` to make it
776 active. Once this has been done, the magic commands can be used.
776 active. Once this has been done, the magic commands can be used.
777 """
777 """
778
778
779 try:
779 try:
780 # This is injected into __builtins__.
780 # This is injected into __builtins__.
781 ip = get_ipython()
781 ip = get_ipython()
782 except NameError:
782 except NameError:
783 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
783 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
784 else:
784 else:
785 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
785 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
786 if pmagic is None:
786 if pmagic is None:
787 ip.magic_load_ext('parallelmagic')
787 ip.magic_load_ext('parallelmagic')
788 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
788 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
789
789
790 pmagic.active_view = self
790 pmagic.active_view = self
791
791
792
792
793 @skip_doctest
793 @skip_doctest
794 class LoadBalancedView(View):
794 class LoadBalancedView(View):
795 """An load-balancing View that only executes via the Task scheduler.
795 """An load-balancing View that only executes via the Task scheduler.
796
796
797 Load-balanced views can be created with the client's `view` method:
797 Load-balanced views can be created with the client's `view` method:
798
798
799 >>> v = client.load_balanced_view()
799 >>> v = client.load_balanced_view()
800
800
801 or targets can be specified, to restrict the potential destinations:
801 or targets can be specified, to restrict the potential destinations:
802
802
803 >>> v = client.client.load_balanced_view([1,3])
803 >>> v = client.client.load_balanced_view([1,3])
804
804
805 which would restrict loadbalancing to between engines 1 and 3.
805 which would restrict loadbalancing to between engines 1 and 3.
806
806
807 """
807 """
808
808
809 follow=Any()
809 follow=Any()
810 after=Any()
810 after=Any()
811 timeout=CFloat()
811 timeout=CFloat()
812 retries = Integer(0)
812 retries = Integer(0)
813
813
814 _task_scheme = Any()
814 _task_scheme = Any()
815 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
815 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
816
816
817 def __init__(self, client=None, socket=None, **flags):
817 def __init__(self, client=None, socket=None, **flags):
818 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
818 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
819 self._task_scheme=client._task_scheme
819 self._task_scheme=client._task_scheme
820
820
821 def _validate_dependency(self, dep):
821 def _validate_dependency(self, dep):
822 """validate a dependency.
822 """validate a dependency.
823
823
824 For use in `set_flags`.
824 For use in `set_flags`.
825 """
825 """
826 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
826 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
827 return True
827 return True
828 elif isinstance(dep, (list,set, tuple)):
828 elif isinstance(dep, (list,set, tuple)):
829 for d in dep:
829 for d in dep:
830 if not isinstance(d, (basestring, AsyncResult)):
830 if not isinstance(d, (basestring, AsyncResult)):
831 return False
831 return False
832 elif isinstance(dep, dict):
832 elif isinstance(dep, dict):
833 if set(dep.keys()) != set(Dependency().as_dict().keys()):
833 if set(dep.keys()) != set(Dependency().as_dict().keys()):
834 return False
834 return False
835 if not isinstance(dep['msg_ids'], list):
835 if not isinstance(dep['msg_ids'], list):
836 return False
836 return False
837 for d in dep['msg_ids']:
837 for d in dep['msg_ids']:
838 if not isinstance(d, basestring):
838 if not isinstance(d, basestring):
839 return False
839 return False
840 else:
840 else:
841 return False
841 return False
842
842
843 return True
843 return True
844
844
845 def _render_dependency(self, dep):
845 def _render_dependency(self, dep):
846 """helper for building jsonable dependencies from various input forms."""
846 """helper for building jsonable dependencies from various input forms."""
847 if isinstance(dep, Dependency):
847 if isinstance(dep, Dependency):
848 return dep.as_dict()
848 return dep.as_dict()
849 elif isinstance(dep, AsyncResult):
849 elif isinstance(dep, AsyncResult):
850 return dep.msg_ids
850 return dep.msg_ids
851 elif dep is None:
851 elif dep is None:
852 return []
852 return []
853 else:
853 else:
854 # pass to Dependency constructor
854 # pass to Dependency constructor
855 return list(Dependency(dep))
855 return list(Dependency(dep))
856
856
857 def set_flags(self, **kwargs):
857 def set_flags(self, **kwargs):
858 """set my attribute flags by keyword.
858 """set my attribute flags by keyword.
859
859
860 A View is a wrapper for the Client's apply method, but with attributes
860 A View is a wrapper for the Client's apply method, but with attributes
861 that specify keyword arguments, those attributes can be set by keyword
861 that specify keyword arguments, those attributes can be set by keyword
862 argument with this method.
862 argument with this method.
863
863
864 Parameters
864 Parameters
865 ----------
865 ----------
866
866
867 block : bool
867 block : bool
868 whether to wait for results
868 whether to wait for results
869 track : bool
869 track : bool
870 whether to create a MessageTracker to allow the user to
870 whether to create a MessageTracker to allow the user to
871 safely edit after arrays and buffers during non-copying
871 safely edit after arrays and buffers during non-copying
872 sends.
872 sends.
873
873
874 after : Dependency or collection of msg_ids
874 after : Dependency or collection of msg_ids
875 Only for load-balanced execution (targets=None)
875 Only for load-balanced execution (targets=None)
876 Specify a list of msg_ids as a time-based dependency.
876 Specify a list of msg_ids as a time-based dependency.
877 This job will only be run *after* the dependencies
877 This job will only be run *after* the dependencies
878 have been met.
878 have been met.
879
879
880 follow : Dependency or collection of msg_ids
880 follow : Dependency or collection of msg_ids
881 Only for load-balanced execution (targets=None)
881 Only for load-balanced execution (targets=None)
882 Specify a list of msg_ids as a location-based dependency.
882 Specify a list of msg_ids as a location-based dependency.
883 This job will only be run on an engine where this dependency
883 This job will only be run on an engine where this dependency
884 is met.
884 is met.
885
885
886 timeout : float/int or None
886 timeout : float/int or None
887 Only for load-balanced execution (targets=None)
887 Only for load-balanced execution (targets=None)
888 Specify an amount of time (in seconds) for the scheduler to
888 Specify an amount of time (in seconds) for the scheduler to
889 wait for dependencies to be met before failing with a
889 wait for dependencies to be met before failing with a
890 DependencyTimeout.
890 DependencyTimeout.
891
891
892 retries : int
892 retries : int
893 Number of times a task will be retried on failure.
893 Number of times a task will be retried on failure.
894 """
894 """
895
895
896 super(LoadBalancedView, self).set_flags(**kwargs)
896 super(LoadBalancedView, self).set_flags(**kwargs)
897 for name in ('follow', 'after'):
897 for name in ('follow', 'after'):
898 if name in kwargs:
898 if name in kwargs:
899 value = kwargs[name]
899 value = kwargs[name]
900 if self._validate_dependency(value):
900 if self._validate_dependency(value):
901 setattr(self, name, value)
901 setattr(self, name, value)
902 else:
902 else:
903 raise ValueError("Invalid dependency: %r"%value)
903 raise ValueError("Invalid dependency: %r"%value)
904 if 'timeout' in kwargs:
904 if 'timeout' in kwargs:
905 t = kwargs['timeout']
905 t = kwargs['timeout']
906 if not isinstance(t, (int, long, float, type(None))):
906 if not isinstance(t, (int, long, float, type(None))):
907 raise TypeError("Invalid type for timeout: %r"%type(t))
907 raise TypeError("Invalid type for timeout: %r"%type(t))
908 if t is not None:
908 if t is not None:
909 if t < 0:
909 if t < 0:
910 raise ValueError("Invalid timeout: %s"%t)
910 raise ValueError("Invalid timeout: %s"%t)
911 self.timeout = t
911 self.timeout = t
912
912
913 @sync_results
913 @sync_results
914 @save_ids
914 @save_ids
915 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
915 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
916 after=None, follow=None, timeout=None,
916 after=None, follow=None, timeout=None,
917 targets=None, retries=None):
917 targets=None, retries=None):
918 """calls f(*args, **kwargs) on a remote engine, returning the result.
918 """calls f(*args, **kwargs) on a remote engine, returning the result.
919
919
920 This method temporarily sets all of `apply`'s flags for a single call.
920 This method temporarily sets all of `apply`'s flags for a single call.
921
921
922 Parameters
922 Parameters
923 ----------
923 ----------
924
924
925 f : callable
925 f : callable
926
926
927 args : list [default: empty]
927 args : list [default: empty]
928
928
929 kwargs : dict [default: empty]
929 kwargs : dict [default: empty]
930
930
931 block : bool [default: self.block]
931 block : bool [default: self.block]
932 whether to block
932 whether to block
933 track : bool [default: self.track]
933 track : bool [default: self.track]
934 whether to ask zmq to track the message, for safe non-copying sends
934 whether to ask zmq to track the message, for safe non-copying sends
935
935
936 !!!!!! TODO: THE REST HERE !!!!
936 !!!!!! TODO: THE REST HERE !!!!
937
937
938 Returns
938 Returns
939 -------
939 -------
940
940
941 if self.block is False:
941 if self.block is False:
942 returns AsyncResult
942 returns AsyncResult
943 else:
943 else:
944 returns actual result of f(*args, **kwargs) on the engine(s)
944 returns actual result of f(*args, **kwargs) on the engine(s)
945 This will be a list of self.targets is also a list (even length 1), or
945 This will be a list of self.targets is also a list (even length 1), or
946 the single result if self.targets is an integer engine id
946 the single result if self.targets is an integer engine id
947 """
947 """
948
948
949 # validate whether we can run
949 # validate whether we can run
950 if self._socket.closed:
950 if self._socket.closed:
951 msg = "Task farming is disabled"
951 msg = "Task farming is disabled"
952 if self._task_scheme == 'pure':
952 if self._task_scheme == 'pure':
953 msg += " because the pure ZMQ scheduler cannot handle"
953 msg += " because the pure ZMQ scheduler cannot handle"
954 msg += " disappearing engines."
954 msg += " disappearing engines."
955 raise RuntimeError(msg)
955 raise RuntimeError(msg)
956
956
957 if self._task_scheme == 'pure':
957 if self._task_scheme == 'pure':
958 # pure zmq scheme doesn't support extra features
958 # pure zmq scheme doesn't support extra features
959 msg = "Pure ZMQ scheduler doesn't support the following flags:"
959 msg = "Pure ZMQ scheduler doesn't support the following flags:"
960 "follow, after, retries, targets, timeout"
960 "follow, after, retries, targets, timeout"
961 if (follow or after or retries or targets or timeout):
961 if (follow or after or retries or targets or timeout):
962 # hard fail on Scheduler flags
962 # hard fail on Scheduler flags
963 raise RuntimeError(msg)
963 raise RuntimeError(msg)
964 if isinstance(f, dependent):
964 if isinstance(f, dependent):
965 # soft warn on functional dependencies
965 # soft warn on functional dependencies
966 warnings.warn(msg, RuntimeWarning)
966 warnings.warn(msg, RuntimeWarning)
967
967
968 # build args
968 # build args
969 args = [] if args is None else args
969 args = [] if args is None else args
970 kwargs = {} if kwargs is None else kwargs
970 kwargs = {} if kwargs is None else kwargs
971 block = self.block if block is None else block
971 block = self.block if block is None else block
972 track = self.track if track is None else track
972 track = self.track if track is None else track
973 after = self.after if after is None else after
973 after = self.after if after is None else after
974 retries = self.retries if retries is None else retries
974 retries = self.retries if retries is None else retries
975 follow = self.follow if follow is None else follow
975 follow = self.follow if follow is None else follow
976 timeout = self.timeout if timeout is None else timeout
976 timeout = self.timeout if timeout is None else timeout
977 targets = self.targets if targets is None else targets
977 targets = self.targets if targets is None else targets
978
978
979 if not isinstance(retries, int):
979 if not isinstance(retries, int):
980 raise TypeError('retries must be int, not %r'%type(retries))
980 raise TypeError('retries must be int, not %r'%type(retries))
981
981
982 if targets is None:
982 if targets is None:
983 idents = []
983 idents = []
984 else:
984 else:
985 idents = self.client._build_targets(targets)[0]
985 idents = self.client._build_targets(targets)[0]
986 # ensure *not* bytes
986 # ensure *not* bytes
987 idents = [ ident.decode() for ident in idents ]
987 idents = [ ident.decode() for ident in idents ]
988
988
989 after = self._render_dependency(after)
989 after = self._render_dependency(after)
990 follow = self._render_dependency(follow)
990 follow = self._render_dependency(follow)
991 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
991 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
992
992
993 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
993 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
994 subheader=subheader)
994 subheader=subheader)
995 tracker = None if track is False else msg['tracker']
995 tracker = None if track is False else msg['tracker']
996
996
997 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
997 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
998
998
999 if block:
999 if block:
1000 try:
1000 try:
1001 return ar.get()
1001 return ar.get()
1002 except KeyboardInterrupt:
1002 except KeyboardInterrupt:
1003 pass
1003 pass
1004 return ar
1004 return ar
1005
1005
1006 @spin_after
1006 @spin_after
1007 @save_ids
1007 @save_ids
1008 def map(self, f, *sequences, **kwargs):
1008 def map(self, f, *sequences, **kwargs):
1009 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1009 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1010
1010
1011 Parallel version of builtin `map`, load-balanced by this View.
1011 Parallel version of builtin `map`, load-balanced by this View.
1012
1012
1013 `block`, and `chunksize` can be specified by keyword only.
1013 `block`, and `chunksize` can be specified by keyword only.
1014
1014
1015 Each `chunksize` elements will be a separate task, and will be
1015 Each `chunksize` elements will be a separate task, and will be
1016 load-balanced. This lets individual elements be available for iteration
1016 load-balanced. This lets individual elements be available for iteration
1017 as soon as they arrive.
1017 as soon as they arrive.
1018
1018
1019 Parameters
1019 Parameters
1020 ----------
1020 ----------
1021
1021
1022 f : callable
1022 f : callable
1023 function to be mapped
1023 function to be mapped
1024 *sequences: one or more sequences of matching length
1024 *sequences: one or more sequences of matching length
1025 the sequences to be distributed and passed to `f`
1025 the sequences to be distributed and passed to `f`
1026 block : bool [default self.block]
1026 block : bool [default self.block]
1027 whether to wait for the result or not
1027 whether to wait for the result or not
1028 track : bool
1028 track : bool
1029 whether to create a MessageTracker to allow the user to
1029 whether to create a MessageTracker to allow the user to
1030 safely edit after arrays and buffers during non-copying
1030 safely edit after arrays and buffers during non-copying
1031 sends.
1031 sends.
1032 chunksize : int [default 1]
1032 chunksize : int [default 1]
1033 how many elements should be in each task.
1033 how many elements should be in each task.
1034 ordered : bool [default True]
1034 ordered : bool [default True]
1035 Whether the results should be gathered as they arrive, or enforce
1035 Whether the results should be gathered as they arrive, or enforce
1036 the order of submission.
1036 the order of submission.
1037
1037
1038 Only applies when iterating through AsyncMapResult as results arrive.
1038 Only applies when iterating through AsyncMapResult as results arrive.
1039 Has no effect when block=True.
1039 Has no effect when block=True.
1040
1040
1041 Returns
1041 Returns
1042 -------
1042 -------
1043
1043
1044 if block=False:
1044 if block=False:
1045 AsyncMapResult
1045 AsyncMapResult
1046 An object like AsyncResult, but which reassembles the sequence of results
1046 An object like AsyncResult, but which reassembles the sequence of results
1047 into a single list. AsyncMapResults can be iterated through before all
1047 into a single list. AsyncMapResults can be iterated through before all
1048 results are complete.
1048 results are complete.
1049 else:
1049 else:
1050 the result of map(f,*sequences)
1050 the result of map(f,*sequences)
1051
1051
1052 """
1052 """
1053
1053
1054 # default
1054 # default
1055 block = kwargs.get('block', self.block)
1055 block = kwargs.get('block', self.block)
1056 chunksize = kwargs.get('chunksize', 1)
1056 chunksize = kwargs.get('chunksize', 1)
1057 ordered = kwargs.get('ordered', True)
1057 ordered = kwargs.get('ordered', True)
1058
1058
1059 keyset = set(kwargs.keys())
1059 keyset = set(kwargs.keys())
1060 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1060 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1061 if extra_keys:
1061 if extra_keys:
1062 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1062 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1063
1063
1064 assert len(sequences) > 0, "must have some sequences to map onto!"
1064 assert len(sequences) > 0, "must have some sequences to map onto!"
1065
1065
1066 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1066 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1067 return pf.map(*sequences)
1067 return pf.map(*sequences)
1068
1068
1069 __all__ = ['LoadBalancedView', 'DirectView']
1069 __all__ = ['LoadBalancedView', 'DirectView']
@@ -1,476 +1,476 b''
1 """some generic utilities for dealing with classes, urls, and serialization
1 """some generic utilities for dealing with classes, urls, and serialization
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23 import socket
23 import socket
24 import sys
24 import sys
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
26 try:
26 try:
27 from signal import SIGKILL
27 from signal import SIGKILL
28 except ImportError:
28 except ImportError:
29 SIGKILL=None
29 SIGKILL=None
30
30
31 try:
31 try:
32 import cPickle
32 import cPickle
33 pickle = cPickle
33 pickle = cPickle
34 except:
34 except:
35 cPickle = None
35 cPickle = None
36 import pickle
36 import pickle
37
37
38 # System library imports
38 # System library imports
39 import zmq
39 import zmq
40 from zmq.log import handlers
40 from zmq.log import handlers
41
41
42 # IPython imports
42 # IPython imports
43 from IPython.config.application import Application
43 from IPython.config.application import Application
44 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
44 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
45 from IPython.utils.newserialized import serialize, unserialize
45 from IPython.utils.newserialized import serialize, unserialize
46 from IPython.zmq.log import EnginePUBHandler
46 from IPython.zmq.log import EnginePUBHandler
47
47
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49 # Classes
49 # Classes
50 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
51
51
52 class Namespace(dict):
52 class Namespace(dict):
53 """Subclass of dict for attribute access to keys."""
53 """Subclass of dict for attribute access to keys."""
54
54
55 def __getattr__(self, key):
55 def __getattr__(self, key):
56 """getattr aliased to getitem"""
56 """getattr aliased to getitem"""
57 if key in self.iterkeys():
57 if key in self.iterkeys():
58 return self[key]
58 return self[key]
59 else:
59 else:
60 raise NameError(key)
60 raise NameError(key)
61
61
62 def __setattr__(self, key, value):
62 def __setattr__(self, key, value):
63 """setattr aliased to setitem, with strict"""
63 """setattr aliased to setitem, with strict"""
64 if hasattr(dict, key):
64 if hasattr(dict, key):
65 raise KeyError("Cannot override dict keys %r"%key)
65 raise KeyError("Cannot override dict keys %r"%key)
66 self[key] = value
66 self[key] = value
67
67
68
68
69 class ReverseDict(dict):
69 class ReverseDict(dict):
70 """simple double-keyed subset of dict methods."""
70 """simple double-keyed subset of dict methods."""
71
71
72 def __init__(self, *args, **kwargs):
72 def __init__(self, *args, **kwargs):
73 dict.__init__(self, *args, **kwargs)
73 dict.__init__(self, *args, **kwargs)
74 self._reverse = dict()
74 self._reverse = dict()
75 for key, value in self.iteritems():
75 for key, value in self.iteritems():
76 self._reverse[value] = key
76 self._reverse[value] = key
77
77
78 def __getitem__(self, key):
78 def __getitem__(self, key):
79 try:
79 try:
80 return dict.__getitem__(self, key)
80 return dict.__getitem__(self, key)
81 except KeyError:
81 except KeyError:
82 return self._reverse[key]
82 return self._reverse[key]
83
83
84 def __setitem__(self, key, value):
84 def __setitem__(self, key, value):
85 if key in self._reverse:
85 if key in self._reverse:
86 raise KeyError("Can't have key %r on both sides!"%key)
86 raise KeyError("Can't have key %r on both sides!"%key)
87 dict.__setitem__(self, key, value)
87 dict.__setitem__(self, key, value)
88 self._reverse[value] = key
88 self._reverse[value] = key
89
89
90 def pop(self, key):
90 def pop(self, key):
91 value = dict.pop(self, key)
91 value = dict.pop(self, key)
92 self._reverse.pop(value)
92 self._reverse.pop(value)
93 return value
93 return value
94
94
95 def get(self, key, default=None):
95 def get(self, key, default=None):
96 try:
96 try:
97 return self[key]
97 return self[key]
98 except KeyError:
98 except KeyError:
99 return default
99 return default
100
100
101 #-----------------------------------------------------------------------------
101 #-----------------------------------------------------------------------------
102 # Functions
102 # Functions
103 #-----------------------------------------------------------------------------
103 #-----------------------------------------------------------------------------
104
104
105 def asbytes(s):
105 def asbytes(s):
106 """ensure that an object is ascii bytes"""
106 """ensure that an object is ascii bytes"""
107 if isinstance(s, unicode):
107 if isinstance(s, unicode):
108 s = s.encode('ascii')
108 s = s.encode('ascii')
109 return s
109 return s
110
110
111 def is_url(url):
111 def is_url(url):
112 """boolean check for whether a string is a zmq url"""
112 """boolean check for whether a string is a zmq url"""
113 if '://' not in url:
113 if '://' not in url:
114 return False
114 return False
115 proto, addr = url.split('://', 1)
115 proto, addr = url.split('://', 1)
116 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
116 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
117 return False
117 return False
118 return True
118 return True
119
119
120 def validate_url(url):
120 def validate_url(url):
121 """validate a url for zeromq"""
121 """validate a url for zeromq"""
122 if not isinstance(url, basestring):
122 if not isinstance(url, basestring):
123 raise TypeError("url must be a string, not %r"%type(url))
123 raise TypeError("url must be a string, not %r"%type(url))
124 url = url.lower()
124 url = url.lower()
125
125
126 proto_addr = url.split('://')
126 proto_addr = url.split('://')
127 assert len(proto_addr) == 2, 'Invalid url: %r'%url
127 assert len(proto_addr) == 2, 'Invalid url: %r'%url
128 proto, addr = proto_addr
128 proto, addr = proto_addr
129 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
129 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
130
130
131 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
131 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
132 # author: Remi Sabourin
132 # author: Remi Sabourin
133 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
133 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
134
134
135 if proto == 'tcp':
135 if proto == 'tcp':
136 lis = addr.split(':')
136 lis = addr.split(':')
137 assert len(lis) == 2, 'Invalid url: %r'%url
137 assert len(lis) == 2, 'Invalid url: %r'%url
138 addr,s_port = lis
138 addr,s_port = lis
139 try:
139 try:
140 port = int(s_port)
140 port = int(s_port)
141 except ValueError:
141 except ValueError:
142 raise AssertionError("Invalid port %r in url: %r"%(port, url))
142 raise AssertionError("Invalid port %r in url: %r"%(port, url))
143
143
144 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
144 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
145
145
146 else:
146 else:
147 # only validate tcp urls currently
147 # only validate tcp urls currently
148 pass
148 pass
149
149
150 return True
150 return True
151
151
152
152
153 def validate_url_container(container):
153 def validate_url_container(container):
154 """validate a potentially nested collection of urls."""
154 """validate a potentially nested collection of urls."""
155 if isinstance(container, basestring):
155 if isinstance(container, basestring):
156 url = container
156 url = container
157 return validate_url(url)
157 return validate_url(url)
158 elif isinstance(container, dict):
158 elif isinstance(container, dict):
159 container = container.itervalues()
159 container = container.itervalues()
160
160
161 for element in container:
161 for element in container:
162 validate_url_container(element)
162 validate_url_container(element)
163
163
164
164
165 def split_url(url):
165 def split_url(url):
166 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
166 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
167 proto_addr = url.split('://')
167 proto_addr = url.split('://')
168 assert len(proto_addr) == 2, 'Invalid url: %r'%url
168 assert len(proto_addr) == 2, 'Invalid url: %r'%url
169 proto, addr = proto_addr
169 proto, addr = proto_addr
170 lis = addr.split(':')
170 lis = addr.split(':')
171 assert len(lis) == 2, 'Invalid url: %r'%url
171 assert len(lis) == 2, 'Invalid url: %r'%url
172 addr,s_port = lis
172 addr,s_port = lis
173 return proto,addr,s_port
173 return proto,addr,s_port
174
174
175 def disambiguate_ip_address(ip, location=None):
175 def disambiguate_ip_address(ip, location=None):
176 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
176 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
177 ones, based on the location (default interpretation of location is localhost)."""
177 ones, based on the location (default interpretation of location is localhost)."""
178 if ip in ('0.0.0.0', '*'):
178 if ip in ('0.0.0.0', '*'):
179 try:
179 try:
180 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
180 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
181 except (socket.gaierror, IndexError):
181 except (socket.gaierror, IndexError):
182 # couldn't identify this machine, assume localhost
182 # couldn't identify this machine, assume localhost
183 external_ips = []
183 external_ips = []
184 if location is None or location in external_ips or not external_ips:
184 if location is None or location in external_ips or not external_ips:
185 # If location is unspecified or cannot be determined, assume local
185 # If location is unspecified or cannot be determined, assume local
186 ip='127.0.0.1'
186 ip='127.0.0.1'
187 elif location:
187 elif location:
188 return location
188 return location
189 return ip
189 return ip
190
190
191 def disambiguate_url(url, location=None):
191 def disambiguate_url(url, location=None):
192 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
192 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
193 ones, based on the location (default interpretation is localhost).
193 ones, based on the location (default interpretation is localhost).
194
194
195 This is for zeromq urls, such as tcp://*:10101."""
195 This is for zeromq urls, such as tcp://*:10101."""
196 try:
196 try:
197 proto,ip,port = split_url(url)
197 proto,ip,port = split_url(url)
198 except AssertionError:
198 except AssertionError:
199 # probably not tcp url; could be ipc, etc.
199 # probably not tcp url; could be ipc, etc.
200 return url
200 return url
201
201
202 ip = disambiguate_ip_address(ip,location)
202 ip = disambiguate_ip_address(ip,location)
203
203
204 return "%s://%s:%s"%(proto,ip,port)
204 return "%s://%s:%s"%(proto,ip,port)
205
205
206 def serialize_object(obj, threshold=64e-6):
206 def serialize_object(obj, threshold=64e-6):
207 """Serialize an object into a list of sendable buffers.
207 """Serialize an object into a list of sendable buffers.
208
208
209 Parameters
209 Parameters
210 ----------
210 ----------
211
211
212 obj : object
212 obj : object
213 The object to be serialized
213 The object to be serialized
214 threshold : float
214 threshold : float
215 The threshold for not double-pickling the content.
215 The threshold for not double-pickling the content.
216
216
217
217
218 Returns
218 Returns
219 -------
219 -------
220 ('pmd', [bufs]) :
220 ('pmd', [bufs]) :
221 where pmd is the pickled metadata wrapper,
221 where pmd is the pickled metadata wrapper,
222 bufs is a list of data buffers
222 bufs is a list of data buffers
223 """
223 """
224 databuffers = []
224 databuffers = []
225 if isinstance(obj, (list, tuple)):
225 if isinstance(obj, (list, tuple)):
226 clist = canSequence(obj)
226 clist = canSequence(obj)
227 slist = map(serialize, clist)
227 slist = map(serialize, clist)
228 for s in slist:
228 for s in slist:
229 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
229 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
230 databuffers.append(s.getData())
230 databuffers.append(s.getData())
231 s.data = None
231 s.data = None
232 return pickle.dumps(slist,-1), databuffers
232 return pickle.dumps(slist,-1), databuffers
233 elif isinstance(obj, dict):
233 elif isinstance(obj, dict):
234 sobj = {}
234 sobj = {}
235 for k in sorted(obj.iterkeys()):
235 for k in sorted(obj.iterkeys()):
236 s = serialize(can(obj[k]))
236 s = serialize(can(obj[k]))
237 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
237 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
238 databuffers.append(s.getData())
238 databuffers.append(s.getData())
239 s.data = None
239 s.data = None
240 sobj[k] = s
240 sobj[k] = s
241 return pickle.dumps(sobj,-1),databuffers
241 return pickle.dumps(sobj,-1),databuffers
242 else:
242 else:
243 s = serialize(can(obj))
243 s = serialize(can(obj))
244 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
244 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
245 databuffers.append(s.getData())
245 databuffers.append(s.getData())
246 s.data = None
246 s.data = None
247 return pickle.dumps(s,-1),databuffers
247 return pickle.dumps(s,-1),databuffers
248
248
249
249
250 def unserialize_object(bufs):
250 def unserialize_object(bufs):
251 """reconstruct an object serialized by serialize_object from data buffers."""
251 """reconstruct an object serialized by serialize_object from data buffers."""
252 bufs = list(bufs)
252 bufs = list(bufs)
253 sobj = pickle.loads(bufs.pop(0))
253 sobj = pickle.loads(bufs.pop(0))
254 if isinstance(sobj, (list, tuple)):
254 if isinstance(sobj, (list, tuple)):
255 for s in sobj:
255 for s in sobj:
256 if s.data is None:
256 if s.data is None:
257 s.data = bufs.pop(0)
257 s.data = bufs.pop(0)
258 return uncanSequence(map(unserialize, sobj)), bufs
258 return uncanSequence(map(unserialize, sobj)), bufs
259 elif isinstance(sobj, dict):
259 elif isinstance(sobj, dict):
260 newobj = {}
260 newobj = {}
261 for k in sorted(sobj.iterkeys()):
261 for k in sorted(sobj.iterkeys()):
262 s = sobj[k]
262 s = sobj[k]
263 if s.data is None:
263 if s.data is None:
264 s.data = bufs.pop(0)
264 s.data = bufs.pop(0)
265 newobj[k] = uncan(unserialize(s))
265 newobj[k] = uncan(unserialize(s))
266 return newobj, bufs
266 return newobj, bufs
267 else:
267 else:
268 if sobj.data is None:
268 if sobj.data is None:
269 sobj.data = bufs.pop(0)
269 sobj.data = bufs.pop(0)
270 return uncan(unserialize(sobj)), bufs
270 return uncan(unserialize(sobj)), bufs
271
271
272 def pack_apply_message(f, args, kwargs, threshold=64e-6):
272 def pack_apply_message(f, args, kwargs, threshold=64e-6):
273 """pack up a function, args, and kwargs to be sent over the wire
273 """pack up a function, args, and kwargs to be sent over the wire
274 as a series of buffers. Any object whose data is larger than `threshold`
274 as a series of buffers. Any object whose data is larger than `threshold`
275 will not have their data copied (currently only numpy arrays support zero-copy)"""
275 will not have their data copied (currently only numpy arrays support zero-copy)"""
276 msg = [pickle.dumps(can(f),-1)]
276 msg = [pickle.dumps(can(f),-1)]
277 databuffers = [] # for large objects
277 databuffers = [] # for large objects
278 sargs, bufs = serialize_object(args,threshold)
278 sargs, bufs = serialize_object(args,threshold)
279 msg.append(sargs)
279 msg.append(sargs)
280 databuffers.extend(bufs)
280 databuffers.extend(bufs)
281 skwargs, bufs = serialize_object(kwargs,threshold)
281 skwargs, bufs = serialize_object(kwargs,threshold)
282 msg.append(skwargs)
282 msg.append(skwargs)
283 databuffers.extend(bufs)
283 databuffers.extend(bufs)
284 msg.extend(databuffers)
284 msg.extend(databuffers)
285 return msg
285 return msg
286
286
287 def unpack_apply_message(bufs, g=None, copy=True):
287 def unpack_apply_message(bufs, g=None, copy=True):
288 """unpack f,args,kwargs from buffers packed by pack_apply_message()
288 """unpack f,args,kwargs from buffers packed by pack_apply_message()
289 Returns: original f,args,kwargs"""
289 Returns: original f,args,kwargs"""
290 bufs = list(bufs) # allow us to pop
290 bufs = list(bufs) # allow us to pop
291 assert len(bufs) >= 3, "not enough buffers!"
291 assert len(bufs) >= 3, "not enough buffers!"
292 if not copy:
292 if not copy:
293 for i in range(3):
293 for i in range(3):
294 bufs[i] = bufs[i].bytes
294 bufs[i] = bufs[i].bytes
295 cf = pickle.loads(bufs.pop(0))
295 cf = pickle.loads(bufs.pop(0))
296 sargs = list(pickle.loads(bufs.pop(0)))
296 sargs = list(pickle.loads(bufs.pop(0)))
297 skwargs = dict(pickle.loads(bufs.pop(0)))
297 skwargs = dict(pickle.loads(bufs.pop(0)))
298 # print sargs, skwargs
298 # print sargs, skwargs
299 f = uncan(cf, g)
299 f = uncan(cf, g)
300 for sa in sargs:
300 for sa in sargs:
301 if sa.data is None:
301 if sa.data is None:
302 m = bufs.pop(0)
302 m = bufs.pop(0)
303 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
303 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
304 # always use a buffer, until memoryviews get sorted out
304 # always use a buffer, until memoryviews get sorted out
305 sa.data = buffer(m)
305 sa.data = buffer(m)
306 # disable memoryview support
306 # disable memoryview support
307 # if copy:
307 # if copy:
308 # sa.data = buffer(m)
308 # sa.data = buffer(m)
309 # else:
309 # else:
310 # sa.data = m.buffer
310 # sa.data = m.buffer
311 else:
311 else:
312 if copy:
312 if copy:
313 sa.data = m
313 sa.data = m
314 else:
314 else:
315 sa.data = m.bytes
315 sa.data = m.bytes
316
316
317 args = uncanSequence(map(unserialize, sargs), g)
317 args = uncanSequence(map(unserialize, sargs), g)
318 kwargs = {}
318 kwargs = {}
319 for k in sorted(skwargs.iterkeys()):
319 for k in sorted(skwargs.iterkeys()):
320 sa = skwargs[k]
320 sa = skwargs[k]
321 if sa.data is None:
321 if sa.data is None:
322 m = bufs.pop(0)
322 m = bufs.pop(0)
323 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
323 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
324 # always use a buffer, until memoryviews get sorted out
324 # always use a buffer, until memoryviews get sorted out
325 sa.data = buffer(m)
325 sa.data = buffer(m)
326 # disable memoryview support
326 # disable memoryview support
327 # if copy:
327 # if copy:
328 # sa.data = buffer(m)
328 # sa.data = buffer(m)
329 # else:
329 # else:
330 # sa.data = m.buffer
330 # sa.data = m.buffer
331 else:
331 else:
332 if copy:
332 if copy:
333 sa.data = m
333 sa.data = m
334 else:
334 else:
335 sa.data = m.bytes
335 sa.data = m.bytes
336
336
337 kwargs[k] = uncan(unserialize(sa), g)
337 kwargs[k] = uncan(unserialize(sa), g)
338
338
339 return f,args,kwargs
339 return f,args,kwargs
340
340
341 #--------------------------------------------------------------------------
341 #--------------------------------------------------------------------------
342 # helpers for implementing old MEC API via view.apply
342 # helpers for implementing old MEC API via view.apply
343 #--------------------------------------------------------------------------
343 #--------------------------------------------------------------------------
344
344
345 def interactive(f):
345 def interactive(f):
346 """decorator for making functions appear as interactively defined.
346 """decorator for making functions appear as interactively defined.
347 This results in the function being linked to the user_ns as globals()
347 This results in the function being linked to the user_ns as globals()
348 instead of the module globals().
348 instead of the module globals().
349 """
349 """
350 f.__module__ = '__main__'
350 f.__module__ = '__main__'
351 return f
351 return f
352
352
353 @interactive
353 @interactive
354 def _push(ns):
354 def _push(**ns):
355 """helper method for implementing `client.push` via `client.apply`"""
355 """helper method for implementing `client.push` via `client.apply`"""
356 globals().update(ns)
356 globals().update(ns)
357
357
358 @interactive
358 @interactive
359 def _pull(keys):
359 def _pull(keys):
360 """helper method for implementing `client.pull` via `client.apply`"""
360 """helper method for implementing `client.pull` via `client.apply`"""
361 user_ns = globals()
361 user_ns = globals()
362 if isinstance(keys, (list,tuple, set)):
362 if isinstance(keys, (list,tuple, set)):
363 for key in keys:
363 for key in keys:
364 if not user_ns.has_key(key):
364 if not user_ns.has_key(key):
365 raise NameError("name '%s' is not defined"%key)
365 raise NameError("name '%s' is not defined"%key)
366 return map(user_ns.get, keys)
366 return map(user_ns.get, keys)
367 else:
367 else:
368 if not user_ns.has_key(keys):
368 if not user_ns.has_key(keys):
369 raise NameError("name '%s' is not defined"%keys)
369 raise NameError("name '%s' is not defined"%keys)
370 return user_ns.get(keys)
370 return user_ns.get(keys)
371
371
372 @interactive
372 @interactive
373 def _execute(code):
373 def _execute(code):
374 """helper method for implementing `client.execute` via `client.apply`"""
374 """helper method for implementing `client.execute` via `client.apply`"""
375 exec code in globals()
375 exec code in globals()
376
376
377 #--------------------------------------------------------------------------
377 #--------------------------------------------------------------------------
378 # extra process management utilities
378 # extra process management utilities
379 #--------------------------------------------------------------------------
379 #--------------------------------------------------------------------------
380
380
381 _random_ports = set()
381 _random_ports = set()
382
382
383 def select_random_ports(n):
383 def select_random_ports(n):
384 """Selects and return n random ports that are available."""
384 """Selects and return n random ports that are available."""
385 ports = []
385 ports = []
386 for i in xrange(n):
386 for i in xrange(n):
387 sock = socket.socket()
387 sock = socket.socket()
388 sock.bind(('', 0))
388 sock.bind(('', 0))
389 while sock.getsockname()[1] in _random_ports:
389 while sock.getsockname()[1] in _random_ports:
390 sock.close()
390 sock.close()
391 sock = socket.socket()
391 sock = socket.socket()
392 sock.bind(('', 0))
392 sock.bind(('', 0))
393 ports.append(sock)
393 ports.append(sock)
394 for i, sock in enumerate(ports):
394 for i, sock in enumerate(ports):
395 port = sock.getsockname()[1]
395 port = sock.getsockname()[1]
396 sock.close()
396 sock.close()
397 ports[i] = port
397 ports[i] = port
398 _random_ports.add(port)
398 _random_ports.add(port)
399 return ports
399 return ports
400
400
401 def signal_children(children):
401 def signal_children(children):
402 """Relay interupt/term signals to children, for more solid process cleanup."""
402 """Relay interupt/term signals to children, for more solid process cleanup."""
403 def terminate_children(sig, frame):
403 def terminate_children(sig, frame):
404 log = Application.instance().log
404 log = Application.instance().log
405 log.critical("Got signal %i, terminating children..."%sig)
405 log.critical("Got signal %i, terminating children..."%sig)
406 for child in children:
406 for child in children:
407 child.terminate()
407 child.terminate()
408
408
409 sys.exit(sig != SIGINT)
409 sys.exit(sig != SIGINT)
410 # sys.exit(sig)
410 # sys.exit(sig)
411 for sig in (SIGINT, SIGABRT, SIGTERM):
411 for sig in (SIGINT, SIGABRT, SIGTERM):
412 signal(sig, terminate_children)
412 signal(sig, terminate_children)
413
413
414 def generate_exec_key(keyfile):
414 def generate_exec_key(keyfile):
415 import uuid
415 import uuid
416 newkey = str(uuid.uuid4())
416 newkey = str(uuid.uuid4())
417 with open(keyfile, 'w') as f:
417 with open(keyfile, 'w') as f:
418 # f.write('ipython-key ')
418 # f.write('ipython-key ')
419 f.write(newkey+'\n')
419 f.write(newkey+'\n')
420 # set user-only RW permissions (0600)
420 # set user-only RW permissions (0600)
421 # this will have no effect on Windows
421 # this will have no effect on Windows
422 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
422 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
423
423
424
424
425 def integer_loglevel(loglevel):
425 def integer_loglevel(loglevel):
426 try:
426 try:
427 loglevel = int(loglevel)
427 loglevel = int(loglevel)
428 except ValueError:
428 except ValueError:
429 if isinstance(loglevel, str):
429 if isinstance(loglevel, str):
430 loglevel = getattr(logging, loglevel)
430 loglevel = getattr(logging, loglevel)
431 return loglevel
431 return loglevel
432
432
433 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
433 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
434 logger = logging.getLogger(logname)
434 logger = logging.getLogger(logname)
435 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
435 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
436 # don't add a second PUBHandler
436 # don't add a second PUBHandler
437 return
437 return
438 loglevel = integer_loglevel(loglevel)
438 loglevel = integer_loglevel(loglevel)
439 lsock = context.socket(zmq.PUB)
439 lsock = context.socket(zmq.PUB)
440 lsock.connect(iface)
440 lsock.connect(iface)
441 handler = handlers.PUBHandler(lsock)
441 handler = handlers.PUBHandler(lsock)
442 handler.setLevel(loglevel)
442 handler.setLevel(loglevel)
443 handler.root_topic = root
443 handler.root_topic = root
444 logger.addHandler(handler)
444 logger.addHandler(handler)
445 logger.setLevel(loglevel)
445 logger.setLevel(loglevel)
446
446
447 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
447 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
448 logger = logging.getLogger()
448 logger = logging.getLogger()
449 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
449 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
450 # don't add a second PUBHandler
450 # don't add a second PUBHandler
451 return
451 return
452 loglevel = integer_loglevel(loglevel)
452 loglevel = integer_loglevel(loglevel)
453 lsock = context.socket(zmq.PUB)
453 lsock = context.socket(zmq.PUB)
454 lsock.connect(iface)
454 lsock.connect(iface)
455 handler = EnginePUBHandler(engine, lsock)
455 handler = EnginePUBHandler(engine, lsock)
456 handler.setLevel(loglevel)
456 handler.setLevel(loglevel)
457 logger.addHandler(handler)
457 logger.addHandler(handler)
458 logger.setLevel(loglevel)
458 logger.setLevel(loglevel)
459 return logger
459 return logger
460
460
461 def local_logger(logname, loglevel=logging.DEBUG):
461 def local_logger(logname, loglevel=logging.DEBUG):
462 loglevel = integer_loglevel(loglevel)
462 loglevel = integer_loglevel(loglevel)
463 logger = logging.getLogger(logname)
463 logger = logging.getLogger(logname)
464 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
464 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
465 # don't add a second StreamHandler
465 # don't add a second StreamHandler
466 return
466 return
467 handler = logging.StreamHandler()
467 handler = logging.StreamHandler()
468 handler.setLevel(loglevel)
468 handler.setLevel(loglevel)
469 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
469 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
470 datefmt="%Y-%m-%d %H:%M:%S")
470 datefmt="%Y-%m-%d %H:%M:%S")
471 handler.setFormatter(formatter)
471 handler.setFormatter(formatter)
472
472
473 logger.addHandler(handler)
473 logger.addHandler(handler)
474 logger.setLevel(loglevel)
474 logger.setLevel(loglevel)
475 return logger
475 return logger
476
476
General Comments 0
You need to be logged in to leave comments. Login now