##// END OF EJS Templates
Client -> HasTraits, update examples with API tweaks
MinRK -
Show More
@@ -1,1328 +1,1378 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import json
14 import json
15 import time
15 import time
16 import warnings
16 import warnings
17 from datetime import datetime
17 from datetime import datetime
18 from getpass import getpass
18 from getpass import getpass
19 from pprint import pprint
19 from pprint import pprint
20
20
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 import zmq
23 import zmq
24 # from zmq.eventloop import ioloop, zmqstream
24 # from zmq.eventloop import ioloop, zmqstream
25
25
26 from IPython.utils.path import get_ipython_dir
26 from IPython.utils.path import get_ipython_dir
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode,
28 Dict, List, Bool, Str, Set)
27 from IPython.external.decorator import decorator
29 from IPython.external.decorator import decorator
28 from IPython.external.ssh import tunnel
30 from IPython.external.ssh import tunnel
29
31
30 import error
32 import error
31 import map as Map
33 import map as Map
32 import streamsession as ss
34 import streamsession as ss
33 from asyncresult import AsyncResult, AsyncMapResult
35 from asyncresult import AsyncResult, AsyncMapResult
34 from clusterdir import ClusterDir, ClusterDirError
36 from clusterdir import ClusterDir, ClusterDirError
35 from dependency import Dependency, depend, require, dependent
37 from dependency import Dependency, depend, require, dependent
36 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
38 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
37 from util import ReverseDict, disambiguate_url, validate_url
39 from util import ReverseDict, disambiguate_url, validate_url
38 from view import DirectView, LoadBalancedView
40 from view import DirectView, LoadBalancedView
39
41
40 #--------------------------------------------------------------------------
42 #--------------------------------------------------------------------------
41 # helpers for implementing old MEC API via client.apply
43 # helpers for implementing old MEC API via client.apply
42 #--------------------------------------------------------------------------
44 #--------------------------------------------------------------------------
43
45
44 def _push(ns):
46 def _push(ns):
45 """helper method for implementing `client.push` via `client.apply`"""
47 """helper method for implementing `client.push` via `client.apply`"""
46 globals().update(ns)
48 globals().update(ns)
47
49
48 def _pull(keys):
50 def _pull(keys):
49 """helper method for implementing `client.pull` via `client.apply`"""
51 """helper method for implementing `client.pull` via `client.apply`"""
50 g = globals()
52 g = globals()
51 if isinstance(keys, (list,tuple, set)):
53 if isinstance(keys, (list,tuple, set)):
52 for key in keys:
54 for key in keys:
53 if not g.has_key(key):
55 if not g.has_key(key):
54 raise NameError("name '%s' is not defined"%key)
56 raise NameError("name '%s' is not defined"%key)
55 return map(g.get, keys)
57 return map(g.get, keys)
56 else:
58 else:
57 if not g.has_key(keys):
59 if not g.has_key(keys):
58 raise NameError("name '%s' is not defined"%keys)
60 raise NameError("name '%s' is not defined"%keys)
59 return g.get(keys)
61 return g.get(keys)
60
62
61 def _clear():
63 def _clear():
62 """helper method for implementing `client.clear` via `client.apply`"""
64 """helper method for implementing `client.clear` via `client.apply`"""
63 globals().clear()
65 globals().clear()
64
66
65 def _execute(code):
67 def _execute(code):
66 """helper method for implementing `client.execute` via `client.apply`"""
68 """helper method for implementing `client.execute` via `client.apply`"""
67 exec code in globals()
69 exec code in globals()
68
70
69
71
70 #--------------------------------------------------------------------------
72 #--------------------------------------------------------------------------
71 # Decorators for Client methods
73 # Decorators for Client methods
72 #--------------------------------------------------------------------------
74 #--------------------------------------------------------------------------
73
75
74 @decorator
76 @decorator
75 def spinfirst(f, self, *args, **kwargs):
77 def spinfirst(f, self, *args, **kwargs):
76 """Call spin() to sync state prior to calling the method."""
78 """Call spin() to sync state prior to calling the method."""
77 self.spin()
79 self.spin()
78 return f(self, *args, **kwargs)
80 return f(self, *args, **kwargs)
79
81
80 @decorator
82 @decorator
81 def defaultblock(f, self, *args, **kwargs):
83 def defaultblock(f, self, *args, **kwargs):
82 """Default to self.block; preserve self.block."""
84 """Default to self.block; preserve self.block."""
83 block = kwargs.get('block',None)
85 block = kwargs.get('block',None)
84 block = self.block if block is None else block
86 block = self.block if block is None else block
85 saveblock = self.block
87 saveblock = self.block
86 self.block = block
88 self.block = block
87 try:
89 try:
88 ret = f(self, *args, **kwargs)
90 ret = f(self, *args, **kwargs)
89 finally:
91 finally:
90 self.block = saveblock
92 self.block = saveblock
91 return ret
93 return ret
92
94
93
95
94 #--------------------------------------------------------------------------
96 #--------------------------------------------------------------------------
95 # Classes
97 # Classes
96 #--------------------------------------------------------------------------
98 #--------------------------------------------------------------------------
97
99
98 class Metadata(dict):
100 class Metadata(dict):
99 """Subclass of dict for initializing metadata values.
101 """Subclass of dict for initializing metadata values.
100
102
101 Attribute access works on keys.
103 Attribute access works on keys.
102
104
103 These objects have a strict set of keys - errors will raise if you try
105 These objects have a strict set of keys - errors will raise if you try
104 to add new keys.
106 to add new keys.
105 """
107 """
106 def __init__(self, *args, **kwargs):
108 def __init__(self, *args, **kwargs):
107 dict.__init__(self)
109 dict.__init__(self)
108 md = {'msg_id' : None,
110 md = {'msg_id' : None,
109 'submitted' : None,
111 'submitted' : None,
110 'started' : None,
112 'started' : None,
111 'completed' : None,
113 'completed' : None,
112 'received' : None,
114 'received' : None,
113 'engine_uuid' : None,
115 'engine_uuid' : None,
114 'engine_id' : None,
116 'engine_id' : None,
115 'follow' : None,
117 'follow' : None,
116 'after' : None,
118 'after' : None,
117 'status' : None,
119 'status' : None,
118
120
119 'pyin' : None,
121 'pyin' : None,
120 'pyout' : None,
122 'pyout' : None,
121 'pyerr' : None,
123 'pyerr' : None,
122 'stdout' : '',
124 'stdout' : '',
123 'stderr' : '',
125 'stderr' : '',
124 }
126 }
125 self.update(md)
127 self.update(md)
126 self.update(dict(*args, **kwargs))
128 self.update(dict(*args, **kwargs))
127
129
128 def __getattr__(self, key):
130 def __getattr__(self, key):
129 """getattr aliased to getitem"""
131 """getattr aliased to getitem"""
130 if key in self.iterkeys():
132 if key in self.iterkeys():
131 return self[key]
133 return self[key]
132 else:
134 else:
133 raise AttributeError(key)
135 raise AttributeError(key)
134
136
135 def __setattr__(self, key, value):
137 def __setattr__(self, key, value):
136 """setattr aliased to setitem, with strict"""
138 """setattr aliased to setitem, with strict"""
137 if key in self.iterkeys():
139 if key in self.iterkeys():
138 self[key] = value
140 self[key] = value
139 else:
141 else:
140 raise AttributeError(key)
142 raise AttributeError(key)
141
143
142 def __setitem__(self, key, value):
144 def __setitem__(self, key, value):
143 """strict static key enforcement"""
145 """strict static key enforcement"""
144 if key in self.iterkeys():
146 if key in self.iterkeys():
145 dict.__setitem__(self, key, value)
147 dict.__setitem__(self, key, value)
146 else:
148 else:
147 raise KeyError(key)
149 raise KeyError(key)
148
150
149
151
150 class Client(object):
152 class Client(HasTraits):
151 """A semi-synchronous client to the IPython ZMQ controller
153 """A semi-synchronous client to the IPython ZMQ controller
152
154
153 Parameters
155 Parameters
154 ----------
156 ----------
155
157
156 url_or_file : bytes; zmq url or path to ipcontroller-client.json
158 url_or_file : bytes; zmq url or path to ipcontroller-client.json
157 Connection information for the Hub's registration. If a json connector
159 Connection information for the Hub's registration. If a json connector
158 file is given, then likely no further configuration is necessary.
160 file is given, then likely no further configuration is necessary.
159 [Default: use profile]
161 [Default: use profile]
160 profile : bytes
162 profile : bytes
161 The name of the Cluster profile to be used to find connector information.
163 The name of the Cluster profile to be used to find connector information.
162 [Default: 'default']
164 [Default: 'default']
163 context : zmq.Context
165 context : zmq.Context
164 Pass an existing zmq.Context instance, otherwise the client will create its own.
166 Pass an existing zmq.Context instance, otherwise the client will create its own.
165 username : bytes
167 username : bytes
166 set username to be passed to the Session object
168 set username to be passed to the Session object
167 debug : bool
169 debug : bool
168 flag for lots of message printing for debug purposes
170 flag for lots of message printing for debug purposes
169
171
170 #-------------- ssh related args ----------------
172 #-------------- ssh related args ----------------
171 # These are args for configuring the ssh tunnel to be used
173 # These are args for configuring the ssh tunnel to be used
172 # credentials are used to forward connections over ssh to the Controller
174 # credentials are used to forward connections over ssh to the Controller
173 # Note that the ip given in `addr` needs to be relative to sshserver
175 # Note that the ip given in `addr` needs to be relative to sshserver
174 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
176 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
175 # and set sshserver as the same machine the Controller is on. However,
177 # and set sshserver as the same machine the Controller is on. However,
176 # the only requirement is that sshserver is able to see the Controller
178 # the only requirement is that sshserver is able to see the Controller
177 # (i.e. is within the same trusted network).
179 # (i.e. is within the same trusted network).
178
180
179 sshserver : str
181 sshserver : str
180 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
182 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
181 If keyfile or password is specified, and this is not, it will default to
183 If keyfile or password is specified, and this is not, it will default to
182 the ip given in addr.
184 the ip given in addr.
183 sshkey : str; path to public ssh key file
185 sshkey : str; path to public ssh key file
184 This specifies a key to be used in ssh login, default None.
186 This specifies a key to be used in ssh login, default None.
185 Regular default ssh keys will be used without specifying this argument.
187 Regular default ssh keys will be used without specifying this argument.
186 password : str
188 password : str
187 Your ssh password to sshserver. Note that if this is left None,
189 Your ssh password to sshserver. Note that if this is left None,
188 you will be prompted for it if passwordless key based login is unavailable.
190 you will be prompted for it if passwordless key based login is unavailable.
189 paramiko : bool
191 paramiko : bool
190 flag for whether to use paramiko instead of shell ssh for tunneling.
192 flag for whether to use paramiko instead of shell ssh for tunneling.
191 [default: True on win32, False else]
193 [default: True on win32, False else]
192
194
193 #------- exec authentication args -------
195 #------- exec authentication args -------
194 # If even localhost is untrusted, you can have some protection against
196 # If even localhost is untrusted, you can have some protection against
195 # unauthorized execution by using a key. Messages are still sent
197 # unauthorized execution by using a key. Messages are still sent
196 # as cleartext, so if someone can snoop your loopback traffic this will
198 # as cleartext, so if someone can snoop your loopback traffic this will
197 # not help against malicious attacks.
199 # not help against malicious attacks.
198
200
199 exec_key : str
201 exec_key : str
200 an authentication key or file containing a key
202 an authentication key or file containing a key
201 default: None
203 default: None
202
204
203
205
204 Attributes
206 Attributes
205 ----------
207 ----------
206
208
207 ids : set of int engine IDs
209 ids : set of int engine IDs
208 requesting the ids attribute always synchronizes
210 requesting the ids attribute always synchronizes
209 the registration state. To request ids without synchronization,
211 the registration state. To request ids without synchronization,
210 use semi-private _ids attributes.
212 use semi-private _ids attributes.
211
213
212 history : list of msg_ids
214 history : list of msg_ids
213 a list of msg_ids, keeping track of all the execution
215 a list of msg_ids, keeping track of all the execution
214 messages you have submitted in order.
216 messages you have submitted in order.
215
217
216 outstanding : set of msg_ids
218 outstanding : set of msg_ids
217 a set of msg_ids that have been submitted, but whose
219 a set of msg_ids that have been submitted, but whose
218 results have not yet been received.
220 results have not yet been received.
219
221
220 results : dict
222 results : dict
221 a dict of all our results, keyed by msg_id
223 a dict of all our results, keyed by msg_id
222
224
223 block : bool
225 block : bool
224 determines default behavior when block not specified
226 determines default behavior when block not specified
225 in execution methods
227 in execution methods
226
228
227 Methods
229 Methods
228 -------
230 -------
229
231
230 spin
232 spin
231 flushes incoming results and registration state changes
233 flushes incoming results and registration state changes
232 control methods spin, and requesting `ids` also ensures up to date
234 control methods spin, and requesting `ids` also ensures up to date
233
235
234 barrier
236 barrier
235 wait on one or more msg_ids
237 wait on one or more msg_ids
236
238
237 execution methods
239 execution methods
238 apply
240 apply
239 legacy: execute, run
241 legacy: execute, run
240
242
241 query methods
243 query methods
242 queue_status, get_result, purge
244 queue_status, get_result, purge
243
245
244 control methods
246 control methods
245 abort, shutdown
247 abort, shutdown
246
248
247 """
249 """
248
250
249
251
250 _connected=False
252 block = Bool(False)
251 _ssh=False
253 outstanding=Set()
252 _engines=None
254 results = Dict()
253 _registration_socket=None
255 metadata = Dict()
254 _query_socket=None
256 history = List()
255 _control_socket=None
257 debug = Bool(False)
256 _iopub_socket=None
258 profile=CUnicode('default')
257 _notification_socket=None
259
258 _mux_socket=None
260 _ids = List()
259 _task_socket=None
261 _connected=Bool(False)
260 _task_scheme=None
262 _ssh=Bool(False)
261 block = False
263 _context = Instance('zmq.Context')
262 outstanding=None
264 _config = Dict()
263 results = None
265 _engines=Instance(ReverseDict, (), {})
264 history = None
266 _registration_socket=Instance('zmq.Socket')
265 debug = False
267 _query_socket=Instance('zmq.Socket')
266 targets = None
268 _control_socket=Instance('zmq.Socket')
269 _iopub_socket=Instance('zmq.Socket')
270 _notification_socket=Instance('zmq.Socket')
271 _mux_socket=Instance('zmq.Socket')
272 _task_socket=Instance('zmq.Socket')
273 _task_scheme=Str()
274 _balanced_views=Dict()
275 _direct_views=Dict()
276 _closed = False
267
277
268 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
278 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
269 context=None, username=None, debug=False, exec_key=None,
279 context=None, username=None, debug=False, exec_key=None,
270 sshserver=None, sshkey=None, password=None, paramiko=None,
280 sshserver=None, sshkey=None, password=None, paramiko=None,
271 ):
281 ):
282 super(Client, self).__init__(debug=debug, profile=profile)
272 if context is None:
283 if context is None:
273 context = zmq.Context()
284 context = zmq.Context()
274 self.context = context
285 self._context = context
286
275
287
276 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
288 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
277 if self._cd is not None:
289 if self._cd is not None:
278 if url_or_file is None:
290 if url_or_file is None:
279 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
291 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
280 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
292 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
281 " Please specify at least one of url_or_file or profile."
293 " Please specify at least one of url_or_file or profile."
282
294
283 try:
295 try:
284 validate_url(url_or_file)
296 validate_url(url_or_file)
285 except AssertionError:
297 except AssertionError:
286 if not os.path.exists(url_or_file):
298 if not os.path.exists(url_or_file):
287 if self._cd:
299 if self._cd:
288 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
300 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
289 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
301 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
290 with open(url_or_file) as f:
302 with open(url_or_file) as f:
291 cfg = json.loads(f.read())
303 cfg = json.loads(f.read())
292 else:
304 else:
293 cfg = {'url':url_or_file}
305 cfg = {'url':url_or_file}
294
306
295 # sync defaults from args, json:
307 # sync defaults from args, json:
296 if sshserver:
308 if sshserver:
297 cfg['ssh'] = sshserver
309 cfg['ssh'] = sshserver
298 if exec_key:
310 if exec_key:
299 cfg['exec_key'] = exec_key
311 cfg['exec_key'] = exec_key
300 exec_key = cfg['exec_key']
312 exec_key = cfg['exec_key']
301 sshserver=cfg['ssh']
313 sshserver=cfg['ssh']
302 url = cfg['url']
314 url = cfg['url']
303 location = cfg.setdefault('location', None)
315 location = cfg.setdefault('location', None)
304 cfg['url'] = disambiguate_url(cfg['url'], location)
316 cfg['url'] = disambiguate_url(cfg['url'], location)
305 url = cfg['url']
317 url = cfg['url']
306
318
307 self._config = cfg
319 self._config = cfg
308
320
309 self._ssh = bool(sshserver or sshkey or password)
321 self._ssh = bool(sshserver or sshkey or password)
310 if self._ssh and sshserver is None:
322 if self._ssh and sshserver is None:
311 # default to ssh via localhost
323 # default to ssh via localhost
312 sshserver = url.split('://')[1].split(':')[0]
324 sshserver = url.split('://')[1].split(':')[0]
313 if self._ssh and password is None:
325 if self._ssh and password is None:
314 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
326 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
315 password=False
327 password=False
316 else:
328 else:
317 password = getpass("SSH Password for %s: "%sshserver)
329 password = getpass("SSH Password for %s: "%sshserver)
318 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
330 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
319 if exec_key is not None and os.path.isfile(exec_key):
331 if exec_key is not None and os.path.isfile(exec_key):
320 arg = 'keyfile'
332 arg = 'keyfile'
321 else:
333 else:
322 arg = 'key'
334 arg = 'key'
323 key_arg = {arg:exec_key}
335 key_arg = {arg:exec_key}
324 if username is None:
336 if username is None:
325 self.session = ss.StreamSession(**key_arg)
337 self.session = ss.StreamSession(**key_arg)
326 else:
338 else:
327 self.session = ss.StreamSession(username, **key_arg)
339 self.session = ss.StreamSession(username, **key_arg)
328 self._registration_socket = self.context.socket(zmq.XREQ)
340 self._registration_socket = self._context.socket(zmq.XREQ)
329 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
341 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
330 if self._ssh:
342 if self._ssh:
331 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
343 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
332 else:
344 else:
333 self._registration_socket.connect(url)
345 self._registration_socket.connect(url)
334 self._engines = ReverseDict()
346
335 self._ids = []
347 self.session.debug = self.debug
336 self.outstanding=set()
337 self.results = {}
338 self.metadata = {}
339 self.history = []
340 self.debug = debug
341 self.session.debug = debug
342
348
343 self._notification_handlers = {'registration_notification' : self._register_engine,
349 self._notification_handlers = {'registration_notification' : self._register_engine,
344 'unregistration_notification' : self._unregister_engine,
350 'unregistration_notification' : self._unregister_engine,
345 }
351 }
346 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
352 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
347 'apply_reply' : self._handle_apply_reply}
353 'apply_reply' : self._handle_apply_reply}
348 self._connect(sshserver, ssh_kwargs)
354 self._connect(sshserver, ssh_kwargs)
349
355
350
356
351 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
357 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
352 if ipython_dir is None:
358 if ipython_dir is None:
353 ipython_dir = get_ipython_dir()
359 ipython_dir = get_ipython_dir()
354 if cluster_dir is not None:
360 if cluster_dir is not None:
355 try:
361 try:
356 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
362 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
357 except ClusterDirError:
363 except ClusterDirError:
358 pass
364 pass
359 elif profile is not None:
365 elif profile is not None:
360 try:
366 try:
361 self._cd = ClusterDir.find_cluster_dir_by_profile(
367 self._cd = ClusterDir.find_cluster_dir_by_profile(
362 ipython_dir, profile)
368 ipython_dir, profile)
363 except ClusterDirError:
369 except ClusterDirError:
364 pass
370 pass
365 else:
371 else:
366 self._cd = None
372 self._cd = None
367
373
368 @property
374 @property
369 def ids(self):
375 def ids(self):
370 """Always up-to-date ids property."""
376 """Always up-to-date ids property."""
371 self._flush_notifications()
377 self._flush_notifications()
372 return self._ids
378 return self._ids
379
380 def close(self):
381 if self._closed:
382 return
383 snames = filter(lambda n: n.endswith('socket'), dir(self))
384 for socket in map(lambda name: getattr(self, name), snames):
385 socket.close()
386 self._closed = True
373
387
374 def _update_engines(self, engines):
388 def _update_engines(self, engines):
375 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
389 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
376 for k,v in engines.iteritems():
390 for k,v in engines.iteritems():
377 eid = int(k)
391 eid = int(k)
378 self._engines[eid] = bytes(v) # force not unicode
392 self._engines[eid] = bytes(v) # force not unicode
379 self._ids.append(eid)
393 self._ids.append(eid)
380 self._ids = sorted(self._ids)
394 self._ids = sorted(self._ids)
381 if sorted(self._engines.keys()) != range(len(self._engines)) and \
395 if sorted(self._engines.keys()) != range(len(self._engines)) and \
382 self._task_scheme == 'pure' and self._task_socket:
396 self._task_scheme == 'pure' and self._task_socket:
383 self._stop_scheduling_tasks()
397 self._stop_scheduling_tasks()
384
398
385 def _stop_scheduling_tasks(self):
399 def _stop_scheduling_tasks(self):
386 """Stop scheduling tasks because an engine has been unregistered
400 """Stop scheduling tasks because an engine has been unregistered
387 from a pure ZMQ scheduler.
401 from a pure ZMQ scheduler.
388 """
402 """
389
403
390 self._task_socket.close()
404 self._task_socket.close()
391 self._task_socket = None
405 self._task_socket = None
392 msg = "An engine has been unregistered, and we are using pure " +\
406 msg = "An engine has been unregistered, and we are using pure " +\
393 "ZMQ task scheduling. Task farming will be disabled."
407 "ZMQ task scheduling. Task farming will be disabled."
394 if self.outstanding:
408 if self.outstanding:
395 msg += " If you were running tasks when this happened, " +\
409 msg += " If you were running tasks when this happened, " +\
396 "some `outstanding` msg_ids may never resolve."
410 "some `outstanding` msg_ids may never resolve."
397 warnings.warn(msg, RuntimeWarning)
411 warnings.warn(msg, RuntimeWarning)
398
412
399 def _build_targets(self, targets):
413 def _build_targets(self, targets):
400 """Turn valid target IDs or 'all' into two lists:
414 """Turn valid target IDs or 'all' into two lists:
401 (int_ids, uuids).
415 (int_ids, uuids).
402 """
416 """
403 if targets is None:
417 if targets is None:
404 targets = self._ids
418 targets = self._ids
405 elif isinstance(targets, str):
419 elif isinstance(targets, str):
406 if targets.lower() == 'all':
420 if targets.lower() == 'all':
407 targets = self._ids
421 targets = self._ids
408 else:
422 else:
409 raise TypeError("%r not valid str target, must be 'all'"%(targets))
423 raise TypeError("%r not valid str target, must be 'all'"%(targets))
410 elif isinstance(targets, int):
424 elif isinstance(targets, int):
411 targets = [targets]
425 targets = [targets]
412 return [self._engines[t] for t in targets], list(targets)
426 return [self._engines[t] for t in targets], list(targets)
413
427
414 def _connect(self, sshserver, ssh_kwargs):
428 def _connect(self, sshserver, ssh_kwargs):
415 """setup all our socket connections to the controller. This is called from
429 """setup all our socket connections to the controller. This is called from
416 __init__."""
430 __init__."""
417
431
418 # Maybe allow reconnecting?
432 # Maybe allow reconnecting?
419 if self._connected:
433 if self._connected:
420 return
434 return
421 self._connected=True
435 self._connected=True
422
436
423 def connect_socket(s, url):
437 def connect_socket(s, url):
424 url = disambiguate_url(url, self._config['location'])
438 url = disambiguate_url(url, self._config['location'])
425 if self._ssh:
439 if self._ssh:
426 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
440 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
427 else:
441 else:
428 return s.connect(url)
442 return s.connect(url)
429
443
430 self.session.send(self._registration_socket, 'connection_request')
444 self.session.send(self._registration_socket, 'connection_request')
431 idents,msg = self.session.recv(self._registration_socket,mode=0)
445 idents,msg = self.session.recv(self._registration_socket,mode=0)
432 if self.debug:
446 if self.debug:
433 pprint(msg)
447 pprint(msg)
434 msg = ss.Message(msg)
448 msg = ss.Message(msg)
435 content = msg.content
449 content = msg.content
436 self._config['registration'] = dict(content)
450 self._config['registration'] = dict(content)
437 if content.status == 'ok':
451 if content.status == 'ok':
438 if content.mux:
452 if content.mux:
439 self._mux_socket = self.context.socket(zmq.PAIR)
453 self._mux_socket = self._context.socket(zmq.PAIR)
440 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
454 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
441 connect_socket(self._mux_socket, content.mux)
455 connect_socket(self._mux_socket, content.mux)
442 if content.task:
456 if content.task:
443 self._task_scheme, task_addr = content.task
457 self._task_scheme, task_addr = content.task
444 self._task_socket = self.context.socket(zmq.PAIR)
458 self._task_socket = self._context.socket(zmq.PAIR)
445 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
459 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
446 connect_socket(self._task_socket, task_addr)
460 connect_socket(self._task_socket, task_addr)
447 if content.notification:
461 if content.notification:
448 self._notification_socket = self.context.socket(zmq.SUB)
462 self._notification_socket = self._context.socket(zmq.SUB)
449 connect_socket(self._notification_socket, content.notification)
463 connect_socket(self._notification_socket, content.notification)
450 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
464 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
451 if content.query:
465 if content.query:
452 self._query_socket = self.context.socket(zmq.PAIR)
466 self._query_socket = self._context.socket(zmq.PAIR)
453 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
467 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
454 connect_socket(self._query_socket, content.query)
468 connect_socket(self._query_socket, content.query)
455 if content.control:
469 if content.control:
456 self._control_socket = self.context.socket(zmq.PAIR)
470 self._control_socket = self._context.socket(zmq.PAIR)
457 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
471 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
458 connect_socket(self._control_socket, content.control)
472 connect_socket(self._control_socket, content.control)
459 if content.iopub:
473 if content.iopub:
460 self._iopub_socket = self.context.socket(zmq.SUB)
474 self._iopub_socket = self._context.socket(zmq.SUB)
461 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
475 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
462 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
476 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
463 connect_socket(self._iopub_socket, content.iopub)
477 connect_socket(self._iopub_socket, content.iopub)
464 self._update_engines(dict(content.engines))
478 self._update_engines(dict(content.engines))
465
479
466 else:
480 else:
467 self._connected = False
481 self._connected = False
468 raise Exception("Failed to connect!")
482 raise Exception("Failed to connect!")
469
483
470 #--------------------------------------------------------------------------
484 #--------------------------------------------------------------------------
471 # handlers and callbacks for incoming messages
485 # handlers and callbacks for incoming messages
472 #--------------------------------------------------------------------------
486 #--------------------------------------------------------------------------
473
487
474 def _register_engine(self, msg):
488 def _register_engine(self, msg):
475 """Register a new engine, and update our connection info."""
489 """Register a new engine, and update our connection info."""
476 content = msg['content']
490 content = msg['content']
477 eid = content['id']
491 eid = content['id']
478 d = {eid : content['queue']}
492 d = {eid : content['queue']}
479 self._update_engines(d)
493 self._update_engines(d)
480
494
481 def _unregister_engine(self, msg):
495 def _unregister_engine(self, msg):
482 """Unregister an engine that has died."""
496 """Unregister an engine that has died."""
483 content = msg['content']
497 content = msg['content']
484 eid = int(content['id'])
498 eid = int(content['id'])
485 if eid in self._ids:
499 if eid in self._ids:
486 self._ids.remove(eid)
500 self._ids.remove(eid)
487 self._engines.pop(eid)
501 self._engines.pop(eid)
488 if self._task_socket and self._task_scheme == 'pure':
502 if self._task_socket and self._task_scheme == 'pure':
489 self._stop_scheduling_tasks()
503 self._stop_scheduling_tasks()
490
504
491 def _extract_metadata(self, header, parent, content):
505 def _extract_metadata(self, header, parent, content):
492 md = {'msg_id' : parent['msg_id'],
506 md = {'msg_id' : parent['msg_id'],
493 'received' : datetime.now(),
507 'received' : datetime.now(),
494 'engine_uuid' : header.get('engine', None),
508 'engine_uuid' : header.get('engine', None),
495 'follow' : parent.get('follow', []),
509 'follow' : parent.get('follow', []),
496 'after' : parent.get('after', []),
510 'after' : parent.get('after', []),
497 'status' : content['status'],
511 'status' : content['status'],
498 }
512 }
499
513
500 if md['engine_uuid'] is not None:
514 if md['engine_uuid'] is not None:
501 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
515 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
502
516
503 if 'date' in parent:
517 if 'date' in parent:
504 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
518 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
505 if 'started' in header:
519 if 'started' in header:
506 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
520 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
507 if 'date' in header:
521 if 'date' in header:
508 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
522 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
509 return md
523 return md
510
524
511 def _handle_execute_reply(self, msg):
525 def _handle_execute_reply(self, msg):
512 """Save the reply to an execute_request into our results.
526 """Save the reply to an execute_request into our results.
513
527
514 execute messages are never actually used. apply is used instead.
528 execute messages are never actually used. apply is used instead.
515 """
529 """
516
530
517 parent = msg['parent_header']
531 parent = msg['parent_header']
518 msg_id = parent['msg_id']
532 msg_id = parent['msg_id']
519 if msg_id not in self.outstanding:
533 if msg_id not in self.outstanding:
520 if msg_id in self.history:
534 if msg_id in self.history:
521 print ("got stale result: %s"%msg_id)
535 print ("got stale result: %s"%msg_id)
522 else:
536 else:
523 print ("got unknown result: %s"%msg_id)
537 print ("got unknown result: %s"%msg_id)
524 else:
538 else:
525 self.outstanding.remove(msg_id)
539 self.outstanding.remove(msg_id)
526 self.results[msg_id] = ss.unwrap_exception(msg['content'])
540 self.results[msg_id] = ss.unwrap_exception(msg['content'])
527
541
528 def _handle_apply_reply(self, msg):
542 def _handle_apply_reply(self, msg):
529 """Save the reply to an apply_request into our results."""
543 """Save the reply to an apply_request into our results."""
530 parent = msg['parent_header']
544 parent = msg['parent_header']
531 msg_id = parent['msg_id']
545 msg_id = parent['msg_id']
532 if msg_id not in self.outstanding:
546 if msg_id not in self.outstanding:
533 if msg_id in self.history:
547 if msg_id in self.history:
534 print ("got stale result: %s"%msg_id)
548 print ("got stale result: %s"%msg_id)
535 print self.results[msg_id]
549 print self.results[msg_id]
536 print msg
550 print msg
537 else:
551 else:
538 print ("got unknown result: %s"%msg_id)
552 print ("got unknown result: %s"%msg_id)
539 else:
553 else:
540 self.outstanding.remove(msg_id)
554 self.outstanding.remove(msg_id)
541 content = msg['content']
555 content = msg['content']
542 header = msg['header']
556 header = msg['header']
543
557
544 # construct metadata:
558 # construct metadata:
545 md = self.metadata.setdefault(msg_id, Metadata())
559 md = self.metadata.setdefault(msg_id, Metadata())
546 md.update(self._extract_metadata(header, parent, content))
560 md.update(self._extract_metadata(header, parent, content))
547 self.metadata[msg_id] = md
561 self.metadata[msg_id] = md
548
562
549 # construct result:
563 # construct result:
550 if content['status'] == 'ok':
564 if content['status'] == 'ok':
551 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
565 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
552 elif content['status'] == 'aborted':
566 elif content['status'] == 'aborted':
553 self.results[msg_id] = error.AbortedTask(msg_id)
567 self.results[msg_id] = error.AbortedTask(msg_id)
554 elif content['status'] == 'resubmitted':
568 elif content['status'] == 'resubmitted':
555 # TODO: handle resubmission
569 # TODO: handle resubmission
556 pass
570 pass
557 else:
571 else:
558 e = ss.unwrap_exception(content)
572 e = ss.unwrap_exception(content)
559 if e.engine_info:
573 if e.engine_info:
560 e_uuid = e.engine_info['engineid']
574 e_uuid = e.engine_info['engineid']
561 eid = self._engines[e_uuid]
575 eid = self._engines[e_uuid]
562 e.engine_info['engineid'] = eid
576 e.engine_info['engineid'] = eid
563 self.results[msg_id] = e
577 self.results[msg_id] = e
564
578
565 def _flush_notifications(self):
579 def _flush_notifications(self):
566 """Flush notifications of engine registrations waiting
580 """Flush notifications of engine registrations waiting
567 in ZMQ queue."""
581 in ZMQ queue."""
568 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
582 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
569 while msg is not None:
583 while msg is not None:
570 if self.debug:
584 if self.debug:
571 pprint(msg)
585 pprint(msg)
572 msg = msg[-1]
586 msg = msg[-1]
573 msg_type = msg['msg_type']
587 msg_type = msg['msg_type']
574 handler = self._notification_handlers.get(msg_type, None)
588 handler = self._notification_handlers.get(msg_type, None)
575 if handler is None:
589 if handler is None:
576 raise Exception("Unhandled message type: %s"%msg.msg_type)
590 raise Exception("Unhandled message type: %s"%msg.msg_type)
577 else:
591 else:
578 handler(msg)
592 handler(msg)
579 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
593 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
580
594
581 def _flush_results(self, sock):
595 def _flush_results(self, sock):
582 """Flush task or queue results waiting in ZMQ queue."""
596 """Flush task or queue results waiting in ZMQ queue."""
583 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
597 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
584 while msg is not None:
598 while msg is not None:
585 if self.debug:
599 if self.debug:
586 pprint(msg)
600 pprint(msg)
587 msg = msg[-1]
601 msg = msg[-1]
588 msg_type = msg['msg_type']
602 msg_type = msg['msg_type']
589 handler = self._queue_handlers.get(msg_type, None)
603 handler = self._queue_handlers.get(msg_type, None)
590 if handler is None:
604 if handler is None:
591 raise Exception("Unhandled message type: %s"%msg.msg_type)
605 raise Exception("Unhandled message type: %s"%msg.msg_type)
592 else:
606 else:
593 handler(msg)
607 handler(msg)
594 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
608 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
595
609
596 def _flush_control(self, sock):
610 def _flush_control(self, sock):
597 """Flush replies from the control channel waiting
611 """Flush replies from the control channel waiting
598 in the ZMQ queue.
612 in the ZMQ queue.
599
613
600 Currently: ignore them."""
614 Currently: ignore them."""
601 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
615 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
602 while msg is not None:
616 while msg is not None:
603 if self.debug:
617 if self.debug:
604 pprint(msg)
618 pprint(msg)
605 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
619 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
606
620
607 def _flush_iopub(self, sock):
621 def _flush_iopub(self, sock):
608 """Flush replies from the iopub channel waiting
622 """Flush replies from the iopub channel waiting
609 in the ZMQ queue.
623 in the ZMQ queue.
610 """
624 """
611 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
625 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
612 while msg is not None:
626 while msg is not None:
613 if self.debug:
627 if self.debug:
614 pprint(msg)
628 pprint(msg)
615 msg = msg[-1]
629 msg = msg[-1]
616 parent = msg['parent_header']
630 parent = msg['parent_header']
617 msg_id = parent['msg_id']
631 msg_id = parent['msg_id']
618 content = msg['content']
632 content = msg['content']
619 header = msg['header']
633 header = msg['header']
620 msg_type = msg['msg_type']
634 msg_type = msg['msg_type']
621
635
622 # init metadata:
636 # init metadata:
623 md = self.metadata.setdefault(msg_id, Metadata())
637 md = self.metadata.setdefault(msg_id, Metadata())
624
638
625 if msg_type == 'stream':
639 if msg_type == 'stream':
626 name = content['name']
640 name = content['name']
627 s = md[name] or ''
641 s = md[name] or ''
628 md[name] = s + content['data']
642 md[name] = s + content['data']
629 elif msg_type == 'pyerr':
643 elif msg_type == 'pyerr':
630 md.update({'pyerr' : ss.unwrap_exception(content)})
644 md.update({'pyerr' : ss.unwrap_exception(content)})
631 else:
645 else:
632 md.update({msg_type : content['data']})
646 md.update({msg_type : content['data']})
633
647
634 self.metadata[msg_id] = md
648 self.metadata[msg_id] = md
635
649
636 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
650 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
637
651
638 #--------------------------------------------------------------------------
652 #--------------------------------------------------------------------------
639 # getitem
653 # len, getitem
640 #--------------------------------------------------------------------------
654 #--------------------------------------------------------------------------
641
655
656 def __len__(self):
657 """len(client) returns # of engines."""
658 return len(self.ids)
659
642 def __getitem__(self, key):
660 def __getitem__(self, key):
643 """index access returns DirectView multiplexer objects
661 """index access returns DirectView multiplexer objects
644
662
645 Must be int, slice, or list/tuple/xrange of ints"""
663 Must be int, slice, or list/tuple/xrange of ints"""
646 if not isinstance(key, (int, slice, tuple, list, xrange)):
664 if not isinstance(key, (int, slice, tuple, list, xrange)):
647 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
665 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
648 else:
666 else:
649 return self.view(key, balanced=False)
667 return self.view(key, balanced=False)
650
668
651 #--------------------------------------------------------------------------
669 #--------------------------------------------------------------------------
652 # Begin public methods
670 # Begin public methods
653 #--------------------------------------------------------------------------
671 #--------------------------------------------------------------------------
654
672
655 def spin(self):
673 def spin(self):
656 """Flush any registration notifications and execution results
674 """Flush any registration notifications and execution results
657 waiting in the ZMQ queue.
675 waiting in the ZMQ queue.
658 """
676 """
659 if self._notification_socket:
677 if self._notification_socket:
660 self._flush_notifications()
678 self._flush_notifications()
661 if self._mux_socket:
679 if self._mux_socket:
662 self._flush_results(self._mux_socket)
680 self._flush_results(self._mux_socket)
663 if self._task_socket:
681 if self._task_socket:
664 self._flush_results(self._task_socket)
682 self._flush_results(self._task_socket)
665 if self._control_socket:
683 if self._control_socket:
666 self._flush_control(self._control_socket)
684 self._flush_control(self._control_socket)
667 if self._iopub_socket:
685 if self._iopub_socket:
668 self._flush_iopub(self._iopub_socket)
686 self._flush_iopub(self._iopub_socket)
669
687
670 def barrier(self, msg_ids=None, timeout=-1):
688 def barrier(self, msg_ids=None, timeout=-1):
671 """waits on one or more `msg_ids`, for up to `timeout` seconds.
689 """waits on one or more `msg_ids`, for up to `timeout` seconds.
672
690
673 Parameters
691 Parameters
674 ----------
692 ----------
675
693
676 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
694 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
677 ints are indices to self.history
695 ints are indices to self.history
678 strs are msg_ids
696 strs are msg_ids
679 default: wait on all outstanding messages
697 default: wait on all outstanding messages
680 timeout : float
698 timeout : float
681 a time in seconds, after which to give up.
699 a time in seconds, after which to give up.
682 default is -1, which means no timeout
700 default is -1, which means no timeout
683
701
684 Returns
702 Returns
685 -------
703 -------
686
704
687 True : when all msg_ids are done
705 True : when all msg_ids are done
688 False : timeout reached, some msg_ids still outstanding
706 False : timeout reached, some msg_ids still outstanding
689 """
707 """
690 tic = time.time()
708 tic = time.time()
691 if msg_ids is None:
709 if msg_ids is None:
692 theids = self.outstanding
710 theids = self.outstanding
693 else:
711 else:
694 if isinstance(msg_ids, (int, str, AsyncResult)):
712 if isinstance(msg_ids, (int, str, AsyncResult)):
695 msg_ids = [msg_ids]
713 msg_ids = [msg_ids]
696 theids = set()
714 theids = set()
697 for msg_id in msg_ids:
715 for msg_id in msg_ids:
698 if isinstance(msg_id, int):
716 if isinstance(msg_id, int):
699 msg_id = self.history[msg_id]
717 msg_id = self.history[msg_id]
700 elif isinstance(msg_id, AsyncResult):
718 elif isinstance(msg_id, AsyncResult):
701 map(theids.add, msg_id.msg_ids)
719 map(theids.add, msg_id.msg_ids)
702 continue
720 continue
703 theids.add(msg_id)
721 theids.add(msg_id)
704 if not theids.intersection(self.outstanding):
722 if not theids.intersection(self.outstanding):
705 return True
723 return True
706 self.spin()
724 self.spin()
707 while theids.intersection(self.outstanding):
725 while theids.intersection(self.outstanding):
708 if timeout >= 0 and ( time.time()-tic ) > timeout:
726 if timeout >= 0 and ( time.time()-tic ) > timeout:
709 break
727 break
710 time.sleep(1e-3)
728 time.sleep(1e-3)
711 self.spin()
729 self.spin()
712 return len(theids.intersection(self.outstanding)) == 0
730 return len(theids.intersection(self.outstanding)) == 0
713
731
714 #--------------------------------------------------------------------------
732 #--------------------------------------------------------------------------
715 # Control methods
733 # Control methods
716 #--------------------------------------------------------------------------
734 #--------------------------------------------------------------------------
717
735
718 @spinfirst
736 @spinfirst
719 @defaultblock
737 @defaultblock
720 def clear(self, targets=None, block=None):
738 def clear(self, targets=None, block=None):
721 """Clear the namespace in target(s)."""
739 """Clear the namespace in target(s)."""
722 targets = self._build_targets(targets)[0]
740 targets = self._build_targets(targets)[0]
723 for t in targets:
741 for t in targets:
724 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
742 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
725 error = False
743 error = False
726 if self.block:
744 if self.block:
727 for i in range(len(targets)):
745 for i in range(len(targets)):
728 idents,msg = self.session.recv(self._control_socket,0)
746 idents,msg = self.session.recv(self._control_socket,0)
729 if self.debug:
747 if self.debug:
730 pprint(msg)
748 pprint(msg)
731 if msg['content']['status'] != 'ok':
749 if msg['content']['status'] != 'ok':
732 error = ss.unwrap_exception(msg['content'])
750 error = ss.unwrap_exception(msg['content'])
733 if error:
751 if error:
734 return error
752 return error
735
753
736
754
737 @spinfirst
755 @spinfirst
738 @defaultblock
756 @defaultblock
739 def abort(self, msg_ids = None, targets=None, block=None):
757 def abort(self, msg_ids = None, targets=None, block=None):
740 """Abort the execution queues of target(s)."""
758 """Abort the execution queues of target(s)."""
741 targets = self._build_targets(targets)[0]
759 targets = self._build_targets(targets)[0]
742 if isinstance(msg_ids, basestring):
760 if isinstance(msg_ids, basestring):
743 msg_ids = [msg_ids]
761 msg_ids = [msg_ids]
744 content = dict(msg_ids=msg_ids)
762 content = dict(msg_ids=msg_ids)
745 for t in targets:
763 for t in targets:
746 self.session.send(self._control_socket, 'abort_request',
764 self.session.send(self._control_socket, 'abort_request',
747 content=content, ident=t)
765 content=content, ident=t)
748 error = False
766 error = False
749 if self.block:
767 if self.block:
750 for i in range(len(targets)):
768 for i in range(len(targets)):
751 idents,msg = self.session.recv(self._control_socket,0)
769 idents,msg = self.session.recv(self._control_socket,0)
752 if self.debug:
770 if self.debug:
753 pprint(msg)
771 pprint(msg)
754 if msg['content']['status'] != 'ok':
772 if msg['content']['status'] != 'ok':
755 error = ss.unwrap_exception(msg['content'])
773 error = ss.unwrap_exception(msg['content'])
756 if error:
774 if error:
757 return error
775 return error
758
776
759 @spinfirst
777 @spinfirst
760 @defaultblock
778 @defaultblock
761 def shutdown(self, targets=None, restart=False, controller=False, block=None):
779 def shutdown(self, targets=None, restart=False, controller=False, block=None):
762 """Terminates one or more engine processes, optionally including the controller."""
780 """Terminates one or more engine processes, optionally including the controller."""
763 if controller:
781 if controller:
764 targets = 'all'
782 targets = 'all'
765 targets = self._build_targets(targets)[0]
783 targets = self._build_targets(targets)[0]
766 for t in targets:
784 for t in targets:
767 self.session.send(self._control_socket, 'shutdown_request',
785 self.session.send(self._control_socket, 'shutdown_request',
768 content={'restart':restart},ident=t)
786 content={'restart':restart},ident=t)
769 error = False
787 error = False
770 if block or controller:
788 if block or controller:
771 for i in range(len(targets)):
789 for i in range(len(targets)):
772 idents,msg = self.session.recv(self._control_socket,0)
790 idents,msg = self.session.recv(self._control_socket,0)
773 if self.debug:
791 if self.debug:
774 pprint(msg)
792 pprint(msg)
775 if msg['content']['status'] != 'ok':
793 if msg['content']['status'] != 'ok':
776 error = ss.unwrap_exception(msg['content'])
794 error = ss.unwrap_exception(msg['content'])
777
795
778 if controller:
796 if controller:
779 time.sleep(0.25)
797 time.sleep(0.25)
780 self.session.send(self._query_socket, 'shutdown_request')
798 self.session.send(self._query_socket, 'shutdown_request')
781 idents,msg = self.session.recv(self._query_socket, 0)
799 idents,msg = self.session.recv(self._query_socket, 0)
782 if self.debug:
800 if self.debug:
783 pprint(msg)
801 pprint(msg)
784 if msg['content']['status'] != 'ok':
802 if msg['content']['status'] != 'ok':
785 error = ss.unwrap_exception(msg['content'])
803 error = ss.unwrap_exception(msg['content'])
786
804
787 if error:
805 if error:
788 raise error
806 raise error
789
807
790 #--------------------------------------------------------------------------
808 #--------------------------------------------------------------------------
791 # Execution methods
809 # Execution methods
792 #--------------------------------------------------------------------------
810 #--------------------------------------------------------------------------
793
811
794 @defaultblock
812 @defaultblock
795 def execute(self, code, targets='all', block=None):
813 def execute(self, code, targets='all', block=None):
796 """Executes `code` on `targets` in blocking or nonblocking manner.
814 """Executes `code` on `targets` in blocking or nonblocking manner.
797
815
798 ``execute`` is always `bound` (affects engine namespace)
816 ``execute`` is always `bound` (affects engine namespace)
799
817
800 Parameters
818 Parameters
801 ----------
819 ----------
802
820
803 code : str
821 code : str
804 the code string to be executed
822 the code string to be executed
805 targets : int/str/list of ints/strs
823 targets : int/str/list of ints/strs
806 the engines on which to execute
824 the engines on which to execute
807 default : all
825 default : all
808 block : bool
826 block : bool
809 whether or not to wait until done to return
827 whether or not to wait until done to return
810 default: self.block
828 default: self.block
811 """
829 """
812 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False)
830 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False)
813 return result
831 return result
814
832
815 def run(self, filename, targets='all', block=None):
833 def run(self, filename, targets='all', block=None):
816 """Execute contents of `filename` on engine(s).
834 """Execute contents of `filename` on engine(s).
817
835
818 This simply reads the contents of the file and calls `execute`.
836 This simply reads the contents of the file and calls `execute`.
819
837
820 Parameters
838 Parameters
821 ----------
839 ----------
822
840
823 filename : str
841 filename : str
824 The path to the file
842 The path to the file
825 targets : int/str/list of ints/strs
843 targets : int/str/list of ints/strs
826 the engines on which to execute
844 the engines on which to execute
827 default : all
845 default : all
828 block : bool
846 block : bool
829 whether or not to wait until done
847 whether or not to wait until done
830 default: self.block
848 default: self.block
831
849
832 """
850 """
833 with open(filename, 'rb') as f:
851 with open(filename, 'rb') as f:
834 code = f.read()
852 code = f.read()
835 return self.execute(code, targets=targets, block=block)
853 return self.execute(code, targets=targets, block=block)
836
854
837 def _maybe_raise(self, result):
855 def _maybe_raise(self, result):
838 """wrapper for maybe raising an exception if apply failed."""
856 """wrapper for maybe raising an exception if apply failed."""
839 if isinstance(result, error.RemoteError):
857 if isinstance(result, error.RemoteError):
840 raise result
858 raise result
841
859
842 return result
860 return result
843
861
844 def _build_dependency(self, dep):
862 def _build_dependency(self, dep):
845 """helper for building jsonable dependencies from various input forms"""
863 """helper for building jsonable dependencies from various input forms"""
846 if isinstance(dep, Dependency):
864 if isinstance(dep, Dependency):
847 return dep.as_dict()
865 return dep.as_dict()
848 elif isinstance(dep, AsyncResult):
866 elif isinstance(dep, AsyncResult):
849 return dep.msg_ids
867 return dep.msg_ids
850 elif dep is None:
868 elif dep is None:
851 return []
869 return []
852 else:
870 else:
853 # pass to Dependency constructor
871 # pass to Dependency constructor
854 return list(Dependency(dep))
872 return list(Dependency(dep))
855
873
856 @defaultblock
874 @defaultblock
857 def apply(self, f, args=None, kwargs=None, bound=True, block=None,
875 def apply(self, f, args=None, kwargs=None, bound=True, block=None,
858 targets=None, balanced=None,
876 targets=None, balanced=None,
859 after=None, follow=None, timeout=None):
877 after=None, follow=None, timeout=None):
860 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
878 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
861
879
862 This is the central execution command for the client.
880 This is the central execution command for the client.
863
881
864 Parameters
882 Parameters
865 ----------
883 ----------
866
884
867 f : function
885 f : function
868 The fuction to be called remotely
886 The fuction to be called remotely
869 args : tuple/list
887 args : tuple/list
870 The positional arguments passed to `f`
888 The positional arguments passed to `f`
871 kwargs : dict
889 kwargs : dict
872 The keyword arguments passed to `f`
890 The keyword arguments passed to `f`
873 bound : bool (default: True)
891 bound : bool (default: True)
874 Whether to execute in the Engine(s) namespace, or in a clean
892 Whether to execute in the Engine(s) namespace, or in a clean
875 namespace not affecting the engine.
893 namespace not affecting the engine.
876 block : bool (default: self.block)
894 block : bool (default: self.block)
877 Whether to wait for the result, or return immediately.
895 Whether to wait for the result, or return immediately.
878 False:
896 False:
879 returns AsyncResult
897 returns AsyncResult
880 True:
898 True:
881 returns actual result(s) of f(*args, **kwargs)
899 returns actual result(s) of f(*args, **kwargs)
882 if multiple targets:
900 if multiple targets:
883 list of results, matching `targets`
901 list of results, matching `targets`
884 targets : int,list of ints, 'all', None
902 targets : int,list of ints, 'all', None
885 Specify the destination of the job.
903 Specify the destination of the job.
886 if None:
904 if None:
887 Submit via Task queue for load-balancing.
905 Submit via Task queue for load-balancing.
888 if 'all':
906 if 'all':
889 Run on all active engines
907 Run on all active engines
890 if list:
908 if list:
891 Run on each specified engine
909 Run on each specified engine
892 if int:
910 if int:
893 Run on single engine
911 Run on single engine
894
912
895 balanced : bool, default None
913 balanced : bool, default None
896 whether to load-balance. This will default to True
914 whether to load-balance. This will default to True
897 if targets is unspecified, or False if targets is specified.
915 if targets is unspecified, or False if targets is specified.
898
916
899 The following arguments are only used when balanced is True:
917 The following arguments are only used when balanced is True:
900 after : Dependency or collection of msg_ids
918 after : Dependency or collection of msg_ids
901 Only for load-balanced execution (targets=None)
919 Only for load-balanced execution (targets=None)
902 Specify a list of msg_ids as a time-based dependency.
920 Specify a list of msg_ids as a time-based dependency.
903 This job will only be run *after* the dependencies
921 This job will only be run *after* the dependencies
904 have been met.
922 have been met.
905
923
906 follow : Dependency or collection of msg_ids
924 follow : Dependency or collection of msg_ids
907 Only for load-balanced execution (targets=None)
925 Only for load-balanced execution (targets=None)
908 Specify a list of msg_ids as a location-based dependency.
926 Specify a list of msg_ids as a location-based dependency.
909 This job will only be run on an engine where this dependency
927 This job will only be run on an engine where this dependency
910 is met.
928 is met.
911
929
912 timeout : float/int or None
930 timeout : float/int or None
913 Only for load-balanced execution (targets=None)
931 Only for load-balanced execution (targets=None)
914 Specify an amount of time (in seconds) for the scheduler to
932 Specify an amount of time (in seconds) for the scheduler to
915 wait for dependencies to be met before failing with a
933 wait for dependencies to be met before failing with a
916 DependencyTimeout.
934 DependencyTimeout.
917
935
918 after,follow,timeout only used if `balanced=True`.
936 after,follow,timeout only used if `balanced=True`.
919
937
920 Returns
938 Returns
921 -------
939 -------
922
940
923 if block is False:
941 if block is False:
924 return AsyncResult wrapping msg_ids
942 return AsyncResult wrapping msg_ids
925 output of AsyncResult.get() is identical to that of `apply(...block=True)`
943 output of AsyncResult.get() is identical to that of `apply(...block=True)`
926 else:
944 else:
927 if single target:
945 if single target:
928 return result of `f(*args, **kwargs)`
946 return result of `f(*args, **kwargs)`
929 else:
947 else:
930 return list of results, matching `targets`
948 return list of results, matching `targets`
931 """
949 """
932
950 assert not self._closed, "cannot use me anymore, I'm closed!"
933 # defaults:
951 # defaults:
952 block = block if block is not None else self.block
934 args = args if args is not None else []
953 args = args if args is not None else []
935 kwargs = kwargs if kwargs is not None else {}
954 kwargs = kwargs if kwargs is not None else {}
936
955
937 if balanced is None:
956 if balanced is None:
938 if targets is None:
957 if targets is None:
939 # default to balanced if targets unspecified
958 # default to balanced if targets unspecified
940 balanced = True
959 balanced = True
941 else:
960 else:
942 # otherwise default to multiplexing
961 # otherwise default to multiplexing
943 balanced = False
962 balanced = False
944
963
945 if targets is None and balanced is False:
964 if targets is None and balanced is False:
946 # default to all if *not* balanced, and targets is unspecified
965 # default to all if *not* balanced, and targets is unspecified
947 targets = 'all'
966 targets = 'all'
948
967
949 # enforce types of f,args,kwrags
968 # enforce types of f,args,kwrags
950 if not callable(f):
969 if not callable(f):
951 raise TypeError("f must be callable, not %s"%type(f))
970 raise TypeError("f must be callable, not %s"%type(f))
952 if not isinstance(args, (tuple, list)):
971 if not isinstance(args, (tuple, list)):
953 raise TypeError("args must be tuple or list, not %s"%type(args))
972 raise TypeError("args must be tuple or list, not %s"%type(args))
954 if not isinstance(kwargs, dict):
973 if not isinstance(kwargs, dict):
955 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
974 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
956
975
957 options = dict(bound=bound, block=block, targets=targets)
976 options = dict(bound=bound, block=block, targets=targets)
958
977
959 if balanced:
978 if balanced:
960 return self._apply_balanced(f, args, kwargs, timeout=timeout,
979 return self._apply_balanced(f, args, kwargs, timeout=timeout,
961 after=after, follow=follow, **options)
980 after=after, follow=follow, **options)
962 elif follow or after or timeout:
981 elif follow or after or timeout:
963 msg = "follow, after, and timeout args are only used for"
982 msg = "follow, after, and timeout args are only used for"
964 msg += " load-balanced execution."
983 msg += " load-balanced execution."
965 raise ValueError(msg)
984 raise ValueError(msg)
966 else:
985 else:
967 return self._apply_direct(f, args, kwargs, **options)
986 return self._apply_direct(f, args, kwargs, **options)
968
987
969 def _apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None,
988 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
970 after=None, follow=None, timeout=None):
989 after=None, follow=None, timeout=None):
971 """call f(*args, **kwargs) remotely in a load-balanced manner.
990 """call f(*args, **kwargs) remotely in a load-balanced manner.
972
991
973 This is a private method, see `apply` for details.
992 This is a private method, see `apply` for details.
974 Not to be called directly!
993 Not to be called directly!
975 """
994 """
976
995
977 for kwarg in (bound, block, targets):
996 loc = locals()
978 assert kwarg is not None, "kwarg %r must be specified!"%kwarg
997 for name in ('bound', 'block'):
998 assert loc[name] is not None, "kwarg %r must be specified!"%name
979
999
980 if self._task_socket is None:
1000 if self._task_socket is None:
981 msg = "Task farming is disabled"
1001 msg = "Task farming is disabled"
982 if self._task_scheme == 'pure':
1002 if self._task_scheme == 'pure':
983 msg += " because the pure ZMQ scheduler cannot handle"
1003 msg += " because the pure ZMQ scheduler cannot handle"
984 msg += " disappearing engines."
1004 msg += " disappearing engines."
985 raise RuntimeError(msg)
1005 raise RuntimeError(msg)
986
1006
987 if self._task_scheme == 'pure':
1007 if self._task_scheme == 'pure':
988 # pure zmq scheme doesn't support dependencies
1008 # pure zmq scheme doesn't support dependencies
989 msg = "Pure ZMQ scheduler doesn't support dependencies"
1009 msg = "Pure ZMQ scheduler doesn't support dependencies"
990 if (follow or after):
1010 if (follow or after):
991 # hard fail on DAG dependencies
1011 # hard fail on DAG dependencies
992 raise RuntimeError(msg)
1012 raise RuntimeError(msg)
993 if isinstance(f, dependent):
1013 if isinstance(f, dependent):
994 # soft warn on functional dependencies
1014 # soft warn on functional dependencies
995 warnings.warn(msg, RuntimeWarning)
1015 warnings.warn(msg, RuntimeWarning)
996
1016
997 # defaults:
1017 # defaults:
998 args = args if args is not None else []
1018 args = args if args is not None else []
999 kwargs = kwargs if kwargs is not None else {}
1019 kwargs = kwargs if kwargs is not None else {}
1000
1020
1001 if targets:
1021 if targets:
1002 idents,_ = self._build_targets(targets)
1022 idents,_ = self._build_targets(targets)
1003 else:
1023 else:
1004 idents = []
1024 idents = []
1005
1025
1006 after = self._build_dependency(after)
1026 after = self._build_dependency(after)
1007 follow = self._build_dependency(follow)
1027 follow = self._build_dependency(follow)
1008 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1028 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1009 bufs = ss.pack_apply_message(f,args,kwargs)
1029 bufs = ss.pack_apply_message(f,args,kwargs)
1010 content = dict(bound=bound)
1030 content = dict(bound=bound)
1011
1031
1012 msg = self.session.send(self._task_socket, "apply_request",
1032 msg = self.session.send(self._task_socket, "apply_request",
1013 content=content, buffers=bufs, subheader=subheader)
1033 content=content, buffers=bufs, subheader=subheader)
1014 msg_id = msg['msg_id']
1034 msg_id = msg['msg_id']
1015 self.outstanding.add(msg_id)
1035 self.outstanding.add(msg_id)
1016 self.history.append(msg_id)
1036 self.history.append(msg_id)
1017 ar = AsyncResult(self, [msg_id], fname=f.__name__)
1037 ar = AsyncResult(self, [msg_id], fname=f.__name__)
1018 if block:
1038 if block:
1019 try:
1039 try:
1020 return ar.get()
1040 return ar.get()
1021 except KeyboardInterrupt:
1041 except KeyboardInterrupt:
1022 return ar
1042 return ar
1023 else:
1043 else:
1024 return ar
1044 return ar
1025
1045
1026 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None):
1046 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None):
1027 """Then underlying method for applying functions to specific engines
1047 """Then underlying method for applying functions to specific engines
1028 via the MUX queue.
1048 via the MUX queue.
1029
1049
1030 This is a private method, see `apply` for details.
1050 This is a private method, see `apply` for details.
1031 Not to be called directly!
1051 Not to be called directly!
1032 """
1052 """
1033
1053 loc = locals()
1034 for kwarg in (bound, block, targets):
1054 for name in ('bound', 'block', 'targets'):
1035 assert kwarg is not None, "kwarg %r must be specified!"%kwarg
1055 assert loc[name] is not None, "kwarg %r must be specified!"%name
1036
1056
1037 idents,targets = self._build_targets(targets)
1057 idents,targets = self._build_targets(targets)
1038
1058
1039 subheader = {}
1059 subheader = {}
1040 content = dict(bound=bound)
1060 content = dict(bound=bound)
1041 bufs = ss.pack_apply_message(f,args,kwargs)
1061 bufs = ss.pack_apply_message(f,args,kwargs)
1042
1062
1043 msg_ids = []
1063 msg_ids = []
1044 for ident in idents:
1064 for ident in idents:
1045 msg = self.session.send(self._mux_socket, "apply_request",
1065 msg = self.session.send(self._mux_socket, "apply_request",
1046 content=content, buffers=bufs, ident=ident, subheader=subheader)
1066 content=content, buffers=bufs, ident=ident, subheader=subheader)
1047 msg_id = msg['msg_id']
1067 msg_id = msg['msg_id']
1048 self.outstanding.add(msg_id)
1068 self.outstanding.add(msg_id)
1049 self.history.append(msg_id)
1069 self.history.append(msg_id)
1050 msg_ids.append(msg_id)
1070 msg_ids.append(msg_id)
1051 ar = AsyncResult(self, msg_ids, fname=f.__name__)
1071 ar = AsyncResult(self, msg_ids, fname=f.__name__)
1052 if block:
1072 if block:
1053 try:
1073 try:
1054 return ar.get()
1074 return ar.get()
1055 except KeyboardInterrupt:
1075 except KeyboardInterrupt:
1056 return ar
1076 return ar
1057 else:
1077 else:
1058 return ar
1078 return ar
1059
1079
1060 #--------------------------------------------------------------------------
1080 #--------------------------------------------------------------------------
1061 # decorators
1081 # construct a View object
1062 #--------------------------------------------------------------------------
1082 #--------------------------------------------------------------------------
1063
1083
1064 @defaultblock
1084 @defaultblock
1065 def parallel(self, bound=True, targets='all', block=None):
1085 def remote(self, bound=True, block=None, targets=None, balanced=None):
1066 """Decorator for making a ParallelFunction."""
1086 """Decorator for making a RemoteFunction"""
1067 return parallel(self, bound=bound, targets=targets, block=block)
1087 return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
1068
1088
1069 @defaultblock
1089 @defaultblock
1070 def remote(self, bound=True, targets='all', block=None):
1090 def parallel(self, dist='b', bound=True, block=None, targets=None, balanced=None):
1071 """Decorator for making a RemoteFunction."""
1091 """Decorator for making a ParallelFunction"""
1072 return remote(self, bound=bound, targets=targets, block=block)
1092 return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
1073
1093
1074 def view(self, targets=None, balanced=False):
1094 def _cache_view(self, targets, balanced):
1075 """Method for constructing View objects"""
1095 """save views, so subsequent requests don't create new objects."""
1096 if balanced:
1097 view_class = LoadBalancedView
1098 view_cache = self._balanced_views
1099 else:
1100 view_class = DirectView
1101 view_cache = self._direct_views
1102
1103 # use str, since often targets will be a list
1104 key = str(targets)
1105 if key not in view_cache:
1106 view_cache[key] = view_class(client=self, targets=targets)
1107
1108 return view_cache[key]
1109
1110 def view(self, targets=None, balanced=None):
1111 """Method for constructing View objects.
1112
1113 If no arguments are specified, create a LoadBalancedView
1114 using all engines. If only `targets` specified, it will
1115 be a DirectView. This method is the underlying implementation
1116 of ``client.__getitem__``.
1117
1118 Parameters
1119 ----------
1120
1121 targets: list,slice,int,etc. [default: use all engines]
1122 The engines to use for the View
1123 balanced : bool [default: False if targets specified, True else]
1124 whether to build a LoadBalancedView or a DirectView
1125
1126 """
1127
1128 balanced = (targets is None) if balanced is None else balanced
1129
1076 if targets is None:
1130 if targets is None:
1077 if balanced:
1131 if balanced:
1078 return LoadBalancedView(client=self)
1132 return self._cache_view(None,True)
1079 else:
1133 else:
1080 targets = slice(None)
1134 targets = slice(None)
1081
1135
1082 if balanced:
1083 view_class = LoadBalancedView
1084 else:
1085 view_class = DirectView
1086 if isinstance(targets, int):
1136 if isinstance(targets, int):
1087 if targets not in self.ids:
1137 if targets not in self.ids:
1088 raise IndexError("No such engine: %i"%targets)
1138 raise IndexError("No such engine: %i"%targets)
1089 return view_class(client=self, targets=targets)
1139 return self._cache_view(targets, balanced)
1090
1140
1091 if isinstance(targets, slice):
1141 if isinstance(targets, slice):
1092 indices = range(len(self.ids))[targets]
1142 indices = range(len(self.ids))[targets]
1093 ids = sorted(self._ids)
1143 ids = sorted(self._ids)
1094 targets = [ ids[i] for i in indices ]
1144 targets = [ ids[i] for i in indices ]
1095
1145
1096 if isinstance(targets, (tuple, list, xrange)):
1146 if isinstance(targets, (tuple, list, xrange)):
1097 _,targets = self._build_targets(list(targets))
1147 _,targets = self._build_targets(list(targets))
1098 return view_class(client=self, targets=targets)
1148 return self._cache_view(targets, balanced)
1099 else:
1149 else:
1100 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1150 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1101
1151
1102 #--------------------------------------------------------------------------
1152 #--------------------------------------------------------------------------
1103 # Data movement
1153 # Data movement
1104 #--------------------------------------------------------------------------
1154 #--------------------------------------------------------------------------
1105
1155
1106 @defaultblock
1156 @defaultblock
1107 def push(self, ns, targets='all', block=None):
1157 def push(self, ns, targets='all', block=None):
1108 """Push the contents of `ns` into the namespace on `target`"""
1158 """Push the contents of `ns` into the namespace on `target`"""
1109 if not isinstance(ns, dict):
1159 if not isinstance(ns, dict):
1110 raise TypeError("Must be a dict, not %s"%type(ns))
1160 raise TypeError("Must be a dict, not %s"%type(ns))
1111 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
1161 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
1112 return result
1162 return result
1113
1163
1114 @defaultblock
1164 @defaultblock
1115 def pull(self, keys, targets='all', block=None):
1165 def pull(self, keys, targets='all', block=None):
1116 """Pull objects from `target`'s namespace by `keys`"""
1166 """Pull objects from `target`'s namespace by `keys`"""
1117 if isinstance(keys, str):
1167 if isinstance(keys, str):
1118 pass
1168 pass
1119 elif isinstance(keys, (list,tuple,set)):
1169 elif isinstance(keys, (list,tuple,set)):
1120 for key in keys:
1170 for key in keys:
1121 if not isinstance(key, str):
1171 if not isinstance(key, str):
1122 raise TypeError
1172 raise TypeError
1123 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1173 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1124 return result
1174 return result
1125
1175
1126 @defaultblock
1176 @defaultblock
1127 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1177 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1128 """
1178 """
1129 Partition a Python sequence and send the partitions to a set of engines.
1179 Partition a Python sequence and send the partitions to a set of engines.
1130 """
1180 """
1131 targets = self._build_targets(targets)[-1]
1181 targets = self._build_targets(targets)[-1]
1132 mapObject = Map.dists[dist]()
1182 mapObject = Map.dists[dist]()
1133 nparts = len(targets)
1183 nparts = len(targets)
1134 msg_ids = []
1184 msg_ids = []
1135 for index, engineid in enumerate(targets):
1185 for index, engineid in enumerate(targets):
1136 partition = mapObject.getPartition(seq, index, nparts)
1186 partition = mapObject.getPartition(seq, index, nparts)
1137 if flatten and len(partition) == 1:
1187 if flatten and len(partition) == 1:
1138 r = self.push({key: partition[0]}, targets=engineid, block=False)
1188 r = self.push({key: partition[0]}, targets=engineid, block=False)
1139 else:
1189 else:
1140 r = self.push({key: partition}, targets=engineid, block=False)
1190 r = self.push({key: partition}, targets=engineid, block=False)
1141 msg_ids.extend(r.msg_ids)
1191 msg_ids.extend(r.msg_ids)
1142 r = AsyncResult(self, msg_ids, fname='scatter')
1192 r = AsyncResult(self, msg_ids, fname='scatter')
1143 if block:
1193 if block:
1144 return r.get()
1194 return r.get()
1145 else:
1195 else:
1146 return r
1196 return r
1147
1197
1148 @defaultblock
1198 @defaultblock
1149 def gather(self, key, dist='b', targets='all', block=None):
1199 def gather(self, key, dist='b', targets='all', block=None):
1150 """
1200 """
1151 Gather a partitioned sequence on a set of engines as a single local seq.
1201 Gather a partitioned sequence on a set of engines as a single local seq.
1152 """
1202 """
1153
1203
1154 targets = self._build_targets(targets)[-1]
1204 targets = self._build_targets(targets)[-1]
1155 mapObject = Map.dists[dist]()
1205 mapObject = Map.dists[dist]()
1156 msg_ids = []
1206 msg_ids = []
1157 for index, engineid in enumerate(targets):
1207 for index, engineid in enumerate(targets):
1158 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1208 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1159
1209
1160 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1210 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1161 if block:
1211 if block:
1162 return r.get()
1212 return r.get()
1163 else:
1213 else:
1164 return r
1214 return r
1165
1215
1166 #--------------------------------------------------------------------------
1216 #--------------------------------------------------------------------------
1167 # Query methods
1217 # Query methods
1168 #--------------------------------------------------------------------------
1218 #--------------------------------------------------------------------------
1169
1219
1170 @spinfirst
1220 @spinfirst
1171 def get_results(self, msg_ids, status_only=False):
1221 def get_results(self, msg_ids, status_only=False):
1172 """Returns the result of the execute or task request with `msg_ids`.
1222 """Returns the result of the execute or task request with `msg_ids`.
1173
1223
1174 Parameters
1224 Parameters
1175 ----------
1225 ----------
1176
1226
1177 msg_ids : list of ints or msg_ids
1227 msg_ids : list of ints or msg_ids
1178 if int:
1228 if int:
1179 Passed as index to self.history for convenience.
1229 Passed as index to self.history for convenience.
1180 status_only : bool (default: False)
1230 status_only : bool (default: False)
1181 if False:
1231 if False:
1182 return the actual results
1232 return the actual results
1183
1233
1184 Returns
1234 Returns
1185 -------
1235 -------
1186
1236
1187 results : dict
1237 results : dict
1188 There will always be the keys 'pending' and 'completed', which will
1238 There will always be the keys 'pending' and 'completed', which will
1189 be lists of msg_ids.
1239 be lists of msg_ids.
1190 """
1240 """
1191 if not isinstance(msg_ids, (list,tuple)):
1241 if not isinstance(msg_ids, (list,tuple)):
1192 msg_ids = [msg_ids]
1242 msg_ids = [msg_ids]
1193 theids = []
1243 theids = []
1194 for msg_id in msg_ids:
1244 for msg_id in msg_ids:
1195 if isinstance(msg_id, int):
1245 if isinstance(msg_id, int):
1196 msg_id = self.history[msg_id]
1246 msg_id = self.history[msg_id]
1197 if not isinstance(msg_id, str):
1247 if not isinstance(msg_id, str):
1198 raise TypeError("msg_ids must be str, not %r"%msg_id)
1248 raise TypeError("msg_ids must be str, not %r"%msg_id)
1199 theids.append(msg_id)
1249 theids.append(msg_id)
1200
1250
1201 completed = []
1251 completed = []
1202 local_results = {}
1252 local_results = {}
1203
1253
1204 # comment this block out to temporarily disable local shortcut:
1254 # comment this block out to temporarily disable local shortcut:
1205 for msg_id in list(theids):
1255 for msg_id in list(theids):
1206 if msg_id in self.results:
1256 if msg_id in self.results:
1207 completed.append(msg_id)
1257 completed.append(msg_id)
1208 local_results[msg_id] = self.results[msg_id]
1258 local_results[msg_id] = self.results[msg_id]
1209 theids.remove(msg_id)
1259 theids.remove(msg_id)
1210
1260
1211 if theids: # some not locally cached
1261 if theids: # some not locally cached
1212 content = dict(msg_ids=theids, status_only=status_only)
1262 content = dict(msg_ids=theids, status_only=status_only)
1213 msg = self.session.send(self._query_socket, "result_request", content=content)
1263 msg = self.session.send(self._query_socket, "result_request", content=content)
1214 zmq.select([self._query_socket], [], [])
1264 zmq.select([self._query_socket], [], [])
1215 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1265 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1216 if self.debug:
1266 if self.debug:
1217 pprint(msg)
1267 pprint(msg)
1218 content = msg['content']
1268 content = msg['content']
1219 if content['status'] != 'ok':
1269 if content['status'] != 'ok':
1220 raise ss.unwrap_exception(content)
1270 raise ss.unwrap_exception(content)
1221 buffers = msg['buffers']
1271 buffers = msg['buffers']
1222 else:
1272 else:
1223 content = dict(completed=[],pending=[])
1273 content = dict(completed=[],pending=[])
1224
1274
1225 content['completed'].extend(completed)
1275 content['completed'].extend(completed)
1226
1276
1227 if status_only:
1277 if status_only:
1228 return content
1278 return content
1229
1279
1230 failures = []
1280 failures = []
1231 # load cached results into result:
1281 # load cached results into result:
1232 content.update(local_results)
1282 content.update(local_results)
1233 # update cache with results:
1283 # update cache with results:
1234 for msg_id in sorted(theids):
1284 for msg_id in sorted(theids):
1235 if msg_id in content['completed']:
1285 if msg_id in content['completed']:
1236 rec = content[msg_id]
1286 rec = content[msg_id]
1237 parent = rec['header']
1287 parent = rec['header']
1238 header = rec['result_header']
1288 header = rec['result_header']
1239 rcontent = rec['result_content']
1289 rcontent = rec['result_content']
1240 iodict = rec['io']
1290 iodict = rec['io']
1241 if isinstance(rcontent, str):
1291 if isinstance(rcontent, str):
1242 rcontent = self.session.unpack(rcontent)
1292 rcontent = self.session.unpack(rcontent)
1243
1293
1244 md = self.metadata.setdefault(msg_id, Metadata())
1294 md = self.metadata.setdefault(msg_id, Metadata())
1245 md.update(self._extract_metadata(header, parent, rcontent))
1295 md.update(self._extract_metadata(header, parent, rcontent))
1246 md.update(iodict)
1296 md.update(iodict)
1247
1297
1248 if rcontent['status'] == 'ok':
1298 if rcontent['status'] == 'ok':
1249 res,buffers = ss.unserialize_object(buffers)
1299 res,buffers = ss.unserialize_object(buffers)
1250 else:
1300 else:
1251 res = ss.unwrap_exception(rcontent)
1301 res = ss.unwrap_exception(rcontent)
1252 failures.append(res)
1302 failures.append(res)
1253
1303
1254 self.results[msg_id] = res
1304 self.results[msg_id] = res
1255 content[msg_id] = res
1305 content[msg_id] = res
1256
1306
1257 error.collect_exceptions(failures, "get_results")
1307 error.collect_exceptions(failures, "get_results")
1258 return content
1308 return content
1259
1309
1260 @spinfirst
1310 @spinfirst
1261 def queue_status(self, targets='all', verbose=False):
1311 def queue_status(self, targets='all', verbose=False):
1262 """Fetch the status of engine queues.
1312 """Fetch the status of engine queues.
1263
1313
1264 Parameters
1314 Parameters
1265 ----------
1315 ----------
1266
1316
1267 targets : int/str/list of ints/strs
1317 targets : int/str/list of ints/strs
1268 the engines whose states are to be queried.
1318 the engines whose states are to be queried.
1269 default : all
1319 default : all
1270 verbose : bool
1320 verbose : bool
1271 Whether to return lengths only, or lists of ids for each element
1321 Whether to return lengths only, or lists of ids for each element
1272 """
1322 """
1273 targets = self._build_targets(targets)[1]
1323 targets = self._build_targets(targets)[1]
1274 content = dict(targets=targets, verbose=verbose)
1324 content = dict(targets=targets, verbose=verbose)
1275 self.session.send(self._query_socket, "queue_request", content=content)
1325 self.session.send(self._query_socket, "queue_request", content=content)
1276 idents,msg = self.session.recv(self._query_socket, 0)
1326 idents,msg = self.session.recv(self._query_socket, 0)
1277 if self.debug:
1327 if self.debug:
1278 pprint(msg)
1328 pprint(msg)
1279 content = msg['content']
1329 content = msg['content']
1280 status = content.pop('status')
1330 status = content.pop('status')
1281 if status != 'ok':
1331 if status != 'ok':
1282 raise ss.unwrap_exception(content)
1332 raise ss.unwrap_exception(content)
1283 return ss.rekey(content)
1333 return ss.rekey(content)
1284
1334
1285 @spinfirst
1335 @spinfirst
1286 def purge_results(self, msg_ids=[], targets=[]):
1336 def purge_results(self, msg_ids=[], targets=[]):
1287 """Tell the controller to forget results.
1337 """Tell the controller to forget results.
1288
1338
1289 Individual results can be purged by msg_id, or the entire
1339 Individual results can be purged by msg_id, or the entire
1290 history of specific targets can be purged.
1340 history of specific targets can be purged.
1291
1341
1292 Parameters
1342 Parameters
1293 ----------
1343 ----------
1294
1344
1295 msg_ids : str or list of strs
1345 msg_ids : str or list of strs
1296 the msg_ids whose results should be forgotten.
1346 the msg_ids whose results should be forgotten.
1297 targets : int/str/list of ints/strs
1347 targets : int/str/list of ints/strs
1298 The targets, by uuid or int_id, whose entire history is to be purged.
1348 The targets, by uuid or int_id, whose entire history is to be purged.
1299 Use `targets='all'` to scrub everything from the controller's memory.
1349 Use `targets='all'` to scrub everything from the controller's memory.
1300
1350
1301 default : None
1351 default : None
1302 """
1352 """
1303 if not targets and not msg_ids:
1353 if not targets and not msg_ids:
1304 raise ValueError
1354 raise ValueError
1305 if targets:
1355 if targets:
1306 targets = self._build_targets(targets)[1]
1356 targets = self._build_targets(targets)[1]
1307 content = dict(targets=targets, msg_ids=msg_ids)
1357 content = dict(targets=targets, msg_ids=msg_ids)
1308 self.session.send(self._query_socket, "purge_request", content=content)
1358 self.session.send(self._query_socket, "purge_request", content=content)
1309 idents, msg = self.session.recv(self._query_socket, 0)
1359 idents, msg = self.session.recv(self._query_socket, 0)
1310 if self.debug:
1360 if self.debug:
1311 pprint(msg)
1361 pprint(msg)
1312 content = msg['content']
1362 content = msg['content']
1313 if content['status'] != 'ok':
1363 if content['status'] != 'ok':
1314 raise ss.unwrap_exception(content)
1364 raise ss.unwrap_exception(content)
1315
1365
1316
1366
1317 __all__ = [ 'Client',
1367 __all__ = [ 'Client',
1318 'depend',
1368 'depend',
1319 'require',
1369 'require',
1320 'remote',
1370 'remote',
1321 'parallel',
1371 'parallel',
1322 'RemoteFunction',
1372 'RemoteFunction',
1323 'ParallelFunction',
1373 'ParallelFunction',
1324 'DirectView',
1374 'DirectView',
1325 'LoadBalancedView',
1375 'LoadBalancedView',
1326 'AsyncResult',
1376 'AsyncResult',
1327 'AsyncMapResult'
1377 'AsyncMapResult'
1328 ]
1378 ]
@@ -1,156 +1,166 b''
1 """Remote Functions and decorators for the client."""
1 """Remote Functions and decorators for the client."""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import warnings
14
13 import map as Map
15 import map as Map
14 from asyncresult import AsyncMapResult
16 from asyncresult import AsyncMapResult
15
17
16 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
17 # Decorators
19 # Decorators
18 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
19
21
20 def remote(client, bound=False, block=None, targets=None, balanced=None):
22 def remote(client, bound=True, block=None, targets=None, balanced=None):
21 """Turn a function into a remote function.
23 """Turn a function into a remote function.
22
24
23 This method can be used for map:
25 This method can be used for map:
24
26
25 >>> @remote(client,block=True)
27 >>> @remote(client,block=True)
26 def func(a)
28 def func(a)
27 """
29 """
28 def remote_function(f):
30 def remote_function(f):
29 return RemoteFunction(client, f, bound, block, targets, balanced)
31 return RemoteFunction(client, f, bound, block, targets, balanced)
30 return remote_function
32 return remote_function
31
33
32 def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None):
34 def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None):
33 """Turn a function into a parallel remote function.
35 """Turn a function into a parallel remote function.
34
36
35 This method can be used for map:
37 This method can be used for map:
36
38
37 >>> @parallel(client,block=True)
39 >>> @parallel(client,block=True)
38 def func(a)
40 def func(a)
39 """
41 """
40 def parallel_function(f):
42 def parallel_function(f):
41 return ParallelFunction(client, f, dist, bound, block, targets, balanced)
43 return ParallelFunction(client, f, dist, bound, block, targets, balanced)
42 return parallel_function
44 return parallel_function
43
45
44 #--------------------------------------------------------------------------
46 #--------------------------------------------------------------------------
45 # Classes
47 # Classes
46 #--------------------------------------------------------------------------
48 #--------------------------------------------------------------------------
47
49
48 class RemoteFunction(object):
50 class RemoteFunction(object):
49 """Turn an existing function into a remote function.
51 """Turn an existing function into a remote function.
50
52
51 Parameters
53 Parameters
52 ----------
54 ----------
53
55
54 client : Client instance
56 client : Client instance
55 The client to be used to connect to engines
57 The client to be used to connect to engines
56 f : callable
58 f : callable
57 The function to be wrapped into a remote function
59 The function to be wrapped into a remote function
58 bound : bool [default: False]
60 bound : bool [default: False]
59 Whether the affect the remote namespace when called
61 Whether the affect the remote namespace when called
60 block : bool [default: None]
62 block : bool [default: None]
61 Whether to wait for results or not. The default behavior is
63 Whether to wait for results or not. The default behavior is
62 to use the current `block` attribute of `client`
64 to use the current `block` attribute of `client`
63 targets : valid target list [default: all]
65 targets : valid target list [default: all]
64 The targets on which to execute.
66 The targets on which to execute.
65 balanced : bool
67 balanced : bool
66 Whether to load-balance with the Task scheduler or not
68 Whether to load-balance with the Task scheduler or not
67 """
69 """
68
70
69 client = None # the remote connection
71 client = None # the remote connection
70 func = None # the wrapped function
72 func = None # the wrapped function
71 block = None # whether to block
73 block = None # whether to block
72 bound = None # whether to affect the namespace
74 bound = None # whether to affect the namespace
73 targets = None # where to execute
75 targets = None # where to execute
74 balanced = None # whether to load-balance
76 balanced = None # whether to load-balance
75
77
76 def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None):
78 def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None):
77 self.client = client
79 self.client = client
78 self.func = f
80 self.func = f
79 self.block=block
81 self.block=block
80 self.bound=bound
82 self.bound=bound
81 self.targets=targets
83 self.targets=targets
82 if balanced is None:
84 if balanced is None:
83 if targets is None:
85 if targets is None:
84 balanced = True
86 balanced = True
85 else:
87 else:
86 balanced = False
88 balanced = False
87 self.balanced = balanced
89 self.balanced = balanced
88
90
89 def __call__(self, *args, **kwargs):
91 def __call__(self, *args, **kwargs):
90 return self.client.apply(self.func, args=args, kwargs=kwargs,
92 return self.client.apply(self.func, args=args, kwargs=kwargs,
91 block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced)
93 block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced)
92
94
93
95
94 class ParallelFunction(RemoteFunction):
96 class ParallelFunction(RemoteFunction):
95 """Class for mapping a function to sequences."""
97 """Class for mapping a function to sequences."""
96 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None):
98 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None, chunk_size=None):
97 super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced)
99 super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced)
100 self.chunk_size = chunk_size
101
98 mapClass = Map.dists[dist]
102 mapClass = Map.dists[dist]
99 self.mapObject = mapClass()
103 self.mapObject = mapClass()
100
104
101 def __call__(self, *sequences):
105 def __call__(self, *sequences):
102 len_0 = len(sequences[0])
106 len_0 = len(sequences[0])
103 for s in sequences:
107 for s in sequences:
104 if len(s)!=len_0:
108 if len(s)!=len_0:
105 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
109 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
106 raise ValueError(msg)
110 raise ValueError(msg)
107
111
108 if self.balanced:
112 if self.balanced:
109 targets = [self.targets]*len_0
113 if self.chunk_size:
114 nparts = len_0/self.chunk_size + int(len_0%self.chunk_size > 0)
115 else:
116 nparts = len_0
117 targets = [self.targets]*nparts
110 else:
118 else:
119 if self.chunk_size:
120 warnings.warn("`chunk_size` is ignored when `balanced=False", UserWarning)
111 # multiplexed:
121 # multiplexed:
112 targets = self.client._build_targets(self.targets)[-1]
122 targets = self.client._build_targets(self.targets)[-1]
123 nparts = len(targets)
113
124
114 nparts = len(targets)
115 msg_ids = []
125 msg_ids = []
116 # my_f = lambda *a: map(self.func, *a)
126 # my_f = lambda *a: map(self.func, *a)
117 for index, t in enumerate(targets):
127 for index, t in enumerate(targets):
118 args = []
128 args = []
119 for seq in sequences:
129 for seq in sequences:
120 part = self.mapObject.getPartition(seq, index, nparts)
130 part = self.mapObject.getPartition(seq, index, nparts)
121 if not part:
131 if not part:
122 continue
132 continue
123 else:
133 else:
124 args.append(part)
134 args.append(part)
125 if not args:
135 if not args:
126 continue
136 continue
127
137
128 # print (args)
138 # print (args)
129 if hasattr(self, '_map'):
139 if hasattr(self, '_map'):
130 f = map
140 f = map
131 args = [self.func]+args
141 args = [self.func]+args
132 else:
142 else:
133 f=self.func
143 f=self.func
134 ar = self.client.apply(f, args=args, block=False, bound=self.bound,
144 ar = self.client.apply(f, args=args, block=False, bound=self.bound,
135 targets=targets, balanced=self.balanced)
145 targets=t, balanced=self.balanced)
136
146
137 msg_ids.append(ar.msg_ids[0])
147 msg_ids.append(ar.msg_ids[0])
138
148
139 r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
149 r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
140 if self.block:
150 if self.block:
141 try:
151 try:
142 return r.get()
152 return r.get()
143 except KeyboardInterrupt:
153 except KeyboardInterrupt:
144 return r
154 return r
145 else:
155 else:
146 return r
156 return r
147
157
148 def map(self, *sequences):
158 def map(self, *sequences):
149 """call a function on each element of a sequence remotely."""
159 """call a function on each element of a sequence remotely."""
150 self._map = True
160 self._map = True
151 try:
161 try:
152 ret = self.__call__(*sequences)
162 ret = self.__call__(*sequences)
153 finally:
163 finally:
154 del self._map
164 del self._map
155 return ret
165 return ret
156
166
@@ -1,572 +1,592 b''
1 """Views of remote engines"""
1 """Views of remote engines"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance
13 from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance
14
14
15 from IPython.external.decorator import decorator
15 from IPython.external.decorator import decorator
16 from IPython.zmq.parallel.asyncresult import AsyncResult
16 from IPython.zmq.parallel.asyncresult import AsyncResult
17 from IPython.zmq.parallel.dependency import Dependency
17 from IPython.zmq.parallel.dependency import Dependency
18 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
18 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Decorators
21 # Decorators
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 @decorator
24 @decorator
25 def myblock(f, self, *args, **kwargs):
25 def myblock(f, self, *args, **kwargs):
26 """override client.block with self.block during a call"""
26 """override client.block with self.block during a call"""
27 block = self.client.block
27 block = self.client.block
28 self.client.block = self.block
28 self.client.block = self.block
29 try:
29 try:
30 ret = f(self, *args, **kwargs)
30 ret = f(self, *args, **kwargs)
31 finally:
31 finally:
32 self.client.block = block
32 self.client.block = block
33 return ret
33 return ret
34
34
35 @decorator
35 @decorator
36 def save_ids(f, self, *args, **kwargs):
36 def save_ids(f, self, *args, **kwargs):
37 """Keep our history and outstanding attributes up to date after a method call."""
37 """Keep our history and outstanding attributes up to date after a method call."""
38 n_previous = len(self.client.history)
38 n_previous = len(self.client.history)
39 ret = f(self, *args, **kwargs)
39 ret = f(self, *args, **kwargs)
40 nmsgs = len(self.client.history) - n_previous
40 nmsgs = len(self.client.history) - n_previous
41 msg_ids = self.client.history[-nmsgs:]
41 msg_ids = self.client.history[-nmsgs:]
42 self.history.extend(msg_ids)
42 self.history.extend(msg_ids)
43 map(self.outstanding.add, msg_ids)
43 map(self.outstanding.add, msg_ids)
44 return ret
44 return ret
45
45
46 @decorator
46 @decorator
47 def sync_results(f, self, *args, **kwargs):
47 def sync_results(f, self, *args, **kwargs):
48 """sync relevant results from self.client to our results attribute."""
48 """sync relevant results from self.client to our results attribute."""
49 ret = f(self, *args, **kwargs)
49 ret = f(self, *args, **kwargs)
50 delta = self.outstanding.difference(self.client.outstanding)
50 delta = self.outstanding.difference(self.client.outstanding)
51 completed = self.outstanding.intersection(delta)
51 completed = self.outstanding.intersection(delta)
52 self.outstanding = self.outstanding.difference(completed)
52 self.outstanding = self.outstanding.difference(completed)
53 for msg_id in completed:
53 for msg_id in completed:
54 self.results[msg_id] = self.client.results[msg_id]
54 self.results[msg_id] = self.client.results[msg_id]
55 return ret
55 return ret
56
56
57 @decorator
57 @decorator
58 def spin_after(f, self, *args, **kwargs):
58 def spin_after(f, self, *args, **kwargs):
59 """call spin after the method."""
59 """call spin after the method."""
60 ret = f(self, *args, **kwargs)
60 ret = f(self, *args, **kwargs)
61 self.spin()
61 self.spin()
62 return ret
62 return ret
63
63
64 #-----------------------------------------------------------------------------
64 #-----------------------------------------------------------------------------
65 # Classes
65 # Classes
66 #-----------------------------------------------------------------------------
66 #-----------------------------------------------------------------------------
67
67
68 class View(HasTraits):
68 class View(HasTraits):
69 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
69 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
70
70
71 Don't use this class, use subclasses.
71 Don't use this class, use subclasses.
72 """
72 """
73 block=Bool(False)
73 block=Bool(False)
74 bound=Bool(False)
74 bound=Bool(False)
75 history=List()
75 history=List()
76 outstanding = Set()
76 outstanding = Set()
77 results = Dict()
77 results = Dict()
78 client = Instance('IPython.zmq.parallel.client.Client')
78 client = Instance('IPython.zmq.parallel.client.Client')
79
79
80 _ntargets = Int(1)
80 _ntargets = Int(1)
81 _balanced = Bool(False)
81 _balanced = Bool(False)
82 _default_names = List(['block', 'bound'])
82 _default_names = List(['block', 'bound'])
83 _targets = None
83 _targets = None
84
84
85 def __init__(self, client=None, targets=None):
85 def __init__(self, client=None, targets=None):
86 super(View, self).__init__(client=client)
86 super(View, self).__init__(client=client)
87 self._targets = targets
87 self._targets = targets
88 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
88 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
89 self.block = client.block
89 self.block = client.block
90
90
91 for name in self._default_names:
91 for name in self._default_names:
92 setattr(self, name, getattr(self, name, None))
92 setattr(self, name, getattr(self, name, None))
93
93
94
94
95 def __repr__(self):
95 def __repr__(self):
96 strtargets = str(self._targets)
96 strtargets = str(self._targets)
97 if len(strtargets) > 16:
97 if len(strtargets) > 16:
98 strtargets = strtargets[:12]+'...]'
98 strtargets = strtargets[:12]+'...]'
99 return "<%s %s>"%(self.__class__.__name__, strtargets)
99 return "<%s %s>"%(self.__class__.__name__, strtargets)
100
100
101 @property
101 @property
102 def targets(self):
102 def targets(self):
103 return self._targets
103 return self._targets
104
104
105 @targets.setter
105 @targets.setter
106 def targets(self, value):
106 def targets(self, value):
107 raise AttributeError("Cannot set View `targets` after construction!")
107 raise AttributeError("Cannot set View `targets` after construction!")
108
108
109 def _defaults(self, *excludes):
109 def _defaults(self, *excludes):
110 """return dict of our default attributes, excluding names given."""
110 """return dict of our default attributes, excluding names given."""
111 d = dict(balanced=self._balanced, targets=self.targets)
111 d = dict(balanced=self._balanced, targets=self.targets)
112 for name in self._default_names:
112 for name in self._default_names:
113 if name not in excludes:
113 if name not in excludes:
114 d[name] = getattr(self, name)
114 d[name] = getattr(self, name)
115 return d
115 return d
116
116
117 def set_flags(self, **kwargs):
117 def set_flags(self, **kwargs):
118 """set my attribute flags by keyword.
118 """set my attribute flags by keyword.
119
119
120 A View is a wrapper for the Client's apply method, but
120 A View is a wrapper for the Client's apply method, but
121 with attributes that specify keyword arguments, those attributes
121 with attributes that specify keyword arguments, those attributes
122 can be set by keyword argument with this method.
122 can be set by keyword argument with this method.
123
123
124 Parameters
124 Parameters
125 ----------
125 ----------
126
126
127 block : bool
127 block : bool
128 whether to wait for results
128 whether to wait for results
129 bound : bool
129 bound : bool
130 whether to use the client's namespace
130 whether to use the client's namespace
131 """
131 """
132 for key in kwargs:
132 for key in kwargs:
133 if key not in self._default_names:
133 if key not in self._default_names:
134 raise KeyError("Invalid name: %r"%key)
134 raise KeyError("Invalid name: %r"%key)
135 for name in ('block', 'bound'):
135 for name in ('block', 'bound'):
136 if name in kwargs:
136 if name in kwargs:
137 setattr(self, name, kwargs)
137 setattr(self, name, kwargs[name])
138
138
139 #----------------------------------------------------------------
139 #----------------------------------------------------------------
140 # wrappers for client methods:
140 # wrappers for client methods:
141 #----------------------------------------------------------------
141 #----------------------------------------------------------------
142 @sync_results
142 @sync_results
143 def spin(self):
143 def spin(self):
144 """spin the client, and sync"""
144 """spin the client, and sync"""
145 self.client.spin()
145 self.client.spin()
146
146
147 @sync_results
147 @sync_results
148 @save_ids
148 @save_ids
149 def apply(self, f, *args, **kwargs):
149 def apply(self, f, *args, **kwargs):
150 """calls f(*args, **kwargs) on remote engines, returning the result.
150 """calls f(*args, **kwargs) on remote engines, returning the result.
151
151
152 This method does not involve the engine's namespace.
152 This method does not involve the engine's namespace.
153
153
154 if self.block is False:
154 if self.block is False:
155 returns msg_id
155 returns msg_id
156 else:
156 else:
157 returns actual result of f(*args, **kwargs)
157 returns actual result of f(*args, **kwargs)
158 """
158 """
159 return self.client.apply(f, args, kwargs, **self._defaults())
159 return self.client.apply(f, args, kwargs, **self._defaults())
160
160
161 @save_ids
161 @save_ids
162 def apply_async(self, f, *args, **kwargs):
162 def apply_async(self, f, *args, **kwargs):
163 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
163 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
164
164
165 This method does not involve the engine's namespace.
165 This method does not involve the engine's namespace.
166
166
167 returns msg_id
167 returns msg_id
168 """
168 """
169 d = self._defaults('block', 'bound')
169 d = self._defaults('block', 'bound')
170 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
170 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
171
171
172 @spin_after
172 @spin_after
173 @save_ids
173 @save_ids
174 def apply_sync(self, f, *args, **kwargs):
174 def apply_sync(self, f, *args, **kwargs):
175 """calls f(*args, **kwargs) on remote engines in a blocking manner,
175 """calls f(*args, **kwargs) on remote engines in a blocking manner,
176 returning the result.
176 returning the result.
177
177
178 This method does not involve the engine's namespace.
178 This method does not involve the engine's namespace.
179
179
180 returns: actual result of f(*args, **kwargs)
180 returns: actual result of f(*args, **kwargs)
181 """
181 """
182 d = self._defaults('block', 'bound')
182 d = self._defaults('block', 'bound')
183 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
183 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
184
184
185 @sync_results
185 @sync_results
186 @save_ids
186 @save_ids
187 def apply_bound(self, f, *args, **kwargs):
187 def apply_bound(self, f, *args, **kwargs):
188 """calls f(*args, **kwargs) bound to engine namespace(s).
188 """calls f(*args, **kwargs) bound to engine namespace(s).
189
189
190 if self.block is False:
190 if self.block is False:
191 returns msg_id
191 returns msg_id
192 else:
192 else:
193 returns actual result of f(*args, **kwargs)
193 returns actual result of f(*args, **kwargs)
194
194
195 This method has access to the targets' globals
195 This method has access to the targets' globals
196
196
197 """
197 """
198 d = self._defaults('bound')
198 d = self._defaults('bound')
199 return self.client.apply(f, args, kwargs, bound=True, **d)
199 return self.client.apply(f, args, kwargs, bound=True, **d)
200
200
201 @sync_results
201 @sync_results
202 @save_ids
202 @save_ids
203 def apply_async_bound(self, f, *args, **kwargs):
203 def apply_async_bound(self, f, *args, **kwargs):
204 """calls f(*args, **kwargs) bound to engine namespace(s)
204 """calls f(*args, **kwargs) bound to engine namespace(s)
205 in a nonblocking manner.
205 in a nonblocking manner.
206
206
207 returns: msg_id
207 returns: msg_id
208
208
209 This method has access to the targets' globals
209 This method has access to the targets' globals
210
210
211 """
211 """
212 d = self._defaults('block', 'bound')
212 d = self._defaults('block', 'bound')
213 return self.client.apply(f, args, kwargs, block=False, bound=True, **d)
213 return self.client.apply(f, args, kwargs, block=False, bound=True, **d)
214
214
215 @spin_after
215 @spin_after
216 @save_ids
216 @save_ids
217 def apply_sync_bound(self, f, *args, **kwargs):
217 def apply_sync_bound(self, f, *args, **kwargs):
218 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
218 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
219
219
220 returns: actual result of f(*args, **kwargs)
220 returns: actual result of f(*args, **kwargs)
221
221
222 This method has access to the targets' globals
222 This method has access to the targets' globals
223
223
224 """
224 """
225 d = self._defaults('block', 'bound')
225 d = self._defaults('block', 'bound')
226 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
226 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
227
227
228 def abort(self, msg_ids=None, block=None):
228 def abort(self, msg_ids=None, block=None):
229 """Abort jobs on my engines.
229 """Abort jobs on my engines.
230
230
231 Parameters
231 Parameters
232 ----------
232 ----------
233
233
234 msg_ids : None, str, list of strs, optional
234 msg_ids : None, str, list of strs, optional
235 if None: abort all jobs.
235 if None: abort all jobs.
236 else: abort specific msg_id(s).
236 else: abort specific msg_id(s).
237 """
237 """
238 block = block if block is not None else self.block
238 block = block if block is not None else self.block
239 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
239 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
240
240
241 def queue_status(self, verbose=False):
241 def queue_status(self, verbose=False):
242 """Fetch the Queue status of my engines"""
242 """Fetch the Queue status of my engines"""
243 return self.client.queue_status(targets=self.targets, verbose=verbose)
243 return self.client.queue_status(targets=self.targets, verbose=verbose)
244
244
245 def purge_results(self, msg_ids=[], targets=[]):
245 def purge_results(self, msg_ids=[], targets=[]):
246 """Instruct the controller to forget specific results."""
246 """Instruct the controller to forget specific results."""
247 if targets is None or targets == 'all':
247 if targets is None or targets == 'all':
248 targets = self.targets
248 targets = self.targets
249 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
249 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
250
250
251 #-------------------------------------------------------------------
251 #-------------------------------------------------------------------
252 # Map
253 #-------------------------------------------------------------------
254
255 def map(self, f, *sequences, **kwargs):
256 """override in subclasses"""
257 raise NotImplementedError
258
259 def map_async(self, f, *sequences, **kwargs):
260 """Parallel version of builtin `map`, using this view's engines.
261
262 This is equivalent to map(...block=False)
263
264 See `map` for details.
265 """
266 if 'block' in kwargs:
267 raise TypeError("map_async doesn't take a `block` keyword argument.")
268 kwargs['block'] = False
269 return self.map(f,*sequences,**kwargs)
270
271 def map_sync(self, f, *sequences, **kwargs):
272 """Parallel version of builtin `map`, using this view's engines.
273
274 This is equivalent to map(...block=True)
275
276 See `map` for details.
277 """
278 if 'block' in kwargs:
279 raise TypeError("map_sync doesn't take a `block` keyword argument.")
280 kwargs['block'] = True
281 return self.map(f,*sequences,**kwargs)
282
283 #-------------------------------------------------------------------
252 # Decorators
284 # Decorators
253 #-------------------------------------------------------------------
285 #-------------------------------------------------------------------
254 def parallel(self, bound=True, block=True):
255 """Decorator for making a ParallelFunction"""
256 return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
257
286
258 def remote(self, bound=True, block=True):
287 def remote(self, bound=True, block=True):
259 """Decorator for making a RemoteFunction"""
288 """Decorator for making a RemoteFunction"""
260 return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
289 return remote(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
261
290
291 def parallel(self, dist='b', bound=True, block=None):
292 """Decorator for making a ParallelFunction"""
293 block = self.block if block is None else block
294 return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
262
295
263
296
264 class DirectView(View):
297 class DirectView(View):
265 """Direct Multiplexer View of one or more engines.
298 """Direct Multiplexer View of one or more engines.
266
299
267 These are created via indexed access to a client:
300 These are created via indexed access to a client:
268
301
269 >>> dv_1 = client[1]
302 >>> dv_1 = client[1]
270 >>> dv_all = client[:]
303 >>> dv_all = client[:]
271 >>> dv_even = client[::2]
304 >>> dv_even = client[::2]
272 >>> dv_some = client[1:3]
305 >>> dv_some = client[1:3]
273
306
274 This object provides dictionary access to engine namespaces:
307 This object provides dictionary access to engine namespaces:
275
308
276 # push a=5:
309 # push a=5:
277 >>> dv['a'] = 5
310 >>> dv['a'] = 5
278 # pull 'foo':
311 # pull 'foo':
279 >>> db['foo']
312 >>> db['foo']
280
313
281 """
314 """
282
315
283 def __init__(self, client=None, targets=None):
316 def __init__(self, client=None, targets=None):
284 super(DirectView, self).__init__(client=client, targets=targets)
317 super(DirectView, self).__init__(client=client, targets=targets)
285 self._balanced = False
318 self._balanced = False
286
319
287 @spin_after
320 @spin_after
288 @save_ids
321 @save_ids
289 def map(self, f, *sequences, **kwargs):
322 def map(self, f, *sequences, **kwargs):
290 """Parallel version of builtin `map`, using this View's `targets`.
323 """Parallel version of builtin `map`, using this View's `targets`.
291
324
292 There will be one task per target, so work will be chunked
325 There will be one task per target, so work will be chunked
293 if the sequences are longer than `targets`.
326 if the sequences are longer than `targets`.
294
327
295 Results can be iterated as they are ready, but will become available in chunks.
328 Results can be iterated as they are ready, but will become available in chunks.
296
329
297 Parameters
330 Parameters
298 ----------
331 ----------
299
332
300 f : callable
333 f : callable
301 function to be mapped
334 function to be mapped
302 *sequences: one or more sequences of matching length
335 *sequences: one or more sequences of matching length
303 the sequences to be distributed and passed to `f`
336 the sequences to be distributed and passed to `f`
304 block : bool
337 block : bool
305 whether to wait for the result or not [default self.block]
338 whether to wait for the result or not [default self.block]
306 bound : bool
339 bound : bool
307 whether to wait for the result or not [default self.bound]
340 whether to wait for the result or not [default self.bound]
308
341
309 Returns
342 Returns
310 -------
343 -------
311
344
312 if block=False:
345 if block=False:
313 AsyncMapResult
346 AsyncMapResult
314 An object like AsyncResult, but which reassembles the sequence of results
347 An object like AsyncResult, but which reassembles the sequence of results
315 into a single list. AsyncMapResults can be iterated through before all
348 into a single list. AsyncMapResults can be iterated through before all
316 results are complete.
349 results are complete.
317 else:
350 else:
318 the result of map(f,*sequences)
351 the result of map(f,*sequences)
319 """
352 """
320
353
321 block = kwargs.get('block', self.block)
354 block = kwargs.get('block', self.block)
322 bound = kwargs.get('bound', self.bound)
355 bound = kwargs.get('bound', self.bound)
323 for k in kwargs.keys():
356 for k in kwargs.keys():
324 if k not in ['block', 'bound']:
357 if k not in ['block', 'bound']:
325 raise TypeError("invalid keyword arg, %r"%k)
358 raise TypeError("invalid keyword arg, %r"%k)
326
359
327 assert len(sequences) > 0, "must have some sequences to map onto!"
360 assert len(sequences) > 0, "must have some sequences to map onto!"
328 pf = ParallelFunction(self.client, f, block=block,
361 pf = ParallelFunction(self.client, f, block=block, bound=bound,
329 bound=bound, targets=self.targets, balanced=False)
362 targets=self.targets, balanced=False)
330 return pf.map(*sequences)
363 return pf.map(*sequences)
331
364
332 def map_async(self, f, *sequences, **kwargs):
333 """Parallel version of builtin `map`, using this view's engines."""
334 if 'block' in kwargs:
335 raise TypeError("map_async doesn't take a `block` keyword argument.")
336 kwargs['block'] = True
337 return self.map(f,*sequences,**kwargs)
338
339 @sync_results
365 @sync_results
340 @save_ids
366 @save_ids
341 def execute(self, code, block=True):
367 def execute(self, code, block=True):
342 """execute some code on my targets."""
368 """execute some code on my targets."""
343 return self.client.execute(code, block=block, targets=self.targets)
369 return self.client.execute(code, block=block, targets=self.targets)
344
370
345 def update(self, ns):
371 def update(self, ns):
346 """update remote namespace with dict `ns`"""
372 """update remote namespace with dict `ns`"""
347 return self.client.push(ns, targets=self.targets, block=self.block)
373 return self.client.push(ns, targets=self.targets, block=self.block)
348
374
349 push = update
375 push = update
350
376
351 def get(self, key_s):
377 def get(self, key_s):
352 """get object(s) by `key_s` from remote namespace
378 """get object(s) by `key_s` from remote namespace
353 will return one object if it is a key.
379 will return one object if it is a key.
354 It also takes a list of keys, and will return a list of objects."""
380 It also takes a list of keys, and will return a list of objects."""
355 # block = block if block is not None else self.block
381 # block = block if block is not None else self.block
356 return self.client.pull(key_s, block=True, targets=self.targets)
382 return self.client.pull(key_s, block=True, targets=self.targets)
357
383
358 @sync_results
384 @sync_results
359 @save_ids
385 @save_ids
360 def pull(self, key_s, block=True):
386 def pull(self, key_s, block=True):
361 """get object(s) by `key_s` from remote namespace
387 """get object(s) by `key_s` from remote namespace
362 will return one object if it is a key.
388 will return one object if it is a key.
363 It also takes a list of keys, and will return a list of objects."""
389 It also takes a list of keys, and will return a list of objects."""
364 block = block if block is not None else self.block
390 block = block if block is not None else self.block
365 return self.client.pull(key_s, block=block, targets=self.targets)
391 return self.client.pull(key_s, block=block, targets=self.targets)
366
392
367 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
393 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
368 """
394 """
369 Partition a Python sequence and send the partitions to a set of engines.
395 Partition a Python sequence and send the partitions to a set of engines.
370 """
396 """
371 block = block if block is not None else self.block
397 block = block if block is not None else self.block
372 targets = targets if targets is not None else self.targets
398 targets = targets if targets is not None else self.targets
373
399
374 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
400 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
375 targets=targets, block=block)
401 targets=targets, block=block)
376
402
377 @sync_results
403 @sync_results
378 @save_ids
404 @save_ids
379 def gather(self, key, dist='b', targets=None, block=None):
405 def gather(self, key, dist='b', targets=None, block=None):
380 """
406 """
381 Gather a partitioned sequence on a set of engines as a single local seq.
407 Gather a partitioned sequence on a set of engines as a single local seq.
382 """
408 """
383 block = block if block is not None else self.block
409 block = block if block is not None else self.block
384 targets = targets if targets is not None else self.targets
410 targets = targets if targets is not None else self.targets
385
411
386 return self.client.gather(key, dist=dist, targets=targets, block=block)
412 return self.client.gather(key, dist=dist, targets=targets, block=block)
387
413
388 def __getitem__(self, key):
414 def __getitem__(self, key):
389 return self.get(key)
415 return self.get(key)
390
416
391 def __setitem__(self,key, value):
417 def __setitem__(self,key, value):
392 self.update({key:value})
418 self.update({key:value})
393
419
394 def clear(self, block=False):
420 def clear(self, block=False):
395 """Clear the remote namespaces on my engines."""
421 """Clear the remote namespaces on my engines."""
396 block = block if block is not None else self.block
422 block = block if block is not None else self.block
397 return self.client.clear(targets=self.targets, block=block)
423 return self.client.clear(targets=self.targets, block=block)
398
424
399 def kill(self, block=True):
425 def kill(self, block=True):
400 """Kill my engines."""
426 """Kill my engines."""
401 block = block if block is not None else self.block
427 block = block if block is not None else self.block
402 return self.client.kill(targets=self.targets, block=block)
428 return self.client.kill(targets=self.targets, block=block)
403
429
404 #----------------------------------------
430 #----------------------------------------
405 # activate for %px,%autopx magics
431 # activate for %px,%autopx magics
406 #----------------------------------------
432 #----------------------------------------
407 def activate(self):
433 def activate(self):
408 """Make this `View` active for parallel magic commands.
434 """Make this `View` active for parallel magic commands.
409
435
410 IPython has a magic command syntax to work with `MultiEngineClient` objects.
436 IPython has a magic command syntax to work with `MultiEngineClient` objects.
411 In a given IPython session there is a single active one. While
437 In a given IPython session there is a single active one. While
412 there can be many `Views` created and used by the user,
438 there can be many `Views` created and used by the user,
413 there is only one active one. The active `View` is used whenever
439 there is only one active one. The active `View` is used whenever
414 the magic commands %px and %autopx are used.
440 the magic commands %px and %autopx are used.
415
441
416 The activate() method is called on a given `View` to make it
442 The activate() method is called on a given `View` to make it
417 active. Once this has been done, the magic commands can be used.
443 active. Once this has been done, the magic commands can be used.
418 """
444 """
419
445
420 try:
446 try:
421 # This is injected into __builtins__.
447 # This is injected into __builtins__.
422 ip = get_ipython()
448 ip = get_ipython()
423 except NameError:
449 except NameError:
424 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
450 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
425 else:
451 else:
426 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
452 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
427 if pmagic is not None:
453 if pmagic is not None:
428 pmagic.active_multiengine_client = self
454 pmagic.active_multiengine_client = self
429 else:
455 else:
430 print "You must first load the parallelmagic extension " \
456 print "You must first load the parallelmagic extension " \
431 "by doing '%load_ext parallelmagic'"
457 "by doing '%load_ext parallelmagic'"
432
458
433
459
434 class LoadBalancedView(View):
460 class LoadBalancedView(View):
435 """An load-balancing View that only executes via the Task scheduler.
461 """An load-balancing View that only executes via the Task scheduler.
436
462
437 Load-balanced views can be created with the client's `view` method:
463 Load-balanced views can be created with the client's `view` method:
438
464
439 >>> v = client.view(balanced=True)
465 >>> v = client.view(balanced=True)
440
466
441 or targets can be specified, to restrict the potential destinations:
467 or targets can be specified, to restrict the potential destinations:
442
468
443 >>> v = client.view([1,3],balanced=True)
469 >>> v = client.view([1,3],balanced=True)
444
470
445 which would restrict loadbalancing to between engines 1 and 3.
471 which would restrict loadbalancing to between engines 1 and 3.
446
472
447 """
473 """
448
474
449 _apply_name = 'apply_balanced'
450 _default_names = ['block', 'bound', 'follow', 'after', 'timeout']
475 _default_names = ['block', 'bound', 'follow', 'after', 'timeout']
451
476
452 def __init__(self, client=None, targets=None):
477 def __init__(self, client=None, targets=None):
453 super(LoadBalancedView, self).__init__(client=client, targets=targets)
478 super(LoadBalancedView, self).__init__(client=client, targets=targets)
454 self._ntargets = 1
479 self._ntargets = 1
480 self._balanced = True
455
481
456 def _validate_dependency(self, dep):
482 def _validate_dependency(self, dep):
457 """validate a dependency.
483 """validate a dependency.
458
484
459 For use in `set_flags`.
485 For use in `set_flags`.
460 """
486 """
461 if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
487 if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
462 return True
488 return True
463 elif isinstance(dep, (list,set, tuple)):
489 elif isinstance(dep, (list,set, tuple)):
464 for d in dep:
490 for d in dep:
465 if not isinstance(d, str, AsyncResult):
491 if not isinstance(d, str, AsyncResult):
466 return False
492 return False
467 elif isinstance(dep, dict):
493 elif isinstance(dep, dict):
468 if set(dep.keys()) != set(Dependency().as_dict().keys()):
494 if set(dep.keys()) != set(Dependency().as_dict().keys()):
469 return False
495 return False
470 if not isinstance(dep['msg_ids'], list):
496 if not isinstance(dep['msg_ids'], list):
471 return False
497 return False
472 for d in dep['msg_ids']:
498 for d in dep['msg_ids']:
473 if not isinstance(d, str):
499 if not isinstance(d, str):
474 return False
500 return False
475 else:
501 else:
476 return False
502 return False
477
503
478 def set_flags(self, **kwargs):
504 def set_flags(self, **kwargs):
479 """set my attribute flags by keyword.
505 """set my attribute flags by keyword.
480
506
481 A View is a wrapper for the Client's apply method, but
507 A View is a wrapper for the Client's apply method, but
482 with attributes that specify keyword arguments, those attributes
508 with attributes that specify keyword arguments, those attributes
483 can be set by keyword argument with this method.
509 can be set by keyword argument with this method.
484
510
485 Parameters
511 Parameters
486 ----------
512 ----------
487
513
488 block : bool
514 block : bool
489 whether to wait for results
515 whether to wait for results
490 bound : bool
516 bound : bool
491 whether to use the engine's namespace
517 whether to use the engine's namespace
492 follow : Dependency, list, msg_id, AsyncResult
518 follow : Dependency, list, msg_id, AsyncResult
493 the location dependencies of tasks
519 the location dependencies of tasks
494 after : Dependency, list, msg_id, AsyncResult
520 after : Dependency, list, msg_id, AsyncResult
495 the time dependencies of tasks
521 the time dependencies of tasks
496 timeout : int,None
522 timeout : int,None
497 the timeout to be used for tasks
523 the timeout to be used for tasks
498 """
524 """
499
525
500 super(LoadBalancedView, self).set_flags(**kwargs)
526 super(LoadBalancedView, self).set_flags(**kwargs)
501 for name in ('follow', 'after'):
527 for name in ('follow', 'after'):
502 if name in kwargs:
528 if name in kwargs:
503 value = kwargs[name]
529 value = kwargs[name]
504 if self._validate_dependency(value):
530 if self._validate_dependency(value):
505 setattr(self, name, value)
531 setattr(self, name, value)
506 else:
532 else:
507 raise ValueError("Invalid dependency: %r"%value)
533 raise ValueError("Invalid dependency: %r"%value)
508 if 'timeout' in kwargs:
534 if 'timeout' in kwargs:
509 t = kwargs['timeout']
535 t = kwargs['timeout']
510 if not isinstance(t, (int, long, float, None)):
536 if not isinstance(t, (int, long, float, None)):
511 raise TypeError("Invalid type for timeout: %r"%type(t))
537 raise TypeError("Invalid type for timeout: %r"%type(t))
512 if t is not None:
538 if t is not None:
513 if t < 0:
539 if t < 0:
514 raise ValueError("Invalid timeout: %s"%t)
540 raise ValueError("Invalid timeout: %s"%t)
515 self.timeout = t
541 self.timeout = t
516
542
517 @spin_after
543 @spin_after
518 @save_ids
544 @save_ids
519 def map(self, f, *sequences, **kwargs):
545 def map(self, f, *sequences, **kwargs):
520 """Parallel version of builtin `map`, load-balanced by this View.
546 """Parallel version of builtin `map`, load-balanced by this View.
521
547
522 Each element will be a separate task, and will be load-balanced. This
548 Each element will be a separate task, and will be load-balanced. This
523 lets individual elements be available for iteration as soon as they arrive.
549 lets individual elements be available for iteration as soon as they arrive.
524
550
525 Parameters
551 Parameters
526 ----------
552 ----------
527
553
528 f : callable
554 f : callable
529 function to be mapped
555 function to be mapped
530 *sequences: one or more sequences of matching length
556 *sequences: one or more sequences of matching length
531 the sequences to be distributed and passed to `f`
557 the sequences to be distributed and passed to `f`
532 block : bool
558 block : bool
533 whether to wait for the result or not [default self.block]
559 whether to wait for the result or not [default self.block]
534 bound : bool
560 bound : bool
535 whether to use the engine's namespace
561 whether to use the engine's namespace
536
562
537 Returns
563 Returns
538 -------
564 -------
539
565
540 if block=False:
566 if block=False:
541 AsyncMapResult
567 AsyncMapResult
542 An object like AsyncResult, but which reassembles the sequence of results
568 An object like AsyncResult, but which reassembles the sequence of results
543 into a single list. AsyncMapResults can be iterated through before all
569 into a single list. AsyncMapResults can be iterated through before all
544 results are complete.
570 results are complete.
545 else:
571 else:
546 the result of map(f,*sequences)
572 the result of map(f,*sequences)
547
573
548 """
574 """
549
575
576 # default
550 block = kwargs.get('block', self.block)
577 block = kwargs.get('block', self.block)
551 bound = kwargs.get('bound', self.bound)
578 bound = kwargs.get('bound', self.bound)
579 chunk_size = kwargs.get('chunk_size', 1)
580
581 keyset = set(kwargs.keys())
582 extra_keys = keyset.difference_update(set(['block', 'bound', 'chunk_size']))
583 if extra_keys:
584 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
552
585
553 assert len(sequences) > 0, "must have some sequences to map onto!"
586 assert len(sequences) > 0, "must have some sequences to map onto!"
554
587
555 pf = ParallelFunction(self.client, f, block=block, bound=bound,
588 pf = ParallelFunction(self.client, f, block=block, bound=bound,
556 targets=self.targets, balanced=True)
589 targets=self.targets, balanced=True,
590 chunk_size=chunk_size)
557 return pf.map(*sequences)
591 return pf.map(*sequences)
558
592
559 def map_async(self, f, *sequences, **kwargs):
560 """Parallel version of builtin `map`, using this view's engines.
561
562 This is equivalent to map(...block=False)
563
564 See `map` for details.
565 """
566
567 if 'block' in kwargs:
568 raise TypeError("map_async doesn't take a `block` keyword argument.")
569 kwargs['block'] = True
570 return self.map(f,*sequences,**kwargs)
571
572
@@ -1,110 +1,119 b''
1 """Example for generating an arbitrary DAG as a dependency map.
1 """Example for generating an arbitrary DAG as a dependency map.
2
2
3 This demo uses networkx to generate the graph.
3 This demo uses networkx to generate the graph.
4
4
5 Authors
5 Authors
6 -------
6 -------
7 * MinRK
7 * MinRK
8 """
8 """
9 import networkx as nx
9 import networkx as nx
10 from random import randint, random
10 from random import randint, random
11 from IPython.zmq.parallel import client as cmod
11 from IPython.zmq.parallel import client as cmod
12
12
13 def randomwait():
13 def randomwait():
14 import time
14 import time
15 from random import random
15 from random import random
16 time.sleep(random())
16 time.sleep(random())
17 return time.time()
17 return time.time()
18
18
19
19
20 def random_dag(nodes, edges):
20 def random_dag(nodes, edges):
21 """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges."""
21 """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges."""
22 G = nx.DiGraph()
22 G = nx.DiGraph()
23 for i in range(nodes):
23 for i in range(nodes):
24 G.add_node(i)
24 G.add_node(i)
25 while edges > 0:
25 while edges > 0:
26 a = randint(0,nodes-1)
26 a = randint(0,nodes-1)
27 b=a
27 b=a
28 while b==a:
28 while b==a:
29 b = randint(0,nodes-1)
29 b = randint(0,nodes-1)
30 G.add_edge(a,b)
30 G.add_edge(a,b)
31 if nx.is_directed_acyclic_graph(G):
31 if nx.is_directed_acyclic_graph(G):
32 edges -= 1
32 edges -= 1
33 else:
33 else:
34 # we closed a loop!
34 # we closed a loop!
35 G.remove_edge(a,b)
35 G.remove_edge(a,b)
36 return G
36 return G
37
37
38 def add_children(G, parent, level, n=2):
38 def add_children(G, parent, level, n=2):
39 """Add children recursively to a binary tree."""
39 """Add children recursively to a binary tree."""
40 if level == 0:
40 if level == 0:
41 return
41 return
42 for i in range(n):
42 for i in range(n):
43 child = parent+str(i)
43 child = parent+str(i)
44 G.add_node(child)
44 G.add_node(child)
45 G.add_edge(parent,child)
45 G.add_edge(parent,child)
46 add_children(G, child, level-1, n)
46 add_children(G, child, level-1, n)
47
47
48 def make_bintree(levels):
48 def make_bintree(levels):
49 """Make a symmetrical binary tree with @levels"""
49 """Make a symmetrical binary tree with @levels"""
50 G = nx.DiGraph()
50 G = nx.DiGraph()
51 root = '0'
51 root = '0'
52 G.add_node(root)
52 G.add_node(root)
53 add_children(G, root, levels, 2)
53 add_children(G, root, levels, 2)
54 return G
54 return G
55
55
56 def submit_jobs(client, G, jobs):
56 def submit_jobs(client, G, jobs):
57 """Submit jobs via client where G describes the time dependencies."""
57 """Submit jobs via client where G describes the time dependencies."""
58 results = {}
58 results = {}
59 for node in nx.topological_sort(G):
59 for node in nx.topological_sort(G):
60 deps = [ results[n] for n in G.predecessors(node) ]
60 deps = [ results[n] for n in G.predecessors(node) ]
61 results[node] = client.apply(jobs[node], after=deps)
61 results[node] = client.apply(jobs[node], after=deps)
62 return results
62 return results
63
63
64 def validate_tree(G, results):
64 def validate_tree(G, results):
65 """Validate that jobs executed after their dependencies."""
65 """Validate that jobs executed after their dependencies."""
66 for node in G:
66 for node in G:
67 started = results[node].metadata.started
67 started = results[node].metadata.started
68 for parent in G.predecessors(node):
68 for parent in G.predecessors(node):
69 finished = results[parent].metadata.completed
69 finished = results[parent].metadata.completed
70 assert started > finished, "%s should have happened after %s"%(node, parent)
70 assert started > finished, "%s should have happened after %s"%(node, parent)
71
71
72 def main(nodes, edges):
72 def main(nodes, edges):
73 """Generate a random graph, submit jobs, then validate that the
73 """Generate a random graph, submit jobs, then validate that the
74 dependency order was enforced.
74 dependency order was enforced.
75 Finally, plot the graph, with time on the x-axis, and
75 Finally, plot the graph, with time on the x-axis, and
76 in-degree on the y (just for spread). All arrows must
76 in-degree on the y (just for spread). All arrows must
77 point at least slightly to the right if the graph is valid.
77 point at least slightly to the right if the graph is valid.
78 """
78 """
79 import pylab
79 from matplotlib.dates import date2num
80 from matplotlib.dates import date2num
80 from matplotlib.cm import gist_rainbow
81 from matplotlib.cm import gist_rainbow
81 print "building DAG"
82 print "building DAG"
82 G = random_dag(nodes, edges)
83 G = random_dag(nodes, edges)
83 jobs = {}
84 jobs = {}
84 pos = {}
85 pos = {}
85 colors = {}
86 colors = {}
86 for node in G:
87 for node in G:
87 jobs[node] = randomwait
88 jobs[node] = randomwait
88
89
89 client = cmod.Client()
90 client = cmod.Client()
90 print "submitting %i tasks with %i dependencies"%(nodes,edges)
91 print "submitting %i tasks with %i dependencies"%(nodes,edges)
91 results = submit_jobs(client, G, jobs)
92 results = submit_jobs(client, G, jobs)
92 print "waiting for results"
93 print "waiting for results"
93 client.barrier()
94 client.barrier()
94 print "done"
95 print "done"
95 for node in G:
96 for node in G:
96 md = results[node].metadata
97 md = results[node].metadata
97 start = date2num(md.started)
98 start = date2num(md.started)
98 runtime = date2num(md.completed) - start
99 runtime = date2num(md.completed) - start
99 pos[node] = (start, runtime)
100 pos[node] = (start, runtime)
100 colors[node] = md.engine_id
101 colors[node] = md.engine_id
101 validate_tree(G, results)
102 validate_tree(G, results)
102 nx.draw(G, pos, node_list = colors.keys(), node_color=colors.values(), cmap=gist_rainbow)
103 nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(), cmap=gist_rainbow,
104 with_labels=False)
105 x,y = zip(*pos.values())
106 xmin,ymin = map(min, (x,y))
107 xmax,ymax = map(max, (x,y))
108 xscale = xmax-xmin
109 yscale = ymax-ymin
110 pylab.xlim(xmin-xscale*.1,xmax+xscale*.1)
111 pylab.ylim(ymin-yscale*.1,ymax+yscale*.1)
103 return G,results
112 return G,results
104
113
105 if __name__ == '__main__':
114 if __name__ == '__main__':
106 import pylab
115 import pylab
107 # main(5,10)
116 # main(5,10)
108 main(32,96)
117 main(32,96)
109 pylab.show()
118 pylab.show()
110 No newline at end of file
119
@@ -1,144 +1,144 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Run a Monte-Carlo options pricer in parallel."""
2 """Run a Monte-Carlo options pricer in parallel."""
3
3
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Imports
5 # Imports
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7
7
8 import sys
8 import sys
9 import time
9 import time
10 from IPython.zmq.parallel import client
10 from IPython.zmq.parallel import client
11 import numpy as np
11 import numpy as np
12 from mcpricer import price_options
12 from mcpricer import price_options
13 from matplotlib import pyplot as plt
13 from matplotlib import pyplot as plt
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Setup parameters for the run
16 # Setup parameters for the run
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 def ask_question(text, the_type, default):
19 def ask_question(text, the_type, default):
20 s = '%s [%r]: ' % (text, the_type(default))
20 s = '%s [%r]: ' % (text, the_type(default))
21 result = raw_input(s)
21 result = raw_input(s)
22 if result:
22 if result:
23 return the_type(result)
23 return the_type(result)
24 else:
24 else:
25 return the_type(default)
25 return the_type(default)
26
26
27 cluster_profile = ask_question("Cluster profile", str, "default")
27 cluster_profile = ask_question("Cluster profile", str, "default")
28 price = ask_question("Initial price", float, 100.0)
28 price = ask_question("Initial price", float, 100.0)
29 rate = ask_question("Interest rate", float, 0.05)
29 rate = ask_question("Interest rate", float, 0.05)
30 days = ask_question("Days to expiration", int, 260)
30 days = ask_question("Days to expiration", int, 260)
31 paths = ask_question("Number of MC paths", int, 10000)
31 paths = ask_question("Number of MC paths", int, 10000)
32 n_strikes = ask_question("Number of strike values", int, 5)
32 n_strikes = ask_question("Number of strike values", int, 5)
33 min_strike = ask_question("Min strike price", float, 90.0)
33 min_strike = ask_question("Min strike price", float, 90.0)
34 max_strike = ask_question("Max strike price", float, 110.0)
34 max_strike = ask_question("Max strike price", float, 110.0)
35 n_sigmas = ask_question("Number of volatility values", int, 5)
35 n_sigmas = ask_question("Number of volatility values", int, 5)
36 min_sigma = ask_question("Min volatility", float, 0.1)
36 min_sigma = ask_question("Min volatility", float, 0.1)
37 max_sigma = ask_question("Max volatility", float, 0.4)
37 max_sigma = ask_question("Max volatility", float, 0.4)
38
38
39 strike_vals = np.linspace(min_strike, max_strike, n_strikes)
39 strike_vals = np.linspace(min_strike, max_strike, n_strikes)
40 sigma_vals = np.linspace(min_sigma, max_sigma, n_sigmas)
40 sigma_vals = np.linspace(min_sigma, max_sigma, n_sigmas)
41
41
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43 # Setup for parallel calculation
43 # Setup for parallel calculation
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45
45
46 # The Client is used to setup the calculation and works with all
46 # The Client is used to setup the calculation and works with all
47 # engines.
47 # engines.
48 c = client.Client(profile=cluster_profile)
48 c = client.Client(profile=cluster_profile)
49
49
50 # A LoadBalancedView is an interface to the engines that provides dynamic load
50 # A LoadBalancedView is an interface to the engines that provides dynamic load
51 # balancing at the expense of not knowing which engine will execute the code.
51 # balancing at the expense of not knowing which engine will execute the code.
52 view = c[None]
52 view = c.view()
53
53
54 # Initialize the common code on the engines. This Python module has the
54 # Initialize the common code on the engines. This Python module has the
55 # price_options function that prices the options.
55 # price_options function that prices the options.
56
56
57 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
58 # Perform parallel calculation
58 # Perform parallel calculation
59 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
60
60
61 print "Running parallel calculation over strike prices and volatilities..."
61 print "Running parallel calculation over strike prices and volatilities..."
62 print "Strike prices: ", strike_vals
62 print "Strike prices: ", strike_vals
63 print "Volatilities: ", sigma_vals
63 print "Volatilities: ", sigma_vals
64 sys.stdout.flush()
64 sys.stdout.flush()
65
65
66 # Submit tasks to the TaskClient for each (strike, sigma) pair as a MapTask.
66 # Submit tasks to the TaskClient for each (strike, sigma) pair as a MapTask.
67 t1 = time.time()
67 t1 = time.time()
68 async_results = []
68 async_results = []
69 for strike in strike_vals:
69 for strike in strike_vals:
70 for sigma in sigma_vals:
70 for sigma in sigma_vals:
71 ar = view.apply_async(price_options, price, strike, sigma, rate, days, paths)
71 ar = view.apply_async(price_options, price, strike, sigma, rate, days, paths)
72 async_results.append(ar)
72 async_results.append(ar)
73
73
74 print "Submitted tasks: ", len(async_results)
74 print "Submitted tasks: ", len(async_results)
75 sys.stdout.flush()
75 sys.stdout.flush()
76
76
77 # Block until all tasks are completed.
77 # Block until all tasks are completed.
78 c.barrier(async_results)
78 c.barrier(async_results)
79 t2 = time.time()
79 t2 = time.time()
80 t = t2-t1
80 t = t2-t1
81
81
82 print "Parallel calculation completed, time = %s s" % t
82 print "Parallel calculation completed, time = %s s" % t
83 print "Collecting results..."
83 print "Collecting results..."
84
84
85 # Get the results using TaskClient.get_task_result.
85 # Get the results using TaskClient.get_task_result.
86 results = [ar.get() for ar in async_results]
86 results = [ar.get() for ar in async_results]
87
87
88 # Assemble the result into a structured NumPy array.
88 # Assemble the result into a structured NumPy array.
89 prices = np.empty(n_strikes*n_sigmas,
89 prices = np.empty(n_strikes*n_sigmas,
90 dtype=[('ecall',float),('eput',float),('acall',float),('aput',float)]
90 dtype=[('ecall',float),('eput',float),('acall',float),('aput',float)]
91 )
91 )
92
92
93 for i, price in enumerate(results):
93 for i, price in enumerate(results):
94 prices[i] = tuple(price)
94 prices[i] = tuple(price)
95
95
96 prices.shape = (n_strikes, n_sigmas)
96 prices.shape = (n_strikes, n_sigmas)
97 strike_mesh, sigma_mesh = np.meshgrid(strike_vals, sigma_vals)
97 strike_mesh, sigma_mesh = np.meshgrid(strike_vals, sigma_vals)
98
98
99 print "Results are available: strike_mesh, sigma_mesh, prices"
99 print "Results are available: strike_mesh, sigma_mesh, prices"
100 print "To plot results type 'plot_options(sigma_mesh, strike_mesh, prices)'"
100 print "To plot results type 'plot_options(sigma_mesh, strike_mesh, prices)'"
101
101
102 #-----------------------------------------------------------------------------
102 #-----------------------------------------------------------------------------
103 # Utilities
103 # Utilities
104 #-----------------------------------------------------------------------------
104 #-----------------------------------------------------------------------------
105
105
106 def plot_options(sigma_mesh, strike_mesh, prices):
106 def plot_options(sigma_mesh, strike_mesh, prices):
107 """
107 """
108 Make a contour plot of the option price in (sigma, strike) space.
108 Make a contour plot of the option price in (sigma, strike) space.
109 """
109 """
110 plt.figure(1)
110 plt.figure(1)
111
111
112 plt.subplot(221)
112 plt.subplot(221)
113 plt.contourf(sigma_mesh, strike_mesh, prices['ecall'])
113 plt.contourf(sigma_mesh, strike_mesh, prices['ecall'])
114 plt.axis('tight')
114 plt.axis('tight')
115 plt.colorbar()
115 plt.colorbar()
116 plt.title('European Call')
116 plt.title('European Call')
117 plt.ylabel("Strike Price")
117 plt.ylabel("Strike Price")
118
118
119 plt.subplot(222)
119 plt.subplot(222)
120 plt.contourf(sigma_mesh, strike_mesh, prices['acall'])
120 plt.contourf(sigma_mesh, strike_mesh, prices['acall'])
121 plt.axis('tight')
121 plt.axis('tight')
122 plt.colorbar()
122 plt.colorbar()
123 plt.title("Asian Call")
123 plt.title("Asian Call")
124
124
125 plt.subplot(223)
125 plt.subplot(223)
126 plt.contourf(sigma_mesh, strike_mesh, prices['eput'])
126 plt.contourf(sigma_mesh, strike_mesh, prices['eput'])
127 plt.axis('tight')
127 plt.axis('tight')
128 plt.colorbar()
128 plt.colorbar()
129 plt.title("European Put")
129 plt.title("European Put")
130 plt.xlabel("Volatility")
130 plt.xlabel("Volatility")
131 plt.ylabel("Strike Price")
131 plt.ylabel("Strike Price")
132
132
133 plt.subplot(224)
133 plt.subplot(224)
134 plt.contourf(sigma_mesh, strike_mesh, prices['aput'])
134 plt.contourf(sigma_mesh, strike_mesh, prices['aput'])
135 plt.axis('tight')
135 plt.axis('tight')
136 plt.colorbar()
136 plt.colorbar()
137 plt.title("Asian Put")
137 plt.title("Asian Put")
138 plt.xlabel("Volatility")
138 plt.xlabel("Volatility")
139
139
140
140
141
141
142
142
143
143
144
144
@@ -1,63 +1,64 b''
1 """Calculate statistics on the digits of pi in parallel.
1 """Calculate statistics on the digits of pi in parallel.
2
2
3 This program uses the functions in :file:`pidigits.py` to calculate
3 This program uses the functions in :file:`pidigits.py` to calculate
4 the frequencies of 2 digit sequences in the digits of pi. The
4 the frequencies of 2 digit sequences in the digits of pi. The
5 results are plotted using matplotlib.
5 results are plotted using matplotlib.
6
6
7 To run, text files from http://www.super-computing.org/
7 To run, text files from http://www.super-computing.org/
8 must be installed in the working directory of the IPython engines.
8 must be installed in the working directory of the IPython engines.
9 The actual filenames to be used can be set with the ``filestring``
9 The actual filenames to be used can be set with the ``filestring``
10 variable below.
10 variable below.
11
11
12 The dataset we have been using for this is the 200 million digit one here:
12 The dataset we have been using for this is the 200 million digit one here:
13 ftp://pi.super-computing.org/.2/pi200m/
13 ftp://pi.super-computing.org/.2/pi200m/
14
14
15 and the files used will be downloaded if they are not in the working directory
15 and the files used will be downloaded if they are not in the working directory
16 of the IPython engines.
16 of the IPython engines.
17 """
17 """
18
18
19 from IPython.zmq.parallel import client
19 from IPython.zmq.parallel import client
20 from matplotlib import pyplot as plt
20 from matplotlib import pyplot as plt
21 import numpy as np
21 import numpy as np
22 from pidigits import *
22 from pidigits import *
23 from timeit import default_timer as clock
23 from timeit import default_timer as clock
24
24
25 # Files with digits of pi (10m digits each)
25 # Files with digits of pi (10m digits each)
26 filestring = 'pi200m.ascii.%(i)02dof20'
26 filestring = 'pi200m.ascii.%(i)02dof20'
27 files = [filestring % {'i':i} for i in range(1,16)]
27 files = [filestring % {'i':i} for i in range(1,16)]
28
28
29 # Connect to the IPython cluster
29 # Connect to the IPython cluster
30 c = client.Client()
30 c = client.Client(profile='edison')
31 c.run('pidigits.py')
31 c.run('pidigits.py')
32
32
33 # the number of engines
33 # the number of engines
34 n = len(c.ids)
34 n = len(c)
35 id0 = list(c.ids)[0]
35 id0 = c.ids[0]
36 v = c[:]
37 v.set_flags(bound=True,block=True)
36 # fetch the pi-files
38 # fetch the pi-files
37 print "downloading %i files of pi"%n
39 print "downloading %i files of pi"%n
38 c.map(fetch_pi_file, files[:n])
40 v.map(fetch_pi_file, files[:n])
39 print "done"
41 print "done"
40
42
41 # Run 10m digits on 1 engine
43 # Run 10m digits on 1 engine
42 t1 = clock()
44 t1 = clock()
43 freqs10m = c[id0].apply_sync_bound(compute_two_digit_freqs, files[0])
45 freqs10m = c[id0].apply_sync_bound(compute_two_digit_freqs, files[0])
44 t2 = clock()
46 t2 = clock()
45 digits_per_second1 = 10.0e6/(t2-t1)
47 digits_per_second1 = 10.0e6/(t2-t1)
46 print "Digits per second (1 core, 10m digits): ", digits_per_second1
48 print "Digits per second (1 core, 10m digits): ", digits_per_second1
47
49
48
50
49 # Run n*10m digits on all engines
51 # Run n*10m digits on all engines
50 t1 = clock()
52 t1 = clock()
51 c.block=True
53 freqs_all = v.map(compute_two_digit_freqs, files[:n])
52 freqs_all = c.map(compute_two_digit_freqs, files[:n])
53 freqs150m = reduce_freqs(freqs_all)
54 freqs150m = reduce_freqs(freqs_all)
54 t2 = clock()
55 t2 = clock()
55 digits_per_second8 = n*10.0e6/(t2-t1)
56 digits_per_second8 = n*10.0e6/(t2-t1)
56 print "Digits per second (%i engines, %i0m digits): "%(n,n), digits_per_second8
57 print "Digits per second (%i engines, %i0m digits): "%(n,n), digits_per_second8
57
58
58 print "Speedup: ", digits_per_second8/digits_per_second1
59 print "Speedup: ", digits_per_second8/digits_per_second1
59
60
60 plot_two_digit_freqs(freqs150m)
61 plot_two_digit_freqs(freqs150m)
61 plt.title("2 digit sequences in %i0m digits of pi"%n)
62 plt.title("2 digit sequences in %i0m digits of pi"%n)
62 plt.show()
63 plt.show()
63
64
@@ -1,159 +1,157 b''
1 """Compute statistics on the digits of pi.
1 """Compute statistics on the digits of pi.
2
2
3 This uses precomputed digits of pi from the website
3 This uses precomputed digits of pi from the website
4 of Professor Yasumasa Kanada at the University of
4 of Professor Yasumasa Kanada at the University of
5 Tokoyo: http://www.super-computing.org/
5 Tokoyo: http://www.super-computing.org/
6
6
7 Currently, there are only functions to read the
7 Currently, there are only functions to read the
8 .txt (non-compressed, non-binary) files, but adding
8 .txt (non-compressed, non-binary) files, but adding
9 support for compression and binary files would be
9 support for compression and binary files would be
10 straightforward.
10 straightforward.
11
11
12 This focuses on computing the number of times that
12 This focuses on computing the number of times that
13 all 1, 2, n digits sequences occur in the digits of pi.
13 all 1, 2, n digits sequences occur in the digits of pi.
14 If the digits of pi are truly random, these frequencies
14 If the digits of pi are truly random, these frequencies
15 should be equal.
15 should be equal.
16 """
16 """
17
17
18 # Import statements
18 # Import statements
19 from __future__ import division, with_statement
19 from __future__ import division, with_statement
20
20
21 import os
22 import urllib
23
24 import numpy as np
21 import numpy as np
25 from matplotlib import pyplot as plt
22 from matplotlib import pyplot as plt
26
23
27 # Top-level functions
24 # Top-level functions
28
25
29 def fetch_pi_file(filename):
26 def fetch_pi_file(filename):
30 """This will download a segment of pi from super-computing.org
27 """This will download a segment of pi from super-computing.org
31 if the file is not already present.
28 if the file is not already present.
32 """
29 """
30 import os, urllib
33 ftpdir="ftp://pi.super-computing.org/.2/pi200m/"
31 ftpdir="ftp://pi.super-computing.org/.2/pi200m/"
34 if os.path.exists(filename):
32 if os.path.exists(filename):
35 # we already have it
33 # we already have it
36 return
34 return
37 else:
35 else:
38 # download it
36 # download it
39 urllib.urlretrieve(ftpdir+filename,filename)
37 urllib.urlretrieve(ftpdir+filename,filename)
40
38
41 def compute_one_digit_freqs(filename):
39 def compute_one_digit_freqs(filename):
42 """
40 """
43 Read digits of pi from a file and compute the 1 digit frequencies.
41 Read digits of pi from a file and compute the 1 digit frequencies.
44 """
42 """
45 d = txt_file_to_digits(filename)
43 d = txt_file_to_digits(filename)
46 freqs = one_digit_freqs(d)
44 freqs = one_digit_freqs(d)
47 return freqs
45 return freqs
48
46
49 def compute_two_digit_freqs(filename):
47 def compute_two_digit_freqs(filename):
50 """
48 """
51 Read digits of pi from a file and compute the 2 digit frequencies.
49 Read digits of pi from a file and compute the 2 digit frequencies.
52 """
50 """
53 d = txt_file_to_digits(filename)
51 d = txt_file_to_digits(filename)
54 freqs = two_digit_freqs(d)
52 freqs = two_digit_freqs(d)
55 return freqs
53 return freqs
56
54
57 def reduce_freqs(freqlist):
55 def reduce_freqs(freqlist):
58 """
56 """
59 Add up a list of freq counts to get the total counts.
57 Add up a list of freq counts to get the total counts.
60 """
58 """
61 allfreqs = np.zeros_like(freqlist[0])
59 allfreqs = np.zeros_like(freqlist[0])
62 for f in freqlist:
60 for f in freqlist:
63 allfreqs += f
61 allfreqs += f
64 return allfreqs
62 return allfreqs
65
63
66 def compute_n_digit_freqs(filename, n):
64 def compute_n_digit_freqs(filename, n):
67 """
65 """
68 Read digits of pi from a file and compute the n digit frequencies.
66 Read digits of pi from a file and compute the n digit frequencies.
69 """
67 """
70 d = txt_file_to_digits(filename)
68 d = txt_file_to_digits(filename)
71 freqs = n_digit_freqs(d, n)
69 freqs = n_digit_freqs(d, n)
72 return freqs
70 return freqs
73
71
74 # Read digits from a txt file
72 # Read digits from a txt file
75
73
76 def txt_file_to_digits(filename, the_type=str):
74 def txt_file_to_digits(filename, the_type=str):
77 """
75 """
78 Yield the digits of pi read from a .txt file.
76 Yield the digits of pi read from a .txt file.
79 """
77 """
80 with open(filename, 'r') as f:
78 with open(filename, 'r') as f:
81 for line in f.readlines():
79 for line in f.readlines():
82 for c in line:
80 for c in line:
83 if c != '\n' and c!= ' ':
81 if c != '\n' and c!= ' ':
84 yield the_type(c)
82 yield the_type(c)
85
83
86 # Actual counting functions
84 # Actual counting functions
87
85
88 def one_digit_freqs(digits, normalize=False):
86 def one_digit_freqs(digits, normalize=False):
89 """
87 """
90 Consume digits of pi and compute 1 digit freq. counts.
88 Consume digits of pi and compute 1 digit freq. counts.
91 """
89 """
92 freqs = np.zeros(10, dtype='i4')
90 freqs = np.zeros(10, dtype='i4')
93 for d in digits:
91 for d in digits:
94 freqs[int(d)] += 1
92 freqs[int(d)] += 1
95 if normalize:
93 if normalize:
96 freqs = freqs/freqs.sum()
94 freqs = freqs/freqs.sum()
97 return freqs
95 return freqs
98
96
99 def two_digit_freqs(digits, normalize=False):
97 def two_digit_freqs(digits, normalize=False):
100 """
98 """
101 Consume digits of pi and compute 2 digits freq. counts.
99 Consume digits of pi and compute 2 digits freq. counts.
102 """
100 """
103 freqs = np.zeros(100, dtype='i4')
101 freqs = np.zeros(100, dtype='i4')
104 last = digits.next()
102 last = digits.next()
105 this = digits.next()
103 this = digits.next()
106 for d in digits:
104 for d in digits:
107 index = int(last + this)
105 index = int(last + this)
108 freqs[index] += 1
106 freqs[index] += 1
109 last = this
107 last = this
110 this = d
108 this = d
111 if normalize:
109 if normalize:
112 freqs = freqs/freqs.sum()
110 freqs = freqs/freqs.sum()
113 return freqs
111 return freqs
114
112
115 def n_digit_freqs(digits, n, normalize=False):
113 def n_digit_freqs(digits, n, normalize=False):
116 """
114 """
117 Consume digits of pi and compute n digits freq. counts.
115 Consume digits of pi and compute n digits freq. counts.
118
116
119 This should only be used for 1-6 digits.
117 This should only be used for 1-6 digits.
120 """
118 """
121 freqs = np.zeros(pow(10,n), dtype='i4')
119 freqs = np.zeros(pow(10,n), dtype='i4')
122 current = np.zeros(n, dtype=int)
120 current = np.zeros(n, dtype=int)
123 for i in range(n):
121 for i in range(n):
124 current[i] = digits.next()
122 current[i] = digits.next()
125 for d in digits:
123 for d in digits:
126 index = int(''.join(map(str, current)))
124 index = int(''.join(map(str, current)))
127 freqs[index] += 1
125 freqs[index] += 1
128 current[0:-1] = current[1:]
126 current[0:-1] = current[1:]
129 current[-1] = d
127 current[-1] = d
130 if normalize:
128 if normalize:
131 freqs = freqs/freqs.sum()
129 freqs = freqs/freqs.sum()
132 return freqs
130 return freqs
133
131
134 # Plotting functions
132 # Plotting functions
135
133
136 def plot_two_digit_freqs(f2):
134 def plot_two_digit_freqs(f2):
137 """
135 """
138 Plot two digits frequency counts using matplotlib.
136 Plot two digits frequency counts using matplotlib.
139 """
137 """
140 f2_copy = f2.copy()
138 f2_copy = f2.copy()
141 f2_copy.shape = (10,10)
139 f2_copy.shape = (10,10)
142 ax = plt.matshow(f2_copy)
140 ax = plt.matshow(f2_copy)
143 plt.colorbar()
141 plt.colorbar()
144 for i in range(10):
142 for i in range(10):
145 for j in range(10):
143 for j in range(10):
146 plt.text(i-0.2, j+0.2, str(j)+str(i))
144 plt.text(i-0.2, j+0.2, str(j)+str(i))
147 plt.ylabel('First digit')
145 plt.ylabel('First digit')
148 plt.xlabel('Second digit')
146 plt.xlabel('Second digit')
149 return ax
147 return ax
150
148
151 def plot_one_digit_freqs(f1):
149 def plot_one_digit_freqs(f1):
152 """
150 """
153 Plot one digit frequency counts using matplotlib.
151 Plot one digit frequency counts using matplotlib.
154 """
152 """
155 ax = plt.plot(f1,'bo-')
153 ax = plt.plot(f1,'bo-')
156 plt.title('Single digit counts in pi')
154 plt.title('Single digit counts in pi')
157 plt.xlabel('Digit')
155 plt.xlabel('Digit')
158 plt.ylabel('Count')
156 plt.ylabel('Count')
159 return ax
157 return ax
@@ -1,283 +1,284 b''
1 =================
1 =================
2 Parallel examples
2 Parallel examples
3 =================
3 =================
4
4
5 .. note::
5 .. note::
6
6
7 Performance numbers from ``IPython.kernel``, not newparallel.
7 Performance numbers from ``IPython.kernel``, not newparallel.
8
8
9 In this section we describe two more involved examples of using an IPython
9 In this section we describe two more involved examples of using an IPython
10 cluster to perform a parallel computation. In these examples, we will be using
10 cluster to perform a parallel computation. In these examples, we will be using
11 IPython's "pylab" mode, which enables interactive plotting using the
11 IPython's "pylab" mode, which enables interactive plotting using the
12 Matplotlib package. IPython can be started in this mode by typing::
12 Matplotlib package. IPython can be started in this mode by typing::
13
13
14 ipython --pylab
14 ipython --pylab
15
15
16 at the system command line.
16 at the system command line.
17
17
18 150 million digits of pi
18 150 million digits of pi
19 ========================
19 ========================
20
20
21 In this example we would like to study the distribution of digits in the
21 In this example we would like to study the distribution of digits in the
22 number pi (in base 10). While it is not known if pi is a normal number (a
22 number pi (in base 10). While it is not known if pi is a normal number (a
23 number is normal in base 10 if 0-9 occur with equal likelihood) numerical
23 number is normal in base 10 if 0-9 occur with equal likelihood) numerical
24 investigations suggest that it is. We will begin with a serial calculation on
24 investigations suggest that it is. We will begin with a serial calculation on
25 10,000 digits of pi and then perform a parallel calculation involving 150
25 10,000 digits of pi and then perform a parallel calculation involving 150
26 million digits.
26 million digits.
27
27
28 In both the serial and parallel calculation we will be using functions defined
28 In both the serial and parallel calculation we will be using functions defined
29 in the :file:`pidigits.py` file, which is available in the
29 in the :file:`pidigits.py` file, which is available in the
30 :file:`docs/examples/newparallel` directory of the IPython source distribution.
30 :file:`docs/examples/newparallel` directory of the IPython source distribution.
31 These functions provide basic facilities for working with the digits of pi and
31 These functions provide basic facilities for working with the digits of pi and
32 can be loaded into IPython by putting :file:`pidigits.py` in your current
32 can be loaded into IPython by putting :file:`pidigits.py` in your current
33 working directory and then doing:
33 working directory and then doing:
34
34
35 .. sourcecode:: ipython
35 .. sourcecode:: ipython
36
36
37 In [1]: run pidigits.py
37 In [1]: run pidigits.py
38
38
39 Serial calculation
39 Serial calculation
40 ------------------
40 ------------------
41
41
42 For the serial calculation, we will use `SymPy <http://www.sympy.org>`_ to
42 For the serial calculation, we will use `SymPy <http://www.sympy.org>`_ to
43 calculate 10,000 digits of pi and then look at the frequencies of the digits
43 calculate 10,000 digits of pi and then look at the frequencies of the digits
44 0-9. Out of 10,000 digits, we expect each digit to occur 1,000 times. While
44 0-9. Out of 10,000 digits, we expect each digit to occur 1,000 times. While
45 SymPy is capable of calculating many more digits of pi, our purpose here is to
45 SymPy is capable of calculating many more digits of pi, our purpose here is to
46 set the stage for the much larger parallel calculation.
46 set the stage for the much larger parallel calculation.
47
47
48 In this example, we use two functions from :file:`pidigits.py`:
48 In this example, we use two functions from :file:`pidigits.py`:
49 :func:`one_digit_freqs` (which calculates how many times each digit occurs)
49 :func:`one_digit_freqs` (which calculates how many times each digit occurs)
50 and :func:`plot_one_digit_freqs` (which uses Matplotlib to plot the result).
50 and :func:`plot_one_digit_freqs` (which uses Matplotlib to plot the result).
51 Here is an interactive IPython session that uses these functions with
51 Here is an interactive IPython session that uses these functions with
52 SymPy:
52 SymPy:
53
53
54 .. sourcecode:: ipython
54 .. sourcecode:: ipython
55
55
56 In [7]: import sympy
56 In [7]: import sympy
57
57
58 In [8]: pi = sympy.pi.evalf(40)
58 In [8]: pi = sympy.pi.evalf(40)
59
59
60 In [9]: pi
60 In [9]: pi
61 Out[9]: 3.141592653589793238462643383279502884197
61 Out[9]: 3.141592653589793238462643383279502884197
62
62
63 In [10]: pi = sympy.pi.evalf(10000)
63 In [10]: pi = sympy.pi.evalf(10000)
64
64
65 In [11]: digits = (d for d in str(pi)[2:]) # create a sequence of digits
65 In [11]: digits = (d for d in str(pi)[2:]) # create a sequence of digits
66
66
67 In [12]: run pidigits.py # load one_digit_freqs/plot_one_digit_freqs
67 In [12]: run pidigits.py # load one_digit_freqs/plot_one_digit_freqs
68
68
69 In [13]: freqs = one_digit_freqs(digits)
69 In [13]: freqs = one_digit_freqs(digits)
70
70
71 In [14]: plot_one_digit_freqs(freqs)
71 In [14]: plot_one_digit_freqs(freqs)
72 Out[14]: [<matplotlib.lines.Line2D object at 0x18a55290>]
72 Out[14]: [<matplotlib.lines.Line2D object at 0x18a55290>]
73
73
74 The resulting plot of the single digit counts shows that each digit occurs
74 The resulting plot of the single digit counts shows that each digit occurs
75 approximately 1,000 times, but that with only 10,000 digits the
75 approximately 1,000 times, but that with only 10,000 digits the
76 statistical fluctuations are still rather large:
76 statistical fluctuations are still rather large:
77
77
78 .. image:: ../parallel/single_digits.*
78 .. image:: ../parallel/single_digits.*
79
79
80 It is clear that to reduce the relative fluctuations in the counts, we need
80 It is clear that to reduce the relative fluctuations in the counts, we need
81 to look at many more digits of pi. That brings us to the parallel calculation.
81 to look at many more digits of pi. That brings us to the parallel calculation.
82
82
83 Parallel calculation
83 Parallel calculation
84 --------------------
84 --------------------
85
85
86 Calculating many digits of pi is a challenging computational problem in itself.
86 Calculating many digits of pi is a challenging computational problem in itself.
87 Because we want to focus on the distribution of digits in this example, we
87 Because we want to focus on the distribution of digits in this example, we
88 will use pre-computed digit of pi from the website of Professor Yasumasa
88 will use pre-computed digit of pi from the website of Professor Yasumasa
89 Kanada at the University of Tokyo (http://www.super-computing.org). These
89 Kanada at the University of Tokyo (http://www.super-computing.org). These
90 digits come in a set of text files (ftp://pi.super-computing.org/.2/pi200m/)
90 digits come in a set of text files (ftp://pi.super-computing.org/.2/pi200m/)
91 that each have 10 million digits of pi.
91 that each have 10 million digits of pi.
92
92
93 For the parallel calculation, we have copied these files to the local hard
93 For the parallel calculation, we have copied these files to the local hard
94 drives of the compute nodes. A total of 15 of these files will be used, for a
94 drives of the compute nodes. A total of 15 of these files will be used, for a
95 total of 150 million digits of pi. To make things a little more interesting we
95 total of 150 million digits of pi. To make things a little more interesting we
96 will calculate the frequencies of all 2 digits sequences (00-99) and then plot
96 will calculate the frequencies of all 2 digits sequences (00-99) and then plot
97 the result using a 2D matrix in Matplotlib.
97 the result using a 2D matrix in Matplotlib.
98
98
99 The overall idea of the calculation is simple: each IPython engine will
99 The overall idea of the calculation is simple: each IPython engine will
100 compute the two digit counts for the digits in a single file. Then in a final
100 compute the two digit counts for the digits in a single file. Then in a final
101 step the counts from each engine will be added up. To perform this
101 step the counts from each engine will be added up. To perform this
102 calculation, we will need two top-level functions from :file:`pidigits.py`:
102 calculation, we will need two top-level functions from :file:`pidigits.py`:
103
103
104 .. literalinclude:: ../../examples/newparallel/pidigits.py
104 .. literalinclude:: ../../examples/newparallel/pidigits.py
105 :language: python
105 :language: python
106 :lines: 41-56
106 :lines: 41-56
107
107
108 We will also use the :func:`plot_two_digit_freqs` function to plot the
108 We will also use the :func:`plot_two_digit_freqs` function to plot the
109 results. The code to run this calculation in parallel is contained in
109 results. The code to run this calculation in parallel is contained in
110 :file:`docs/examples/newparallel/parallelpi.py`. This code can be run in parallel
110 :file:`docs/examples/newparallel/parallelpi.py`. This code can be run in parallel
111 using IPython by following these steps:
111 using IPython by following these steps:
112
112
113 1. Use :command:`ipclusterz` to start 15 engines. We used an 8 core (2 quad
113 1. Use :command:`ipclusterz` to start 15 engines. We used an 8 core (2 quad
114 core CPUs) cluster with hyperthreading enabled which makes the 8 cores
114 core CPUs) cluster with hyperthreading enabled which makes the 8 cores
115 looks like 16 (1 controller + 15 engines) in the OS. However, the maximum
115 looks like 16 (1 controller + 15 engines) in the OS. However, the maximum
116 speedup we can observe is still only 8x.
116 speedup we can observe is still only 8x.
117 2. With the file :file:`parallelpi.py` in your current working directory, open
117 2. With the file :file:`parallelpi.py` in your current working directory, open
118 up IPython in pylab mode and type ``run parallelpi.py``. This will download
118 up IPython in pylab mode and type ``run parallelpi.py``. This will download
119 the pi files via ftp the first time you run it, if they are not
119 the pi files via ftp the first time you run it, if they are not
120 present in the Engines' working directory.
120 present in the Engines' working directory.
121
121
122 When run on our 8 core cluster, we observe a speedup of 7.7x. This is slightly
122 When run on our 8 core cluster, we observe a speedup of 7.7x. This is slightly
123 less than linear scaling (8x) because the controller is also running on one of
123 less than linear scaling (8x) because the controller is also running on one of
124 the cores.
124 the cores.
125
125
126 To emphasize the interactive nature of IPython, we now show how the
126 To emphasize the interactive nature of IPython, we now show how the
127 calculation can also be run by simply typing the commands from
127 calculation can also be run by simply typing the commands from
128 :file:`parallelpi.py` interactively into IPython:
128 :file:`parallelpi.py` interactively into IPython:
129
129
130 .. sourcecode:: ipython
130 .. sourcecode:: ipython
131
131
132 In [1]: from IPython.zmq.parallel import client
132 In [1]: from IPython.zmq.parallel import client
133
133
134 # The Client allows us to use the engines interactively.
134 # The Client allows us to use the engines interactively.
135 # We simply pass Client the name of the cluster profile we
135 # We simply pass Client the name of the cluster profile we
136 # are using.
136 # are using.
137 In [2]: c = client.Client(profile='mycluster')
137 In [2]: c = client.Client(profile='mycluster')
138 In [3]: view = c.view(balanced=True)
138
139
139 In [3]: c.ids
140 In [3]: c.ids
140 Out[3]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
141 Out[3]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
141
142
142 In [4]: run pidigits.py
143 In [4]: run pidigits.py
143
144
144 In [5]: filestring = 'pi200m.ascii.%(i)02dof20'
145 In [5]: filestring = 'pi200m.ascii.%(i)02dof20'
145
146
146 # Create the list of files to process.
147 # Create the list of files to process.
147 In [6]: files = [filestring % {'i':i} for i in range(1,16)]
148 In [6]: files = [filestring % {'i':i} for i in range(1,16)]
148
149
149 In [7]: files
150 In [7]: files
150 Out[7]:
151 Out[7]:
151 ['pi200m.ascii.01of20',
152 ['pi200m.ascii.01of20',
152 'pi200m.ascii.02of20',
153 'pi200m.ascii.02of20',
153 'pi200m.ascii.03of20',
154 'pi200m.ascii.03of20',
154 'pi200m.ascii.04of20',
155 'pi200m.ascii.04of20',
155 'pi200m.ascii.05of20',
156 'pi200m.ascii.05of20',
156 'pi200m.ascii.06of20',
157 'pi200m.ascii.06of20',
157 'pi200m.ascii.07of20',
158 'pi200m.ascii.07of20',
158 'pi200m.ascii.08of20',
159 'pi200m.ascii.08of20',
159 'pi200m.ascii.09of20',
160 'pi200m.ascii.09of20',
160 'pi200m.ascii.10of20',
161 'pi200m.ascii.10of20',
161 'pi200m.ascii.11of20',
162 'pi200m.ascii.11of20',
162 'pi200m.ascii.12of20',
163 'pi200m.ascii.12of20',
163 'pi200m.ascii.13of20',
164 'pi200m.ascii.13of20',
164 'pi200m.ascii.14of20',
165 'pi200m.ascii.14of20',
165 'pi200m.ascii.15of20']
166 'pi200m.ascii.15of20']
166
167
167 # download the data files if they don't already exist:
168 # download the data files if they don't already exist:
168 In [8]: c.map(fetch_pi_file, files)
169 In [8]: c.map(fetch_pi_file, files)
169
170
170 # This is the parallel calculation using the Client.map method
171 # This is the parallel calculation using the Client.map method
171 # which applies compute_two_digit_freqs to each file in files in parallel.
172 # which applies compute_two_digit_freqs to each file in files in parallel.
172 In [9]: freqs_all = c.map(compute_two_digit_freqs, files)
173 In [9]: freqs_all = c.map(compute_two_digit_freqs, files)
173
174
174 # Add up the frequencies from each engine.
175 # Add up the frequencies from each engine.
175 In [10]: freqs = reduce_freqs(freqs_all)
176 In [10]: freqs = reduce_freqs(freqs_all)
176
177
177 In [11]: plot_two_digit_freqs(freqs)
178 In [11]: plot_two_digit_freqs(freqs)
178 Out[11]: <matplotlib.image.AxesImage object at 0x18beb110>
179 Out[11]: <matplotlib.image.AxesImage object at 0x18beb110>
179
180
180 In [12]: plt.title('2 digit counts of 150m digits of pi')
181 In [12]: plt.title('2 digit counts of 150m digits of pi')
181 Out[12]: <matplotlib.text.Text object at 0x18d1f9b0>
182 Out[12]: <matplotlib.text.Text object at 0x18d1f9b0>
182
183
183 The resulting plot generated by Matplotlib is shown below. The colors indicate
184 The resulting plot generated by Matplotlib is shown below. The colors indicate
184 which two digit sequences are more (red) or less (blue) likely to occur in the
185 which two digit sequences are more (red) or less (blue) likely to occur in the
185 first 150 million digits of pi. We clearly see that the sequence "41" is
186 first 150 million digits of pi. We clearly see that the sequence "41" is
186 most likely and that "06" and "07" are least likely. Further analysis would
187 most likely and that "06" and "07" are least likely. Further analysis would
187 show that the relative size of the statistical fluctuations have decreased
188 show that the relative size of the statistical fluctuations have decreased
188 compared to the 10,000 digit calculation.
189 compared to the 10,000 digit calculation.
189
190
190 .. image:: ../parallel/two_digit_counts.*
191 .. image:: ../parallel/two_digit_counts.*
191
192
192
193
193 Parallel options pricing
194 Parallel options pricing
194 ========================
195 ========================
195
196
196 An option is a financial contract that gives the buyer of the contract the
197 An option is a financial contract that gives the buyer of the contract the
197 right to buy (a "call") or sell (a "put") a secondary asset (a stock for
198 right to buy (a "call") or sell (a "put") a secondary asset (a stock for
198 example) at a particular date in the future (the expiration date) for a
199 example) at a particular date in the future (the expiration date) for a
199 pre-agreed upon price (the strike price). For this right, the buyer pays the
200 pre-agreed upon price (the strike price). For this right, the buyer pays the
200 seller a premium (the option price). There are a wide variety of flavors of
201 seller a premium (the option price). There are a wide variety of flavors of
201 options (American, European, Asian, etc.) that are useful for different
202 options (American, European, Asian, etc.) that are useful for different
202 purposes: hedging against risk, speculation, etc.
203 purposes: hedging against risk, speculation, etc.
203
204
204 Much of modern finance is driven by the need to price these contracts
205 Much of modern finance is driven by the need to price these contracts
205 accurately based on what is known about the properties (such as volatility) of
206 accurately based on what is known about the properties (such as volatility) of
206 the underlying asset. One method of pricing options is to use a Monte Carlo
207 the underlying asset. One method of pricing options is to use a Monte Carlo
207 simulation of the underlying asset price. In this example we use this approach
208 simulation of the underlying asset price. In this example we use this approach
208 to price both European and Asian (path dependent) options for various strike
209 to price both European and Asian (path dependent) options for various strike
209 prices and volatilities.
210 prices and volatilities.
210
211
211 The code for this example can be found in the :file:`docs/examples/newparallel`
212 The code for this example can be found in the :file:`docs/examples/newparallel`
212 directory of the IPython source. The function :func:`price_options` in
213 directory of the IPython source. The function :func:`price_options` in
213 :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using
214 :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using
214 the NumPy package and is shown here:
215 the NumPy package and is shown here:
215
216
216 .. literalinclude:: ../../examples/newparallel/mcpricer.py
217 .. literalinclude:: ../../examples/newparallel/mcpricer.py
217 :language: python
218 :language: python
218
219
219 To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class,
220 To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class,
220 which distributes work to the engines using dynamic load balancing. This
221 which distributes work to the engines using dynamic load balancing. This
221 view is a wrapper of the :class:`Client` class shown in
222 view is a wrapper of the :class:`Client` class shown in
222 the previous example. The parallel calculation using :class:`LoadBalancedView` can
223 the previous example. The parallel calculation using :class:`LoadBalancedView` can
223 be found in the file :file:`mcpricer.py`. The code in this file creates a
224 be found in the file :file:`mcpricer.py`. The code in this file creates a
224 :class:`TaskClient` instance and then submits a set of tasks using
225 :class:`TaskClient` instance and then submits a set of tasks using
225 :meth:`TaskClient.run` that calculate the option prices for different
226 :meth:`TaskClient.run` that calculate the option prices for different
226 volatilities and strike prices. The results are then plotted as a 2D contour
227 volatilities and strike prices. The results are then plotted as a 2D contour
227 plot using Matplotlib.
228 plot using Matplotlib.
228
229
229 .. literalinclude:: ../../examples/newparallel/mcdriver.py
230 .. literalinclude:: ../../examples/newparallel/mcdriver.py
230 :language: python
231 :language: python
231
232
232 To use this code, start an IPython cluster using :command:`ipclusterz`, open
233 To use this code, start an IPython cluster using :command:`ipclusterz`, open
233 IPython in the pylab mode with the file :file:`mcdriver.py` in your current
234 IPython in the pylab mode with the file :file:`mcdriver.py` in your current
234 working directory and then type:
235 working directory and then type:
235
236
236 .. sourcecode:: ipython
237 .. sourcecode:: ipython
237
238
238 In [7]: run mcdriver.py
239 In [7]: run mcdriver.py
239 Submitted tasks: [0, 1, 2, ...]
240 Submitted tasks: [0, 1, 2, ...]
240
241
241 Once all the tasks have finished, the results can be plotted using the
242 Once all the tasks have finished, the results can be plotted using the
242 :func:`plot_options` function. Here we make contour plots of the Asian
243 :func:`plot_options` function. Here we make contour plots of the Asian
243 call and Asian put options as function of the volatility and strike price:
244 call and Asian put options as function of the volatility and strike price:
244
245
245 .. sourcecode:: ipython
246 .. sourcecode:: ipython
246
247
247 In [8]: plot_options(sigma_vals, K_vals, prices['acall'])
248 In [8]: plot_options(sigma_vals, K_vals, prices['acall'])
248
249
249 In [9]: plt.figure()
250 In [9]: plt.figure()
250 Out[9]: <matplotlib.figure.Figure object at 0x18c178d0>
251 Out[9]: <matplotlib.figure.Figure object at 0x18c178d0>
251
252
252 In [10]: plot_options(sigma_vals, K_vals, prices['aput'])
253 In [10]: plot_options(sigma_vals, K_vals, prices['aput'])
253
254
254 These results are shown in the two figures below. On a 8 core cluster the
255 These results are shown in the two figures below. On a 8 core cluster the
255 entire calculation (10 strike prices, 10 volatilities, 100,000 paths for each)
256 entire calculation (10 strike prices, 10 volatilities, 100,000 paths for each)
256 took 30 seconds in parallel, giving a speedup of 7.7x, which is comparable
257 took 30 seconds in parallel, giving a speedup of 7.7x, which is comparable
257 to the speedup observed in our previous example.
258 to the speedup observed in our previous example.
258
259
259 .. image:: ../parallel/asian_call.*
260 .. image:: ../parallel/asian_call.*
260
261
261 .. image:: ../parallel/asian_put.*
262 .. image:: ../parallel/asian_put.*
262
263
263 Conclusion
264 Conclusion
264 ==========
265 ==========
265
266
266 To conclude these examples, we summarize the key features of IPython's
267 To conclude these examples, we summarize the key features of IPython's
267 parallel architecture that have been demonstrated:
268 parallel architecture that have been demonstrated:
268
269
269 * Serial code can be parallelized often with only a few extra lines of code.
270 * Serial code can be parallelized often with only a few extra lines of code.
270 We have used the :class:`DirectView` and :class:`LoadBalancedView` classes
271 We have used the :class:`DirectView` and :class:`LoadBalancedView` classes
271 for this purpose.
272 for this purpose.
272 * The resulting parallel code can be run without ever leaving the IPython's
273 * The resulting parallel code can be run without ever leaving the IPython's
273 interactive shell.
274 interactive shell.
274 * Any data computed in parallel can be explored interactively through
275 * Any data computed in parallel can be explored interactively through
275 visualization or further numerical calculations.
276 visualization or further numerical calculations.
276 * We have run these examples on a cluster running Windows HPC Server 2008.
277 * We have run these examples on a cluster running Windows HPC Server 2008.
277 IPython's built in support for the Windows HPC job scheduler makes it
278 IPython's built in support for the Windows HPC job scheduler makes it
278 easy to get started with IPython's parallel capabilities.
279 easy to get started with IPython's parallel capabilities.
279
280
280 .. note::
281 .. note::
281
282
282 The newparallel code has never been run on Windows HPC Server, so the last
283 The newparallel code has never been run on Windows HPC Server, so the last
283 conclusion is untested.
284 conclusion is untested.
General Comments 0
You need to be logged in to leave comments. Login now