##// END OF EJS Templates
reflect revised apply_bound pattern
MinRK -
Show More
@@ -1,169 +1,169 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.test.test_newserialized -*-
3 3
4 4 """Refactored serialization classes and interfaces."""
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 # Tell nose to skip this module
9 9 __test__ = {}
10 10
11 11 #-------------------------------------------------------------------------------
12 12 # Copyright (C) 2008 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-------------------------------------------------------------------------------
17 17
18 18 #-------------------------------------------------------------------------------
19 19 # Imports
20 20 #-------------------------------------------------------------------------------
21 21
22 22 import cPickle as pickle
23 23
24 24 try:
25 25 import numpy
26 26 except ImportError:
27 27 pass
28 28
29 29 class SerializationError(Exception):
30 30 pass
31 31
32 32 #-----------------------------------------------------------------------------
33 33 # Classes and functions
34 34 #-----------------------------------------------------------------------------
35 35
36 36 class ISerialized:
37 37
38 38 def getData():
39 39 """"""
40 40
41 41 def getDataSize(units=10.0**6):
42 42 """"""
43 43
44 44 def getTypeDescriptor():
45 45 """"""
46 46
47 47 def getMetadata():
48 48 """"""
49 49
50 50
51 51 class IUnSerialized:
52 52
53 53 def getObject():
54 54 """"""
55 55
56 56 class Serialized(object):
57 57
58 58 # implements(ISerialized)
59 59
60 60 def __init__(self, data, typeDescriptor, metadata={}):
61 61 self.data = data
62 62 self.typeDescriptor = typeDescriptor
63 63 self.metadata = metadata
64 64
65 65 def getData(self):
66 66 return self.data
67 67
68 68 def getDataSize(self, units=10.0**6):
69 69 return len(self.data)/units
70 70
71 71 def getTypeDescriptor(self):
72 72 return self.typeDescriptor
73 73
74 74 def getMetadata(self):
75 75 return self.metadata
76 76
77 77
78 78 class UnSerialized(object):
79 79
80 80 # implements(IUnSerialized)
81 81
82 82 def __init__(self, obj):
83 83 self.obj = obj
84 84
85 85 def getObject(self):
86 86 return self.obj
87 87
88 88
89 89 class SerializeIt(object):
90 90
91 91 # implements(ISerialized)
92 92
93 93 def __init__(self, unSerialized):
94 94 self.data = None
95 95 self.obj = unSerialized.getObject()
96 96 if globals().has_key('numpy') and isinstance(self.obj, numpy.ndarray):
97 97 if len(self.obj.shape) == 0: # length 0 arrays are just pickled
98 98 self.typeDescriptor = 'pickle'
99 99 self.metadata = {}
100 100 else:
101 101 self.obj = numpy.ascontiguousarray(self.obj, dtype=None)
102 102 self.typeDescriptor = 'ndarray'
103 103 self.metadata = {'shape':self.obj.shape,
104 104 'dtype':self.obj.dtype.str}
105 elif isinstance(self.obj, bytes):
105 elif isinstance(self.obj, str):
106 106 self.typeDescriptor = 'bytes'
107 107 self.metadata = {}
108 108 elif isinstance(self.obj, buffer):
109 109 self.typeDescriptor = 'buffer'
110 110 self.metadata = {}
111 111 else:
112 112 self.typeDescriptor = 'pickle'
113 113 self.metadata = {}
114 114 self._generateData()
115 115
116 116 def _generateData(self):
117 117 if self.typeDescriptor == 'ndarray':
118 118 self.data = numpy.getbuffer(self.obj)
119 119 elif self.typeDescriptor in ('bytes', 'buffer'):
120 120 self.data = self.obj
121 121 elif self.typeDescriptor == 'pickle':
122 122 self.data = pickle.dumps(self.obj, -1)
123 123 else:
124 124 raise SerializationError("Really wierd serialization error.")
125 125 del self.obj
126 126
127 127 def getData(self):
128 128 return self.data
129 129
130 130 def getDataSize(self, units=10.0**6):
131 131 return 1.0*len(self.data)/units
132 132
133 133 def getTypeDescriptor(self):
134 134 return self.typeDescriptor
135 135
136 136 def getMetadata(self):
137 137 return self.metadata
138 138
139 139
140 140 class UnSerializeIt(UnSerialized):
141 141
142 142 # implements(IUnSerialized)
143 143
144 144 def __init__(self, serialized):
145 145 self.serialized = serialized
146 146
147 147 def getObject(self):
148 148 typeDescriptor = self.serialized.getTypeDescriptor()
149 149 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
150 150 buf = self.serialized.getData()
151 if isinstance(buf, (buffer,bytes)):
151 if isinstance(buf, (str, buffer)):
152 152 result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
153 153 else:
154 154 # memoryview
155 155 result = numpy.array(buf, dtype = self.serialized.metadata['dtype'])
156 156 result.shape = self.serialized.metadata['shape']
157 157 elif typeDescriptor == 'pickle':
158 158 result = pickle.loads(self.serialized.getData())
159 159 elif typeDescriptor in ('bytes', 'buffer'):
160 160 result = self.serialized.getData()
161 161 else:
162 162 raise SerializationError("Really wierd serialization error.")
163 163 return result
164 164
165 165 def serialize(obj):
166 166 return SerializeIt(UnSerialized(obj))
167 167
168 168 def unserialize(serialized):
169 169 return UnSerializeIt(serialized).getObject()
@@ -1,142 +1,143 b''
1 1 # encoding: utf-8
2 2
3 3 """Pickle related utilities. Perhaps this should be called 'can'."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 18 from types import FunctionType
19 19 import copy
20 20
21 21 from IPython.zmq.parallel.dependency import dependent
22 22
23 23 import codeutil
24 24
25 25 #-------------------------------------------------------------------------------
26 26 # Classes
27 27 #-------------------------------------------------------------------------------
28 28
29 29
30 30 class CannedObject(object):
31 31 def __init__(self, obj, keys=[]):
32 32 self.keys = keys
33 33 self.obj = copy.copy(obj)
34 34 for key in keys:
35 35 setattr(self.obj, key, can(getattr(obj, key)))
36 36
37 37
38 38 def getObject(self, g=None):
39 39 if g is None:
40 40 g = globals()
41 41 for key in self.keys:
42 42 setattr(self.obj, key, uncan(getattr(self.obj, key), g))
43 43 return self.obj
44 44
45 45 class Reference(CannedObject):
46 46 """object for wrapping a remote reference by name."""
47 47 def __init__(self, name):
48 48 if not isinstance(name, basestring):
49 49 raise TypeError("illegal name: %r"%name)
50 50 self.name = name
51 51
52 52 def __repr__(self):
53 53 return "<Reference: %r>"%self.name
54 54
55 55 def getObject(self, g=None):
56 56 if g is None:
57 57 g = globals()
58 58 try:
59 59 return g[self.name]
60 60 except KeyError:
61 61 raise NameError("name %r is not defined"%self.name)
62 62
63 63
64 64 class CannedFunction(CannedObject):
65 65
66 66 def __init__(self, f):
67 67 self._checkType(f)
68 68 self.code = f.func_code
69 self.defaults = f.func_defaults
69 70 self.__name__ = f.__name__
70 71
71 72 def _checkType(self, obj):
72 73 assert isinstance(obj, FunctionType), "Not a function type"
73 74
74 75 def getObject(self, g=None):
75 76 if g is None:
76 77 g = globals()
77 newFunc = FunctionType(self.code, g)
78 newFunc = FunctionType(self.code, g, self.__name__, self.defaults)
78 79 return newFunc
79 80
80 81 #-------------------------------------------------------------------------------
81 82 # Functions
82 83 #-------------------------------------------------------------------------------
83 84
84 85
85 86 def can(obj):
86 87 if isinstance(obj, dependent):
87 88 keys = ('f','df')
88 89 return CannedObject(obj, keys=keys)
89 90 elif isinstance(obj, FunctionType):
90 91 return CannedFunction(obj)
91 92 elif isinstance(obj,dict):
92 93 return canDict(obj)
93 94 elif isinstance(obj, (list,tuple)):
94 95 return canSequence(obj)
95 96 else:
96 97 return obj
97 98
98 99 def canDict(obj):
99 100 if isinstance(obj, dict):
100 101 newobj = {}
101 102 for k, v in obj.iteritems():
102 103 newobj[k] = can(v)
103 104 return newobj
104 105 else:
105 106 return obj
106 107
107 108 def canSequence(obj):
108 109 if isinstance(obj, (list, tuple)):
109 110 t = type(obj)
110 111 return t([can(i) for i in obj])
111 112 else:
112 113 return obj
113 114
114 115 def uncan(obj, g=None):
115 116 if isinstance(obj, CannedObject):
116 117 return obj.getObject(g)
117 118 elif isinstance(obj,dict):
118 119 return uncanDict(obj, g)
119 120 elif isinstance(obj, (list,tuple)):
120 121 return uncanSequence(obj, g)
121 122 else:
122 123 return obj
123 124
124 125 def uncanDict(obj, g=None):
125 126 if isinstance(obj, dict):
126 127 newobj = {}
127 128 for k, v in obj.iteritems():
128 129 newobj[k] = uncan(v,g)
129 130 return newobj
130 131 else:
131 132 return obj
132 133
133 134 def uncanSequence(obj, g=None):
134 135 if isinstance(obj, (list, tuple)):
135 136 t = type(obj)
136 137 return t([uncan(i,g) for i in obj])
137 138 else:
138 139 return obj
139 140
140 141
141 142 def rebindFunctionGlobals(f, glbls):
142 143 return FunctionType(f.func_code, glbls)
@@ -1,1569 +1,1569 b''
1 1 """A semi-synchronous Client for the ZMQ controller"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 import os
14 14 import json
15 15 import time
16 16 import warnings
17 17 from datetime import datetime
18 18 from getpass import getpass
19 19 from pprint import pprint
20 20
21 21 pjoin = os.path.join
22 22
23 23 import zmq
24 24 # from zmq.eventloop import ioloop, zmqstream
25 25
26 26 from IPython.utils.path import get_ipython_dir
27 27 from IPython.utils.pickleutil import Reference
28 28 from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode,
29 29 Dict, List, Bool, Str, Set)
30 30 from IPython.external.decorator import decorator
31 31 from IPython.external.ssh import tunnel
32 32
33 33 from . import error
34 34 from . import map as Map
35 35 from . import util
36 36 from . import streamsession as ss
37 37 from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult
38 38 from .clusterdir import ClusterDir, ClusterDirError
39 39 from .dependency import Dependency, depend, require, dependent
40 40 from .remotefunction import remote, parallel, ParallelFunction, RemoteFunction
41 41 from .util import ReverseDict, validate_url, disambiguate_url
42 42 from .view import DirectView, LoadBalancedView
43 43
44 44 #--------------------------------------------------------------------------
45 45 # helpers for implementing old MEC API via client.apply
46 46 #--------------------------------------------------------------------------
47 47
48 def _push(ns):
48 def _push(user_ns, **ns):
49 49 """helper method for implementing `client.push` via `client.apply`"""
50 globals().update(ns)
50 user_ns.update(ns)
51 51
52 def _pull(keys):
52 def _pull(user_ns, keys):
53 53 """helper method for implementing `client.pull` via `client.apply`"""
54 g = globals()
55 54 if isinstance(keys, (list,tuple, set)):
56 55 for key in keys:
57 if not g.has_key(key):
56 if not user_ns.has_key(key):
58 57 raise NameError("name '%s' is not defined"%key)
59 return map(g.get, keys)
58 return map(user_ns.get, keys)
60 59 else:
61 if not g.has_key(keys):
60 if not user_ns.has_key(keys):
62 61 raise NameError("name '%s' is not defined"%keys)
63 return g.get(keys)
62 return user_ns.get(keys)
64 63
65 def _clear():
64 def _clear(user_ns):
66 65 """helper method for implementing `client.clear` via `client.apply`"""
67 globals().clear()
66 user_ns.clear()
68 67
69 def _execute(code):
68 def _execute(user_ns, code):
70 69 """helper method for implementing `client.execute` via `client.apply`"""
71 exec code in globals()
70 exec code in user_ns
72 71
73 72
74 73 #--------------------------------------------------------------------------
75 74 # Decorators for Client methods
76 75 #--------------------------------------------------------------------------
77 76
78 77 @decorator
79 78 def spinfirst(f, self, *args, **kwargs):
80 79 """Call spin() to sync state prior to calling the method."""
81 80 self.spin()
82 81 return f(self, *args, **kwargs)
83 82
84 83 @decorator
85 84 def defaultblock(f, self, *args, **kwargs):
86 85 """Default to self.block; preserve self.block."""
87 86 block = kwargs.get('block',None)
88 87 block = self.block if block is None else block
89 88 saveblock = self.block
90 89 self.block = block
91 90 try:
92 91 ret = f(self, *args, **kwargs)
93 92 finally:
94 93 self.block = saveblock
95 94 return ret
96 95
97 96
98 97 #--------------------------------------------------------------------------
99 98 # Classes
100 99 #--------------------------------------------------------------------------
101 100
102 101 class Metadata(dict):
103 102 """Subclass of dict for initializing metadata values.
104 103
105 104 Attribute access works on keys.
106 105
107 106 These objects have a strict set of keys - errors will raise if you try
108 107 to add new keys.
109 108 """
110 109 def __init__(self, *args, **kwargs):
111 110 dict.__init__(self)
112 111 md = {'msg_id' : None,
113 112 'submitted' : None,
114 113 'started' : None,
115 114 'completed' : None,
116 115 'received' : None,
117 116 'engine_uuid' : None,
118 117 'engine_id' : None,
119 118 'follow' : None,
120 119 'after' : None,
121 120 'status' : None,
122 121
123 122 'pyin' : None,
124 123 'pyout' : None,
125 124 'pyerr' : None,
126 125 'stdout' : '',
127 126 'stderr' : '',
128 127 }
129 128 self.update(md)
130 129 self.update(dict(*args, **kwargs))
131 130
132 131 def __getattr__(self, key):
133 132 """getattr aliased to getitem"""
134 133 if key in self.iterkeys():
135 134 return self[key]
136 135 else:
137 136 raise AttributeError(key)
138 137
139 138 def __setattr__(self, key, value):
140 139 """setattr aliased to setitem, with strict"""
141 140 if key in self.iterkeys():
142 141 self[key] = value
143 142 else:
144 143 raise AttributeError(key)
145 144
146 145 def __setitem__(self, key, value):
147 146 """strict static key enforcement"""
148 147 if key in self.iterkeys():
149 148 dict.__setitem__(self, key, value)
150 149 else:
151 150 raise KeyError(key)
152 151
153 152
154 153 class Client(HasTraits):
155 154 """A semi-synchronous client to the IPython ZMQ controller
156 155
157 156 Parameters
158 157 ----------
159 158
160 159 url_or_file : bytes; zmq url or path to ipcontroller-client.json
161 160 Connection information for the Hub's registration. If a json connector
162 161 file is given, then likely no further configuration is necessary.
163 162 [Default: use profile]
164 163 profile : bytes
165 164 The name of the Cluster profile to be used to find connector information.
166 165 [Default: 'default']
167 166 context : zmq.Context
168 167 Pass an existing zmq.Context instance, otherwise the client will create its own.
169 168 username : bytes
170 169 set username to be passed to the Session object
171 170 debug : bool
172 171 flag for lots of message printing for debug purposes
173 172
174 173 #-------------- ssh related args ----------------
175 174 # These are args for configuring the ssh tunnel to be used
176 175 # credentials are used to forward connections over ssh to the Controller
177 176 # Note that the ip given in `addr` needs to be relative to sshserver
178 177 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
179 178 # and set sshserver as the same machine the Controller is on. However,
180 179 # the only requirement is that sshserver is able to see the Controller
181 180 # (i.e. is within the same trusted network).
182 181
183 182 sshserver : str
184 183 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
185 184 If keyfile or password is specified, and this is not, it will default to
186 185 the ip given in addr.
187 186 sshkey : str; path to public ssh key file
188 187 This specifies a key to be used in ssh login, default None.
189 188 Regular default ssh keys will be used without specifying this argument.
190 189 password : str
191 190 Your ssh password to sshserver. Note that if this is left None,
192 191 you will be prompted for it if passwordless key based login is unavailable.
193 192 paramiko : bool
194 193 flag for whether to use paramiko instead of shell ssh for tunneling.
195 194 [default: True on win32, False else]
196 195
197 196 #------- exec authentication args -------
198 197 # If even localhost is untrusted, you can have some protection against
199 198 # unauthorized execution by using a key. Messages are still sent
200 199 # as cleartext, so if someone can snoop your loopback traffic this will
201 200 # not help against malicious attacks.
202 201
203 202 exec_key : str
204 203 an authentication key or file containing a key
205 204 default: None
206 205
207 206
208 207 Attributes
209 208 ----------
210 209
211 210 ids : set of int engine IDs
212 211 requesting the ids attribute always synchronizes
213 212 the registration state. To request ids without synchronization,
214 213 use semi-private _ids attributes.
215 214
216 215 history : list of msg_ids
217 216 a list of msg_ids, keeping track of all the execution
218 217 messages you have submitted in order.
219 218
220 219 outstanding : set of msg_ids
221 220 a set of msg_ids that have been submitted, but whose
222 221 results have not yet been received.
223 222
224 223 results : dict
225 224 a dict of all our results, keyed by msg_id
226 225
227 226 block : bool
228 227 determines default behavior when block not specified
229 228 in execution methods
230 229
231 230 Methods
232 231 -------
233 232
234 233 spin
235 234 flushes incoming results and registration state changes
236 235 control methods spin, and requesting `ids` also ensures up to date
237 236
238 237 barrier
239 238 wait on one or more msg_ids
240 239
241 240 execution methods
242 241 apply
243 242 legacy: execute, run
244 243
245 244 query methods
246 245 queue_status, get_result, purge
247 246
248 247 control methods
249 248 abort, shutdown
250 249
251 250 """
252 251
253 252
254 253 block = Bool(False)
255 254 outstanding = Set()
256 255 results = Instance('collections.defaultdict', (dict,))
257 256 metadata = Instance('collections.defaultdict', (Metadata,))
258 257 history = List()
259 258 debug = Bool(False)
260 259 profile=CUnicode('default')
261 260
262 261 _outstanding_dict = Instance('collections.defaultdict', (set,))
263 262 _ids = List()
264 263 _connected=Bool(False)
265 264 _ssh=Bool(False)
266 265 _context = Instance('zmq.Context')
267 266 _config = Dict()
268 267 _engines=Instance(ReverseDict, (), {})
269 268 _registration_socket=Instance('zmq.Socket')
270 269 _query_socket=Instance('zmq.Socket')
271 270 _control_socket=Instance('zmq.Socket')
272 271 _iopub_socket=Instance('zmq.Socket')
273 272 _notification_socket=Instance('zmq.Socket')
274 273 _mux_socket=Instance('zmq.Socket')
275 274 _task_socket=Instance('zmq.Socket')
276 275 _task_scheme=Str()
277 276 _balanced_views=Dict()
278 277 _direct_views=Dict()
279 278 _closed = False
280 279
281 280 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
282 281 context=None, username=None, debug=False, exec_key=None,
283 282 sshserver=None, sshkey=None, password=None, paramiko=None,
284 283 ):
285 284 super(Client, self).__init__(debug=debug, profile=profile)
286 285 if context is None:
287 286 context = zmq.Context()
288 287 self._context = context
289 288
290 289
291 290 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
292 291 if self._cd is not None:
293 292 if url_or_file is None:
294 293 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
295 294 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
296 295 " Please specify at least one of url_or_file or profile."
297 296
298 297 try:
299 298 validate_url(url_or_file)
300 299 except AssertionError:
301 300 if not os.path.exists(url_or_file):
302 301 if self._cd:
303 302 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
304 303 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
305 304 with open(url_or_file) as f:
306 305 cfg = json.loads(f.read())
307 306 else:
308 307 cfg = {'url':url_or_file}
309 308
310 309 # sync defaults from args, json:
311 310 if sshserver:
312 311 cfg['ssh'] = sshserver
313 312 if exec_key:
314 313 cfg['exec_key'] = exec_key
315 314 exec_key = cfg['exec_key']
316 315 sshserver=cfg['ssh']
317 316 url = cfg['url']
318 317 location = cfg.setdefault('location', None)
319 318 cfg['url'] = disambiguate_url(cfg['url'], location)
320 319 url = cfg['url']
321 320
322 321 self._config = cfg
323 322
324 323 self._ssh = bool(sshserver or sshkey or password)
325 324 if self._ssh and sshserver is None:
326 325 # default to ssh via localhost
327 326 sshserver = url.split('://')[1].split(':')[0]
328 327 if self._ssh and password is None:
329 328 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
330 329 password=False
331 330 else:
332 331 password = getpass("SSH Password for %s: "%sshserver)
333 332 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
334 333 if exec_key is not None and os.path.isfile(exec_key):
335 334 arg = 'keyfile'
336 335 else:
337 336 arg = 'key'
338 337 key_arg = {arg:exec_key}
339 338 if username is None:
340 339 self.session = ss.StreamSession(**key_arg)
341 340 else:
342 341 self.session = ss.StreamSession(username, **key_arg)
343 342 self._registration_socket = self._context.socket(zmq.XREQ)
344 343 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
345 344 if self._ssh:
346 345 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
347 346 else:
348 347 self._registration_socket.connect(url)
349 348
350 349 self.session.debug = self.debug
351 350
352 351 self._notification_handlers = {'registration_notification' : self._register_engine,
353 352 'unregistration_notification' : self._unregister_engine,
354 353 }
355 354 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
356 355 'apply_reply' : self._handle_apply_reply}
357 356 self._connect(sshserver, ssh_kwargs)
358 357
359 358 def __del__(self):
360 359 """cleanup sockets, but _not_ context."""
361 360 self.close()
362 361
363 362 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
364 363 if ipython_dir is None:
365 364 ipython_dir = get_ipython_dir()
366 365 if cluster_dir is not None:
367 366 try:
368 367 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
369 368 return
370 369 except ClusterDirError:
371 370 pass
372 371 elif profile is not None:
373 372 try:
374 373 self._cd = ClusterDir.find_cluster_dir_by_profile(
375 374 ipython_dir, profile)
376 375 return
377 376 except ClusterDirError:
378 377 pass
379 378 self._cd = None
380 379
381 380 @property
382 381 def ids(self):
383 382 """Always up-to-date ids property."""
384 383 self._flush_notifications()
385 384 # always copy:
386 385 return list(self._ids)
387 386
388 387 def close(self):
389 388 if self._closed:
390 389 return
391 390 snames = filter(lambda n: n.endswith('socket'), dir(self))
392 391 for socket in map(lambda name: getattr(self, name), snames):
393 392 if isinstance(socket, zmq.Socket) and not socket.closed:
394 393 socket.close()
395 394 self._closed = True
396 395
397 396 def _update_engines(self, engines):
398 397 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
399 398 for k,v in engines.iteritems():
400 399 eid = int(k)
401 400 self._engines[eid] = bytes(v) # force not unicode
402 401 self._ids.append(eid)
403 402 self._ids = sorted(self._ids)
404 403 if sorted(self._engines.keys()) != range(len(self._engines)) and \
405 404 self._task_scheme == 'pure' and self._task_socket:
406 405 self._stop_scheduling_tasks()
407 406
408 407 def _stop_scheduling_tasks(self):
409 408 """Stop scheduling tasks because an engine has been unregistered
410 409 from a pure ZMQ scheduler.
411 410 """
412 411
413 412 self._task_socket.close()
414 413 self._task_socket = None
415 414 msg = "An engine has been unregistered, and we are using pure " +\
416 415 "ZMQ task scheduling. Task farming will be disabled."
417 416 if self.outstanding:
418 417 msg += " If you were running tasks when this happened, " +\
419 418 "some `outstanding` msg_ids may never resolve."
420 419 warnings.warn(msg, RuntimeWarning)
421 420
422 421 def _build_targets(self, targets):
423 422 """Turn valid target IDs or 'all' into two lists:
424 423 (int_ids, uuids).
425 424 """
426 425 if targets is None:
427 426 targets = self._ids
428 427 elif isinstance(targets, str):
429 428 if targets.lower() == 'all':
430 429 targets = self._ids
431 430 else:
432 431 raise TypeError("%r not valid str target, must be 'all'"%(targets))
433 432 elif isinstance(targets, int):
434 433 targets = [targets]
435 434 return [self._engines[t] for t in targets], list(targets)
436 435
437 436 def _connect(self, sshserver, ssh_kwargs):
438 437 """setup all our socket connections to the controller. This is called from
439 438 __init__."""
440 439
441 440 # Maybe allow reconnecting?
442 441 if self._connected:
443 442 return
444 443 self._connected=True
445 444
446 445 def connect_socket(s, url):
447 446 url = disambiguate_url(url, self._config['location'])
448 447 if self._ssh:
449 448 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
450 449 else:
451 450 return s.connect(url)
452 451
453 452 self.session.send(self._registration_socket, 'connection_request')
454 453 idents,msg = self.session.recv(self._registration_socket,mode=0)
455 454 if self.debug:
456 455 pprint(msg)
457 456 msg = ss.Message(msg)
458 457 content = msg.content
459 458 self._config['registration'] = dict(content)
460 459 if content.status == 'ok':
461 460 if content.mux:
462 461 self._mux_socket = self._context.socket(zmq.PAIR)
463 462 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
464 463 connect_socket(self._mux_socket, content.mux)
465 464 if content.task:
466 465 self._task_scheme, task_addr = content.task
467 466 self._task_socket = self._context.socket(zmq.PAIR)
468 467 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
469 468 connect_socket(self._task_socket, task_addr)
470 469 if content.notification:
471 470 self._notification_socket = self._context.socket(zmq.SUB)
472 471 connect_socket(self._notification_socket, content.notification)
473 472 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
474 473 if content.query:
475 474 self._query_socket = self._context.socket(zmq.PAIR)
476 475 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
477 476 connect_socket(self._query_socket, content.query)
478 477 if content.control:
479 478 self._control_socket = self._context.socket(zmq.PAIR)
480 479 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
481 480 connect_socket(self._control_socket, content.control)
482 481 if content.iopub:
483 482 self._iopub_socket = self._context.socket(zmq.SUB)
484 483 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
485 484 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
486 485 connect_socket(self._iopub_socket, content.iopub)
487 486 self._update_engines(dict(content.engines))
488 487
489 488 else:
490 489 self._connected = False
491 490 raise Exception("Failed to connect!")
492 491
493 492 #--------------------------------------------------------------------------
494 493 # handlers and callbacks for incoming messages
495 494 #--------------------------------------------------------------------------
496 495
497 496 def _unwrap_exception(self, content):
498 497 """unwrap exception, and remap engineid to int."""
499 498 e = error.unwrap_exception(content)
500 499 if e.engine_info:
501 500 e_uuid = e.engine_info['engine_uuid']
502 501 eid = self._engines[e_uuid]
503 502 e.engine_info['engine_id'] = eid
504 503 return e
505 504
506 505 def _extract_metadata(self, header, parent, content):
507 506 md = {'msg_id' : parent['msg_id'],
508 507 'received' : datetime.now(),
509 508 'engine_uuid' : header.get('engine', None),
510 509 'follow' : parent.get('follow', []),
511 510 'after' : parent.get('after', []),
512 511 'status' : content['status'],
513 512 }
514 513
515 514 if md['engine_uuid'] is not None:
516 515 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
517 516
518 517 if 'date' in parent:
519 518 md['submitted'] = datetime.strptime(parent['date'], util.ISO8601)
520 519 if 'started' in header:
521 520 md['started'] = datetime.strptime(header['started'], util.ISO8601)
522 521 if 'date' in header:
523 522 md['completed'] = datetime.strptime(header['date'], util.ISO8601)
524 523 return md
525 524
526 525 def _register_engine(self, msg):
527 526 """Register a new engine, and update our connection info."""
528 527 content = msg['content']
529 528 eid = content['id']
530 529 d = {eid : content['queue']}
531 530 self._update_engines(d)
532 531
533 532 def _unregister_engine(self, msg):
534 533 """Unregister an engine that has died."""
535 534 content = msg['content']
536 535 eid = int(content['id'])
537 536 if eid in self._ids:
538 537 self._ids.remove(eid)
539 538 uuid = self._engines.pop(eid)
540 539
541 540 self._handle_stranded_msgs(eid, uuid)
542 541
543 542 if self._task_socket and self._task_scheme == 'pure':
544 543 self._stop_scheduling_tasks()
545 544
546 545 def _handle_stranded_msgs(self, eid, uuid):
547 546 """Handle messages known to be on an engine when the engine unregisters.
548 547
549 548 It is possible that this will fire prematurely - that is, an engine will
550 549 go down after completing a result, and the client will be notified
551 550 of the unregistration and later receive the successful result.
552 551 """
553 552
554 553 outstanding = self._outstanding_dict[uuid]
555 554
556 555 for msg_id in list(outstanding):
557 556 if msg_id in self.results:
558 557 # we already
559 558 continue
560 559 try:
561 560 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
562 561 except:
563 562 content = error.wrap_exception()
564 563 # build a fake message:
565 564 parent = {}
566 565 header = {}
567 566 parent['msg_id'] = msg_id
568 567 header['engine'] = uuid
569 568 header['date'] = datetime.now().strftime(util.ISO8601)
570 569 msg = dict(parent_header=parent, header=header, content=content)
571 570 self._handle_apply_reply(msg)
572 571
573 572 def _handle_execute_reply(self, msg):
574 573 """Save the reply to an execute_request into our results.
575 574
576 575 execute messages are never actually used. apply is used instead.
577 576 """
578 577
579 578 parent = msg['parent_header']
580 579 msg_id = parent['msg_id']
581 580 if msg_id not in self.outstanding:
582 581 if msg_id in self.history:
583 582 print ("got stale result: %s"%msg_id)
584 583 else:
585 584 print ("got unknown result: %s"%msg_id)
586 585 else:
587 586 self.outstanding.remove(msg_id)
588 587 self.results[msg_id] = self._unwrap_exception(msg['content'])
589 588
590 589 def _handle_apply_reply(self, msg):
591 590 """Save the reply to an apply_request into our results."""
592 591 parent = msg['parent_header']
593 592 msg_id = parent['msg_id']
594 593 if msg_id not in self.outstanding:
595 594 if msg_id in self.history:
596 595 print ("got stale result: %s"%msg_id)
597 596 print self.results[msg_id]
598 597 print msg
599 598 else:
600 599 print ("got unknown result: %s"%msg_id)
601 600 else:
602 601 self.outstanding.remove(msg_id)
603 602 content = msg['content']
604 603 header = msg['header']
605 604
606 605 # construct metadata:
607 606 md = self.metadata[msg_id]
608 607 md.update(self._extract_metadata(header, parent, content))
609 608 # is this redundant?
610 609 self.metadata[msg_id] = md
611 610
612 611 e_outstanding = self._outstanding_dict[md['engine_uuid']]
613 612 if msg_id in e_outstanding:
614 613 e_outstanding.remove(msg_id)
615 614
616 615 # construct result:
617 616 if content['status'] == 'ok':
618 617 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
619 618 elif content['status'] == 'aborted':
620 619 self.results[msg_id] = error.AbortedTask(msg_id)
621 620 elif content['status'] == 'resubmitted':
622 621 # TODO: handle resubmission
623 622 pass
624 623 else:
625 624 self.results[msg_id] = self._unwrap_exception(content)
626 625
627 626 def _flush_notifications(self):
628 627 """Flush notifications of engine registrations waiting
629 628 in ZMQ queue."""
630 629 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
631 630 while msg is not None:
632 631 if self.debug:
633 632 pprint(msg)
634 633 msg = msg[-1]
635 634 msg_type = msg['msg_type']
636 635 handler = self._notification_handlers.get(msg_type, None)
637 636 if handler is None:
638 637 raise Exception("Unhandled message type: %s"%msg.msg_type)
639 638 else:
640 639 handler(msg)
641 640 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
642 641
643 642 def _flush_results(self, sock):
644 643 """Flush task or queue results waiting in ZMQ queue."""
645 644 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
646 645 while msg is not None:
647 646 if self.debug:
648 647 pprint(msg)
649 648 msg = msg[-1]
650 649 msg_type = msg['msg_type']
651 650 handler = self._queue_handlers.get(msg_type, None)
652 651 if handler is None:
653 652 raise Exception("Unhandled message type: %s"%msg.msg_type)
654 653 else:
655 654 handler(msg)
656 655 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
657 656
658 657 def _flush_control(self, sock):
659 658 """Flush replies from the control channel waiting
660 659 in the ZMQ queue.
661 660
662 661 Currently: ignore them."""
663 662 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
664 663 while msg is not None:
665 664 if self.debug:
666 665 pprint(msg)
667 666 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
668 667
669 668 def _flush_iopub(self, sock):
670 669 """Flush replies from the iopub channel waiting
671 670 in the ZMQ queue.
672 671 """
673 672 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
674 673 while msg is not None:
675 674 if self.debug:
676 675 pprint(msg)
677 676 msg = msg[-1]
678 677 parent = msg['parent_header']
679 678 msg_id = parent['msg_id']
680 679 content = msg['content']
681 680 header = msg['header']
682 681 msg_type = msg['msg_type']
683 682
684 683 # init metadata:
685 684 md = self.metadata[msg_id]
686 685
687 686 if msg_type == 'stream':
688 687 name = content['name']
689 688 s = md[name] or ''
690 689 md[name] = s + content['data']
691 690 elif msg_type == 'pyerr':
692 691 md.update({'pyerr' : self._unwrap_exception(content)})
693 692 else:
694 693 md.update({msg_type : content['data']})
695 694
696 695 # reduntant?
697 696 self.metadata[msg_id] = md
698 697
699 698 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
700 699
701 700 #--------------------------------------------------------------------------
702 701 # len, getitem
703 702 #--------------------------------------------------------------------------
704 703
705 704 def __len__(self):
706 705 """len(client) returns # of engines."""
707 706 return len(self.ids)
708 707
709 708 def __getitem__(self, key):
710 709 """index access returns DirectView multiplexer objects
711 710
712 711 Must be int, slice, or list/tuple/xrange of ints"""
713 712 if not isinstance(key, (int, slice, tuple, list, xrange)):
714 713 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
715 714 else:
716 715 return self.view(key, balanced=False)
717 716
718 717 #--------------------------------------------------------------------------
719 718 # Begin public methods
720 719 #--------------------------------------------------------------------------
721 720
722 721 def spin(self):
723 722 """Flush any registration notifications and execution results
724 723 waiting in the ZMQ queue.
725 724 """
726 725 if self._notification_socket:
727 726 self._flush_notifications()
728 727 if self._mux_socket:
729 728 self._flush_results(self._mux_socket)
730 729 if self._task_socket:
731 730 self._flush_results(self._task_socket)
732 731 if self._control_socket:
733 732 self._flush_control(self._control_socket)
734 733 if self._iopub_socket:
735 734 self._flush_iopub(self._iopub_socket)
736 735
737 736 def barrier(self, jobs=None, timeout=-1):
738 737 """waits on one or more `jobs`, for up to `timeout` seconds.
739 738
740 739 Parameters
741 740 ----------
742 741
743 742 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
744 743 ints are indices to self.history
745 744 strs are msg_ids
746 745 default: wait on all outstanding messages
747 746 timeout : float
748 747 a time in seconds, after which to give up.
749 748 default is -1, which means no timeout
750 749
751 750 Returns
752 751 -------
753 752
754 753 True : when all msg_ids are done
755 754 False : timeout reached, some msg_ids still outstanding
756 755 """
757 756 tic = time.time()
758 757 if jobs is None:
759 758 theids = self.outstanding
760 759 else:
761 760 if isinstance(jobs, (int, str, AsyncResult)):
762 761 jobs = [jobs]
763 762 theids = set()
764 763 for job in jobs:
765 764 if isinstance(job, int):
766 765 # index access
767 766 job = self.history[job]
768 767 elif isinstance(job, AsyncResult):
769 768 map(theids.add, job.msg_ids)
770 769 continue
771 770 theids.add(job)
772 771 if not theids.intersection(self.outstanding):
773 772 return True
774 773 self.spin()
775 774 while theids.intersection(self.outstanding):
776 775 if timeout >= 0 and ( time.time()-tic ) > timeout:
777 776 break
778 777 time.sleep(1e-3)
779 778 self.spin()
780 779 return len(theids.intersection(self.outstanding)) == 0
781 780
782 781 #--------------------------------------------------------------------------
783 782 # Control methods
784 783 #--------------------------------------------------------------------------
785 784
786 785 @spinfirst
787 786 @defaultblock
788 787 def clear(self, targets=None, block=None):
789 788 """Clear the namespace in target(s)."""
790 789 targets = self._build_targets(targets)[0]
791 790 for t in targets:
792 791 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
793 792 error = False
794 793 if self.block:
795 794 for i in range(len(targets)):
796 795 idents,msg = self.session.recv(self._control_socket,0)
797 796 if self.debug:
798 797 pprint(msg)
799 798 if msg['content']['status'] != 'ok':
800 799 error = self._unwrap_exception(msg['content'])
801 800 if error:
802 801 raise error
803 802
804 803
805 804 @spinfirst
806 805 @defaultblock
807 806 def abort(self, jobs=None, targets=None, block=None):
808 807 """Abort specific jobs from the execution queues of target(s).
809 808
810 809 This is a mechanism to prevent jobs that have already been submitted
811 810 from executing.
812 811
813 812 Parameters
814 813 ----------
815 814
816 815 jobs : msg_id, list of msg_ids, or AsyncResult
817 816 The jobs to be aborted
818 817
819 818
820 819 """
821 820 targets = self._build_targets(targets)[0]
822 821 msg_ids = []
823 822 if isinstance(jobs, (basestring,AsyncResult)):
824 823 jobs = [jobs]
825 824 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
826 825 if bad_ids:
827 826 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
828 827 for j in jobs:
829 828 if isinstance(j, AsyncResult):
830 829 msg_ids.extend(j.msg_ids)
831 830 else:
832 831 msg_ids.append(j)
833 832 content = dict(msg_ids=msg_ids)
834 833 for t in targets:
835 834 self.session.send(self._control_socket, 'abort_request',
836 835 content=content, ident=t)
837 836 error = False
838 837 if self.block:
839 838 for i in range(len(targets)):
840 839 idents,msg = self.session.recv(self._control_socket,0)
841 840 if self.debug:
842 841 pprint(msg)
843 842 if msg['content']['status'] != 'ok':
844 843 error = self._unwrap_exception(msg['content'])
845 844 if error:
846 845 raise error
847 846
848 847 @spinfirst
849 848 @defaultblock
850 849 def shutdown(self, targets=None, restart=False, controller=False, block=None):
851 850 """Terminates one or more engine processes, optionally including the controller."""
852 851 if controller:
853 852 targets = 'all'
854 853 targets = self._build_targets(targets)[0]
855 854 for t in targets:
856 855 self.session.send(self._control_socket, 'shutdown_request',
857 856 content={'restart':restart},ident=t)
858 857 error = False
859 858 if block or controller:
860 859 for i in range(len(targets)):
861 860 idents,msg = self.session.recv(self._control_socket,0)
862 861 if self.debug:
863 862 pprint(msg)
864 863 if msg['content']['status'] != 'ok':
865 864 error = self._unwrap_exception(msg['content'])
866 865
867 866 if controller:
868 867 time.sleep(0.25)
869 868 self.session.send(self._query_socket, 'shutdown_request')
870 869 idents,msg = self.session.recv(self._query_socket, 0)
871 870 if self.debug:
872 871 pprint(msg)
873 872 if msg['content']['status'] != 'ok':
874 873 error = self._unwrap_exception(msg['content'])
875 874
876 875 if error:
877 876 raise error
878 877
879 878 #--------------------------------------------------------------------------
880 879 # Execution methods
881 880 #--------------------------------------------------------------------------
882 881
883 882 @defaultblock
884 883 def execute(self, code, targets='all', block=None):
885 884 """Executes `code` on `targets` in blocking or nonblocking manner.
886 885
887 886 ``execute`` is always `bound` (affects engine namespace)
888 887
889 888 Parameters
890 889 ----------
891 890
892 891 code : str
893 892 the code string to be executed
894 893 targets : int/str/list of ints/strs
895 894 the engines on which to execute
896 895 default : all
897 896 block : bool
898 897 whether or not to wait until done to return
899 898 default: self.block
900 899 """
901 900 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
902 901 if not block:
903 902 return result
904 903
905 904 def run(self, filename, targets='all', block=None):
906 905 """Execute contents of `filename` on engine(s).
907 906
908 907 This simply reads the contents of the file and calls `execute`.
909 908
910 909 Parameters
911 910 ----------
912 911
913 912 filename : str
914 913 The path to the file
915 914 targets : int/str/list of ints/strs
916 915 the engines on which to execute
917 916 default : all
918 917 block : bool
919 918 whether or not to wait until done
920 919 default: self.block
921 920
922 921 """
923 922 with open(filename, 'r') as f:
924 923 # add newline in case of trailing indented whitespace
925 924 # which will cause SyntaxError
926 925 code = f.read()+'\n'
927 926 return self.execute(code, targets=targets, block=block)
928 927
929 928 def _maybe_raise(self, result):
930 929 """wrapper for maybe raising an exception if apply failed."""
931 930 if isinstance(result, error.RemoteError):
932 931 raise result
933 932
934 933 return result
935 934
936 935 def _build_dependency(self, dep):
937 936 """helper for building jsonable dependencies from various input forms"""
938 937 if isinstance(dep, Dependency):
939 938 return dep.as_dict()
940 939 elif isinstance(dep, AsyncResult):
941 940 return dep.msg_ids
942 941 elif dep is None:
943 942 return []
944 943 else:
945 944 # pass to Dependency constructor
946 945 return list(Dependency(dep))
947 946
948 947 @defaultblock
949 def apply(self, f, args=None, kwargs=None, bound=True, block=None,
948 def apply(self, f, args=None, kwargs=None, bound=False, block=None,
950 949 targets=None, balanced=None,
951 950 after=None, follow=None, timeout=None,
952 951 track=False):
953 952 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
954 953
955 954 This is the central execution command for the client.
956 955
957 956 Parameters
958 957 ----------
959 958
960 959 f : function
961 960 The fuction to be called remotely
962 961 args : tuple/list
963 962 The positional arguments passed to `f`
964 963 kwargs : dict
965 964 The keyword arguments passed to `f`
966 bound : bool (default: True)
967 Whether to execute in the Engine(s) namespace, or in a clean
968 namespace not affecting the engine.
965 bound : bool (default: False)
966 Whether to pass the Engine(s) Namespace as the first argument to `f`.
969 967 block : bool (default: self.block)
970 968 Whether to wait for the result, or return immediately.
971 969 False:
972 970 returns AsyncResult
973 971 True:
974 972 returns actual result(s) of f(*args, **kwargs)
975 973 if multiple targets:
976 974 list of results, matching `targets`
977 975 targets : int,list of ints, 'all', None
978 976 Specify the destination of the job.
979 977 if None:
980 978 Submit via Task queue for load-balancing.
981 979 if 'all':
982 980 Run on all active engines
983 981 if list:
984 982 Run on each specified engine
985 983 if int:
986 984 Run on single engine
987 985
988 986 balanced : bool, default None
989 987 whether to load-balance. This will default to True
990 988 if targets is unspecified, or False if targets is specified.
991 989
992 990 The following arguments are only used when balanced is True:
993 991 after : Dependency or collection of msg_ids
994 992 Only for load-balanced execution (targets=None)
995 993 Specify a list of msg_ids as a time-based dependency.
996 994 This job will only be run *after* the dependencies
997 995 have been met.
998 996
999 997 follow : Dependency or collection of msg_ids
1000 998 Only for load-balanced execution (targets=None)
1001 999 Specify a list of msg_ids as a location-based dependency.
1002 1000 This job will only be run on an engine where this dependency
1003 1001 is met.
1004 1002
1005 1003 timeout : float/int or None
1006 1004 Only for load-balanced execution (targets=None)
1007 1005 Specify an amount of time (in seconds) for the scheduler to
1008 1006 wait for dependencies to be met before failing with a
1009 1007 DependencyTimeout.
1010 1008 track : bool
1011 1009 whether to track non-copying sends.
1012 1010 [default False]
1013 1011
1014 1012 after,follow,timeout only used if `balanced=True`.
1015 1013
1016 1014 Returns
1017 1015 -------
1018 1016
1019 1017 if block is False:
1020 1018 return AsyncResult wrapping msg_ids
1021 1019 output of AsyncResult.get() is identical to that of `apply(...block=True)`
1022 1020 else:
1023 1021 if single target:
1024 1022 return result of `f(*args, **kwargs)`
1025 1023 else:
1026 1024 return list of results, matching `targets`
1027 1025 """
1028 1026 assert not self._closed, "cannot use me anymore, I'm closed!"
1029 1027 # defaults:
1030 1028 block = block if block is not None else self.block
1031 1029 args = args if args is not None else []
1032 1030 kwargs = kwargs if kwargs is not None else {}
1033 1031
1034 1032 if balanced is None:
1035 1033 if targets is None:
1036 1034 # default to balanced if targets unspecified
1037 1035 balanced = True
1038 1036 else:
1039 1037 # otherwise default to multiplexing
1040 1038 balanced = False
1041 1039
1042 1040 if targets is None and balanced is False:
1043 1041 # default to all if *not* balanced, and targets is unspecified
1044 1042 targets = 'all'
1045 1043
1046 1044 # enforce types of f,args,kwrags
1047 1045 if not callable(f):
1048 1046 raise TypeError("f must be callable, not %s"%type(f))
1049 1047 if not isinstance(args, (tuple, list)):
1050 1048 raise TypeError("args must be tuple or list, not %s"%type(args))
1051 1049 if not isinstance(kwargs, dict):
1052 1050 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1053 1051
1054 1052 options = dict(bound=bound, block=block, targets=targets, track=track)
1055 1053
1056 1054 if balanced:
1057 1055 return self._apply_balanced(f, args, kwargs, timeout=timeout,
1058 1056 after=after, follow=follow, **options)
1059 1057 elif follow or after or timeout:
1060 1058 msg = "follow, after, and timeout args are only used for"
1061 1059 msg += " load-balanced execution."
1062 1060 raise ValueError(msg)
1063 1061 else:
1064 1062 return self._apply_direct(f, args, kwargs, **options)
1065 1063
1066 1064 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
1067 1065 after=None, follow=None, timeout=None, track=None):
1068 1066 """call f(*args, **kwargs) remotely in a load-balanced manner.
1069 1067
1070 1068 This is a private method, see `apply` for details.
1071 1069 Not to be called directly!
1072 1070 """
1073 1071
1074 1072 loc = locals()
1075 1073 for name in ('bound', 'block', 'track'):
1076 1074 assert loc[name] is not None, "kwarg %r must be specified!"%name
1077 1075
1078 1076 if self._task_socket is None:
1079 1077 msg = "Task farming is disabled"
1080 1078 if self._task_scheme == 'pure':
1081 1079 msg += " because the pure ZMQ scheduler cannot handle"
1082 1080 msg += " disappearing engines."
1083 1081 raise RuntimeError(msg)
1084 1082
1085 1083 if self._task_scheme == 'pure':
1086 1084 # pure zmq scheme doesn't support dependencies
1087 1085 msg = "Pure ZMQ scheduler doesn't support dependencies"
1088 1086 if (follow or after):
1089 1087 # hard fail on DAG dependencies
1090 1088 raise RuntimeError(msg)
1091 1089 if isinstance(f, dependent):
1092 1090 # soft warn on functional dependencies
1093 1091 warnings.warn(msg, RuntimeWarning)
1094 1092
1095 1093 # defaults:
1096 1094 args = args if args is not None else []
1097 1095 kwargs = kwargs if kwargs is not None else {}
1098 1096
1099 1097 if targets:
1100 1098 idents,_ = self._build_targets(targets)
1101 1099 else:
1102 1100 idents = []
1103 1101
1104 1102 after = self._build_dependency(after)
1105 1103 follow = self._build_dependency(follow)
1106 1104 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1107 1105 bufs = util.pack_apply_message(f,args,kwargs)
1108 1106 content = dict(bound=bound)
1109 1107
1110 1108 msg = self.session.send(self._task_socket, "apply_request",
1111 1109 content=content, buffers=bufs, subheader=subheader, track=track)
1112 1110 msg_id = msg['msg_id']
1113 1111 self.outstanding.add(msg_id)
1114 1112 self.history.append(msg_id)
1115 1113 self.metadata[msg_id]['submitted'] = datetime.now()
1116 1114 tracker = None if track is False else msg['tracker']
1117 1115 ar = AsyncResult(self, [msg_id], fname=f.__name__, targets=targets, tracker=tracker)
1118 1116 if block:
1119 1117 try:
1120 1118 return ar.get()
1121 1119 except KeyboardInterrupt:
1122 1120 return ar
1123 1121 else:
1124 1122 return ar
1125 1123
1126 1124 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None,
1127 1125 track=None):
1128 1126 """Then underlying method for applying functions to specific engines
1129 1127 via the MUX queue.
1130 1128
1131 1129 This is a private method, see `apply` for details.
1132 1130 Not to be called directly!
1133 1131 """
1134 1132 loc = locals()
1135 1133 for name in ('bound', 'block', 'targets', 'track'):
1136 1134 assert loc[name] is not None, "kwarg %r must be specified!"%name
1137 1135
1138 1136 idents,targets = self._build_targets(targets)
1139 1137
1140 1138 subheader = {}
1141 1139 content = dict(bound=bound)
1142 1140 bufs = util.pack_apply_message(f,args,kwargs)
1143 1141
1144 1142 msg_ids = []
1145 1143 trackers = []
1146 1144 for ident in idents:
1147 1145 msg = self.session.send(self._mux_socket, "apply_request",
1148 1146 content=content, buffers=bufs, ident=ident, subheader=subheader,
1149 1147 track=track)
1150 1148 if track:
1151 1149 trackers.append(msg['tracker'])
1152 1150 msg_id = msg['msg_id']
1153 1151 self.outstanding.add(msg_id)
1154 1152 self._outstanding_dict[ident].add(msg_id)
1155 1153 self.history.append(msg_id)
1156 1154 msg_ids.append(msg_id)
1157 1155
1158 1156 tracker = None if track is False else zmq.MessageTracker(*trackers)
1159 1157 ar = AsyncResult(self, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
1160 1158
1161 1159 if block:
1162 1160 try:
1163 1161 return ar.get()
1164 1162 except KeyboardInterrupt:
1165 1163 return ar
1166 1164 else:
1167 1165 return ar
1168 1166
1169 1167 #--------------------------------------------------------------------------
1170 1168 # construct a View object
1171 1169 #--------------------------------------------------------------------------
1172 1170
1173 1171 @defaultblock
1174 def remote(self, bound=True, block=None, targets=None, balanced=None):
1172 def remote(self, bound=False, block=None, targets=None, balanced=None):
1175 1173 """Decorator for making a RemoteFunction"""
1176 1174 return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
1177 1175
1178 1176 @defaultblock
1179 def parallel(self, dist='b', bound=True, block=None, targets=None, balanced=None):
1177 def parallel(self, dist='b', bound=False, block=None, targets=None, balanced=None):
1180 1178 """Decorator for making a ParallelFunction"""
1181 1179 return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
1182 1180
1183 1181 def _cache_view(self, targets, balanced):
1184 1182 """save views, so subsequent requests don't create new objects."""
1185 1183 if balanced:
1186 1184 view_class = LoadBalancedView
1187 1185 view_cache = self._balanced_views
1188 1186 else:
1189 1187 view_class = DirectView
1190 1188 view_cache = self._direct_views
1191 1189
1192 1190 # use str, since often targets will be a list
1193 1191 key = str(targets)
1194 1192 if key not in view_cache:
1195 1193 view_cache[key] = view_class(client=self, targets=targets)
1196 1194
1197 1195 return view_cache[key]
1198 1196
1199 1197 def view(self, targets=None, balanced=None):
1200 1198 """Method for constructing View objects.
1201 1199
1202 1200 If no arguments are specified, create a LoadBalancedView
1203 1201 using all engines. If only `targets` specified, it will
1204 1202 be a DirectView. This method is the underlying implementation
1205 1203 of ``client.__getitem__``.
1206 1204
1207 1205 Parameters
1208 1206 ----------
1209 1207
1210 1208 targets: list,slice,int,etc. [default: use all engines]
1211 1209 The engines to use for the View
1212 1210 balanced : bool [default: False if targets specified, True else]
1213 1211 whether to build a LoadBalancedView or a DirectView
1214 1212
1215 1213 """
1216 1214
1217 1215 balanced = (targets is None) if balanced is None else balanced
1218 1216
1219 1217 if targets is None:
1220 1218 if balanced:
1221 1219 return self._cache_view(None,True)
1222 1220 else:
1223 1221 targets = slice(None)
1224 1222
1225 1223 if isinstance(targets, int):
1226 1224 if targets < 0:
1227 1225 targets = self.ids[targets]
1228 1226 if targets not in self.ids:
1229 1227 raise IndexError("No such engine: %i"%targets)
1230 1228 return self._cache_view(targets, balanced)
1231 1229
1232 1230 if isinstance(targets, slice):
1233 1231 indices = range(len(self.ids))[targets]
1234 1232 ids = sorted(self._ids)
1235 1233 targets = [ ids[i] for i in indices ]
1236 1234
1237 1235 if isinstance(targets, (tuple, list, xrange)):
1238 1236 _,targets = self._build_targets(list(targets))
1239 1237 return self._cache_view(targets, balanced)
1240 1238 else:
1241 1239 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1242 1240
1243 1241 #--------------------------------------------------------------------------
1244 1242 # Data movement
1245 1243 #--------------------------------------------------------------------------
1246 1244
1247 1245 @defaultblock
1248 1246 def push(self, ns, targets='all', block=None, track=False):
1249 1247 """Push the contents of `ns` into the namespace on `target`"""
1250 1248 if not isinstance(ns, dict):
1251 1249 raise TypeError("Must be a dict, not %s"%type(ns))
1252 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False, track=track)
1250 result = self.apply(_push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
1253 1251 if not block:
1254 1252 return result
1255 1253
1256 1254 @defaultblock
1257 1255 def pull(self, keys, targets='all', block=None):
1258 1256 """Pull objects from `target`'s namespace by `keys`"""
1259 if isinstance(keys, str):
1257 if isinstance(keys, basestring):
1260 1258 pass
1261 1259 elif isinstance(keys, (list,tuple,set)):
1262 1260 for key in keys:
1263 if not isinstance(key, str):
1264 raise TypeError
1261 if not isinstance(key, basestring):
1262 raise TypeError("keys must be str, not type %r"%type(key))
1263 else:
1264 raise TypeError("keys must be strs, not %r"%keys)
1265 1265 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1266 1266 return result
1267 1267
1268 1268 @defaultblock
1269 1269 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None, track=False):
1270 1270 """
1271 1271 Partition a Python sequence and send the partitions to a set of engines.
1272 1272 """
1273 1273 targets = self._build_targets(targets)[-1]
1274 1274 mapObject = Map.dists[dist]()
1275 1275 nparts = len(targets)
1276 1276 msg_ids = []
1277 1277 trackers = []
1278 1278 for index, engineid in enumerate(targets):
1279 1279 partition = mapObject.getPartition(seq, index, nparts)
1280 1280 if flatten and len(partition) == 1:
1281 1281 r = self.push({key: partition[0]}, targets=engineid, block=False, track=track)
1282 1282 else:
1283 1283 r = self.push({key: partition}, targets=engineid, block=False, track=track)
1284 1284 msg_ids.extend(r.msg_ids)
1285 1285 if track:
1286 1286 trackers.append(r._tracker)
1287 1287
1288 1288 if track:
1289 1289 tracker = zmq.MessageTracker(*trackers)
1290 1290 else:
1291 1291 tracker = None
1292 1292
1293 1293 r = AsyncResult(self, msg_ids, fname='scatter', targets=targets, tracker=tracker)
1294 1294 if block:
1295 1295 r.wait()
1296 1296 else:
1297 1297 return r
1298 1298
1299 1299 @defaultblock
1300 1300 def gather(self, key, dist='b', targets='all', block=None):
1301 1301 """
1302 1302 Gather a partitioned sequence on a set of engines as a single local seq.
1303 1303 """
1304 1304
1305 1305 targets = self._build_targets(targets)[-1]
1306 1306 mapObject = Map.dists[dist]()
1307 1307 msg_ids = []
1308 1308 for index, engineid in enumerate(targets):
1309 1309 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1310 1310
1311 1311 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1312 1312 if block:
1313 1313 return r.get()
1314 1314 else:
1315 1315 return r
1316 1316
1317 1317 #--------------------------------------------------------------------------
1318 1318 # Query methods
1319 1319 #--------------------------------------------------------------------------
1320 1320
1321 1321 @spinfirst
1322 1322 @defaultblock
1323 1323 def get_result(self, indices_or_msg_ids=None, block=None):
1324 1324 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1325 1325
1326 1326 If the client already has the results, no request to the Hub will be made.
1327 1327
1328 1328 This is a convenient way to construct AsyncResult objects, which are wrappers
1329 1329 that include metadata about execution, and allow for awaiting results that
1330 1330 were not submitted by this Client.
1331 1331
1332 1332 It can also be a convenient way to retrieve the metadata associated with
1333 1333 blocking execution, since it always retrieves
1334 1334
1335 1335 Examples
1336 1336 --------
1337 1337 ::
1338 1338
1339 1339 In [10]: r = client.apply()
1340 1340
1341 1341 Parameters
1342 1342 ----------
1343 1343
1344 1344 indices_or_msg_ids : integer history index, str msg_id, or list of either
1345 1345 The indices or msg_ids of indices to be retrieved
1346 1346
1347 1347 block : bool
1348 1348 Whether to wait for the result to be done
1349 1349
1350 1350 Returns
1351 1351 -------
1352 1352
1353 1353 AsyncResult
1354 1354 A single AsyncResult object will always be returned.
1355 1355
1356 1356 AsyncHubResult
1357 1357 A subclass of AsyncResult that retrieves results from the Hub
1358 1358
1359 1359 """
1360 1360 if indices_or_msg_ids is None:
1361 1361 indices_or_msg_ids = -1
1362 1362
1363 1363 if not isinstance(indices_or_msg_ids, (list,tuple)):
1364 1364 indices_or_msg_ids = [indices_or_msg_ids]
1365 1365
1366 1366 theids = []
1367 1367 for id in indices_or_msg_ids:
1368 1368 if isinstance(id, int):
1369 1369 id = self.history[id]
1370 1370 if not isinstance(id, str):
1371 1371 raise TypeError("indices must be str or int, not %r"%id)
1372 1372 theids.append(id)
1373 1373
1374 1374 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1375 1375 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1376 1376
1377 1377 if remote_ids:
1378 1378 ar = AsyncHubResult(self, msg_ids=theids)
1379 1379 else:
1380 1380 ar = AsyncResult(self, msg_ids=theids)
1381 1381
1382 1382 if block:
1383 1383 ar.wait()
1384 1384
1385 1385 return ar
1386 1386
1387 1387 @spinfirst
1388 1388 def result_status(self, msg_ids, status_only=True):
1389 1389 """Check on the status of the result(s) of the apply request with `msg_ids`.
1390 1390
1391 1391 If status_only is False, then the actual results will be retrieved, else
1392 1392 only the status of the results will be checked.
1393 1393
1394 1394 Parameters
1395 1395 ----------
1396 1396
1397 1397 msg_ids : list of msg_ids
1398 1398 if int:
1399 1399 Passed as index to self.history for convenience.
1400 1400 status_only : bool (default: True)
1401 1401 if False:
1402 1402 Retrieve the actual results of completed tasks.
1403 1403
1404 1404 Returns
1405 1405 -------
1406 1406
1407 1407 results : dict
1408 1408 There will always be the keys 'pending' and 'completed', which will
1409 1409 be lists of msg_ids that are incomplete or complete. If `status_only`
1410 1410 is False, then completed results will be keyed by their `msg_id`.
1411 1411 """
1412 1412 if not isinstance(msg_ids, (list,tuple)):
1413 1413 msg_ids = [msg_ids]
1414 1414
1415 1415 theids = []
1416 1416 for msg_id in msg_ids:
1417 1417 if isinstance(msg_id, int):
1418 1418 msg_id = self.history[msg_id]
1419 1419 if not isinstance(msg_id, basestring):
1420 1420 raise TypeError("msg_ids must be str, not %r"%msg_id)
1421 1421 theids.append(msg_id)
1422 1422
1423 1423 completed = []
1424 1424 local_results = {}
1425 1425
1426 1426 # comment this block out to temporarily disable local shortcut:
1427 1427 for msg_id in theids:
1428 1428 if msg_id in self.results:
1429 1429 completed.append(msg_id)
1430 1430 local_results[msg_id] = self.results[msg_id]
1431 1431 theids.remove(msg_id)
1432 1432
1433 1433 if theids: # some not locally cached
1434 1434 content = dict(msg_ids=theids, status_only=status_only)
1435 1435 msg = self.session.send(self._query_socket, "result_request", content=content)
1436 1436 zmq.select([self._query_socket], [], [])
1437 1437 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1438 1438 if self.debug:
1439 1439 pprint(msg)
1440 1440 content = msg['content']
1441 1441 if content['status'] != 'ok':
1442 1442 raise self._unwrap_exception(content)
1443 1443 buffers = msg['buffers']
1444 1444 else:
1445 1445 content = dict(completed=[],pending=[])
1446 1446
1447 1447 content['completed'].extend(completed)
1448 1448
1449 1449 if status_only:
1450 1450 return content
1451 1451
1452 1452 failures = []
1453 1453 # load cached results into result:
1454 1454 content.update(local_results)
1455 1455 # update cache with results:
1456 1456 for msg_id in sorted(theids):
1457 1457 if msg_id in content['completed']:
1458 1458 rec = content[msg_id]
1459 1459 parent = rec['header']
1460 1460 header = rec['result_header']
1461 1461 rcontent = rec['result_content']
1462 1462 iodict = rec['io']
1463 1463 if isinstance(rcontent, str):
1464 1464 rcontent = self.session.unpack(rcontent)
1465 1465
1466 1466 md = self.metadata[msg_id]
1467 1467 md.update(self._extract_metadata(header, parent, rcontent))
1468 1468 md.update(iodict)
1469 1469
1470 1470 if rcontent['status'] == 'ok':
1471 1471 res,buffers = util.unserialize_object(buffers)
1472 1472 else:
1473 1473 print rcontent
1474 1474 res = self._unwrap_exception(rcontent)
1475 1475 failures.append(res)
1476 1476
1477 1477 self.results[msg_id] = res
1478 1478 content[msg_id] = res
1479 1479
1480 1480 if len(theids) == 1 and failures:
1481 1481 raise failures[0]
1482 1482
1483 1483 error.collect_exceptions(failures, "result_status")
1484 1484 return content
1485 1485
1486 1486 @spinfirst
1487 1487 def queue_status(self, targets='all', verbose=False):
1488 1488 """Fetch the status of engine queues.
1489 1489
1490 1490 Parameters
1491 1491 ----------
1492 1492
1493 1493 targets : int/str/list of ints/strs
1494 1494 the engines whose states are to be queried.
1495 1495 default : all
1496 1496 verbose : bool
1497 1497 Whether to return lengths only, or lists of ids for each element
1498 1498 """
1499 1499 targets = self._build_targets(targets)[1]
1500 1500 content = dict(targets=targets, verbose=verbose)
1501 1501 self.session.send(self._query_socket, "queue_request", content=content)
1502 1502 idents,msg = self.session.recv(self._query_socket, 0)
1503 1503 if self.debug:
1504 1504 pprint(msg)
1505 1505 content = msg['content']
1506 1506 status = content.pop('status')
1507 1507 if status != 'ok':
1508 1508 raise self._unwrap_exception(content)
1509 1509 return util.rekey(content)
1510 1510
1511 1511 @spinfirst
1512 1512 def purge_results(self, jobs=[], targets=[]):
1513 1513 """Tell the controller to forget results.
1514 1514
1515 1515 Individual results can be purged by msg_id, or the entire
1516 1516 history of specific targets can be purged.
1517 1517
1518 1518 Parameters
1519 1519 ----------
1520 1520
1521 1521 jobs : str or list of strs or AsyncResult objects
1522 1522 the msg_ids whose results should be forgotten.
1523 1523 targets : int/str/list of ints/strs
1524 1524 The targets, by uuid or int_id, whose entire history is to be purged.
1525 1525 Use `targets='all'` to scrub everything from the controller's memory.
1526 1526
1527 1527 default : None
1528 1528 """
1529 1529 if not targets and not jobs:
1530 1530 raise ValueError("Must specify at least one of `targets` and `jobs`")
1531 1531 if targets:
1532 1532 targets = self._build_targets(targets)[1]
1533 1533
1534 1534 # construct msg_ids from jobs
1535 1535 msg_ids = []
1536 1536 if isinstance(jobs, (basestring,AsyncResult)):
1537 1537 jobs = [jobs]
1538 1538 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1539 1539 if bad_ids:
1540 1540 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1541 1541 for j in jobs:
1542 1542 if isinstance(j, AsyncResult):
1543 1543 msg_ids.extend(j.msg_ids)
1544 1544 else:
1545 1545 msg_ids.append(j)
1546 1546
1547 1547 content = dict(targets=targets, msg_ids=msg_ids)
1548 1548 self.session.send(self._query_socket, "purge_request", content=content)
1549 1549 idents, msg = self.session.recv(self._query_socket, 0)
1550 1550 if self.debug:
1551 1551 pprint(msg)
1552 1552 content = msg['content']
1553 1553 if content['status'] != 'ok':
1554 1554 raise self._unwrap_exception(content)
1555 1555
1556 1556
1557 1557 __all__ = [ 'Client',
1558 1558 'depend',
1559 1559 'require',
1560 1560 'remote',
1561 1561 'parallel',
1562 1562 'RemoteFunction',
1563 1563 'ParallelFunction',
1564 1564 'DirectView',
1565 1565 'LoadBalancedView',
1566 1566 'AsyncResult',
1567 1567 'AsyncMapResult',
1568 1568 'Reference'
1569 1569 ]
@@ -1,203 +1,203 b''
1 1 """Remote Functions and decorators for the client."""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 import warnings
14 14
15 15 from IPython.testing import decorators as testdec
16 16
17 17 from . import map as Map
18 18 from .asyncresult import AsyncMapResult
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Decorators
22 22 #-----------------------------------------------------------------------------
23 23
24 24 @testdec.skip_doctest
25 def remote(client, bound=True, block=None, targets=None, balanced=None):
25 def remote(client, bound=False, block=None, targets=None, balanced=None):
26 26 """Turn a function into a remote function.
27 27
28 28 This method can be used for map:
29 29
30 30 In [1]: @remote(client,block=True)
31 31 ...: def func(a):
32 32 ...: pass
33 33 """
34 34
35 35 def remote_function(f):
36 36 return RemoteFunction(client, f, bound, block, targets, balanced)
37 37 return remote_function
38 38
39 39 @testdec.skip_doctest
40 def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None):
40 def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None):
41 41 """Turn a function into a parallel remote function.
42 42
43 43 This method can be used for map:
44 44
45 45 In [1]: @parallel(client,block=True)
46 46 ...: def func(a):
47 47 ...: pass
48 48 """
49 49
50 50 def parallel_function(f):
51 51 return ParallelFunction(client, f, dist, bound, block, targets, balanced)
52 52 return parallel_function
53 53
54 54 #--------------------------------------------------------------------------
55 55 # Classes
56 56 #--------------------------------------------------------------------------
57 57
58 58 class RemoteFunction(object):
59 59 """Turn an existing function into a remote function.
60 60
61 61 Parameters
62 62 ----------
63 63
64 64 client : Client instance
65 65 The client to be used to connect to engines
66 66 f : callable
67 67 The function to be wrapped into a remote function
68 68 bound : bool [default: False]
69 69 Whether the affect the remote namespace when called
70 70 block : bool [default: None]
71 71 Whether to wait for results or not. The default behavior is
72 72 to use the current `block` attribute of `client`
73 73 targets : valid target list [default: all]
74 74 The targets on which to execute.
75 75 balanced : bool
76 76 Whether to load-balance with the Task scheduler or not
77 77 """
78 78
79 79 client = None # the remote connection
80 80 func = None # the wrapped function
81 81 block = None # whether to block
82 82 bound = None # whether to affect the namespace
83 83 targets = None # where to execute
84 84 balanced = None # whether to load-balance
85 85
86 86 def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None):
87 87 self.client = client
88 88 self.func = f
89 89 self.block=block
90 90 self.bound=bound
91 91 self.targets=targets
92 92 if balanced is None:
93 93 if targets is None:
94 94 balanced = True
95 95 else:
96 96 balanced = False
97 97 self.balanced = balanced
98 98
99 99 def __call__(self, *args, **kwargs):
100 100 return self.client.apply(self.func, args=args, kwargs=kwargs,
101 101 block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced)
102 102
103 103
104 104 class ParallelFunction(RemoteFunction):
105 105 """Class for mapping a function to sequences.
106 106
107 107 This will distribute the sequences according the a mapper, and call
108 108 the function on each sub-sequence. If called via map, then the function
109 109 will be called once on each element, rather that each sub-sequence.
110 110
111 111 Parameters
112 112 ----------
113 113
114 114 client : Client instance
115 115 The client to be used to connect to engines
116 116 f : callable
117 117 The function to be wrapped into a remote function
118 118 bound : bool [default: False]
119 119 Whether the affect the remote namespace when called
120 120 block : bool [default: None]
121 121 Whether to wait for results or not. The default behavior is
122 122 to use the current `block` attribute of `client`
123 123 targets : valid target list [default: all]
124 124 The targets on which to execute.
125 125 balanced : bool
126 126 Whether to load-balance with the Task scheduler or not
127 127 chunk_size : int or None
128 128 The size of chunk to use when breaking up sequences in a load-balanced manner
129 129 """
130 130 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None, chunk_size=None):
131 131 super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced)
132 132 self.chunk_size = chunk_size
133 133
134 134 mapClass = Map.dists[dist]
135 135 self.mapObject = mapClass()
136 136
137 137 def __call__(self, *sequences):
138 138 len_0 = len(sequences[0])
139 139 for s in sequences:
140 140 if len(s)!=len_0:
141 141 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
142 142 raise ValueError(msg)
143 143
144 144 if self.balanced:
145 145 if self.chunk_size:
146 146 nparts = len_0/self.chunk_size + int(len_0%self.chunk_size > 0)
147 147 else:
148 148 nparts = len_0
149 149 targets = [self.targets]*nparts
150 150 else:
151 151 if self.chunk_size:
152 152 warnings.warn("`chunk_size` is ignored when `balanced=False", UserWarning)
153 153 # multiplexed:
154 154 targets = self.client._build_targets(self.targets)[-1]
155 155 nparts = len(targets)
156 156
157 157 msg_ids = []
158 158 # my_f = lambda *a: map(self.func, *a)
159 159 for index, t in enumerate(targets):
160 160 args = []
161 161 for seq in sequences:
162 162 part = self.mapObject.getPartition(seq, index, nparts)
163 163 if len(part) == 0:
164 164 continue
165 165 else:
166 166 args.append(part)
167 167 if not args:
168 168 continue
169 169
170 170 # print (args)
171 171 if hasattr(self, '_map'):
172 172 f = map
173 173 args = [self.func]+args
174 174 else:
175 175 f=self.func
176 176 ar = self.client.apply(f, args=args, block=False, bound=self.bound,
177 177 targets=t, balanced=self.balanced)
178 178
179 179 msg_ids.append(ar.msg_ids[0])
180 180
181 181 r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
182 182 if self.block:
183 183 try:
184 184 return r.get()
185 185 except KeyboardInterrupt:
186 186 return r
187 187 else:
188 188 return r
189 189
190 190 def map(self, *sequences):
191 191 """call a function on each element of a sequence remotely.
192 192 This should behave very much like the builtin map, but return an AsyncMapResult
193 193 if self.block is False.
194 194 """
195 195 # set _map as a flag for use inside self.__call__
196 196 self._map = True
197 197 try:
198 198 ret = self.__call__(*sequences)
199 199 finally:
200 200 del self._map
201 201 return ret
202 202
203 203 __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction'] No newline at end of file
@@ -1,484 +1,487 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 Kernel adapted from kernel.py to use ZMQ Streams
4 4 """
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Imports
8 8 #-----------------------------------------------------------------------------
9 9
10 10 # Standard library imports.
11 11 from __future__ import print_function
12 12
13 13 import sys
14 14 import time
15 15
16 16 from code import CommandCompiler
17 17 from datetime import datetime
18 18 from pprint import pprint
19 19 from signal import SIGTERM, SIGKILL
20 20
21 21 # System library imports.
22 22 import zmq
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 # Local imports.
26 26 from IPython.core import ultratb
27 27 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str
28 28 from IPython.zmq.completer import KernelCompleter
29 29 from IPython.zmq.iostream import OutStream
30 30 from IPython.zmq.displayhook import DisplayHook
31 31
32 32 from . import heartmonitor
33 33 from .client import Client
34 34 from .error import wrap_exception
35 35 from .factory import SessionFactory
36 36 from .streamsession import StreamSession
37 from .util import serialize_object, unpack_apply_message, ISO8601
37 from .util import serialize_object, unpack_apply_message, ISO8601, Namespace
38 38
39 39 def printer(*args):
40 40 pprint(args, stream=sys.__stdout__)
41 41
42 42
43 43 class _Passer:
44 44 """Empty class that implements `send()` that does nothing."""
45 45 def send(self, *args, **kwargs):
46 46 pass
47 47 send_multipart = send
48 48
49 49
50 50 #-----------------------------------------------------------------------------
51 51 # Main kernel class
52 52 #-----------------------------------------------------------------------------
53 53
54 54 class Kernel(SessionFactory):
55 55
56 56 #---------------------------------------------------------------------------
57 57 # Kernel interface
58 58 #---------------------------------------------------------------------------
59 59
60 60 # kwargs:
61 61 int_id = Int(-1, config=True)
62 62 user_ns = Dict(config=True)
63 63 exec_lines = List(config=True)
64 64
65 65 control_stream = Instance(zmqstream.ZMQStream)
66 66 task_stream = Instance(zmqstream.ZMQStream)
67 67 iopub_stream = Instance(zmqstream.ZMQStream)
68 68 client = Instance('IPython.zmq.parallel.client.Client')
69 69
70 70 # internals
71 71 shell_streams = List()
72 72 compiler = Instance(CommandCompiler, (), {})
73 73 completer = Instance(KernelCompleter)
74 74
75 75 aborted = Set()
76 76 shell_handlers = Dict()
77 77 control_handlers = Dict()
78 78
79 79 def _set_prefix(self):
80 80 self.prefix = "engine.%s"%self.int_id
81 81
82 82 def _connect_completer(self):
83 83 self.completer = KernelCompleter(self.user_ns)
84 84
85 85 def __init__(self, **kwargs):
86 86 super(Kernel, self).__init__(**kwargs)
87 87 self._set_prefix()
88 88 self._connect_completer()
89 89
90 90 self.on_trait_change(self._set_prefix, 'id')
91 91 self.on_trait_change(self._connect_completer, 'user_ns')
92 92
93 93 # Build dict of handlers for message types
94 94 for msg_type in ['execute_request', 'complete_request', 'apply_request',
95 95 'clear_request']:
96 96 self.shell_handlers[msg_type] = getattr(self, msg_type)
97 97
98 98 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
99 99 self.control_handlers[msg_type] = getattr(self, msg_type)
100 100
101 101 self._initial_exec_lines()
102 102
103 103 def _wrap_exception(self, method=None):
104 104 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
105 105 content=wrap_exception(e_info)
106 106 return content
107 107
108 108 def _initial_exec_lines(self):
109 109 s = _Passer()
110 110 content = dict(silent=True, user_variable=[],user_expressions=[])
111 111 for line in self.exec_lines:
112 112 self.log.debug("executing initialization: %s"%line)
113 113 content.update({'code':line})
114 114 msg = self.session.msg('execute_request', content)
115 115 self.execute_request(s, [], msg)
116 116
117 117
118 118 #-------------------- control handlers -----------------------------
119 119 def abort_queues(self):
120 120 for stream in self.shell_streams:
121 121 if stream:
122 122 self.abort_queue(stream)
123 123
124 124 def abort_queue(self, stream):
125 125 while True:
126 126 try:
127 127 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
128 128 except zmq.ZMQError as e:
129 129 if e.errno == zmq.EAGAIN:
130 130 break
131 131 else:
132 132 return
133 133 else:
134 134 if msg is None:
135 135 return
136 136 else:
137 137 idents,msg = msg
138 138
139 139 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
140 140 # msg = self.reply_socket.recv_json()
141 141 self.log.info("Aborting:")
142 142 self.log.info(str(msg))
143 143 msg_type = msg['msg_type']
144 144 reply_type = msg_type.split('_')[0] + '_reply'
145 145 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
146 146 # self.reply_socket.send(ident,zmq.SNDMORE)
147 147 # self.reply_socket.send_json(reply_msg)
148 148 reply_msg = self.session.send(stream, reply_type,
149 149 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
150 150 self.log.debug(str(reply_msg))
151 151 # We need to wait a bit for requests to come in. This can probably
152 152 # be set shorter for true asynchronous clients.
153 153 time.sleep(0.05)
154 154
155 155 def abort_request(self, stream, ident, parent):
156 156 """abort a specifig msg by id"""
157 157 msg_ids = parent['content'].get('msg_ids', None)
158 158 if isinstance(msg_ids, basestring):
159 159 msg_ids = [msg_ids]
160 160 if not msg_ids:
161 161 self.abort_queues()
162 162 for mid in msg_ids:
163 163 self.aborted.add(str(mid))
164 164
165 165 content = dict(status='ok')
166 166 reply_msg = self.session.send(stream, 'abort_reply', content=content,
167 167 parent=parent, ident=ident)[0]
168 168 self.log.debug(str(reply_msg))
169 169
170 170 def shutdown_request(self, stream, ident, parent):
171 171 """kill ourself. This should really be handled in an external process"""
172 172 try:
173 173 self.abort_queues()
174 174 except:
175 175 content = self._wrap_exception('shutdown')
176 176 else:
177 177 content = dict(parent['content'])
178 178 content['status'] = 'ok'
179 179 msg = self.session.send(stream, 'shutdown_reply',
180 180 content=content, parent=parent, ident=ident)
181 181 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
182 182 # content, parent, ident)
183 183 # print >> sys.__stdout__, msg
184 184 # time.sleep(0.2)
185 185 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
186 186 dc.start()
187 187
188 188 def dispatch_control(self, msg):
189 189 idents,msg = self.session.feed_identities(msg, copy=False)
190 190 try:
191 191 msg = self.session.unpack_message(msg, content=True, copy=False)
192 192 except:
193 193 self.log.error("Invalid Message", exc_info=True)
194 194 return
195 195
196 196 header = msg['header']
197 197 msg_id = header['msg_id']
198 198
199 199 handler = self.control_handlers.get(msg['msg_type'], None)
200 200 if handler is None:
201 201 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
202 202 else:
203 203 handler(self.control_stream, idents, msg)
204 204
205 205
206 206 #-------------------- queue helpers ------------------------------
207 207
208 208 def check_dependencies(self, dependencies):
209 209 if not dependencies:
210 210 return True
211 211 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
212 212 anyorall = dependencies[0]
213 213 dependencies = dependencies[1]
214 214 else:
215 215 anyorall = 'all'
216 216 results = self.client.get_results(dependencies,status_only=True)
217 217 if results['status'] != 'ok':
218 218 return False
219 219
220 220 if anyorall == 'any':
221 221 if not results['completed']:
222 222 return False
223 223 else:
224 224 if results['pending']:
225 225 return False
226 226
227 227 return True
228 228
229 229 def check_aborted(self, msg_id):
230 230 return msg_id in self.aborted
231 231
232 232 #-------------------- queue handlers -----------------------------
233 233
234 234 def clear_request(self, stream, idents, parent):
235 235 """Clear our namespace."""
236 236 self.user_ns = {}
237 237 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
238 238 content = dict(status='ok'))
239 239 self._initial_exec_lines()
240 240
241 241 def execute_request(self, stream, ident, parent):
242 242 self.log.debug('execute request %s'%parent)
243 243 try:
244 244 code = parent[u'content'][u'code']
245 245 except:
246 246 self.log.error("Got bad msg: %s"%parent, exc_info=True)
247 247 return
248 248 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
249 249 ident='%s.pyin'%self.prefix)
250 250 started = datetime.now().strftime(ISO8601)
251 251 try:
252 252 comp_code = self.compiler(code, '<zmq-kernel>')
253 253 # allow for not overriding displayhook
254 254 if hasattr(sys.displayhook, 'set_parent'):
255 255 sys.displayhook.set_parent(parent)
256 256 sys.stdout.set_parent(parent)
257 257 sys.stderr.set_parent(parent)
258 258 exec comp_code in self.user_ns, self.user_ns
259 259 except:
260 260 exc_content = self._wrap_exception('execute')
261 261 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
262 262 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
263 263 ident='%s.pyerr'%self.prefix)
264 264 reply_content = exc_content
265 265 else:
266 266 reply_content = {'status' : 'ok'}
267 267
268 268 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
269 269 ident=ident, subheader = dict(started=started))
270 270 self.log.debug(str(reply_msg))
271 271 if reply_msg['content']['status'] == u'error':
272 272 self.abort_queues()
273 273
274 274 def complete_request(self, stream, ident, parent):
275 275 matches = {'matches' : self.complete(parent),
276 276 'status' : 'ok'}
277 277 completion_msg = self.session.send(stream, 'complete_reply',
278 278 matches, parent, ident)
279 279 # print >> sys.__stdout__, completion_msg
280 280
281 281 def complete(self, msg):
282 282 return self.completer.complete(msg.content.line, msg.content.text)
283 283
284 284 def apply_request(self, stream, ident, parent):
285 285 # flush previous reply, so this request won't block it
286 286 stream.flush(zmq.POLLOUT)
287 287
288 288 try:
289 289 content = parent[u'content']
290 290 bufs = parent[u'buffers']
291 291 msg_id = parent['header']['msg_id']
292 292 bound = content.get('bound', False)
293 293 except:
294 294 self.log.error("Got bad msg: %s"%parent, exc_info=True)
295 295 return
296 296 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
297 297 # self.iopub_stream.send(pyin_msg)
298 298 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
299 299 sub = {'dependencies_met' : True, 'engine' : self.ident,
300 300 'started': datetime.now().strftime(ISO8601)}
301 301 try:
302 302 # allow for not overriding displayhook
303 303 if hasattr(sys.displayhook, 'set_parent'):
304 304 sys.displayhook.set_parent(parent)
305 305 sys.stdout.set_parent(parent)
306 306 sys.stderr.set_parent(parent)
307 307 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
308 if bound:
309 working = self.user_ns
310 suffix = str(msg_id).replace("-","")
311 prefix = "_"
312
313 else:
314 working = dict()
315 suffix = prefix = "_" # prevent keyword collisions with lambda
308 working = self.user_ns
309 # suffix =
310 prefix = "_"+str(msg_id).replace("-","")+"_"
311 # if bound:
312 #
313 # else:
314 # working = dict()
315 # suffix = prefix = "_" # prevent keyword collisions with lambda
316 316 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
317 if bound:
318 bound_ns = Namespace(working)
319 args = [bound_ns]+list(args)
317 320 # if f.fun
318 321 fname = getattr(f, '__name__', 'f')
319 322
320 fname = prefix+fname.strip('<>')+suffix
321 argname = prefix+"args"+suffix
322 kwargname = prefix+"kwargs"+suffix
323 resultname = prefix+"result"+suffix
323 fname = prefix+"f"
324 argname = prefix+"args"
325 kwargname = prefix+"kwargs"
326 resultname = prefix+"result"
324 327
325 ns = { fname : f, argname : args, kwargname : kwargs }
328 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
326 329 # print ns
327 330 working.update(ns)
328 331 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
329 exec code in working, working
330 result = working.get(resultname)
331 # clear the namespace
332 if bound:
332 try:
333 exec code in working,working
334 result = working.get(resultname)
335 finally:
333 336 for key in ns.iterkeys():
334 self.user_ns.pop(key)
335 else:
336 del working
337 working.pop(key)
338 if bound:
339 working.update(bound_ns)
337 340
338 341 packed_result,buf = serialize_object(result)
339 342 result_buf = [packed_result]+buf
340 343 except:
341 344 exc_content = self._wrap_exception('apply')
342 345 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
343 346 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
344 347 ident='%s.pyerr'%self.prefix)
345 348 reply_content = exc_content
346 349 result_buf = []
347 350
348 351 if exc_content['ename'] == 'UnmetDependency':
349 352 sub['dependencies_met'] = False
350 353 else:
351 354 reply_content = {'status' : 'ok'}
352 355
353 356 # put 'ok'/'error' status in header, for scheduler introspection:
354 357 sub['status'] = reply_content['status']
355 358
356 359 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
357 360 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
358 361
359 362 # if reply_msg['content']['status'] == u'error':
360 363 # self.abort_queues()
361 364
362 365 def dispatch_queue(self, stream, msg):
363 366 self.control_stream.flush()
364 367 idents,msg = self.session.feed_identities(msg, copy=False)
365 368 try:
366 369 msg = self.session.unpack_message(msg, content=True, copy=False)
367 370 except:
368 371 self.log.error("Invalid Message", exc_info=True)
369 372 return
370 373
371 374
372 375 header = msg['header']
373 376 msg_id = header['msg_id']
374 377 if self.check_aborted(msg_id):
375 378 self.aborted.remove(msg_id)
376 379 # is it safe to assume a msg_id will not be resubmitted?
377 380 reply_type = msg['msg_type'].split('_')[0] + '_reply'
378 381 reply_msg = self.session.send(stream, reply_type,
379 382 content={'status' : 'aborted'}, parent=msg, ident=idents)
380 383 return
381 384 handler = self.shell_handlers.get(msg['msg_type'], None)
382 385 if handler is None:
383 386 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
384 387 else:
385 388 handler(stream, idents, msg)
386 389
387 390 def start(self):
388 391 #### stream mode:
389 392 if self.control_stream:
390 393 self.control_stream.on_recv(self.dispatch_control, copy=False)
391 394 self.control_stream.on_err(printer)
392 395
393 396 def make_dispatcher(stream):
394 397 def dispatcher(msg):
395 398 return self.dispatch_queue(stream, msg)
396 399 return dispatcher
397 400
398 401 for s in self.shell_streams:
399 402 s.on_recv(make_dispatcher(s), copy=False)
400 403 s.on_err(printer)
401 404
402 405 if self.iopub_stream:
403 406 self.iopub_stream.on_err(printer)
404 407
405 408 #### while True mode:
406 409 # while True:
407 410 # idle = True
408 411 # try:
409 412 # msg = self.shell_stream.socket.recv_multipart(
410 413 # zmq.NOBLOCK, copy=False)
411 414 # except zmq.ZMQError, e:
412 415 # if e.errno != zmq.EAGAIN:
413 416 # raise e
414 417 # else:
415 418 # idle=False
416 419 # self.dispatch_queue(self.shell_stream, msg)
417 420 #
418 421 # if not self.task_stream.empty():
419 422 # idle=False
420 423 # msg = self.task_stream.recv_multipart()
421 424 # self.dispatch_queue(self.task_stream, msg)
422 425 # if idle:
423 426 # # don't busywait
424 427 # time.sleep(1e-3)
425 428
426 429 def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
427 430 client_addr=None, loop=None, context=None, key=None,
428 431 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
429 432 """NO LONGER IN USE"""
430 433 # create loop, context, and session:
431 434 if loop is None:
432 435 loop = ioloop.IOLoop.instance()
433 436 if context is None:
434 437 context = zmq.Context()
435 438 c = context
436 439 session = StreamSession(key=key)
437 440 # print (session.key)
438 441 # print (control_addr, shell_addrs, iopub_addr, hb_addrs)
439 442
440 443 # create Control Stream
441 444 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
442 445 control_stream.setsockopt(zmq.IDENTITY, identity)
443 446 control_stream.connect(control_addr)
444 447
445 448 # create Shell Streams (MUX, Task, etc.):
446 449 shell_streams = []
447 450 for addr in shell_addrs:
448 451 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
449 452 stream.setsockopt(zmq.IDENTITY, identity)
450 453 stream.connect(addr)
451 454 shell_streams.append(stream)
452 455
453 456 # create iopub stream:
454 457 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
455 458 iopub_stream.setsockopt(zmq.IDENTITY, identity)
456 459 iopub_stream.connect(iopub_addr)
457 460
458 461 # Redirect input streams and set a display hook.
459 462 if out_stream_factory:
460 463 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
461 464 sys.stdout.topic = 'engine.%i.stdout'%int_id
462 465 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
463 466 sys.stderr.topic = 'engine.%i.stderr'%int_id
464 467 if display_hook_factory:
465 468 sys.displayhook = display_hook_factory(session, iopub_stream)
466 469 sys.displayhook.topic = 'engine.%i.pyout'%int_id
467 470
468 471
469 472 # launch heartbeat
470 473 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
471 474 heart.start()
472 475
473 476 # create (optional) Client
474 477 if client_addr:
475 478 client = Client(client_addr, username=identity)
476 479 else:
477 480 client = None
478 481
479 482 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
480 483 shell_streams=shell_streams, iopub_stream=iopub_stream,
481 484 client=client, loop=loop)
482 485 kernel.start()
483 486 return loop, c, kernel
484 487
@@ -1,46 +1,48 b''
1 1 """toplevel setup/teardown for parallel tests."""
2 2
3 3 import tempfile
4 4 import time
5 5 from subprocess import Popen, PIPE, STDOUT
6 6
7 from IPython.zmq.parallel.ipcluster import launch_process
8 from IPython.zmq.parallel.entry_point import select_random_ports
7 from IPython.zmq.parallel import client
9 8
10 9 processes = []
11 10 blackhole = tempfile.TemporaryFile()
12 11
13 12 # nose setup/teardown
14 13
15 14 def setup():
16 15 cp = Popen('ipcontrollerz --profile iptest -r --log-level 40'.split(), stdout=blackhole, stderr=STDOUT)
17 16 processes.append(cp)
18 17 time.sleep(.5)
19 18 add_engine()
20 time.sleep(2)
19 c = client.Client(profile='iptest')
20 while not c.ids:
21 time.sleep(.1)
22 c.spin()
21 23
22 24 def add_engine(profile='iptest'):
23 25 ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '40'], stdout=blackhole, stderr=STDOUT)
24 26 # ep.start()
25 27 processes.append(ep)
26 28 return ep
27 29
28 30 def teardown():
29 31 time.sleep(1)
30 32 while processes:
31 33 p = processes.pop()
32 34 if p.poll() is None:
33 35 try:
34 36 p.terminate()
35 37 except Exception, e:
36 38 print e
37 39 pass
38 40 if p.poll() is None:
39 41 time.sleep(.25)
40 42 if p.poll() is None:
41 43 try:
42 44 print 'killing'
43 45 p.kill()
44 46 except:
45 print "couldn't shutdown process: ",p
47 print "couldn't shutdown process: ", p
46 48
@@ -1,100 +1,100 b''
1 1 import time
2 2 from signal import SIGINT
3 3 from multiprocessing import Process
4 4
5 5 from nose import SkipTest
6 6
7 7 from zmq.tests import BaseZMQTestCase
8 8
9 9 from IPython.external.decorator import decorator
10 10
11 11 from IPython.zmq.parallel import error
12 12 from IPython.zmq.parallel.client import Client
13 13 from IPython.zmq.parallel.ipcluster import launch_process
14 14 from IPython.zmq.parallel.entry_point import select_random_ports
15 15 from IPython.zmq.parallel.tests import processes,add_engine
16 16
17 17 # simple tasks for use in apply tests
18 18
19 19 def segfault():
20 20 """this will segfault"""
21 21 import ctypes
22 22 ctypes.memset(-1,0,1)
23 23
24 24 def wait(n):
25 25 """sleep for a time"""
26 26 import time
27 27 time.sleep(n)
28 28 return n
29 29
30 30 def raiser(eclass):
31 31 """raise an exception"""
32 32 raise eclass()
33 33
34 34 # test decorator for skipping tests when libraries are unavailable
35 35 def skip_without(*names):
36 36 """skip a test if some names are not importable"""
37 37 @decorator
38 38 def skip_without_names(f, *args, **kwargs):
39 39 """decorator to skip tests in the absence of numpy."""
40 40 for name in names:
41 41 try:
42 42 __import__(name)
43 43 except ImportError:
44 44 raise SkipTest
45 45 return f(*args, **kwargs)
46 46 return skip_without_names
47 47
48 48
49 49 class ClusterTestCase(BaseZMQTestCase):
50 50
51 51 def add_engines(self, n=1, block=True):
52 52 """add multiple engines to our cluster"""
53 53 for i in range(n):
54 54 self.engines.append(add_engine())
55 55 if block:
56 56 self.wait_on_engines()
57 57
58 58 def wait_on_engines(self, timeout=5):
59 59 """wait for our engines to connect."""
60 60 n = len(self.engines)+self.base_engine_count
61 61 tic = time.time()
62 62 while time.time()-tic < timeout and len(self.client.ids) < n:
63 63 time.sleep(0.1)
64 64
65 65 assert not self.client.ids < n, "waiting for engines timed out"
66 66
67 67 def connect_client(self):
68 68 """connect a client with my Context, and track its sockets for cleanup"""
69 69 c = Client(profile='iptest',context=self.context)
70 70 for name in filter(lambda n:n.endswith('socket'), dir(c)):
71 71 self.sockets.append(getattr(c, name))
72 72 return c
73 73
74 74 def assertRaisesRemote(self, etype, f, *args, **kwargs):
75 75 try:
76 76 try:
77 77 f(*args, **kwargs)
78 78 except error.CompositeError as e:
79 79 e.raise_exception()
80 80 except error.RemoteError as e:
81 81 self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(e.ename, etype.__name__))
82 82 else:
83 83 self.fail("should have raised a RemoteError")
84 84
85 85 def setUp(self):
86 86 BaseZMQTestCase.setUp(self)
87 87 self.client = self.connect_client()
88 88 self.base_engine_count=len(self.client.ids)
89 89 self.engines=[]
90 90
91 91 def tearDown(self):
92 92 self.client.close()
93 93 BaseZMQTestCase.tearDown(self)
94 # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
95 # [ e.wait() for e in self.engines ]
94 # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
95 # [ e.wait() for e in self.engines ]
96 96 # while len(self.client.ids) > self.base_engine_count:
97 97 # time.sleep(.1)
98 98 # del self.engines
99 99 # BaseZMQTestCase.tearDown(self)
100 100 No newline at end of file
@@ -1,252 +1,262 b''
1 1 import time
2 2 from tempfile import mktemp
3 3
4 4 import nose.tools as nt
5 5 import zmq
6 6
7 7 from IPython.zmq.parallel import client as clientmod
8 8 from IPython.zmq.parallel import error
9 9 from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult
10 10 from IPython.zmq.parallel.view import LoadBalancedView, DirectView
11 11
12 12 from clienttest import ClusterTestCase, segfault, wait
13 13
14 14 class TestClient(ClusterTestCase):
15 15
16 16 def test_ids(self):
17 17 n = len(self.client.ids)
18 18 self.add_engines(3)
19 19 self.assertEquals(len(self.client.ids), n+3)
20 20 self.assertTrue
21 21
22 22 def test_segfault_task(self):
23 23 """test graceful handling of engine death (balanced)"""
24 24 self.add_engines(1)
25 25 ar = self.client.apply(segfault, block=False)
26 26 self.assertRaisesRemote(error.EngineError, ar.get)
27 27 eid = ar.engine_id
28 28 while eid in self.client.ids:
29 29 time.sleep(.01)
30 30 self.client.spin()
31 31
32 32 def test_segfault_mux(self):
33 33 """test graceful handling of engine death (direct)"""
34 34 self.add_engines(1)
35 35 eid = self.client.ids[-1]
36 36 ar = self.client[eid].apply_async(segfault)
37 37 self.assertRaisesRemote(error.EngineError, ar.get)
38 38 eid = ar.engine_id
39 39 while eid in self.client.ids:
40 40 time.sleep(.01)
41 41 self.client.spin()
42 42
43 43 def test_view_indexing(self):
44 44 """test index access for views"""
45 45 self.add_engines(2)
46 46 targets = self.client._build_targets('all')[-1]
47 47 v = self.client[:]
48 48 self.assertEquals(v.targets, targets)
49 49 t = self.client.ids[2]
50 50 v = self.client[t]
51 51 self.assert_(isinstance(v, DirectView))
52 52 self.assertEquals(v.targets, t)
53 53 t = self.client.ids[2:4]
54 54 v = self.client[t]
55 55 self.assert_(isinstance(v, DirectView))
56 56 self.assertEquals(v.targets, t)
57 57 v = self.client[::2]
58 58 self.assert_(isinstance(v, DirectView))
59 59 self.assertEquals(v.targets, targets[::2])
60 60 v = self.client[1::3]
61 61 self.assert_(isinstance(v, DirectView))
62 62 self.assertEquals(v.targets, targets[1::3])
63 63 v = self.client[:-3]
64 64 self.assert_(isinstance(v, DirectView))
65 65 self.assertEquals(v.targets, targets[:-3])
66 66 v = self.client[-1]
67 67 self.assert_(isinstance(v, DirectView))
68 68 self.assertEquals(v.targets, targets[-1])
69 69 nt.assert_raises(TypeError, lambda : self.client[None])
70 70
71 71 def test_view_cache(self):
72 72 """test that multiple view requests return the same object"""
73 73 v = self.client[:2]
74 74 v2 =self.client[:2]
75 75 self.assertTrue(v is v2)
76 76 v = self.client.view()
77 77 v2 = self.client.view(balanced=True)
78 78 self.assertTrue(v is v2)
79 79
80 80 def test_targets(self):
81 81 """test various valid targets arguments"""
82 82 build = self.client._build_targets
83 83 ids = self.client.ids
84 84 idents,targets = build(None)
85 85 self.assertEquals(ids, targets)
86 86
87 87 def test_clear(self):
88 88 """test clear behavior"""
89 89 self.add_engines(2)
90 90 self.client.block=True
91 91 self.client.push(dict(a=5))
92 92 self.client.pull('a')
93 93 id0 = self.client.ids[-1]
94 94 self.client.clear(targets=id0)
95 95 self.client.pull('a', targets=self.client.ids[:-1])
96 96 self.assertRaisesRemote(NameError, self.client.pull, 'a')
97 97 self.client.clear()
98 98 for i in self.client.ids:
99 99 self.assertRaisesRemote(NameError, self.client.pull, 'a', targets=i)
100 100
101 101
102 102 def test_push_pull(self):
103 103 """test pushing and pulling"""
104 104 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
105 105 t = self.client.ids[-1]
106 106 self.add_engines(2)
107 107 push = self.client.push
108 108 pull = self.client.pull
109 109 self.client.block=True
110 110 nengines = len(self.client)
111 111 push({'data':data}, targets=t)
112 112 d = pull('data', targets=t)
113 113 self.assertEquals(d, data)
114 114 push({'data':data})
115 115 d = pull('data')
116 116 self.assertEquals(d, nengines*[data])
117 117 ar = push({'data':data}, block=False)
118 118 self.assertTrue(isinstance(ar, AsyncResult))
119 119 r = ar.get()
120 120 ar = pull('data', block=False)
121 121 self.assertTrue(isinstance(ar, AsyncResult))
122 122 r = ar.get()
123 123 self.assertEquals(r, nengines*[data])
124 124 push(dict(a=10,b=20))
125 125 r = pull(('a','b'))
126 126 self.assertEquals(r, nengines*[[10,20]])
127 127
128 128 def test_push_pull_function(self):
129 129 "test pushing and pulling functions"
130 130 def testf(x):
131 131 return 2.0*x
132 132
133 133 self.add_engines(4)
134 134 t = self.client.ids[-1]
135 135 self.client.block=True
136 136 push = self.client.push
137 137 pull = self.client.pull
138 138 execute = self.client.execute
139 139 push({'testf':testf}, targets=t)
140 140 r = pull('testf', targets=t)
141 141 self.assertEqual(r(1.0), testf(1.0))
142 142 execute('r = testf(10)', targets=t)
143 143 r = pull('r', targets=t)
144 144 self.assertEquals(r, testf(10))
145 145 ar = push({'testf':testf}, block=False)
146 146 ar.get()
147 147 ar = pull('testf', block=False)
148 148 rlist = ar.get()
149 149 for r in rlist:
150 150 self.assertEqual(r(1.0), testf(1.0))
151 151 execute("def g(x): return x*x", targets=t)
152 152 r = pull(('testf','g'),targets=t)
153 153 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
154 154
155 155 def test_push_function_globals(self):
156 156 """test that pushed functions have access to globals"""
157 157 def geta():
158 158 return a
159 159 self.add_engines(1)
160 160 v = self.client[-1]
161 161 v.block=True
162 162 v['f'] = geta
163 163 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
164 164 v.execute('a=5')
165 165 v.execute('b=f()')
166 166 self.assertEquals(v['b'], 5)
167 167
168 def test_push_function_defaults(self):
169 """test that pushed functions preserve default args"""
170 def echo(a=10):
171 return a
172 self.add_engines(1)
173 v = self.client[-1]
174 v.block=True
175 v['f'] = echo
176 v.execute('b=f()')
177 self.assertEquals(v['b'], 10)
178
168 179 def test_get_result(self):
169 180 """test getting results from the Hub."""
170 181 c = clientmod.Client(profile='iptest')
171 182 t = self.client.ids[-1]
172 183 ar = c.apply(wait, (1,), block=False, targets=t)
173 184 time.sleep(.25)
174 185 ahr = self.client.get_result(ar.msg_ids)
175 186 self.assertTrue(isinstance(ahr, AsyncHubResult))
176 187 self.assertEquals(ahr.get(), ar.get())
177 188 ar2 = self.client.get_result(ar.msg_ids)
178 189 self.assertFalse(isinstance(ar2, AsyncHubResult))
179 190
180 191 def test_ids_list(self):
181 192 """test client.ids"""
182 193 self.add_engines(2)
183 194 ids = self.client.ids
184 195 self.assertEquals(ids, self.client._ids)
185 196 self.assertFalse(ids is self.client._ids)
186 197 ids.remove(ids[-1])
187 198 self.assertNotEquals(ids, self.client._ids)
188 199
189 200 def test_run_newline(self):
190 201 """test that run appends newline to files"""
191 202 tmpfile = mktemp()
192 203 with open(tmpfile, 'w') as f:
193 204 f.write("""def g():
194 205 return 5
195 206 """)
196 207 v = self.client[-1]
197 208 v.run(tmpfile, block=True)
198 self.assertEquals(v.apply_sync_bound(lambda : g()), 5)
209 self.assertEquals(v.apply_sync(lambda : g()), 5)
199 210
200 211 def test_apply_tracked(self):
201 212 """test tracking for apply"""
202 213 # self.add_engines(1)
203 214 t = self.client.ids[-1]
204 215 self.client.block=False
205 216 def echo(n=1024*1024, **kwargs):
206 217 return self.client.apply(lambda x: x, args=('x'*n,), targets=t, **kwargs)
207 218 ar = echo(1)
208 219 self.assertTrue(ar._tracker is None)
209 220 self.assertTrue(ar.sent)
210 221 ar = echo(track=True)
211 222 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
212 223 self.assertEquals(ar.sent, ar._tracker.done)
213 224 ar._tracker.wait()
214 225 self.assertTrue(ar.sent)
215 226
216 227 def test_push_tracked(self):
217 228 t = self.client.ids[-1]
218 229 ns = dict(x='x'*1024*1024)
219 230 ar = self.client.push(ns, targets=t, block=False)
220 231 self.assertTrue(ar._tracker is None)
221 232 self.assertTrue(ar.sent)
222 233
223 234 ar = self.client.push(ns, targets=t, block=False, track=True)
224 235 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
225 236 self.assertEquals(ar.sent, ar._tracker.done)
226 237 ar._tracker.wait()
227 238 self.assertTrue(ar.sent)
228 239 ar.get()
229 240
230 241 def test_scatter_tracked(self):
231 242 t = self.client.ids
232 243 x='x'*1024*1024
233 244 ar = self.client.scatter('x', x, targets=t, block=False)
234 245 self.assertTrue(ar._tracker is None)
235 246 self.assertTrue(ar.sent)
236 247
237 248 ar = self.client.scatter('x', x, targets=t, block=False, track=True)
238 249 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
239 250 self.assertEquals(ar.sent, ar._tracker.done)
240 251 ar._tracker.wait()
241 252 self.assertTrue(ar.sent)
242 253 ar.get()
243 254
244 255 def test_remote_reference(self):
245 256 v = self.client[-1]
246 257 v['a'] = 123
247 258 ra = clientmod.Reference('a')
248 b = v.apply_sync_bound(lambda x: x, ra)
259 b = v.apply_sync(lambda x: x, ra)
249 260 self.assertEquals(b, 123)
250 self.assertRaisesRemote(NameError, v.apply_sync, lambda x: x, ra)
251 261
252 262
@@ -1,271 +1,299 b''
1 1 """some generic utilities for dealing with classes, urls, and serialization"""
2 2 import re
3 3 import socket
4 4
5 5 try:
6 6 import cPickle
7 7 pickle = cPickle
8 8 except:
9 9 cPickle = None
10 10 import pickle
11 11
12 12
13 13 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
14 14 from IPython.utils.newserialized import serialize, unserialize
15 15
16 16 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
17 17
18 class Namespace(dict):
19 """Subclass of dict for attribute access to keys."""
20
21 def __getattr__(self, key):
22 """getattr aliased to getitem"""
23 if key in self.iterkeys():
24 return self[key]
25 else:
26 raise NameError(key)
27
28 def __setattr__(self, key, value):
29 """setattr aliased to setitem, with strict"""
30 if hasattr(dict, key):
31 raise KeyError("Cannot override dict keys %r"%key)
32 self[key] = value
33
34
18 35 class ReverseDict(dict):
19 36 """simple double-keyed subset of dict methods."""
20 37
21 38 def __init__(self, *args, **kwargs):
22 39 dict.__init__(self, *args, **kwargs)
23 40 self._reverse = dict()
24 41 for key, value in self.iteritems():
25 42 self._reverse[value] = key
26 43
27 44 def __getitem__(self, key):
28 45 try:
29 46 return dict.__getitem__(self, key)
30 47 except KeyError:
31 48 return self._reverse[key]
32 49
33 50 def __setitem__(self, key, value):
34 51 if key in self._reverse:
35 52 raise KeyError("Can't have key %r on both sides!"%key)
36 53 dict.__setitem__(self, key, value)
37 54 self._reverse[value] = key
38 55
39 56 def pop(self, key):
40 57 value = dict.pop(self, key)
41 58 self._reverse.pop(value)
42 59 return value
43 60
44 61 def get(self, key, default=None):
45 62 try:
46 63 return self[key]
47 64 except KeyError:
48 65 return default
49 66
50 67 def validate_url(url):
51 68 """validate a url for zeromq"""
52 69 if not isinstance(url, basestring):
53 70 raise TypeError("url must be a string, not %r"%type(url))
54 71 url = url.lower()
55 72
56 73 proto_addr = url.split('://')
57 74 assert len(proto_addr) == 2, 'Invalid url: %r'%url
58 75 proto, addr = proto_addr
59 76 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
60 77
61 78 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
62 79 # author: Remi Sabourin
63 80 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
64 81
65 82 if proto == 'tcp':
66 83 lis = addr.split(':')
67 84 assert len(lis) == 2, 'Invalid url: %r'%url
68 85 addr,s_port = lis
69 86 try:
70 87 port = int(s_port)
71 88 except ValueError:
72 89 raise AssertionError("Invalid port %r in url: %r"%(port, url))
73 90
74 91 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
75 92
76 93 else:
77 94 # only validate tcp urls currently
78 95 pass
79 96
80 97 return True
81 98
82 99
83 100 def validate_url_container(container):
84 101 """validate a potentially nested collection of urls."""
85 102 if isinstance(container, basestring):
86 103 url = container
87 104 return validate_url(url)
88 105 elif isinstance(container, dict):
89 106 container = container.itervalues()
90 107
91 108 for element in container:
92 109 validate_url_container(element)
93 110
94 111
95 112 def split_url(url):
96 113 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
97 114 proto_addr = url.split('://')
98 115 assert len(proto_addr) == 2, 'Invalid url: %r'%url
99 116 proto, addr = proto_addr
100 117 lis = addr.split(':')
101 118 assert len(lis) == 2, 'Invalid url: %r'%url
102 119 addr,s_port = lis
103 120 return proto,addr,s_port
104 121
105 122 def disambiguate_ip_address(ip, location=None):
106 123 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
107 124 ones, based on the location (default interpretation of location is localhost)."""
108 125 if ip in ('0.0.0.0', '*'):
109 126 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
110 127 if location is None or location in external_ips:
111 128 ip='127.0.0.1'
112 129 elif location:
113 130 return location
114 131 return ip
115 132
116 133 def disambiguate_url(url, location=None):
117 134 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
118 135 ones, based on the location (default interpretation is localhost).
119 136
120 137 This is for zeromq urls, such as tcp://*:10101."""
121 138 try:
122 139 proto,ip,port = split_url(url)
123 140 except AssertionError:
124 141 # probably not tcp url; could be ipc, etc.
125 142 return url
126 143
127 144 ip = disambiguate_ip_address(ip,location)
128 145
129 146 return "%s://%s:%s"%(proto,ip,port)
130 147
131 148
132 149 def rekey(dikt):
133 150 """Rekey a dict that has been forced to use str keys where there should be
134 151 ints by json. This belongs in the jsonutil added by fperez."""
135 152 for k in dikt.iterkeys():
136 153 if isinstance(k, str):
137 154 ik=fk=None
138 155 try:
139 156 ik = int(k)
140 157 except ValueError:
141 158 try:
142 159 fk = float(k)
143 160 except ValueError:
144 161 continue
145 162 if ik is not None:
146 163 nk = ik
147 164 else:
148 165 nk = fk
149 166 if nk in dikt:
150 167 raise KeyError("already have key %r"%nk)
151 168 dikt[nk] = dikt.pop(k)
152 169 return dikt
153 170
154 171 def serialize_object(obj, threshold=64e-6):
155 172 """Serialize an object into a list of sendable buffers.
156 173
157 174 Parameters
158 175 ----------
159 176
160 177 obj : object
161 178 The object to be serialized
162 179 threshold : float
163 180 The threshold for not double-pickling the content.
164 181
165 182
166 183 Returns
167 184 -------
168 185 ('pmd', [bufs]) :
169 186 where pmd is the pickled metadata wrapper,
170 187 bufs is a list of data buffers
171 188 """
172 189 databuffers = []
173 190 if isinstance(obj, (list, tuple)):
174 191 clist = canSequence(obj)
175 192 slist = map(serialize, clist)
176 193 for s in slist:
177 194 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
178 195 databuffers.append(s.getData())
179 196 s.data = None
180 197 return pickle.dumps(slist,-1), databuffers
181 198 elif isinstance(obj, dict):
182 199 sobj = {}
183 200 for k in sorted(obj.iterkeys()):
184 201 s = serialize(can(obj[k]))
185 202 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
186 203 databuffers.append(s.getData())
187 204 s.data = None
188 205 sobj[k] = s
189 206 return pickle.dumps(sobj,-1),databuffers
190 207 else:
191 208 s = serialize(can(obj))
192 209 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
193 210 databuffers.append(s.getData())
194 211 s.data = None
195 212 return pickle.dumps(s,-1),databuffers
196 213
197 214
198 215 def unserialize_object(bufs):
199 216 """reconstruct an object serialized by serialize_object from data buffers."""
200 217 bufs = list(bufs)
201 218 sobj = pickle.loads(bufs.pop(0))
202 219 if isinstance(sobj, (list, tuple)):
203 220 for s in sobj:
204 221 if s.data is None:
205 222 s.data = bufs.pop(0)
206 223 return uncanSequence(map(unserialize, sobj)), bufs
207 224 elif isinstance(sobj, dict):
208 225 newobj = {}
209 226 for k in sorted(sobj.iterkeys()):
210 227 s = sobj[k]
211 228 if s.data is None:
212 229 s.data = bufs.pop(0)
213 230 newobj[k] = uncan(unserialize(s))
214 231 return newobj, bufs
215 232 else:
216 233 if sobj.data is None:
217 234 sobj.data = bufs.pop(0)
218 235 return uncan(unserialize(sobj)), bufs
219 236
220 237 def pack_apply_message(f, args, kwargs, threshold=64e-6):
221 238 """pack up a function, args, and kwargs to be sent over the wire
222 239 as a series of buffers. Any object whose data is larger than `threshold`
223 240 will not have their data copied (currently only numpy arrays support zero-copy)"""
224 241 msg = [pickle.dumps(can(f),-1)]
225 242 databuffers = [] # for large objects
226 243 sargs, bufs = serialize_object(args,threshold)
227 244 msg.append(sargs)
228 245 databuffers.extend(bufs)
229 246 skwargs, bufs = serialize_object(kwargs,threshold)
230 247 msg.append(skwargs)
231 248 databuffers.extend(bufs)
232 249 msg.extend(databuffers)
233 250 return msg
234 251
235 252 def unpack_apply_message(bufs, g=None, copy=True):
236 253 """unpack f,args,kwargs from buffers packed by pack_apply_message()
237 254 Returns: original f,args,kwargs"""
238 255 bufs = list(bufs) # allow us to pop
239 256 assert len(bufs) >= 3, "not enough buffers!"
240 257 if not copy:
241 258 for i in range(3):
242 259 bufs[i] = bufs[i].bytes
243 260 cf = pickle.loads(bufs.pop(0))
244 261 sargs = list(pickle.loads(bufs.pop(0)))
245 262 skwargs = dict(pickle.loads(bufs.pop(0)))
246 263 # print sargs, skwargs
247 264 f = uncan(cf, g)
248 265 for sa in sargs:
249 266 if sa.data is None:
250 267 m = bufs.pop(0)
251 268 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
252 269 if copy:
253 270 sa.data = buffer(m)
254 271 else:
255 272 sa.data = m.buffer
256 273 else:
257 274 if copy:
258 275 sa.data = m
259 276 else:
260 277 sa.data = m.bytes
261 278
262 279 args = uncanSequence(map(unserialize, sargs), g)
263 280 kwargs = {}
264 281 for k in sorted(skwargs.iterkeys()):
265 282 sa = skwargs[k]
266 283 if sa.data is None:
267 sa.data = bufs.pop(0)
284 m = bufs.pop(0)
285 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
286 if copy:
287 sa.data = buffer(m)
288 else:
289 sa.data = m.buffer
290 else:
291 if copy:
292 sa.data = m
293 else:
294 sa.data = m.bytes
295
268 296 kwargs[k] = uncan(unserialize(sa), g)
269 297
270 298 return f,args,kwargs
271 299
@@ -1,658 +1,670 b''
1 1 """Views of remote engines."""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 from IPython.testing import decorators as testdec
14 14 from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance
15 15
16 16 from IPython.external.decorator import decorator
17 17
18 18 from .asyncresult import AsyncResult
19 19 from .dependency import Dependency
20 20 from .remotefunction import ParallelFunction, parallel, remote
21 21
22 22 #-----------------------------------------------------------------------------
23 23 # Decorators
24 24 #-----------------------------------------------------------------------------
25 25
26 26 @decorator
27 27 def myblock(f, self, *args, **kwargs):
28 28 """override client.block with self.block during a call"""
29 29 block = self.client.block
30 30 self.client.block = self.block
31 31 try:
32 32 ret = f(self, *args, **kwargs)
33 33 finally:
34 34 self.client.block = block
35 35 return ret
36 36
37 37 @decorator
38 38 def save_ids(f, self, *args, **kwargs):
39 39 """Keep our history and outstanding attributes up to date after a method call."""
40 40 n_previous = len(self.client.history)
41 41 ret = f(self, *args, **kwargs)
42 42 nmsgs = len(self.client.history) - n_previous
43 43 msg_ids = self.client.history[-nmsgs:]
44 44 self.history.extend(msg_ids)
45 45 map(self.outstanding.add, msg_ids)
46 46 return ret
47 47
48 48 @decorator
49 49 def sync_results(f, self, *args, **kwargs):
50 50 """sync relevant results from self.client to our results attribute."""
51 51 ret = f(self, *args, **kwargs)
52 52 delta = self.outstanding.difference(self.client.outstanding)
53 53 completed = self.outstanding.intersection(delta)
54 54 self.outstanding = self.outstanding.difference(completed)
55 55 for msg_id in completed:
56 56 self.results[msg_id] = self.client.results[msg_id]
57 57 return ret
58 58
59 59 @decorator
60 60 def spin_after(f, self, *args, **kwargs):
61 61 """call spin after the method."""
62 62 ret = f(self, *args, **kwargs)
63 63 self.spin()
64 64 return ret
65 65
66 66 #-----------------------------------------------------------------------------
67 67 # Classes
68 68 #-----------------------------------------------------------------------------
69 69
70 70 class View(HasTraits):
71 71 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
72 72
73 73 Don't use this class, use subclasses.
74 74 """
75 75 block=Bool(False)
76 76 bound=Bool(False)
77 track=Bool(False)
77 78 history=List()
78 79 outstanding = Set()
79 80 results = Dict()
80 81 client = Instance('IPython.zmq.parallel.client.Client')
81 82
82 83 _ntargets = Int(1)
83 84 _balanced = Bool(False)
84 _default_names = List(['block', 'bound'])
85 _default_names = List(['block', 'bound', 'track'])
85 86 _targets = Any()
86 87
87 88 def __init__(self, client=None, targets=None):
88 89 super(View, self).__init__(client=client)
89 90 self._targets = targets
90 91 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
91 92 self.block = client.block
92 93
93 94 for name in self._default_names:
94 95 setattr(self, name, getattr(self, name, None))
95 96
96 97 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
97 98
98 99
99 100 def __repr__(self):
100 101 strtargets = str(self._targets)
101 102 if len(strtargets) > 16:
102 103 strtargets = strtargets[:12]+'...]'
103 104 return "<%s %s>"%(self.__class__.__name__, strtargets)
104 105
105 106 @property
106 107 def targets(self):
107 108 return self._targets
108 109
109 110 @targets.setter
110 111 def targets(self, value):
111 112 raise AttributeError("Cannot set View `targets` after construction!")
112 113
113 114 @property
114 115 def balanced(self):
115 116 return self._balanced
116 117
117 118 @balanced.setter
118 119 def balanced(self, value):
119 120 raise AttributeError("Cannot set View `balanced` after construction!")
120 121
121 122 def _defaults(self, *excludes):
122 123 """return dict of our default attributes, excluding names given."""
123 124 d = dict(balanced=self._balanced, targets=self._targets)
124 125 for name in self._default_names:
125 126 if name not in excludes:
126 127 d[name] = getattr(self, name)
127 128 return d
128 129
129 130 def set_flags(self, **kwargs):
130 131 """set my attribute flags by keyword.
131 132
132 133 A View is a wrapper for the Client's apply method, but
133 134 with attributes that specify keyword arguments, those attributes
134 135 can be set by keyword argument with this method.
135 136
136 137 Parameters
137 138 ----------
138 139
139 140 block : bool
140 141 whether to wait for results
141 142 bound : bool
142 whether to use the client's namespace
143 whether to pass the client's Namespace as the first argument
144 to functions called via `apply`.
145 track : bool
146 whether to create a MessageTracker to allow the user to
147 safely edit after arrays and buffers during non-copying
148 sends.
143 149 """
144 150 for key in kwargs:
145 151 if key not in self._default_names:
146 152 raise KeyError("Invalid name: %r"%key)
147 153 for name in ('block', 'bound'):
148 154 if name in kwargs:
149 155 setattr(self, name, kwargs[name])
150 156
151 157 #----------------------------------------------------------------
152 158 # wrappers for client methods:
153 159 #----------------------------------------------------------------
154 160 @sync_results
155 161 def spin(self):
156 162 """spin the client, and sync"""
157 163 self.client.spin()
158 164
159 165 @sync_results
160 166 @save_ids
161 167 def apply(self, f, *args, **kwargs):
162 168 """calls f(*args, **kwargs) on remote engines, returning the result.
163 169
164 This method does not involve the engine's namespace.
170 This method sets all of `client.apply`'s keyword arguments via this
171 View's attributes.
165 172
166 173 if self.block is False:
167 returns msg_id
174 returns AsyncResult
168 175 else:
169 176 returns actual result of f(*args, **kwargs)
170 177 """
171 178 return self.client.apply(f, args, kwargs, **self._defaults())
172 179
173 180 @save_ids
174 181 def apply_async(self, f, *args, **kwargs):
175 182 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
176 183
177 This method does not involve the engine's namespace.
178
179 returns msg_id
184 returns AsyncResult
180 185 """
181 186 d = self._defaults('block', 'bound')
182 187 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
183 188
184 189 @spin_after
185 190 @save_ids
186 191 def apply_sync(self, f, *args, **kwargs):
187 192 """calls f(*args, **kwargs) on remote engines in a blocking manner,
188 193 returning the result.
189 194
190 This method does not involve the engine's namespace.
191
192 195 returns: actual result of f(*args, **kwargs)
193 196 """
194 d = self._defaults('block', 'bound')
197 d = self._defaults('block', 'bound', 'track')
195 198 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
196 199
197 200 # @sync_results
198 201 # @save_ids
199 202 # def apply_bound(self, f, *args, **kwargs):
200 203 # """calls f(*args, **kwargs) bound to engine namespace(s).
201 204 #
202 205 # if self.block is False:
203 206 # returns msg_id
204 207 # else:
205 208 # returns actual result of f(*args, **kwargs)
206 209 #
207 210 # This method has access to the targets' namespace via globals()
208 211 #
209 212 # """
210 213 # d = self._defaults('bound')
211 214 # return self.client.apply(f, args, kwargs, bound=True, **d)
212 215 #
213 216 @sync_results
214 217 @save_ids
215 218 def apply_async_bound(self, f, *args, **kwargs):
216 219 """calls f(*args, **kwargs) bound to engine namespace(s)
217 220 in a nonblocking manner.
218 221
219 returns: msg_id
222 The first argument to `f` will be the Engine's Namespace
220 223
221 This method has access to the targets' namespace via globals()
224 returns: AsyncResult
222 225
223 226 """
224 227 d = self._defaults('block', 'bound')
225 228 return self.client.apply(f, args, kwargs, block=False, bound=True, **d)
226 229
227 230 @spin_after
228 231 @save_ids
229 232 def apply_sync_bound(self, f, *args, **kwargs):
230 233 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
231 234
232 returns: actual result of f(*args, **kwargs)
235 The first argument to `f` will be the Engine's Namespace
233 236
234 This method has access to the targets' namespace via globals()
237 returns: actual result of f(*args, **kwargs)
235 238
236 239 """
237 240 d = self._defaults('block', 'bound')
238 241 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
239 242
240 243 def abort(self, jobs=None, block=None):
241 244 """Abort jobs on my engines.
242 245
243 246 Parameters
244 247 ----------
245 248
246 249 jobs : None, str, list of strs, optional
247 250 if None: abort all jobs.
248 251 else: abort specific msg_id(s).
249 252 """
250 253 block = block if block is not None else self.block
251 254 return self.client.abort(jobs=jobs, targets=self._targets, block=block)
252 255
253 256 def queue_status(self, verbose=False):
254 257 """Fetch the Queue status of my engines"""
255 258 return self.client.queue_status(targets=self._targets, verbose=verbose)
256 259
257 260 def purge_results(self, jobs=[], targets=[]):
258 261 """Instruct the controller to forget specific results."""
259 262 if targets is None or targets == 'all':
260 263 targets = self._targets
261 264 return self.client.purge_results(jobs=jobs, targets=targets)
262 265
263 266 @spin_after
264 267 def get_result(self, indices_or_msg_ids=None):
265 268 """return one or more results, specified by history index or msg_id.
266 269
267 270 See client.get_result for details.
268 271
269 272 """
270 273
271 274 if indices_or_msg_ids is None:
272 275 indices_or_msg_ids = -1
273 276 if isinstance(indices_or_msg_ids, int):
274 277 indices_or_msg_ids = self.history[indices_or_msg_ids]
275 278 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
276 279 indices_or_msg_ids = list(indices_or_msg_ids)
277 280 for i,index in enumerate(indices_or_msg_ids):
278 281 if isinstance(index, int):
279 282 indices_or_msg_ids[i] = self.history[index]
280 283 return self.client.get_result(indices_or_msg_ids)
281 284
282 285 #-------------------------------------------------------------------
283 286 # Map
284 287 #-------------------------------------------------------------------
285 288
286 289 def map(self, f, *sequences, **kwargs):
287 290 """override in subclasses"""
288 291 raise NotImplementedError
289 292
290 293 def map_async(self, f, *sequences, **kwargs):
291 294 """Parallel version of builtin `map`, using this view's engines.
292 295
293 296 This is equivalent to map(...block=False)
294 297
295 298 See `self.map` for details.
296 299 """
297 300 if 'block' in kwargs:
298 301 raise TypeError("map_async doesn't take a `block` keyword argument.")
299 302 kwargs['block'] = False
300 303 return self.map(f,*sequences,**kwargs)
301 304
302 305 def map_sync(self, f, *sequences, **kwargs):
303 306 """Parallel version of builtin `map`, using this view's engines.
304 307
305 308 This is equivalent to map(...block=True)
306 309
307 310 See `self.map` for details.
308 311 """
309 312 if 'block' in kwargs:
310 313 raise TypeError("map_sync doesn't take a `block` keyword argument.")
311 314 kwargs['block'] = True
312 315 return self.map(f,*sequences,**kwargs)
313 316
314 317 def imap(self, f, *sequences, **kwargs):
315 318 """Parallel version of `itertools.imap`.
316 319
317 320 See `self.map` for details.
318 321 """
319 322
320 323 return iter(self.map_async(f,*sequences, **kwargs))
321 324
322 325 #-------------------------------------------------------------------
323 326 # Decorators
324 327 #-------------------------------------------------------------------
325 328
326 def remote(self, bound=True, block=True):
329 def remote(self, bound=False, block=True):
327 330 """Decorator for making a RemoteFunction"""
328 331 return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
329 332
330 def parallel(self, dist='b', bound=True, block=None):
333 def parallel(self, dist='b', bound=False, block=None):
331 334 """Decorator for making a ParallelFunction"""
332 335 block = self.block if block is None else block
333 336 return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
334 337
335 338 @testdec.skip_doctest
336 339 class DirectView(View):
337 340 """Direct Multiplexer View of one or more engines.
338 341
339 342 These are created via indexed access to a client:
340 343
341 344 >>> dv_1 = client[1]
342 345 >>> dv_all = client[:]
343 346 >>> dv_even = client[::2]
344 347 >>> dv_some = client[1:3]
345 348
346 349 This object provides dictionary access to engine namespaces:
347 350
348 351 # push a=5:
349 352 >>> dv['a'] = 5
350 353 # pull 'foo':
351 354 >>> db['foo']
352 355
353 356 """
354 357
355 358 def __init__(self, client=None, targets=None):
356 359 super(DirectView, self).__init__(client=client, targets=targets)
357 360 self._balanced = False
358 361
359 362 @spin_after
360 363 @save_ids
361 364 def map(self, f, *sequences, **kwargs):
362 365 """view.map(f, *sequences, block=self.block, bound=self.bound) => list|AsyncMapResult
363 366
364 367 Parallel version of builtin `map`, using this View's `targets`.
365 368
366 369 There will be one task per target, so work will be chunked
367 370 if the sequences are longer than `targets`.
368 371
369 372 Results can be iterated as they are ready, but will become available in chunks.
370 373
371 374 Parameters
372 375 ----------
373 376
374 377 f : callable
375 378 function to be mapped
376 379 *sequences: one or more sequences of matching length
377 380 the sequences to be distributed and passed to `f`
378 381 block : bool
379 382 whether to wait for the result or not [default self.block]
380 383 bound : bool
381 whether to have access to the engines' namespaces [default self.bound]
384 whether to pass the client's Namespace as the first argument to `f`
382 385
383 386 Returns
384 387 -------
385 388
386 389 if block=False:
387 390 AsyncMapResult
388 391 An object like AsyncResult, but which reassembles the sequence of results
389 392 into a single list. AsyncMapResults can be iterated through before all
390 393 results are complete.
391 394 else:
392 395 list
393 396 the result of map(f,*sequences)
394 397 """
395 398
396 399 block = kwargs.get('block', self.block)
397 400 bound = kwargs.get('bound', self.bound)
398 401 for k in kwargs.keys():
399 402 if k not in ['block', 'bound']:
400 403 raise TypeError("invalid keyword arg, %r"%k)
401 404
402 405 assert len(sequences) > 0, "must have some sequences to map onto!"
403 406 pf = ParallelFunction(self.client, f, block=block, bound=bound,
404 407 targets=self._targets, balanced=False)
405 408 return pf.map(*sequences)
406 409
407 410 @sync_results
408 411 @save_ids
409 412 def execute(self, code, block=None):
410 413 """execute some code on my targets."""
411 414
412 415 block = block if block is not None else self.block
413 416
414 417 return self.client.execute(code, block=block, targets=self._targets)
415 418
416 419 @sync_results
417 420 @save_ids
418 421 def run(self, fname, block=None):
419 422 """execute the code in a file on my targets."""
420 423
421 424 block = block if block is not None else self.block
422 425
423 426 return self.client.run(fname, block=block, targets=self._targets)
424 427
425 428 def update(self, ns):
426 429 """update remote namespace with dict `ns`"""
427 430 return self.client.push(ns, targets=self._targets, block=self.block)
428 431
429 432 def push(self, ns, block=None):
430 433 """update remote namespace with dict `ns`"""
431 434
432 435 block = block if block is not None else self.block
433 436
434 437 return self.client.push(ns, targets=self._targets, block=block)
435 438
436 439 def get(self, key_s):
437 440 """get object(s) by `key_s` from remote namespace
438 441 will return one object if it is a key.
439 442 It also takes a list of keys, and will return a list of objects."""
440 443 # block = block if block is not None else self.block
441 444 return self.client.pull(key_s, block=True, targets=self._targets)
442 445
443 446 @sync_results
444 447 @save_ids
445 448 def pull(self, key_s, block=True):
446 449 """get object(s) by `key_s` from remote namespace
447 450 will return one object if it is a key.
448 451 It also takes a list of keys, and will return a list of objects."""
449 452 block = block if block is not None else self.block
450 453 return self.client.pull(key_s, block=block, targets=self._targets)
451 454
452 455 def scatter(self, key, seq, dist='b', flatten=False, block=None):
453 456 """
454 457 Partition a Python sequence and send the partitions to a set of engines.
455 458 """
456 459 block = block if block is not None else self.block
457 460
458 461 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
459 462 targets=self._targets, block=block)
460 463
461 464 @sync_results
462 465 @save_ids
463 466 def gather(self, key, dist='b', block=None):
464 467 """
465 468 Gather a partitioned sequence on a set of engines as a single local seq.
466 469 """
467 470 block = block if block is not None else self.block
468 471
469 472 return self.client.gather(key, dist=dist, targets=self._targets, block=block)
470 473
471 474 def __getitem__(self, key):
472 475 return self.get(key)
473 476
474 477 def __setitem__(self,key, value):
475 478 self.update({key:value})
476 479
477 480 def clear(self, block=False):
478 481 """Clear the remote namespaces on my engines."""
479 482 block = block if block is not None else self.block
480 483 return self.client.clear(targets=self._targets, block=block)
481 484
482 485 def kill(self, block=True):
483 486 """Kill my engines."""
484 487 block = block if block is not None else self.block
485 488 return self.client.kill(targets=self._targets, block=block)
486 489
487 490 #----------------------------------------
488 491 # activate for %px,%autopx magics
489 492 #----------------------------------------
490 493 def activate(self):
491 494 """Make this `View` active for parallel magic commands.
492 495
493 496 IPython has a magic command syntax to work with `MultiEngineClient` objects.
494 497 In a given IPython session there is a single active one. While
495 498 there can be many `Views` created and used by the user,
496 499 there is only one active one. The active `View` is used whenever
497 500 the magic commands %px and %autopx are used.
498 501
499 502 The activate() method is called on a given `View` to make it
500 503 active. Once this has been done, the magic commands can be used.
501 504 """
502 505
503 506 try:
504 507 # This is injected into __builtins__.
505 508 ip = get_ipython()
506 509 except NameError:
507 510 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
508 511 else:
509 512 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
510 513 if pmagic is not None:
511 514 pmagic.active_multiengine_client = self
512 515 else:
513 516 print "You must first load the parallelmagic extension " \
514 517 "by doing '%load_ext parallelmagic'"
515 518
516 519
517 520 @testdec.skip_doctest
518 521 class LoadBalancedView(View):
519 522 """An load-balancing View that only executes via the Task scheduler.
520 523
521 524 Load-balanced views can be created with the client's `view` method:
522 525
523 526 >>> v = client.view(balanced=True)
524 527
525 528 or targets can be specified, to restrict the potential destinations:
526 529
527 530 >>> v = client.view([1,3],balanced=True)
528 531
529 532 which would restrict loadbalancing to between engines 1 and 3.
530 533
531 534 """
532 535
533 536 _default_names = ['block', 'bound', 'follow', 'after', 'timeout']
534 537
535 538 def __init__(self, client=None, targets=None):
536 539 super(LoadBalancedView, self).__init__(client=client, targets=targets)
537 540 self._ntargets = 1
538 541 self._balanced = True
539 542
540 543 def _validate_dependency(self, dep):
541 544 """validate a dependency.
542 545
543 546 For use in `set_flags`.
544 547 """
545 548 if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
546 549 return True
547 550 elif isinstance(dep, (list,set, tuple)):
548 551 for d in dep:
549 552 if not isinstance(d, str, AsyncResult):
550 553 return False
551 554 elif isinstance(dep, dict):
552 555 if set(dep.keys()) != set(Dependency().as_dict().keys()):
553 556 return False
554 557 if not isinstance(dep['msg_ids'], list):
555 558 return False
556 559 for d in dep['msg_ids']:
557 560 if not isinstance(d, str):
558 561 return False
559 562 else:
560 563 return False
561 564
562 565 def set_flags(self, **kwargs):
563 566 """set my attribute flags by keyword.
564 567
565 568 A View is a wrapper for the Client's apply method, but with attributes
566 569 that specify keyword arguments, those attributes can be set by keyword
567 570 argument with this method.
568 571
569 572 Parameters
570 573 ----------
571 574
572 575 block : bool
573 576 whether to wait for results
574 577 bound : bool
575 whether to use the engine's namespace
578 whether to pass the client's Namespace as the first argument
579 to functions called via `apply`.
580 track : bool
581 whether to create a MessageTracker to allow the user to
582 safely edit after arrays and buffers during non-copying
583 sends.
576 584 follow : Dependency, list, msg_id, AsyncResult
577 585 the location dependencies of tasks
578 586 after : Dependency, list, msg_id, AsyncResult
579 587 the time dependencies of tasks
580 588 timeout : int,None
581 589 the timeout to be used for tasks
582 590 """
583 591
584 592 super(LoadBalancedView, self).set_flags(**kwargs)
585 593 for name in ('follow', 'after'):
586 594 if name in kwargs:
587 595 value = kwargs[name]
588 596 if self._validate_dependency(value):
589 597 setattr(self, name, value)
590 598 else:
591 599 raise ValueError("Invalid dependency: %r"%value)
592 600 if 'timeout' in kwargs:
593 601 t = kwargs['timeout']
594 602 if not isinstance(t, (int, long, float, None)):
595 603 raise TypeError("Invalid type for timeout: %r"%type(t))
596 604 if t is not None:
597 605 if t < 0:
598 606 raise ValueError("Invalid timeout: %s"%t)
599 607 self.timeout = t
600 608
601 609 @spin_after
602 610 @save_ids
603 611 def map(self, f, *sequences, **kwargs):
604 612 """view.map(f, *sequences, block=self.block, bound=self.bound, chunk_size=1) => list|AsyncMapResult
605 613
606 614 Parallel version of builtin `map`, load-balanced by this View.
607 615
608 616 `block`, `bound`, and `chunk_size` can be specified by keyword only.
609 617
610 618 Each `chunk_size` elements will be a separate task, and will be
611 619 load-balanced. This lets individual elements be available for iteration
612 620 as soon as they arrive.
613 621
614 622 Parameters
615 623 ----------
616 624
617 625 f : callable
618 626 function to be mapped
619 627 *sequences: one or more sequences of matching length
620 628 the sequences to be distributed and passed to `f`
621 629 block : bool
622 630 whether to wait for the result or not [default self.block]
623 631 bound : bool
624 whether to use the engine's namespace [default self.bound]
632 whether to pass the client's Namespace as the first argument to `f`
633 track : bool
634 whether to create a MessageTracker to allow the user to
635 safely edit after arrays and buffers during non-copying
636 sends.
625 637 chunk_size : int
626 638 how many elements should be in each task [default 1]
627 639
628 640 Returns
629 641 -------
630 642
631 643 if block=False:
632 644 AsyncMapResult
633 645 An object like AsyncResult, but which reassembles the sequence of results
634 646 into a single list. AsyncMapResults can be iterated through before all
635 647 results are complete.
636 648 else:
637 649 the result of map(f,*sequences)
638 650
639 651 """
640 652
641 653 # default
642 654 block = kwargs.get('block', self.block)
643 655 bound = kwargs.get('bound', self.bound)
644 656 chunk_size = kwargs.get('chunk_size', 1)
645 657
646 658 keyset = set(kwargs.keys())
647 659 extra_keys = keyset.difference_update(set(['block', 'bound', 'chunk_size']))
648 660 if extra_keys:
649 661 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
650 662
651 663 assert len(sequences) > 0, "must have some sequences to map onto!"
652 664
653 665 pf = ParallelFunction(self.client, f, block=block, bound=bound,
654 666 targets=self._targets, balanced=True,
655 667 chunk_size=chunk_size)
656 668 return pf.map(*sequences)
657 669
658 670 __all__ = ['LoadBalancedView', 'DirectView'] No newline at end of file
@@ -1,43 +1,43 b''
1 1 import sys
2 2
3 3 from IPython.zmq.parallel import client
4 4
5 5
6 6 rc = client.Client()
7 7 rc.block=True
8 8 view = rc[:]
9 9 view.run('communicator.py')
10 10 view.execute('com = EngineCommunicator()')
11 11
12 12 # gather the connection information into a dict
13 ar = view.apply_async_bound(lambda : com.info)
13 ar = view.apply_async(lambda : com.info)
14 14 peers = ar.get_dict()
15 15 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
16 16
17 17 # connect the engines to each other:
18 view.apply_sync_bound(lambda pdict: com.connect(pdict), peers)
18 view.apply_sync(lambda pdict: com.connect(pdict), peers)
19 19
20 20 # now all the engines are connected, and we can communicate between them:
21 21
22 22 def broadcast(client, sender, msg_name, dest_name=None, block=None):
23 23 """broadcast a message from one engine to all others."""
24 24 dest_name = msg_name if dest_name is None else dest_name
25 25 client[sender].execute('com.publish(%s)'%msg_name, block=None)
26 26 targets = client.ids
27 27 targets.remove(sender)
28 28 return client[targets].execute('%s=com.consume()'%dest_name, block=None)
29 29
30 30 def send(client, sender, targets, msg_name, dest_name=None, block=None):
31 31 """send a message from one to one-or-more engines."""
32 32 dest_name = msg_name if dest_name is None else dest_name
33 33 def _send(targets, m_name):
34 34 msg = globals()[m_name]
35 35 return com.send(targets, msg)
36 36
37 client[sender].apply_async_bound(_send, targets, msg_name)
37 client[sender].apply_async(_send, targets, msg_name)
38 38
39 39 return client[targets].execute('%s=com.recv()'%dest_name, block=None)
40 40
41 41
42 42
43 43
@@ -1,64 +1,64 b''
1 1 """Calculate statistics on the digits of pi in parallel.
2 2
3 3 This program uses the functions in :file:`pidigits.py` to calculate
4 4 the frequencies of 2 digit sequences in the digits of pi. The
5 5 results are plotted using matplotlib.
6 6
7 7 To run, text files from http://www.super-computing.org/
8 8 must be installed in the working directory of the IPython engines.
9 9 The actual filenames to be used can be set with the ``filestring``
10 10 variable below.
11 11
12 12 The dataset we have been using for this is the 200 million digit one here:
13 13 ftp://pi.super-computing.org/.2/pi200m/
14 14
15 15 and the files used will be downloaded if they are not in the working directory
16 16 of the IPython engines.
17 17 """
18 18
19 19 from IPython.zmq.parallel import client
20 20 from matplotlib import pyplot as plt
21 21 import numpy as np
22 22 from pidigits import *
23 23 from timeit import default_timer as clock
24 24
25 25 # Files with digits of pi (10m digits each)
26 26 filestring = 'pi200m.ascii.%(i)02dof20'
27 27 files = [filestring % {'i':i} for i in range(1,16)]
28 28
29 29 # Connect to the IPython cluster
30 30 c = client.Client(profile='edison')
31 31 c.run('pidigits.py')
32 32
33 33 # the number of engines
34 34 n = len(c)
35 35 id0 = c.ids[0]
36 36 v = c[:]
37 37 v.set_flags(bound=True,block=True)
38 38 # fetch the pi-files
39 39 print "downloading %i files of pi"%n
40 40 v.map(fetch_pi_file, files[:n])
41 41 print "done"
42 42
43 43 # Run 10m digits on 1 engine
44 44 t1 = clock()
45 freqs10m = c[id0].apply_sync_bound(compute_two_digit_freqs, files[0])
45 freqs10m = c[id0].apply_sync(compute_two_digit_freqs, files[0])
46 46 t2 = clock()
47 47 digits_per_second1 = 10.0e6/(t2-t1)
48 48 print "Digits per second (1 core, 10m digits): ", digits_per_second1
49 49
50 50
51 51 # Run n*10m digits on all engines
52 52 t1 = clock()
53 53 freqs_all = v.map(compute_two_digit_freqs, files[:n])
54 54 freqs150m = reduce_freqs(freqs_all)
55 55 t2 = clock()
56 56 digits_per_second8 = n*10.0e6/(t2-t1)
57 57 print "Digits per second (%i engines, %i0m digits): "%(n,n), digits_per_second8
58 58
59 59 print "Speedup: ", digits_per_second8/digits_per_second1
60 60
61 61 plot_two_digit_freqs(freqs150m)
62 62 plt.title("2 digit sequences in %i0m digits of pi"%n)
63 63 plt.show()
64 64
@@ -1,854 +1,845 b''
1 1 .. _parallelmultiengine:
2 2
3 3 ==========================
4 4 IPython's Direct interface
5 5 ==========================
6 6
7 7 The direct, or multiengine, interface represents one possible way of working with a set of
8 8 IPython engines. The basic idea behind the multiengine interface is that the
9 9 capabilities of each engine are directly and explicitly exposed to the user.
10 10 Thus, in the multiengine interface, each engine is given an id that is used to
11 11 identify the engine and give it work to do. This interface is very intuitive
12 12 and is designed with interactive usage in mind, and is thus the best place for
13 13 new users of IPython to begin.
14 14
15 15 Starting the IPython controller and engines
16 16 ===========================================
17 17
18 18 To follow along with this tutorial, you will need to start the IPython
19 19 controller and four IPython engines. The simplest way of doing this is to use
20 20 the :command:`ipclusterz` command::
21 21
22 22 $ ipclusterz start -n 4
23 23
24 24 For more detailed information about starting the controller and engines, see
25 25 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
26 26
27 27 Creating a ``Client`` instance
28 28 ==============================
29 29
30 30 The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
31 31 module and then create a :class:`.Client` instance:
32 32
33 33 .. sourcecode:: ipython
34 34
35 35 In [1]: from IPython.zmq.parallel import client
36 36
37 37 In [2]: rc = client.Client()
38 38
39 39 This form assumes that the default connection information (stored in
40 40 :file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/clusterz_default/security`) is
41 41 accurate. If the controller was started on a remote machine, you must copy that connection
42 42 file to the client machine, or enter its contents as arguments to the Client constructor:
43 43
44 44 .. sourcecode:: ipython
45 45
46 46 # If you have copied the json connector file from the controller:
47 47 In [2]: rc = client.Client('/path/to/ipcontroller-client.json')
48 48 # or for a remote controller at 10.0.1.5, visible from my.server.com:
49 49 In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com')
50 50
51 51
52 52 To make sure there are engines connected to the controller, users can get a list
53 53 of engine ids:
54 54
55 55 .. sourcecode:: ipython
56 56
57 57 In [3]: rc.ids
58 58 Out[3]: [0, 1, 2, 3]
59 59
60 60 Here we see that there are four engines ready to do work for us.
61 61
62 62 For direct execution, we will make use of a :class:`DirectView` object, which can be
63 63 constructed via list-access to the client:
64 64
65 65 .. sourcecode::
66 66
67 67 In [4]: dview = rc[:] # use all engines
68 68
69 69 .. seealso::
70 70
71 For more information, see the in-depth explanation of :ref:`Views <parallel_view>`.
71 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
72 72
73 73
74 74 Quick and easy parallelism
75 75 ==========================
76 76
77 77 In many cases, you simply want to apply a Python function to a sequence of
78 78 objects, but *in parallel*. The client interface provides a simple way
79 79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
80 80
81 81 Parallel map
82 82 ------------
83 83
84 84 Python's builtin :func:`map` functions allows a function to be applied to a
85 85 sequence element-by-element. This type of code is typically trivial to
86 86 parallelize. In fact, since IPython's interface is all about functions anyway,
87 87 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
88 88 DirectView's :meth:`map` method:
89 89
90 90 .. sourcecode:: ipython
91 91
92 92 In [62]: serial_result = map(lambda x:x**10, range(32))
93 93
94 94 In [63]: dview.block = True
95 95
96 96 In [66]: parallel_result = dview.map(lambda x: x**10, range(32))
97 97
98 98 In [67]: serial_result==parallel_result
99 99 Out[67]: True
100 100
101 101
102 102 .. note::
103 103
104 104 The :class:`DirectView`'s version of :meth:`map` does
105 105 not do dynamic load balancing. For a load balanced version, use a
106 106 :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
107 107 `balanced=True`.
108 108
109 109 .. seealso::
110 110
111 111 :meth:`map` is implemented via :class:`ParallelFunction`.
112 112
113 113 Remote function decorators
114 114 --------------------------
115 115
116 116 Remote functions are just like normal functions, but when they are called,
117 117 they execute on one or more engines, rather than locally. IPython provides
118 118 two decorators:
119 119
120 120 .. sourcecode:: ipython
121 121
122 122 In [10]: @rc.remote(block=True, targets='all')
123 123 ...: def getpid():
124 124 ...: import os
125 125 ...: return os.getpid()
126 126 ...:
127 127
128 128 In [11]: getpid()
129 129 Out[11]: [12345, 12346, 12347, 12348]
130 130
131 131 A ``@parallel`` decorator creates parallel functions, that break up an element-wise
132 132 operations and distribute them, reconstructing the result.
133 133
134 134 .. sourcecode:: ipython
135 135
136 136 In [12]: import numpy as np
137 137
138 138 In [13]: A = np.random.random((64,48))
139 139
140 140 In [14]: @rc.parallel(block=True, targets='all')
141 141 ...: def pmul(A,B):
142 142 ...: return A*B
143 143
144 144 In [15]: C_local = A*A
145 145
146 146 In [16]: C_remote_partial = pmul(A,A)
147 147
148 148 In [17]: (C_local == C_remote).all()
149 149 Out[17]: True
150 150
151 151 .. seealso::
152 152
153 153 See the docstrings for the :func:`parallel` and :func:`remote` decorators for
154 154 options.
155 155
156 156 Calling Python functions
157 157 ========================
158 158
159 159 The most basic type of operation that can be performed on the engines is to
160 160 execute Python code or call Python functions. Executing Python code can be
161 161 done in blocking or non-blocking mode (non-blocking is default) using the
162 162 :meth:`execute` method, and calling functions can be done via the
163 163 :meth:`.View.apply` method.
164 164
165 165 apply
166 166 -----
167 167
168 168 The main method for doing remote execution (in fact, all methods that
169 169 communicate with the engines are built on top of it), is :meth:`Client.apply`.
170 170 Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``,
171 171 which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients`
172 172 require some more options, they cannot easily provide this interface.
173 173 Instead, they provide the signature:
174 174
175 175 .. sourcecode:: python
176 176
177 177 c.apply(f, args=None, kwargs=None, bound=True, block=None, targets=None,
178 178 after=None, follow=None, timeout=None)
179 179
180 180 Where various behavior is controlled via keyword arguments. This means that in the client,
181 181 you must pass `args` as a tuple, and `kwargs` as a dict.
182 182
183 183 In order to provide the nicer interface, we have :class:`View` classes, which wrap
184 184 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
185 185 the extra keyword arguments. This means that the views can have the desired pattern:
186 186
187 187 .. sourcecode:: python
188 188
189 189 v.apply(f, *args, **kwargs)
190 190
191 191
192 192 For instance, performing index-access on a client creates a
193 193 :class:`.DirectView`.
194 194
195 195 .. sourcecode:: ipython
196 196
197 197 In [4]: view = rc[1:3]
198 198 Out[4]: <DirectView [1, 2]>
199 199
200 200 In [5]: view.apply<tab>
201 201 view.apply view.apply_async view.apply_async_bound view.apply_sync view.apply_sync_bound
202 202
203 203 A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound`
204 204 and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
205 205 methods allow specifying `bound` and `block` via the different methods.
206 206
207 207 ================== ========== ==========
208 208 method block bound
209 209 ================== ========== ==========
210 210 apply self.block self.bound
211 211 apply_sync True False
212 212 apply_async False False
213 213 apply_sync_bound True True
214 214 apply_async_bound False True
215 215 ================== ========== ==========
216 216
217 217 For explanation of these values, read on.
218 218
219 219 Blocking execution
220 220 ------------------
221 221
222 222 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
223 223 these examples) submits the command to the controller, which places the
224 224 command in the engines' queues for execution. The :meth:`apply` call then
225 225 blocks until the engines are done executing the command:
226 226
227 227 .. sourcecode:: ipython
228 228
229 229 In [2]: dview = rc[:] # A DirectView of all engines
230 230 In [3]: dview.block=True
231 231 In [4]: dview['a'] = 5
232 232
233 233 In [5]: dview['b'] = 10
234 234
235 In [6]: dview.apply_bound(lambda x: a+b+x, 27)
236 Out[6]: [42, 42, 42, 42]%exit
235 In [6]: dview.apply_sync(lambda x: a+b+x, 27)
236 Out[6]: [42, 42, 42, 42]
237 237
238 238 Python commands can be executed on specific engines by calling execute using the ``targets``
239 239 keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by
240 240 index-access to the client:
241 241
242 242 .. sourcecode:: ipython
243 243
244 244 In [6]: rc.execute('c=a+b', targets=[0,2])
245 245
246 246 In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
247 247
248 248 In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all')
249 249 Out[8]: [15, -5, 15, -5]
250 250
251 251 .. note::
252 252
253 253 Note that every call to ``rc.<meth>(...,targets=x)`` can be made via
254 254 ``rc[<x>].<meth>(...)``, which constructs a View object. The only place
255 255 where this differs in in :meth:`apply`. The :class:`Client` takes many
256 256 arguments to apply, so it requires `args` and `kwargs` to be passed as
257 257 individual arguments. Extended options such as `bound`,`targets`, and
258 258 `block` are controlled by the attributes of the :class:`View` objects, so
259 259 they can provide the much more convenient
260 260 :meth:`View.apply(f,*args,**kwargs)`, which simply calls
261 261 ``f(*args,**kwargs)`` remotely.
262 262
263 263 Bound and unbound execution
264 264 ---------------------------
265 265
266 266 The previous example also shows one of the most important things about the IPython
267 267 engines: they have a persistent user namespaces. The :meth:`apply` method can
268 be run in either a bound or unbound manner:
268 be run in either a bound or unbound manner.
269
270 When applying a function in a `bound` manner, the first argument to that function
271 will be the Engine's namespace, which is a :class:`Namespace` object, a dictionary
272 also providing attribute-access to keys.
273
274 In all (unbound and bound) execution
269 275
270 276 .. sourcecode:: ipython
271 277
272 278 In [9]: dview['b'] = 5 # assign b to 5 everywhere
273 279
274 280 In [10]: v0 = rc[0]
275 281
276 In [12]: v0.apply_sync_bound(lambda : b)
277 Out[12]: 5
282 # multiply b*2 inplace
283 In [12]: v0.apply_sync_bound(lambda ns: ns.b*=2)
278 284
279 In [13]: v0.apply_sync(lambda : b)
280 ---------------------------------------------------------------------------
281 RemoteError Traceback (most recent call last)
282 /home/you/<ipython-input-34-21a468eb10f0> in <module>()
283 ----> 1 v0.apply(lambda : b)
284 ...
285 RemoteError: NameError(global name 'b' is not defined)
286 Traceback (most recent call last):
287 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 294, in apply_request
288 exec code in working, working
289 File "<string>", line 1, in <module>
290 File "<ipython-input-34-21a468eb10f0>", line 1, in <lambda>
291 NameError: global name 'b' is not defined
292
293
294 Specifically, `bound=True` specifies that the engine's namespace is to be used
295 as the `globals` when the function is called, and `bound=False` specifies that
296 the engine's namespace is not to be used (hence, 'b' is undefined during unbound
297 execution, since the function is called in an empty namespace). Unbound execution is
298 often useful for large numbers of atomic tasks, which prevents bloating the engine's
299 memory, while bound execution lets you build on your previous work.
285 # b is still available in globals during unbound execution
286 In [13]: v0.apply_sync(lambda a: a*b, 3)
287 Out[13]: 30
300 288
289 `bound=True` specifies that the engine's namespace is to be passed as the first argument when
290 the function is called, and the default `bound=False` specifies that the normal behavior, but
291 the engine's namespace will be available as the globals() when the function is called.
301 292
302 293 Non-blocking execution
303 294 ----------------------
304 295
305 296 In non-blocking mode, :meth:`apply` submits the command to be executed and
306 297 then returns a :class:`AsyncResult` object immediately. The
307 298 :class:`AsyncResult` object gives you a way of getting a result at a later
308 299 time through its :meth:`get` method.
309 300
310 301 .. Note::
311 302
312 303 The :class:`AsyncResult` object provides a superset of the interface in
313 304 :py:class:`multiprocessing.pool.AsyncResult`. See the
314 305 `official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_
315 306 for more.
316 307
317 308
318 309 This allows you to quickly submit long running commands without blocking your
319 310 local Python/IPython session:
320 311
321 312 .. sourcecode:: ipython
322 313
323 314 # define our function
324 315 In [6]: def wait(t):
325 316 ...: import time
326 317 ...: tic = time.time()
327 318 ...: time.sleep(t)
328 319 ...: return time.time()-tic
329 320
330 321 # In non-blocking mode
331 322 In [7]: ar = dview.apply_async(wait, 2)
332 323
333 324 # Now block for the result
334 325 In [8]: ar.get()
335 326 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
336 327
337 328 # Again in non-blocking mode
338 329 In [9]: ar = dview.apply_async(wait, 10)
339 330
340 331 # Poll to see if the result is ready
341 332 In [10]: ar.ready()
342 333 Out[10]: False
343 334
344 335 # ask for the result, but wait a maximum of 1 second:
345 336 In [45]: ar.get(1)
346 337 ---------------------------------------------------------------------------
347 338 TimeoutError Traceback (most recent call last)
348 339 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
349 340 ----> 1 ar.get(1)
350 341
351 342 /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
352 343 62 raise self._exception
353 344 63 else:
354 345 ---> 64 raise error.TimeoutError("Result not ready.")
355 346 65
356 347 66 def ready(self):
357 348
358 349 TimeoutError: Result not ready.
359 350
360 351 .. Note::
361 352
362 353 Note the import inside the function. This is a common model, to ensure
363 354 that the appropriate modules are imported where the task is run.
364 355
365 356 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
366 357 are done. For this, there is a the method :meth:`barrier`. This method takes a
367 358 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
368 359 and blocks until all of the associated results are ready:
369 360
370 361 .. sourcecode:: ipython
371 362
372 363 In [72]: rc.block=False
373 364
374 365 # A trivial list of AsyncResults objects
375 366 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
376 367
377 368 # Wait until all of them are done
378 369 In [74]: rc.barrier(pr_list)
379 370
380 371 # Then, their results are ready using get() or the `.r` attribute
381 372 In [75]: pr_list[0].get()
382 373 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
383 374
384 375
385 376
386 377 The ``block`` keyword argument and attributes
387 378 ---------------------------------------------
388 379
389 380 Most client methods(like :meth:`apply`) accept
390 381 ``block`` as a keyword argument. As we have seen above, these
391 382 keyword arguments control the blocking mode. The :class:`Client` class also has
392 383 a :attr:`block` attribute that controls the default behavior when the keyword
393 384 argument is not provided. Thus the following logic is used for :attr:`block`:
394 385
395 386 * If no keyword argument is provided, the instance attributes are used.
396 387 * Keyword argument, if provided override the instance attributes for
397 388 the duration of a single call.
398 389
399 390 DirectView objects also have a ``bound`` attribute, which is used in the same way.
400 391
401 392 The following examples demonstrate how to use the instance attributes:
402 393
403 394 .. sourcecode:: ipython
404 395
405 396 In [17]: rc.block = False
406 397
407 398 In [18]: ar = rc.apply(lambda : 10, targets=[0,2])
408 399
409 400 In [19]: ar.get()
410 401 Out[19]: [10,10]
411 402
412 403 In [21]: rc.block = True
413 404
414 405 # Note targets='all' means all engines
415 406 In [22]: rc.apply(lambda : 42, targets='all')
416 407 Out[22]: [42, 42, 42, 42]
417 408
418 409 The :attr:`block`, :attr:`bound`, and :attr:`targets` instance attributes of the
419 410 :class:`.DirectView` also determine the behavior of the parallel magic commands.
420 411
421 412
422 413 Parallel magic commands
423 414 -----------------------
424 415
425 416 .. warning::
426 417
427 418 The magics have not been changed to work with the zeromq system. ``%px``
428 419 and ``%autopx`` do work, but ``%result`` does not. %px and %autopx *do
429 420 not* print stdin/out.
430 421
431 422 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
432 423 that make it more pleasant to execute Python commands on the engines
433 424 interactively. These are simply shortcuts to :meth:`execute` and
434 425 :meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single
435 426 Python command on the engines specified by the :attr:`targets` attribute of the
436 427 :class:`DirectView` instance:
437 428
438 429 .. sourcecode:: ipython
439 430
440 431 # Create a DirectView for all targets
441 432 In [22]: dv = rc[:]
442 433
443 434 # Make this DirectView active for parallel magic commands
444 435 In [23]: dv.activate()
445 436
446 437 In [24]: dv.block=True
447 438
448 439 In [25]: import numpy
449 440
450 441 In [26]: %px import numpy
451 442 Parallel execution on engines: [0, 1, 2, 3]
452 443
453 444 In [27]: %px a = numpy.random.rand(2,2)
454 445 Parallel execution on engines: [0, 1, 2, 3]
455 446
456 447 In [28]: %px ev = numpy.linalg.eigvals(a)
457 448 Parallel execution on engines: [0, 1, 2, 3]
458 449
459 450 In [28]: dv['ev']
460 451 Out[28]: [ array([ 1.09522024, -0.09645227]),
461 452 array([ 1.21435496, -0.35546712]),
462 453 array([ 0.72180653, 0.07133042]),
463 454 array([ 1.46384341e+00, 1.04353244e-04])
464 455 ]
465 456
466 457 The ``%result`` magic gets the most recent result, or takes an argument
467 458 specifying the index of the result to be requested. It is simply a shortcut to the
468 459 :meth:`get_result` method:
469 460
470 461 .. sourcecode:: ipython
471 462
472 In [29]: dv.apply_async_bound(lambda : ev)
463 In [29]: dv.apply_async(lambda : ev)
473 464
474 465 In [30]: %result
475 466 Out[30]: [ [ 1.28167017 0.14197338],
476 467 [-0.14093616 1.27877273],
477 468 [-0.37023573 1.06779409],
478 469 [ 0.83664764 -0.25602658] ]
479 470
480 471 The ``%autopx`` magic switches to a mode where everything you type is executed
481 472 on the engines given by the :attr:`targets` attribute:
482 473
483 474 .. sourcecode:: ipython
484 475
485 476 In [30]: dv.block=False
486 477
487 478 In [31]: %autopx
488 479 Auto Parallel Enabled
489 480 Type %autopx to disable
490 481
491 482 In [32]: max_evals = []
492 483 <IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17b8a70>
493 484
494 485 In [33]: for i in range(100):
495 486 ....: a = numpy.random.rand(10,10)
496 487 ....: a = a+a.transpose()
497 488 ....: evals = numpy.linalg.eigvals(a)
498 489 ....: max_evals.append(evals[0].real)
499 490 ....:
500 491 ....:
501 492 <IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17af8f0>
502 493
503 494 In [34]: %autopx
504 495 Auto Parallel Disabled
505 496
506 497 In [35]: dv.block=True
507 498
508 499 In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals))
509 500 Parallel execution on engines: [0, 1, 2, 3]
510 501
511 502 In [37]: dv['ans']
512 503 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
513 504 'Average max eigenvalue is: 10.2076902286',
514 505 'Average max eigenvalue is: 10.1891484655',
515 506 'Average max eigenvalue is: 10.1158837784',]
516 507
517 508
518 509 Moving Python objects around
519 510 ============================
520 511
521 512 In addition to calling functions and executing code on engines, you can
522 513 transfer Python objects to and from your IPython session and the engines. In
523 514 IPython, these operations are called :meth:`push` (sending an object to the
524 515 engines) and :meth:`pull` (getting an object from the engines).
525 516
526 517 Basic push and pull
527 518 -------------------
528 519
529 520 Here are some examples of how you use :meth:`push` and :meth:`pull`:
530 521
531 522 .. sourcecode:: ipython
532 523
533 524 In [38]: rc.push(dict(a=1.03234,b=3453))
534 525 Out[38]: [None,None,None,None]
535 526
536 527 In [39]: rc.pull('a')
537 528 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
538 529
539 530 In [40]: rc.pull('b',targets=0)
540 531 Out[40]: 3453
541 532
542 533 In [41]: rc.pull(('a','b'))
543 534 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
544 535
545 536 # zmq client does not have zip_pull
546 537 In [42]: rc.zip_pull(('a','b'))
547 538 Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)]
548 539
549 540 In [43]: rc.push(dict(c='speed'))
550 541 Out[43]: [None,None,None,None]
551 542
552 543 In non-blocking mode :meth:`push` and :meth:`pull` also return
553 544 :class:`AsyncResult` objects:
554 545
555 546 .. sourcecode:: ipython
556 547
557 548 In [47]: rc.block=False
558 549
559 550 In [48]: ar = rc.pull('a')
560 551
561 552 In [49]: ar.get()
562 553 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
563 554
564 555
565 556 Dictionary interface
566 557 --------------------
567 558
568 559 Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide
569 560 dictionary-style access by key and methods such as :meth:`get` and
570 561 :meth:`update` for convenience. This make the remote namespaces of the engines
571 562 appear as a local dictionary. Underneath, these methods call :meth:`apply`:
572 563
573 564 .. sourcecode:: ipython
574 565
575 566 In [50]: dview.block=True
576 567
577 568 In [51]: dview['a']=['foo','bar']
578 569
579 570 In [52]: dview['a']
580 571 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
581 572
582 573 Scatter and gather
583 574 ------------------
584 575
585 576 Sometimes it is useful to partition a sequence and push the partitions to
586 577 different engines. In MPI language, this is know as scatter/gather and we
587 578 follow that terminology. However, it is important to remember that in
588 579 IPython's :class:`Client` class, :meth:`scatter` is from the
589 580 interactive IPython session to the engines and :meth:`gather` is from the
590 581 engines back to the interactive IPython session. For scatter/gather operations
591 582 between engines, MPI should be used:
592 583
593 584 .. sourcecode:: ipython
594 585
595 586 In [58]: dview.scatter('a',range(16))
596 587 Out[58]: [None,None,None,None]
597 588
598 589 In [59]: dview['a']
599 590 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
600 591
601 592 In [60]: dview.gather('a')
602 593 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
603 594
604 595 Other things to look at
605 596 =======================
606 597
607 598 How to do parallel list comprehensions
608 599 --------------------------------------
609 600
610 601 In many cases list comprehensions are nicer than using the map function. While
611 602 we don't have fully parallel list comprehensions, it is simple to get the
612 603 basic effect using :meth:`scatter` and :meth:`gather`:
613 604
614 605 .. sourcecode:: ipython
615 606
616 607 In [66]: dview.scatter('x',range(64))
617 608
618 609 In [67]: px y = [i**10 for i in x]
619 610 Parallel execution on engines: [0, 1, 2, 3]
620 611 Out[67]:
621 612
622 613 In [68]: y = dview.gather('y')
623 614
624 615 In [69]: print y
625 616 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
626 617
627 618 Parallel exceptions
628 619 -------------------
629 620
630 621 In the multiengine interface, parallel commands can raise Python exceptions,
631 622 just like serial commands. But, it is a little subtle, because a single
632 623 parallel command can actually raise multiple exceptions (one for each engine
633 624 the command was run on). To express this idea, we have a
634 625 :exc:`CompositeError` exception class that will be raised in most cases. The
635 626 :exc:`CompositeError` class is a special type of exception that wraps one or
636 627 more other types of exceptions. Here is how it works:
637 628
638 629 .. sourcecode:: ipython
639 630
640 631 In [76]: dview.block=True
641 632
642 633 In [77]: dview.execute('1/0')
643 634 ---------------------------------------------------------------------------
644 635 CompositeError Traceback (most recent call last)
645 636 /Users/minrk/<ipython-input-10-5d56b303a66c> in <module>()
646 637 ----> 1 dview.execute('1/0')
647 638
648 639 ...
649 640
650 641 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in apply(self, f, args, kwargs, bound, block, targets, balanced, after, follow, timeout)
651 642 1012 raise ValueError(msg)
652 643 1013 else:
653 644 -> 1014 return self._apply_direct(f, args, kwargs, **options)
654 645 1015
655 646 1016 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
656 647
657 648 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in _apply_direct(self, f, args, kwargs, bound, block, targets)
658 649 1100 if block:
659 650 1101 try:
660 651 -> 1102 return ar.get()
661 652 1103 except KeyboardInterrupt:
662 653 1104 return ar
663 654
664 655 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
665 656 78 return self._result
666 657 79 else:
667 658 ---> 80 raise self._exception
668 659 81 else:
669 660 82 raise error.TimeoutError("Result not ready.")
670 661
671 662 CompositeError: one or more exceptions from call to method: _execute
672 663 [0:apply]: ZeroDivisionError: integer division or modulo by zero
673 664 [1:apply]: ZeroDivisionError: integer division or modulo by zero
674 665 [2:apply]: ZeroDivisionError: integer division or modulo by zero
675 666 [3:apply]: ZeroDivisionError: integer division or modulo by zero
676 667
677 668 Notice how the error message printed when :exc:`CompositeError` is raised has
678 669 information about the individual exceptions that were raised on each engine.
679 670 If you want, you can even raise one of these original exceptions:
680 671
681 672 .. sourcecode:: ipython
682 673
683 674 In [80]: try:
684 675 ....: rc.execute('1/0')
685 676 ....: except client.CompositeError, e:
686 677 ....: e.raise_exception()
687 678 ....:
688 679 ....:
689 680 ---------------------------------------------------------------------------
690 681 ZeroDivisionError Traceback (most recent call last)
691 682
692 683 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
693 684
694 685 /ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid)
695 686 156 raise IndexError("an exception with index %i does not exist"%excid)
696 687 157 else:
697 688 --> 158 raise et, ev, etb
698 689 159
699 690 160 def collect_exceptions(rlist, method):
700 691
701 692 ZeroDivisionError: integer division or modulo by zero
702 693
703 694 If you are working in IPython, you can simple type ``%debug`` after one of
704 695 these :exc:`CompositeError` exceptions is raised, and inspect the exception
705 696 instance:
706 697
707 698 .. sourcecode:: ipython
708 699
709 700 In [81]: rc.execute('1/0')
710 701 ---------------------------------------------------------------------------
711 702 CompositeError Traceback (most recent call last)
712 703 /Users/minrk/<ipython-input-5-b0c7a2b62c52> in <module>()
713 704 ----> 1 rc.execute('1/0')
714 705
715 706 /Users/minrk/<string> in execute(self, code, targets, block)
716 707
717 708 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in defaultblock(f, self, *args, **kwargs)
718 709 88 self.block = block
719 710 89 try:
720 711 ---> 90 ret = f(self, *args, **kwargs)
721 712 91 finally:
722 713 92 self.block = saveblock
723 714
724 715 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in execute(self, code, targets, block)
725 716 855 default: self.block
726 717 856 """
727 718 --> 857 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
728 719 858 if not block:
729 720 859 return result
730 721
731 722 /Users/minrk/<string> in apply(self, f, args, kwargs, bound, block, targets, balanced, after, follow, timeout)
732 723
733 724 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in defaultblock(f, self, *args, **kwargs)
734 725 88 self.block = block
735 726 89 try:
736 727 ---> 90 ret = f(self, *args, **kwargs)
737 728 91 finally:
738 729 92 self.block = saveblock
739 730
740 731 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in apply(self, f, args, kwargs, bound, block, targets, balanced, after, follow, timeout)
741 732 1012 raise ValueError(msg)
742 733 1013 else:
743 734 -> 1014 return self._apply_direct(f, args, kwargs, **options)
744 735 1015
745 736 1016 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
746 737
747 738 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in _apply_direct(self, f, args, kwargs, bound, block, targets)
748 739 1100 if block:
749 740 1101 try:
750 741 -> 1102 return ar.get()
751 742 1103 except KeyboardInterrupt:
752 743 1104 return ar
753 744
754 745 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
755 746 78 return self._result
756 747 79 else:
757 748 ---> 80 raise self._exception
758 749 81 else:
759 750 82 raise error.TimeoutError("Result not ready.")
760 751
761 752 CompositeError: one or more exceptions from call to method: _execute
762 753 [0:apply]: ZeroDivisionError: integer division or modulo by zero
763 754 [1:apply]: ZeroDivisionError: integer division or modulo by zero
764 755 [2:apply]: ZeroDivisionError: integer division or modulo by zero
765 756 [3:apply]: ZeroDivisionError: integer division or modulo by zero
766 757
767 758 In [82]: %debug
768 759 > /Users/minrk/dev/ip/mine/IPython/zmq/parallel/asyncresult.py(80)get()
769 760 79 else:
770 761 ---> 80 raise self._exception
771 762 81 else:
772 763
773 764
774 765 # With the debugger running, e is the exceptions instance. We can tab complete
775 766 # on it and see the extra methods that are available.
776 767 ipdb> e.
777 768 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
778 769 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
779 770 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
780 771 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
781 772 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
782 773 ipdb> e.print_tracebacks()
783 774 [0:apply]:
784 775 Traceback (most recent call last):
785 776 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 332, in apply_request
786 777 exec code in working, working
787 778 File "<string>", line 1, in <module>
788 779 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.py", line 69, in _execute
789 780 exec code in globals()
790 781 File "<string>", line 1, in <module>
791 782 ZeroDivisionError: integer division or modulo by zero
792 783
793 784
794 785 [1:apply]:
795 786 Traceback (most recent call last):
796 787 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 332, in apply_request
797 788 exec code in working, working
798 789 File "<string>", line 1, in <module>
799 790 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.py", line 69, in _execute
800 791 exec code in globals()
801 792 File "<string>", line 1, in <module>
802 793 ZeroDivisionError: integer division or modulo by zero
803 794
804 795
805 796 [2:apply]:
806 797 Traceback (most recent call last):
807 798 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 332, in apply_request
808 799 exec code in working, working
809 800 File "<string>", line 1, in <module>
810 801 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.py", line 69, in _execute
811 802 exec code in globals()
812 803 File "<string>", line 1, in <module>
813 804 ZeroDivisionError: integer division or modulo by zero
814 805
815 806
816 807 [3:apply]:
817 808 Traceback (most recent call last):
818 809 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 332, in apply_request
819 810 exec code in working, working
820 811 File "<string>", line 1, in <module>
821 812 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.py", line 69, in _execute
822 813 exec code in globals()
823 814 File "<string>", line 1, in <module>
824 815 ZeroDivisionError: integer division or modulo by zero
825 816
826 817
827 818
828 819 All of this same error handling magic even works in non-blocking mode:
829 820
830 821 .. sourcecode:: ipython
831 822
832 823 In [83]: rc.block=False
833 824
834 825 In [84]: ar = rc.execute('1/0')
835 826
836 827 In [85]: ar.get()
837 828 ---------------------------------------------------------------------------
838 829 CompositeError Traceback (most recent call last)
839 830 /Users/minrk/<ipython-input-3-8531eb3d26fb> in <module>()
840 831 ----> 1 ar.get()
841 832
842 833 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
843 834 78 return self._result
844 835 79 else:
845 836 ---> 80 raise self._exception
846 837 81 else:
847 838 82 raise error.TimeoutError("Result not ready.")
848 839
849 840 CompositeError: one or more exceptions from call to method: _execute
850 841 [0:apply]: ZeroDivisionError: integer division or modulo by zero
851 842 [1:apply]: ZeroDivisionError: integer division or modulo by zero
852 843 [2:apply]: ZeroDivisionError: integer division or modulo by zero
853 844 [3:apply]: ZeroDivisionError: integer division or modulo by zero
854 845
@@ -1,396 +1,405 b''
1 1 .. _paralleltask:
2 2
3 3 ==========================
4 4 The IPython task interface
5 5 ==========================
6 6
7 7 The task interface to the cluster presents the engines as a fault tolerant,
8 8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
9 9 the task interface the user have no direct access to individual engines. By
10 10 allowing the IPython scheduler to assign work, this interface is simultaneously
11 11 simpler and more powerful.
12 12
13 13 Best of all, the user can use both of these interfaces running at the same time
14 14 to take advantage of their respective strengths. When the user can break up
15 15 the user's work into segments that do not depend on previous execution, the
16 16 task interface is ideal. But it also has more power and flexibility, allowing
17 17 the user to guide the distribution of jobs, without having to assign tasks to
18 18 engines explicitly.
19 19
20 20 Starting the IPython controller and engines
21 21 ===========================================
22 22
23 23 To follow along with this tutorial, you will need to start the IPython
24 24 controller and four IPython engines. The simplest way of doing this is to use
25 25 the :command:`ipclusterz` command::
26 26
27 27 $ ipclusterz start -n 4
28 28
29 29 For more detailed information about starting the controller and engines, see
30 30 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
31 31
32 32 Creating a ``Client`` instance
33 33 ==============================
34 34
35 35 The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
36 36 module and then create a :class:`.Client` instance, and we will also be using
37 37 a :class:`LoadBalancedView`, here called `lview`:
38 38
39 39 .. sourcecode:: ipython
40 40
41 41 In [1]: from IPython.zmq.parallel import client
42 42
43 43 In [2]: rc = client.Client()
44 44
45 45 In [3]: lview = rc.view()
46 46
47 47
48 48 This form assumes that the controller was started on localhost with default
49 49 configuration. If not, the location of the controller must be given as an
50 50 argument to the constructor:
51 51
52 52 .. sourcecode:: ipython
53 53
54 54 # for a visible LAN controller listening on an external port:
55 55 In [2]: rc = client.Client('tcp://192.168.1.16:10101')
56 56 # for a remote controller at my.server.com listening on localhost:
57 57 In [3]: rc = client.Client(sshserver='my.server.com')
58 58
59 59
60 60
61 61 Quick and easy parallelism
62 62 ==========================
63 63
64 64 In many cases, you simply want to apply a Python function to a sequence of
65 65 objects, but *in parallel*. Like the multiengine interface, these can be
66 66 implemented via the task interface. The exact same tools can perform these
67 67 actions in load-balanced ways as well as multiplexed ways: a parallel version
68 68 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
69 69 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
70 70 execution time per item varies significantly, you should use the versions in
71 71 the task interface.
72 72
73 73 Parallel map
74 74 ------------
75 75
76 76 To load-balance :meth:`map`,simply use a LoadBalancedView:
77 77
78 78 .. sourcecode:: ipython
79 79
80 80 In [62]: lview.block = True
81 81
82 82 In [63]: serial_result = map(lambda x:x**10, range(32))
83 83
84 84 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
85 85
86 86 In [65]: serial_result==parallel_result
87 87 Out[65]: True
88 88
89 89 Parallel function decorator
90 90 ---------------------------
91 91
92 92 Parallel functions are just like normal function, but they can be called on
93 93 sequences and *in parallel*. The multiengine interface provides a decorator
94 94 that turns any Python function into a parallel function:
95 95
96 96 .. sourcecode:: ipython
97 97
98 98 In [10]: @lview.parallel()
99 99 ....: def f(x):
100 100 ....: return 10.0*x**4
101 101 ....:
102 102
103 103 In [11]: f.map(range(32)) # this is done in parallel
104 104 Out[11]: [0.0,10.0,160.0,...]
105 105
106 106 Dependencies
107 107 ============
108 108
109 109 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
110 110 may want to associate some kind of `Dependency` that describes when, where, or whether
111 111 a task can be run. In IPython, we provide two types of dependencies:
112 112 `Functional Dependencies`_ and `Graph Dependencies`_
113 113
114 114 .. note::
115 115
116 116 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
117 117 and you will see errors or warnings if you try to use dependencies with the pure
118 118 scheduler.
119 119
120 120 Functional Dependencies
121 121 -----------------------
122 122
123 123 Functional dependencies are used to determine whether a given engine is capable of running
124 124 a particular task. This is implemented via a special :class:`Exception` class,
125 125 :class:`UnmetDependency`, found in `IPython.zmq.parallel.error`. Its use is very simple:
126 126 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
127 127 the error up to the client like any other error, catches the error, and submits the task
128 128 to a different engine. This will repeat indefinitely, and a task will never be submitted
129 129 to a given engine a second time.
130 130
131 131 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
132 132 some decorators for facilitating this behavior.
133 133
134 134 There are two decorators and a class used for functional dependencies:
135 135
136 136 .. sourcecode:: ipython
137 137
138 138 In [9]: from IPython.zmq.parallel.dependency import depend, require, dependent
139 139
140 140 @require
141 141 ********
142 142
143 143 The simplest sort of dependency is requiring that a Python module is available. The
144 144 ``@require`` decorator lets you define a function that will only run on engines where names
145 145 you specify are importable:
146 146
147 147 .. sourcecode:: ipython
148 148
149 149 In [10]: @require('numpy', 'zmq')
150 150 ...: def myfunc():
151 151 ...: import numpy,zmq
152 152 ...: return dostuff()
153 153
154 154 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
155 155 numpy and pyzmq available.
156 156
157 157 @depend
158 158 *******
159 159
160 160 The ``@depend`` decorator lets you decorate any function with any *other* function to
161 161 evaluate the dependency. The dependency function will be called at the start of the task,
162 162 and if it returns ``False``, then the dependency will be considered unmet, and the task
163 163 will be assigned to another engine. If the dependency returns *anything other than
164 164 ``False``*, the rest of the task will continue.
165 165
166 166 .. sourcecode:: ipython
167 167
168 168 In [10]: def platform_specific(plat):
169 169 ...: import sys
170 170 ...: return sys.platform == plat
171 171
172 172 In [11]: @depend(platform_specific, 'darwin')
173 173 ...: def mactask():
174 174 ...: do_mac_stuff()
175 175
176 176 In [12]: @depend(platform_specific, 'nt')
177 177 ...: def wintask():
178 178 ...: do_windows_stuff()
179 179
180 180 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
181 181 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
182 182 signature.
183 183
184 184 dependents
185 185 **********
186 186
187 187 You don't have to use the decorators on your tasks, if for instance you may want
188 188 to run tasks with a single function but varying dependencies, you can directly construct
189 189 the :class:`dependent` object that the decorators use:
190 190
191 191 .. sourcecode::ipython
192 192
193 193 In [13]: def mytask(*args):
194 194 ...: dostuff()
195 195
196 196 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
197 197 # this is the same as decorating the declaration of mytask with @depend
198 198 # but you can do it again:
199 199
200 200 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
201 201
202 202 # in general:
203 203 In [16]: t = dependent(f, g, *dargs, **dkwargs)
204 204
205 205 # is equivalent to:
206 206 In [17]: @depend(g, *dargs, **dkwargs)
207 207 ...: def t(a,b,c):
208 208 ...: # contents of f
209 209
210 210 Graph Dependencies
211 211 ------------------
212 212
213 213 Sometimes you want to restrict the time and/or location to run a given task as a function
214 214 of the time and/or location of other tasks. This is implemented via a subclass of
215 215 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
216 216 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
217 217 has been met.
218 218
219 219 The switches we provide for interpreting whether a given dependency set has been met:
220 220
221 221 any|all
222 222 Whether the dependency is considered met if *any* of the dependencies are done, or
223 223 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
224 224 boolean attribute, which defaults to ``True``.
225 225
226 226 success_only
227 227 Whether to consider only tasks that did not raise an error as being fulfilled.
228 228 Sometimes you want to run a task after another, but only if that task succeeded. In
229 229 this case, ``success_only`` should be ``True``. However sometimes you may not care
230 230 whether the task succeeds, and always want the second task to run, in which case
231 231 you should use `success_only=False`. The default behavior is to only use successes.
232 232
233 233 There are other switches for interpretation that are made at the *task* level. These are
234 234 specified via keyword arguments to the client's :meth:`apply` method.
235 235
236 236 after,follow
237 237 You may want to run a task *after* a given set of dependencies have been run and/or
238 238 run it *where* another set of dependencies are met. To support this, every task has an
239 239 `after` dependency to restrict time, and a `follow` dependency to restrict
240 240 destination.
241 241
242 242 timeout
243 243 You may also want to set a time-limit for how long the scheduler should wait before a
244 244 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
245 245 indicates that the task should never timeout. If the timeout is reached, and the
246 246 scheduler still hasn't been able to assign the task to an engine, the task will fail
247 247 with a :class:`DependencyTimeout`.
248 248
249 249 .. note::
250 250
251 251 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
252 252 task to run after a job submitted via the MUX interface.
253 253
254 254 The simplest form of Dependencies is with `all=True,success_only=True`. In these cases,
255 255 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
256 256 `follow` and `after` keywords to :meth:`client.apply`:
257 257
258 258 .. sourcecode:: ipython
259 259
260 260 In [14]: client.block=False
261 261
262 262 In [15]: ar = client.apply(f, args, kwargs, balanced=True)
263 263
264 264 In [16]: ar2 = client.apply(f2, balanced=True)
265 265
266 266 In [17]: ar3 = client.apply(f3, after=[ar,ar2], balanced=True)
267 267
268 268 In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5, balanced=True)
269 269
270 270
271 271 .. seealso::
272 272
273 273 Some parallel workloads can be described as a `Directed Acyclic Graph
274 274 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
275 275 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
276 276 onto task dependencies.
277 277
278 278
279 279
280 280 Impossible Dependencies
281 281 ***********************
282 282
283 283 The schedulers do perform some analysis on graph dependencies to determine whether they
284 284 are not possible to be met. If the scheduler does discover that a dependency cannot be
285 285 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
286 286 scheduler realized that a task can never be run, it won't sit indefinitely in the
287 287 scheduler clogging the pipeline.
288 288
289 289 The basic cases that are checked:
290 290
291 291 * depending on nonexistent messages
292 292 * `follow` dependencies were run on more than one machine and `all=True`
293 293 * any dependencies failed and `all=True,success_only=True`
294 294 * all dependencies failed and `all=False,success_only=True`
295 295
296 296 .. warning::
297 297
298 298 This analysis has not been proven to be rigorous, so it is likely possible for tasks
299 299 to become impossible to run in obscure situations, so a timeout may be a good choice.
300 300
301 .. _parallel_schedulers:
302
301 303 Schedulers
302 304 ==========
303 305
304 306 There are a variety of valid ways to determine where jobs should be assigned in a
305 307 load-balancing situation. In IPython, we support several standard schemes, and
306 308 even make it easy to define your own. The scheme can be selected via the ``--scheme``
307 309 argument to :command:`ipcontrollerz`, or in the :attr:`HubFactory.scheme` attribute
308 310 of a controller config object.
309 311
310 312 The built-in routing schemes:
311 313
314 To select one of these schemes, simply do::
315
316 $ ipcontrollerz --scheme <schemename>
317 for instance:
318 $ ipcontrollerz --scheme lru
319
312 320 lru: Least Recently Used
313 321
314 322 Always assign work to the least-recently-used engine. A close relative of
315 323 round-robin, it will be fair with respect to the number of tasks, agnostic
316 324 with respect to runtime of each task.
317 325
318 326 plainrandom: Plain Random
327
319 328 Randomly picks an engine on which to run.
320 329
321 330 twobin: Two-Bin Random
322 331
323 **Depends on numpy**
332 **Requires numpy**
324 333
325 334 Pick two engines at random, and use the LRU of the two. This is known to be better
326 335 than plain random in many cases, but requires a small amount of computation.
327 336
328 337 leastload: Least Load
329 338
330 339 **This is the default scheme**
331 340
332 341 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
333 342
334 343 weighted: Weighted Two-Bin Random
335 344
336 **Depends on numpy**
345 **Requires numpy**
337 346
338 347 Pick two engines at random using the number of outstanding tasks as inverse weights,
339 348 and use the one with the lower load.
340 349
341 350
342 351 Pure ZMQ Scheduler
343 352 ------------------
344 353
345 354 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
346 355 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all
347 356 load-balancing. This scheduler does not support any of the advanced features of the Python
348 357 :class:`.Scheduler`.
349 358
350 359 Disabled features when using the ZMQ Scheduler:
351 360
352 361 * Engine unregistration
353 362 Task farming will be disabled if an engine unregisters.
354 363 Further, if an engine is unregistered during computation, the scheduler may not recover.
355 364 * Dependencies
356 365 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
357 366 based on message content.
358 367 * Early destination notification
359 368 The Python schedulers know which engine gets which task, and notify the Hub. This
360 369 allows graceful handling of Engines coming and going. There is no way to know
361 370 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
362 371 engine until they *finish*. This makes recovery from engine shutdown very difficult.
363
372
364 373
365 374 .. note::
366 375
367 376 TODO: performance comparisons
368 377
369 378
370 379 More details
371 380 ============
372 381
373 382 The :class:`Client` has many more powerful features that allow quite a bit
374 383 of flexibility in how tasks are defined and run. The next places to look are
375 384 in the following classes:
376 385
377 386 * :class:`IPython.zmq.parallel.client.Client`
378 387 * :class:`IPython.zmq.parallel.client.AsyncResult`
379 388 * :meth:`IPython.zmq.parallel.client.Client.apply`
380 389 * :mod:`IPython.zmq.parallel.dependency`
381 390
382 391 The following is an overview of how to use these classes together:
383 392
384 393 1. Create a :class:`Client`.
385 394 2. Define some functions to be run as tasks
386 395 3. Submit your tasks to using the :meth:`apply` method of your
387 396 :class:`Client` instance, specifying `balanced=True`. This signals
388 397 the :class:`Client` to entrust the Scheduler with assigning tasks to engines.
389 398 4. Use :meth:`Client.get_results` to get the results of the
390 399 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
391 400 for and then receive the results.
392 401
393 402
394 403 .. seealso::
395 404
396 405 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
General Comments 0
You need to be logged in to leave comments. Login now