##// END OF EJS Templates
API update involving map and load-balancing
MinRK -
Show More
@@ -1,165 +1,165 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_newserialized -*-
2 # -*- test-case-name: IPython.kernel.test.test_newserialized -*-
3
3
4 """Refactored serialization classes and interfaces."""
4 """Refactored serialization classes and interfaces."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 # Tell nose to skip this module
8 # Tell nose to skip this module
9 __test__ = {}
9 __test__ = {}
10
10
11 #-------------------------------------------------------------------------------
11 #-------------------------------------------------------------------------------
12 # Copyright (C) 2008 The IPython Development Team
12 # Copyright (C) 2008 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 #-------------------------------------------------------------------------------
18 #-------------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-------------------------------------------------------------------------------
20 #-------------------------------------------------------------------------------
21
21
22 import cPickle as pickle
22 import cPickle as pickle
23
23
24 try:
24 try:
25 import numpy
25 import numpy
26 except ImportError:
26 except ImportError:
27 pass
27 pass
28
28
29 class SerializationError(Exception):
29 class SerializationError(Exception):
30 pass
30 pass
31
31
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33 # Classes and functions
33 # Classes and functions
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35
35
36 class ISerialized:
36 class ISerialized:
37
37
38 def getData():
38 def getData():
39 """"""
39 """"""
40
40
41 def getDataSize(units=10.0**6):
41 def getDataSize(units=10.0**6):
42 """"""
42 """"""
43
43
44 def getTypeDescriptor():
44 def getTypeDescriptor():
45 """"""
45 """"""
46
46
47 def getMetadata():
47 def getMetadata():
48 """"""
48 """"""
49
49
50
50
51 class IUnSerialized:
51 class IUnSerialized:
52
52
53 def getObject():
53 def getObject():
54 """"""
54 """"""
55
55
56 class Serialized(object):
56 class Serialized(object):
57
57
58 # implements(ISerialized)
58 # implements(ISerialized)
59
59
60 def __init__(self, data, typeDescriptor, metadata={}):
60 def __init__(self, data, typeDescriptor, metadata={}):
61 self.data = data
61 self.data = data
62 self.typeDescriptor = typeDescriptor
62 self.typeDescriptor = typeDescriptor
63 self.metadata = metadata
63 self.metadata = metadata
64
64
65 def getData(self):
65 def getData(self):
66 return self.data
66 return self.data
67
67
68 def getDataSize(self, units=10.0**6):
68 def getDataSize(self, units=10.0**6):
69 return len(self.data)/units
69 return len(self.data)/units
70
70
71 def getTypeDescriptor(self):
71 def getTypeDescriptor(self):
72 return self.typeDescriptor
72 return self.typeDescriptor
73
73
74 def getMetadata(self):
74 def getMetadata(self):
75 return self.metadata
75 return self.metadata
76
76
77
77
78 class UnSerialized(object):
78 class UnSerialized(object):
79
79
80 # implements(IUnSerialized)
80 # implements(IUnSerialized)
81
81
82 def __init__(self, obj):
82 def __init__(self, obj):
83 self.obj = obj
83 self.obj = obj
84
84
85 def getObject(self):
85 def getObject(self):
86 return self.obj
86 return self.obj
87
87
88
88
89 class SerializeIt(object):
89 class SerializeIt(object):
90
90
91 # implements(ISerialized)
91 # implements(ISerialized)
92
92
93 def __init__(self, unSerialized):
93 def __init__(self, unSerialized):
94 self.data = None
94 self.data = None
95 self.obj = unSerialized.getObject()
95 self.obj = unSerialized.getObject()
96 if globals().has_key('numpy') and isinstance(self.obj, numpy.ndarray):
96 if globals().has_key('numpy') and isinstance(self.obj, numpy.ndarray):
97 if len(self.obj) == 0: # length 0 arrays can't be reconstructed
97 if len(self.obj) == 0: # length 0 arrays can't be reconstructed
98 raise SerializationError("You cannot send a length 0 array")
98 raise SerializationError("You cannot send a length 0 array")
99 self.obj = numpy.ascontiguousarray(self.obj, dtype=None)
99 self.obj = numpy.ascontiguousarray(self.obj, dtype=None)
100 self.typeDescriptor = 'ndarray'
100 self.typeDescriptor = 'ndarray'
101 self.metadata = {'shape':self.obj.shape,
101 self.metadata = {'shape':self.obj.shape,
102 'dtype':self.obj.dtype.str}
102 'dtype':self.obj.dtype.str}
103 elif isinstance(self.obj, str):
103 elif isinstance(self.obj, str):
104 self.typeDescriptor = 'bytes'
104 self.typeDescriptor = 'bytes'
105 self.metadata = {}
105 self.metadata = {}
106 elif isinstance(self.obj, buffer):
106 elif isinstance(self.obj, buffer):
107 self.typeDescriptor = 'buffer'
107 self.typeDescriptor = 'buffer'
108 self.metadata = {}
108 self.metadata = {}
109 else:
109 else:
110 self.typeDescriptor = 'pickle'
110 self.typeDescriptor = 'pickle'
111 self.metadata = {}
111 self.metadata = {}
112 self._generateData()
112 self._generateData()
113
113
114 def _generateData(self):
114 def _generateData(self):
115 if self.typeDescriptor == 'ndarray':
115 if self.typeDescriptor == 'ndarray':
116 self.data = numpy.getbuffer(self.obj)
116 self.data = numpy.getbuffer(self.obj)
117 elif self.typeDescriptor in ('bytes', 'buffer'):
117 elif self.typeDescriptor in ('bytes', 'buffer'):
118 self.data = self.obj
118 self.data = self.obj
119 elif self.typeDescriptor == 'pickle':
119 elif self.typeDescriptor == 'pickle':
120 self.data = pickle.dumps(self.obj, -1)
120 self.data = pickle.dumps(self.obj, -1)
121 else:
121 else:
122 raise SerializationError("Really wierd serialization error.")
122 raise SerializationError("Really wierd serialization error.")
123 del self.obj
123 del self.obj
124
124
125 def getData(self):
125 def getData(self):
126 return self.data
126 return self.data
127
127
128 def getDataSize(self, units=10.0**6):
128 def getDataSize(self, units=10.0**6):
129 return 1.0*len(self.data)/units
129 return 1.0*len(self.data)/units
130
130
131 def getTypeDescriptor(self):
131 def getTypeDescriptor(self):
132 return self.typeDescriptor
132 return self.typeDescriptor
133
133
134 def getMetadata(self):
134 def getMetadata(self):
135 return self.metadata
135 return self.metadata
136
136
137
137
138 class UnSerializeIt(UnSerialized):
138 class UnSerializeIt(UnSerialized):
139
139
140 # implements(IUnSerialized)
140 # implements(IUnSerialized)
141
141
142 def __init__(self, serialized):
142 def __init__(self, serialized):
143 self.serialized = serialized
143 self.serialized = serialized
144
144
145 def getObject(self):
145 def getObject(self):
146 typeDescriptor = self.serialized.getTypeDescriptor()
146 typeDescriptor = self.serialized.getTypeDescriptor()
147 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
147 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
148 result = numpy.frombuffer(self.serialized.getData(), dtype = self.serialized.metadata['dtype'])
148 result = numpy.frombuffer(self.serialized.getData(), dtype = self.serialized.metadata['dtype'])
149 result.shape = self.serialized.metadata['shape']
149 result.shape = self.serialized.metadata['shape']
150 # This is a hack to make the array writable. We are working with
150 # numpy arrays with frombuffer are read-only. We are working with
151 # the numpy folks to address this issue.
151 # the numpy folks to address this issue.
152 result = result.copy()
152 # result = result.copy()
153 elif typeDescriptor == 'pickle':
153 elif typeDescriptor == 'pickle':
154 result = pickle.loads(self.serialized.getData())
154 result = pickle.loads(self.serialized.getData())
155 elif typeDescriptor in ('bytes', 'buffer'):
155 elif typeDescriptor in ('bytes', 'buffer'):
156 result = self.serialized.getData()
156 result = self.serialized.getData()
157 else:
157 else:
158 raise SerializationError("Really wierd serialization error.")
158 raise SerializationError("Really wierd serialization error.")
159 return result
159 return result
160
160
161 def serialize(obj):
161 def serialize(obj):
162 return SerializeIt(UnSerialized(obj))
162 return SerializeIt(UnSerialized(obj))
163
163
164 def unserialize(serialized):
164 def unserialize(serialized):
165 return UnSerializeIt(serialized).getObject()
165 return UnSerializeIt(serialized).getObject()
@@ -1,1472 +1,1328 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import json
14 import json
15 import time
15 import time
16 import warnings
16 import warnings
17 from datetime import datetime
17 from datetime import datetime
18 from getpass import getpass
18 from getpass import getpass
19 from pprint import pprint
19 from pprint import pprint
20
20
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 import zmq
23 import zmq
24 from zmq.eventloop import ioloop, zmqstream
24 # from zmq.eventloop import ioloop, zmqstream
25
25
26 from IPython.utils.path import get_ipython_dir
26 from IPython.utils.path import get_ipython_dir
27 from IPython.external.decorator import decorator
27 from IPython.external.decorator import decorator
28 from IPython.external.ssh import tunnel
28 from IPython.external.ssh import tunnel
29
29
30 import error
30 import error
31 import map as Map
31 import map as Map
32 import streamsession as ss
32 import streamsession as ss
33 from asyncresult import AsyncResult, AsyncMapResult
33 from asyncresult import AsyncResult, AsyncMapResult
34 from clusterdir import ClusterDir, ClusterDirError
34 from clusterdir import ClusterDir, ClusterDirError
35 from dependency import Dependency, depend, require, dependent
35 from dependency import Dependency, depend, require, dependent
36 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
36 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
37 from util import ReverseDict, disambiguate_url, validate_url
37 from util import ReverseDict, disambiguate_url, validate_url
38 from view import DirectView, LoadBalancedView
38 from view import DirectView, LoadBalancedView
39
39
40 #--------------------------------------------------------------------------
40 #--------------------------------------------------------------------------
41 # helpers for implementing old MEC API via client.apply
41 # helpers for implementing old MEC API via client.apply
42 #--------------------------------------------------------------------------
42 #--------------------------------------------------------------------------
43
43
44 def _push(ns):
44 def _push(ns):
45 """helper method for implementing `client.push` via `client.apply`"""
45 """helper method for implementing `client.push` via `client.apply`"""
46 globals().update(ns)
46 globals().update(ns)
47
47
48 def _pull(keys):
48 def _pull(keys):
49 """helper method for implementing `client.pull` via `client.apply`"""
49 """helper method for implementing `client.pull` via `client.apply`"""
50 g = globals()
50 g = globals()
51 if isinstance(keys, (list,tuple, set)):
51 if isinstance(keys, (list,tuple, set)):
52 for key in keys:
52 for key in keys:
53 if not g.has_key(key):
53 if not g.has_key(key):
54 raise NameError("name '%s' is not defined"%key)
54 raise NameError("name '%s' is not defined"%key)
55 return map(g.get, keys)
55 return map(g.get, keys)
56 else:
56 else:
57 if not g.has_key(keys):
57 if not g.has_key(keys):
58 raise NameError("name '%s' is not defined"%keys)
58 raise NameError("name '%s' is not defined"%keys)
59 return g.get(keys)
59 return g.get(keys)
60
60
61 def _clear():
61 def _clear():
62 """helper method for implementing `client.clear` via `client.apply`"""
62 """helper method for implementing `client.clear` via `client.apply`"""
63 globals().clear()
63 globals().clear()
64
64
65 def _execute(code):
65 def _execute(code):
66 """helper method for implementing `client.execute` via `client.apply`"""
66 """helper method for implementing `client.execute` via `client.apply`"""
67 exec code in globals()
67 exec code in globals()
68
68
69
69
70 #--------------------------------------------------------------------------
70 #--------------------------------------------------------------------------
71 # Decorators for Client methods
71 # Decorators for Client methods
72 #--------------------------------------------------------------------------
72 #--------------------------------------------------------------------------
73
73
74 @decorator
74 @decorator
75 def spinfirst(f, self, *args, **kwargs):
75 def spinfirst(f, self, *args, **kwargs):
76 """Call spin() to sync state prior to calling the method."""
76 """Call spin() to sync state prior to calling the method."""
77 self.spin()
77 self.spin()
78 return f(self, *args, **kwargs)
78 return f(self, *args, **kwargs)
79
79
80 @decorator
80 @decorator
81 def defaultblock(f, self, *args, **kwargs):
81 def defaultblock(f, self, *args, **kwargs):
82 """Default to self.block; preserve self.block."""
82 """Default to self.block; preserve self.block."""
83 block = kwargs.get('block',None)
83 block = kwargs.get('block',None)
84 block = self.block if block is None else block
84 block = self.block if block is None else block
85 saveblock = self.block
85 saveblock = self.block
86 self.block = block
86 self.block = block
87 try:
87 try:
88 ret = f(self, *args, **kwargs)
88 ret = f(self, *args, **kwargs)
89 finally:
89 finally:
90 self.block = saveblock
90 self.block = saveblock
91 return ret
91 return ret
92
92
93
93
94 #--------------------------------------------------------------------------
94 #--------------------------------------------------------------------------
95 # Classes
95 # Classes
96 #--------------------------------------------------------------------------
96 #--------------------------------------------------------------------------
97
97
98 class Metadata(dict):
98 class Metadata(dict):
99 """Subclass of dict for initializing metadata values.
99 """Subclass of dict for initializing metadata values.
100
100
101 Attribute access works on keys.
101 Attribute access works on keys.
102
102
103 These objects have a strict set of keys - errors will raise if you try
103 These objects have a strict set of keys - errors will raise if you try
104 to add new keys.
104 to add new keys.
105 """
105 """
106 def __init__(self, *args, **kwargs):
106 def __init__(self, *args, **kwargs):
107 dict.__init__(self)
107 dict.__init__(self)
108 md = {'msg_id' : None,
108 md = {'msg_id' : None,
109 'submitted' : None,
109 'submitted' : None,
110 'started' : None,
110 'started' : None,
111 'completed' : None,
111 'completed' : None,
112 'received' : None,
112 'received' : None,
113 'engine_uuid' : None,
113 'engine_uuid' : None,
114 'engine_id' : None,
114 'engine_id' : None,
115 'follow' : None,
115 'follow' : None,
116 'after' : None,
116 'after' : None,
117 'status' : None,
117 'status' : None,
118
118
119 'pyin' : None,
119 'pyin' : None,
120 'pyout' : None,
120 'pyout' : None,
121 'pyerr' : None,
121 'pyerr' : None,
122 'stdout' : '',
122 'stdout' : '',
123 'stderr' : '',
123 'stderr' : '',
124 }
124 }
125 self.update(md)
125 self.update(md)
126 self.update(dict(*args, **kwargs))
126 self.update(dict(*args, **kwargs))
127
127
128 def __getattr__(self, key):
128 def __getattr__(self, key):
129 """getattr aliased to getitem"""
129 """getattr aliased to getitem"""
130 if key in self.iterkeys():
130 if key in self.iterkeys():
131 return self[key]
131 return self[key]
132 else:
132 else:
133 raise AttributeError(key)
133 raise AttributeError(key)
134
134
135 def __setattr__(self, key, value):
135 def __setattr__(self, key, value):
136 """setattr aliased to setitem, with strict"""
136 """setattr aliased to setitem, with strict"""
137 if key in self.iterkeys():
137 if key in self.iterkeys():
138 self[key] = value
138 self[key] = value
139 else:
139 else:
140 raise AttributeError(key)
140 raise AttributeError(key)
141
141
142 def __setitem__(self, key, value):
142 def __setitem__(self, key, value):
143 """strict static key enforcement"""
143 """strict static key enforcement"""
144 if key in self.iterkeys():
144 if key in self.iterkeys():
145 dict.__setitem__(self, key, value)
145 dict.__setitem__(self, key, value)
146 else:
146 else:
147 raise KeyError(key)
147 raise KeyError(key)
148
148
149
149
150 class Client(object):
150 class Client(object):
151 """A semi-synchronous client to the IPython ZMQ controller
151 """A semi-synchronous client to the IPython ZMQ controller
152
152
153 Parameters
153 Parameters
154 ----------
154 ----------
155
155
156 url_or_file : bytes; zmq url or path to ipcontroller-client.json
156 url_or_file : bytes; zmq url or path to ipcontroller-client.json
157 Connection information for the Hub's registration. If a json connector
157 Connection information for the Hub's registration. If a json connector
158 file is given, then likely no further configuration is necessary.
158 file is given, then likely no further configuration is necessary.
159 [Default: use profile]
159 [Default: use profile]
160 profile : bytes
160 profile : bytes
161 The name of the Cluster profile to be used to find connector information.
161 The name of the Cluster profile to be used to find connector information.
162 [Default: 'default']
162 [Default: 'default']
163 context : zmq.Context
163 context : zmq.Context
164 Pass an existing zmq.Context instance, otherwise the client will create its own.
164 Pass an existing zmq.Context instance, otherwise the client will create its own.
165 username : bytes
165 username : bytes
166 set username to be passed to the Session object
166 set username to be passed to the Session object
167 debug : bool
167 debug : bool
168 flag for lots of message printing for debug purposes
168 flag for lots of message printing for debug purposes
169
169
170 #-------------- ssh related args ----------------
170 #-------------- ssh related args ----------------
171 # These are args for configuring the ssh tunnel to be used
171 # These are args for configuring the ssh tunnel to be used
172 # credentials are used to forward connections over ssh to the Controller
172 # credentials are used to forward connections over ssh to the Controller
173 # Note that the ip given in `addr` needs to be relative to sshserver
173 # Note that the ip given in `addr` needs to be relative to sshserver
174 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
174 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
175 # and set sshserver as the same machine the Controller is on. However,
175 # and set sshserver as the same machine the Controller is on. However,
176 # the only requirement is that sshserver is able to see the Controller
176 # the only requirement is that sshserver is able to see the Controller
177 # (i.e. is within the same trusted network).
177 # (i.e. is within the same trusted network).
178
178
179 sshserver : str
179 sshserver : str
180 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
180 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
181 If keyfile or password is specified, and this is not, it will default to
181 If keyfile or password is specified, and this is not, it will default to
182 the ip given in addr.
182 the ip given in addr.
183 sshkey : str; path to public ssh key file
183 sshkey : str; path to public ssh key file
184 This specifies a key to be used in ssh login, default None.
184 This specifies a key to be used in ssh login, default None.
185 Regular default ssh keys will be used without specifying this argument.
185 Regular default ssh keys will be used without specifying this argument.
186 password : str
186 password : str
187 Your ssh password to sshserver. Note that if this is left None,
187 Your ssh password to sshserver. Note that if this is left None,
188 you will be prompted for it if passwordless key based login is unavailable.
188 you will be prompted for it if passwordless key based login is unavailable.
189 paramiko : bool
189 paramiko : bool
190 flag for whether to use paramiko instead of shell ssh for tunneling.
190 flag for whether to use paramiko instead of shell ssh for tunneling.
191 [default: True on win32, False else]
191 [default: True on win32, False else]
192
192
193 #------- exec authentication args -------
193 #------- exec authentication args -------
194 # If even localhost is untrusted, you can have some protection against
194 # If even localhost is untrusted, you can have some protection against
195 # unauthorized execution by using a key. Messages are still sent
195 # unauthorized execution by using a key. Messages are still sent
196 # as cleartext, so if someone can snoop your loopback traffic this will
196 # as cleartext, so if someone can snoop your loopback traffic this will
197 # not help against malicious attacks.
197 # not help against malicious attacks.
198
198
199 exec_key : str
199 exec_key : str
200 an authentication key or file containing a key
200 an authentication key or file containing a key
201 default: None
201 default: None
202
202
203
203
204 Attributes
204 Attributes
205 ----------
205 ----------
206
206 ids : set of int engine IDs
207 ids : set of int engine IDs
207 requesting the ids attribute always synchronizes
208 requesting the ids attribute always synchronizes
208 the registration state. To request ids without synchronization,
209 the registration state. To request ids without synchronization,
209 use semi-private _ids attributes.
210 use semi-private _ids attributes.
210
211
211 history : list of msg_ids
212 history : list of msg_ids
212 a list of msg_ids, keeping track of all the execution
213 a list of msg_ids, keeping track of all the execution
213 messages you have submitted in order.
214 messages you have submitted in order.
214
215
215 outstanding : set of msg_ids
216 outstanding : set of msg_ids
216 a set of msg_ids that have been submitted, but whose
217 a set of msg_ids that have been submitted, but whose
217 results have not yet been received.
218 results have not yet been received.
218
219
219 results : dict
220 results : dict
220 a dict of all our results, keyed by msg_id
221 a dict of all our results, keyed by msg_id
221
222
222 block : bool
223 block : bool
223 determines default behavior when block not specified
224 determines default behavior when block not specified
224 in execution methods
225 in execution methods
225
226
226 Methods
227 Methods
227 -------
228 -------
228 spin : flushes incoming results and registration state changes
229 control methods spin, and requesting `ids` also ensures up to date
230
231 barrier : wait on one or more msg_ids
232
229
233 execution methods: apply/apply_bound/apply_to/apply_bound
230 spin
231 flushes incoming results and registration state changes
232 control methods spin, and requesting `ids` also ensures up to date
233
234 barrier
235 wait on one or more msg_ids
236
237 execution methods
238 apply
234 legacy: execute, run
239 legacy: execute, run
235
240
236 query methods: queue_status, get_result, purge
241 query methods
242 queue_status, get_result, purge
237
243
238 control methods: abort, kill
244 control methods
245 abort, shutdown
239
246
240 """
247 """
241
248
242
249
243 _connected=False
250 _connected=False
244 _ssh=False
251 _ssh=False
245 _engines=None
252 _engines=None
246 _registration_socket=None
253 _registration_socket=None
247 _query_socket=None
254 _query_socket=None
248 _control_socket=None
255 _control_socket=None
249 _iopub_socket=None
256 _iopub_socket=None
250 _notification_socket=None
257 _notification_socket=None
251 _mux_socket=None
258 _mux_socket=None
252 _task_socket=None
259 _task_socket=None
253 _task_scheme=None
260 _task_scheme=None
254 block = False
261 block = False
255 outstanding=None
262 outstanding=None
256 results = None
263 results = None
257 history = None
264 history = None
258 debug = False
265 debug = False
259 targets = None
266 targets = None
260
267
261 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
268 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
262 context=None, username=None, debug=False, exec_key=None,
269 context=None, username=None, debug=False, exec_key=None,
263 sshserver=None, sshkey=None, password=None, paramiko=None,
270 sshserver=None, sshkey=None, password=None, paramiko=None,
264 ):
271 ):
265 if context is None:
272 if context is None:
266 context = zmq.Context()
273 context = zmq.Context()
267 self.context = context
274 self.context = context
268 self.targets = 'all'
269
275
270 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
276 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
271 if self._cd is not None:
277 if self._cd is not None:
272 if url_or_file is None:
278 if url_or_file is None:
273 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
279 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
274 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
280 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
275 " Please specify at least one of url_or_file or profile."
281 " Please specify at least one of url_or_file or profile."
276
282
277 try:
283 try:
278 validate_url(url_or_file)
284 validate_url(url_or_file)
279 except AssertionError:
285 except AssertionError:
280 if not os.path.exists(url_or_file):
286 if not os.path.exists(url_or_file):
281 if self._cd:
287 if self._cd:
282 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
288 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
283 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
289 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
284 with open(url_or_file) as f:
290 with open(url_or_file) as f:
285 cfg = json.loads(f.read())
291 cfg = json.loads(f.read())
286 else:
292 else:
287 cfg = {'url':url_or_file}
293 cfg = {'url':url_or_file}
288
294
289 # sync defaults from args, json:
295 # sync defaults from args, json:
290 if sshserver:
296 if sshserver:
291 cfg['ssh'] = sshserver
297 cfg['ssh'] = sshserver
292 if exec_key:
298 if exec_key:
293 cfg['exec_key'] = exec_key
299 cfg['exec_key'] = exec_key
294 exec_key = cfg['exec_key']
300 exec_key = cfg['exec_key']
295 sshserver=cfg['ssh']
301 sshserver=cfg['ssh']
296 url = cfg['url']
302 url = cfg['url']
297 location = cfg.setdefault('location', None)
303 location = cfg.setdefault('location', None)
298 cfg['url'] = disambiguate_url(cfg['url'], location)
304 cfg['url'] = disambiguate_url(cfg['url'], location)
299 url = cfg['url']
305 url = cfg['url']
300
306
301 self._config = cfg
307 self._config = cfg
302
308
303 self._ssh = bool(sshserver or sshkey or password)
309 self._ssh = bool(sshserver or sshkey or password)
304 if self._ssh and sshserver is None:
310 if self._ssh and sshserver is None:
305 # default to ssh via localhost
311 # default to ssh via localhost
306 sshserver = url.split('://')[1].split(':')[0]
312 sshserver = url.split('://')[1].split(':')[0]
307 if self._ssh and password is None:
313 if self._ssh and password is None:
308 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
314 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
309 password=False
315 password=False
310 else:
316 else:
311 password = getpass("SSH Password for %s: "%sshserver)
317 password = getpass("SSH Password for %s: "%sshserver)
312 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
318 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
313 if exec_key is not None and os.path.isfile(exec_key):
319 if exec_key is not None and os.path.isfile(exec_key):
314 arg = 'keyfile'
320 arg = 'keyfile'
315 else:
321 else:
316 arg = 'key'
322 arg = 'key'
317 key_arg = {arg:exec_key}
323 key_arg = {arg:exec_key}
318 if username is None:
324 if username is None:
319 self.session = ss.StreamSession(**key_arg)
325 self.session = ss.StreamSession(**key_arg)
320 else:
326 else:
321 self.session = ss.StreamSession(username, **key_arg)
327 self.session = ss.StreamSession(username, **key_arg)
322 self._registration_socket = self.context.socket(zmq.XREQ)
328 self._registration_socket = self.context.socket(zmq.XREQ)
323 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
329 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
324 if self._ssh:
330 if self._ssh:
325 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
331 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
326 else:
332 else:
327 self._registration_socket.connect(url)
333 self._registration_socket.connect(url)
328 self._engines = ReverseDict()
334 self._engines = ReverseDict()
329 self._ids = []
335 self._ids = []
330 self.outstanding=set()
336 self.outstanding=set()
331 self.results = {}
337 self.results = {}
332 self.metadata = {}
338 self.metadata = {}
333 self.history = []
339 self.history = []
334 self.debug = debug
340 self.debug = debug
335 self.session.debug = debug
341 self.session.debug = debug
336
342
337 self._notification_handlers = {'registration_notification' : self._register_engine,
343 self._notification_handlers = {'registration_notification' : self._register_engine,
338 'unregistration_notification' : self._unregister_engine,
344 'unregistration_notification' : self._unregister_engine,
339 }
345 }
340 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
346 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
341 'apply_reply' : self._handle_apply_reply}
347 'apply_reply' : self._handle_apply_reply}
342 self._connect(sshserver, ssh_kwargs)
348 self._connect(sshserver, ssh_kwargs)
343
349
344
350
345 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
351 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
346 if ipython_dir is None:
352 if ipython_dir is None:
347 ipython_dir = get_ipython_dir()
353 ipython_dir = get_ipython_dir()
348 if cluster_dir is not None:
354 if cluster_dir is not None:
349 try:
355 try:
350 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
356 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
351 except ClusterDirError:
357 except ClusterDirError:
352 pass
358 pass
353 elif profile is not None:
359 elif profile is not None:
354 try:
360 try:
355 self._cd = ClusterDir.find_cluster_dir_by_profile(
361 self._cd = ClusterDir.find_cluster_dir_by_profile(
356 ipython_dir, profile)
362 ipython_dir, profile)
357 except ClusterDirError:
363 except ClusterDirError:
358 pass
364 pass
359 else:
365 else:
360 self._cd = None
366 self._cd = None
361
367
362 @property
368 @property
363 def ids(self):
369 def ids(self):
364 """Always up-to-date ids property."""
370 """Always up-to-date ids property."""
365 self._flush_notifications()
371 self._flush_notifications()
366 return self._ids
372 return self._ids
367
373
368 def _update_engines(self, engines):
374 def _update_engines(self, engines):
369 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
375 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
370 for k,v in engines.iteritems():
376 for k,v in engines.iteritems():
371 eid = int(k)
377 eid = int(k)
372 self._engines[eid] = bytes(v) # force not unicode
378 self._engines[eid] = bytes(v) # force not unicode
373 self._ids.append(eid)
379 self._ids.append(eid)
374 self._ids = sorted(self._ids)
380 self._ids = sorted(self._ids)
375 if sorted(self._engines.keys()) != range(len(self._engines)) and \
381 if sorted(self._engines.keys()) != range(len(self._engines)) and \
376 self._task_scheme == 'pure' and self._task_socket:
382 self._task_scheme == 'pure' and self._task_socket:
377 self._stop_scheduling_tasks()
383 self._stop_scheduling_tasks()
378
384
379 def _stop_scheduling_tasks(self):
385 def _stop_scheduling_tasks(self):
380 """Stop scheduling tasks because an engine has been unregistered
386 """Stop scheduling tasks because an engine has been unregistered
381 from a pure ZMQ scheduler.
387 from a pure ZMQ scheduler.
382 """
388 """
383
389
384 self._task_socket.close()
390 self._task_socket.close()
385 self._task_socket = None
391 self._task_socket = None
386 msg = "An engine has been unregistered, and we are using pure " +\
392 msg = "An engine has been unregistered, and we are using pure " +\
387 "ZMQ task scheduling. Task farming will be disabled."
393 "ZMQ task scheduling. Task farming will be disabled."
388 if self.outstanding:
394 if self.outstanding:
389 msg += " If you were running tasks when this happened, " +\
395 msg += " If you were running tasks when this happened, " +\
390 "some `outstanding` msg_ids may never resolve."
396 "some `outstanding` msg_ids may never resolve."
391 warnings.warn(msg, RuntimeWarning)
397 warnings.warn(msg, RuntimeWarning)
392
398
393 def _build_targets(self, targets):
399 def _build_targets(self, targets):
394 """Turn valid target IDs or 'all' into two lists:
400 """Turn valid target IDs or 'all' into two lists:
395 (int_ids, uuids).
401 (int_ids, uuids).
396 """
402 """
397 if targets is None:
403 if targets is None:
398 targets = self._ids
404 targets = self._ids
399 elif isinstance(targets, str):
405 elif isinstance(targets, str):
400 if targets.lower() == 'all':
406 if targets.lower() == 'all':
401 targets = self._ids
407 targets = self._ids
402 else:
408 else:
403 raise TypeError("%r not valid str target, must be 'all'"%(targets))
409 raise TypeError("%r not valid str target, must be 'all'"%(targets))
404 elif isinstance(targets, int):
410 elif isinstance(targets, int):
405 targets = [targets]
411 targets = [targets]
406 return [self._engines[t] for t in targets], list(targets)
412 return [self._engines[t] for t in targets], list(targets)
407
413
408 def _connect(self, sshserver, ssh_kwargs):
414 def _connect(self, sshserver, ssh_kwargs):
409 """setup all our socket connections to the controller. This is called from
415 """setup all our socket connections to the controller. This is called from
410 __init__."""
416 __init__."""
411
417
412 # Maybe allow reconnecting?
418 # Maybe allow reconnecting?
413 if self._connected:
419 if self._connected:
414 return
420 return
415 self._connected=True
421 self._connected=True
416
422
417 def connect_socket(s, url):
423 def connect_socket(s, url):
418 url = disambiguate_url(url, self._config['location'])
424 url = disambiguate_url(url, self._config['location'])
419 if self._ssh:
425 if self._ssh:
420 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
426 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
421 else:
427 else:
422 return s.connect(url)
428 return s.connect(url)
423
429
424 self.session.send(self._registration_socket, 'connection_request')
430 self.session.send(self._registration_socket, 'connection_request')
425 idents,msg = self.session.recv(self._registration_socket,mode=0)
431 idents,msg = self.session.recv(self._registration_socket,mode=0)
426 if self.debug:
432 if self.debug:
427 pprint(msg)
433 pprint(msg)
428 msg = ss.Message(msg)
434 msg = ss.Message(msg)
429 content = msg.content
435 content = msg.content
430 self._config['registration'] = dict(content)
436 self._config['registration'] = dict(content)
431 if content.status == 'ok':
437 if content.status == 'ok':
432 if content.mux:
438 if content.mux:
433 self._mux_socket = self.context.socket(zmq.PAIR)
439 self._mux_socket = self.context.socket(zmq.PAIR)
434 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
440 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
435 connect_socket(self._mux_socket, content.mux)
441 connect_socket(self._mux_socket, content.mux)
436 if content.task:
442 if content.task:
437 self._task_scheme, task_addr = content.task
443 self._task_scheme, task_addr = content.task
438 self._task_socket = self.context.socket(zmq.PAIR)
444 self._task_socket = self.context.socket(zmq.PAIR)
439 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
445 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
440 connect_socket(self._task_socket, task_addr)
446 connect_socket(self._task_socket, task_addr)
441 if content.notification:
447 if content.notification:
442 self._notification_socket = self.context.socket(zmq.SUB)
448 self._notification_socket = self.context.socket(zmq.SUB)
443 connect_socket(self._notification_socket, content.notification)
449 connect_socket(self._notification_socket, content.notification)
444 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
450 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
445 if content.query:
451 if content.query:
446 self._query_socket = self.context.socket(zmq.PAIR)
452 self._query_socket = self.context.socket(zmq.PAIR)
447 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
453 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
448 connect_socket(self._query_socket, content.query)
454 connect_socket(self._query_socket, content.query)
449 if content.control:
455 if content.control:
450 self._control_socket = self.context.socket(zmq.PAIR)
456 self._control_socket = self.context.socket(zmq.PAIR)
451 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
457 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
452 connect_socket(self._control_socket, content.control)
458 connect_socket(self._control_socket, content.control)
453 if content.iopub:
459 if content.iopub:
454 self._iopub_socket = self.context.socket(zmq.SUB)
460 self._iopub_socket = self.context.socket(zmq.SUB)
455 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
461 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
456 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
462 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
457 connect_socket(self._iopub_socket, content.iopub)
463 connect_socket(self._iopub_socket, content.iopub)
458 self._update_engines(dict(content.engines))
464 self._update_engines(dict(content.engines))
459
465
460 else:
466 else:
461 self._connected = False
467 self._connected = False
462 raise Exception("Failed to connect!")
468 raise Exception("Failed to connect!")
463
469
464 #--------------------------------------------------------------------------
470 #--------------------------------------------------------------------------
465 # handlers and callbacks for incoming messages
471 # handlers and callbacks for incoming messages
466 #--------------------------------------------------------------------------
472 #--------------------------------------------------------------------------
467
473
468 def _register_engine(self, msg):
474 def _register_engine(self, msg):
469 """Register a new engine, and update our connection info."""
475 """Register a new engine, and update our connection info."""
470 content = msg['content']
476 content = msg['content']
471 eid = content['id']
477 eid = content['id']
472 d = {eid : content['queue']}
478 d = {eid : content['queue']}
473 self._update_engines(d)
479 self._update_engines(d)
474
480
475 def _unregister_engine(self, msg):
481 def _unregister_engine(self, msg):
476 """Unregister an engine that has died."""
482 """Unregister an engine that has died."""
477 content = msg['content']
483 content = msg['content']
478 eid = int(content['id'])
484 eid = int(content['id'])
479 if eid in self._ids:
485 if eid in self._ids:
480 self._ids.remove(eid)
486 self._ids.remove(eid)
481 self._engines.pop(eid)
487 self._engines.pop(eid)
482 if self._task_socket and self._task_scheme == 'pure':
488 if self._task_socket and self._task_scheme == 'pure':
483 self._stop_scheduling_tasks()
489 self._stop_scheduling_tasks()
484
490
485 def _extract_metadata(self, header, parent, content):
491 def _extract_metadata(self, header, parent, content):
486 md = {'msg_id' : parent['msg_id'],
492 md = {'msg_id' : parent['msg_id'],
487 'received' : datetime.now(),
493 'received' : datetime.now(),
488 'engine_uuid' : header.get('engine', None),
494 'engine_uuid' : header.get('engine', None),
489 'follow' : parent.get('follow', []),
495 'follow' : parent.get('follow', []),
490 'after' : parent.get('after', []),
496 'after' : parent.get('after', []),
491 'status' : content['status'],
497 'status' : content['status'],
492 }
498 }
493
499
494 if md['engine_uuid'] is not None:
500 if md['engine_uuid'] is not None:
495 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
501 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
496
502
497 if 'date' in parent:
503 if 'date' in parent:
498 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
504 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
499 if 'started' in header:
505 if 'started' in header:
500 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
506 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
501 if 'date' in header:
507 if 'date' in header:
502 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
508 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
503 return md
509 return md
504
510
505 def _handle_execute_reply(self, msg):
511 def _handle_execute_reply(self, msg):
506 """Save the reply to an execute_request into our results.
512 """Save the reply to an execute_request into our results.
507
513
508 execute messages are never actually used. apply is used instead.
514 execute messages are never actually used. apply is used instead.
509 """
515 """
510
516
511 parent = msg['parent_header']
517 parent = msg['parent_header']
512 msg_id = parent['msg_id']
518 msg_id = parent['msg_id']
513 if msg_id not in self.outstanding:
519 if msg_id not in self.outstanding:
514 if msg_id in self.history:
520 if msg_id in self.history:
515 print ("got stale result: %s"%msg_id)
521 print ("got stale result: %s"%msg_id)
516 else:
522 else:
517 print ("got unknown result: %s"%msg_id)
523 print ("got unknown result: %s"%msg_id)
518 else:
524 else:
519 self.outstanding.remove(msg_id)
525 self.outstanding.remove(msg_id)
520 self.results[msg_id] = ss.unwrap_exception(msg['content'])
526 self.results[msg_id] = ss.unwrap_exception(msg['content'])
521
527
522 def _handle_apply_reply(self, msg):
528 def _handle_apply_reply(self, msg):
523 """Save the reply to an apply_request into our results."""
529 """Save the reply to an apply_request into our results."""
524 parent = msg['parent_header']
530 parent = msg['parent_header']
525 msg_id = parent['msg_id']
531 msg_id = parent['msg_id']
526 if msg_id not in self.outstanding:
532 if msg_id not in self.outstanding:
527 if msg_id in self.history:
533 if msg_id in self.history:
528 print ("got stale result: %s"%msg_id)
534 print ("got stale result: %s"%msg_id)
529 print self.results[msg_id]
535 print self.results[msg_id]
530 print msg
536 print msg
531 else:
537 else:
532 print ("got unknown result: %s"%msg_id)
538 print ("got unknown result: %s"%msg_id)
533 else:
539 else:
534 self.outstanding.remove(msg_id)
540 self.outstanding.remove(msg_id)
535 content = msg['content']
541 content = msg['content']
536 header = msg['header']
542 header = msg['header']
537
543
538 # construct metadata:
544 # construct metadata:
539 md = self.metadata.setdefault(msg_id, Metadata())
545 md = self.metadata.setdefault(msg_id, Metadata())
540 md.update(self._extract_metadata(header, parent, content))
546 md.update(self._extract_metadata(header, parent, content))
541 self.metadata[msg_id] = md
547 self.metadata[msg_id] = md
542
548
543 # construct result:
549 # construct result:
544 if content['status'] == 'ok':
550 if content['status'] == 'ok':
545 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
551 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
546 elif content['status'] == 'aborted':
552 elif content['status'] == 'aborted':
547 self.results[msg_id] = error.AbortedTask(msg_id)
553 self.results[msg_id] = error.AbortedTask(msg_id)
548 elif content['status'] == 'resubmitted':
554 elif content['status'] == 'resubmitted':
549 # TODO: handle resubmission
555 # TODO: handle resubmission
550 pass
556 pass
551 else:
557 else:
552 e = ss.unwrap_exception(content)
558 e = ss.unwrap_exception(content)
553 if e.engine_info:
559 if e.engine_info:
554 e_uuid = e.engine_info['engineid']
560 e_uuid = e.engine_info['engineid']
555 eid = self._engines[e_uuid]
561 eid = self._engines[e_uuid]
556 e.engine_info['engineid'] = eid
562 e.engine_info['engineid'] = eid
557 self.results[msg_id] = e
563 self.results[msg_id] = e
558
564
559 def _flush_notifications(self):
565 def _flush_notifications(self):
560 """Flush notifications of engine registrations waiting
566 """Flush notifications of engine registrations waiting
561 in ZMQ queue."""
567 in ZMQ queue."""
562 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
568 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
563 while msg is not None:
569 while msg is not None:
564 if self.debug:
570 if self.debug:
565 pprint(msg)
571 pprint(msg)
566 msg = msg[-1]
572 msg = msg[-1]
567 msg_type = msg['msg_type']
573 msg_type = msg['msg_type']
568 handler = self._notification_handlers.get(msg_type, None)
574 handler = self._notification_handlers.get(msg_type, None)
569 if handler is None:
575 if handler is None:
570 raise Exception("Unhandled message type: %s"%msg.msg_type)
576 raise Exception("Unhandled message type: %s"%msg.msg_type)
571 else:
577 else:
572 handler(msg)
578 handler(msg)
573 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
579 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
574
580
575 def _flush_results(self, sock):
581 def _flush_results(self, sock):
576 """Flush task or queue results waiting in ZMQ queue."""
582 """Flush task or queue results waiting in ZMQ queue."""
577 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
583 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
578 while msg is not None:
584 while msg is not None:
579 if self.debug:
585 if self.debug:
580 pprint(msg)
586 pprint(msg)
581 msg = msg[-1]
587 msg = msg[-1]
582 msg_type = msg['msg_type']
588 msg_type = msg['msg_type']
583 handler = self._queue_handlers.get(msg_type, None)
589 handler = self._queue_handlers.get(msg_type, None)
584 if handler is None:
590 if handler is None:
585 raise Exception("Unhandled message type: %s"%msg.msg_type)
591 raise Exception("Unhandled message type: %s"%msg.msg_type)
586 else:
592 else:
587 handler(msg)
593 handler(msg)
588 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
594 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
589
595
590 def _flush_control(self, sock):
596 def _flush_control(self, sock):
591 """Flush replies from the control channel waiting
597 """Flush replies from the control channel waiting
592 in the ZMQ queue.
598 in the ZMQ queue.
593
599
594 Currently: ignore them."""
600 Currently: ignore them."""
595 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
601 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
596 while msg is not None:
602 while msg is not None:
597 if self.debug:
603 if self.debug:
598 pprint(msg)
604 pprint(msg)
599 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
605 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
600
606
601 def _flush_iopub(self, sock):
607 def _flush_iopub(self, sock):
602 """Flush replies from the iopub channel waiting
608 """Flush replies from the iopub channel waiting
603 in the ZMQ queue.
609 in the ZMQ queue.
604 """
610 """
605 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
611 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
606 while msg is not None:
612 while msg is not None:
607 if self.debug:
613 if self.debug:
608 pprint(msg)
614 pprint(msg)
609 msg = msg[-1]
615 msg = msg[-1]
610 parent = msg['parent_header']
616 parent = msg['parent_header']
611 msg_id = parent['msg_id']
617 msg_id = parent['msg_id']
612 content = msg['content']
618 content = msg['content']
613 header = msg['header']
619 header = msg['header']
614 msg_type = msg['msg_type']
620 msg_type = msg['msg_type']
615
621
616 # init metadata:
622 # init metadata:
617 md = self.metadata.setdefault(msg_id, Metadata())
623 md = self.metadata.setdefault(msg_id, Metadata())
618
624
619 if msg_type == 'stream':
625 if msg_type == 'stream':
620 name = content['name']
626 name = content['name']
621 s = md[name] or ''
627 s = md[name] or ''
622 md[name] = s + content['data']
628 md[name] = s + content['data']
623 elif msg_type == 'pyerr':
629 elif msg_type == 'pyerr':
624 md.update({'pyerr' : ss.unwrap_exception(content)})
630 md.update({'pyerr' : ss.unwrap_exception(content)})
625 else:
631 else:
626 md.update({msg_type : content['data']})
632 md.update({msg_type : content['data']})
627
633
628 self.metadata[msg_id] = md
634 self.metadata[msg_id] = md
629
635
630 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
636 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
631
637
632 #--------------------------------------------------------------------------
638 #--------------------------------------------------------------------------
633 # getitem
639 # getitem
634 #--------------------------------------------------------------------------
640 #--------------------------------------------------------------------------
635
641
636 def __getitem__(self, key):
642 def __getitem__(self, key):
637 """Dict access returns DirectView multiplexer objects or,
643 """index access returns DirectView multiplexer objects
638 if key is None, a LoadBalancedView."""
639 if key is None:
640 return LoadBalancedView(self)
641 if isinstance(key, int):
642 if key not in self.ids:
643 raise IndexError("No such engine: %i"%key)
644 return DirectView(self, key)
645
646 if isinstance(key, slice):
647 indices = range(len(self.ids))[key]
648 ids = sorted(self._ids)
649 key = [ ids[i] for i in indices ]
650 # newkeys = sorted(self._ids)[thekeys[k]]
651
644
652 if isinstance(key, (tuple, list, xrange)):
645 Must be int, slice, or list/tuple/xrange of ints"""
653 _,targets = self._build_targets(list(key))
646 if not isinstance(key, (int, slice, tuple, list, xrange)):
654 return DirectView(self, targets)
647 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
655 else:
648 else:
656 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
649 return self.view(key, balanced=False)
657
650
658 #--------------------------------------------------------------------------
651 #--------------------------------------------------------------------------
659 # Begin public methods
652 # Begin public methods
660 #--------------------------------------------------------------------------
653 #--------------------------------------------------------------------------
661
654
662 @property
663 def remote(self):
664 """property for convenient RemoteFunction generation.
665
666 >>> @client.remote
667 ... def getpid():
668 import os
669 return os.getpid()
670 """
671 return remote(self, block=self.block)
672
673 def spin(self):
655 def spin(self):
674 """Flush any registration notifications and execution results
656 """Flush any registration notifications and execution results
675 waiting in the ZMQ queue.
657 waiting in the ZMQ queue.
676 """
658 """
677 if self._notification_socket:
659 if self._notification_socket:
678 self._flush_notifications()
660 self._flush_notifications()
679 if self._mux_socket:
661 if self._mux_socket:
680 self._flush_results(self._mux_socket)
662 self._flush_results(self._mux_socket)
681 if self._task_socket:
663 if self._task_socket:
682 self._flush_results(self._task_socket)
664 self._flush_results(self._task_socket)
683 if self._control_socket:
665 if self._control_socket:
684 self._flush_control(self._control_socket)
666 self._flush_control(self._control_socket)
685 if self._iopub_socket:
667 if self._iopub_socket:
686 self._flush_iopub(self._iopub_socket)
668 self._flush_iopub(self._iopub_socket)
687
669
688 def barrier(self, msg_ids=None, timeout=-1):
670 def barrier(self, msg_ids=None, timeout=-1):
689 """waits on one or more `msg_ids`, for up to `timeout` seconds.
671 """waits on one or more `msg_ids`, for up to `timeout` seconds.
690
672
691 Parameters
673 Parameters
692 ----------
674 ----------
675
693 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
676 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
694 ints are indices to self.history
677 ints are indices to self.history
695 strs are msg_ids
678 strs are msg_ids
696 default: wait on all outstanding messages
679 default: wait on all outstanding messages
697 timeout : float
680 timeout : float
698 a time in seconds, after which to give up.
681 a time in seconds, after which to give up.
699 default is -1, which means no timeout
682 default is -1, which means no timeout
700
683
701 Returns
684 Returns
702 -------
685 -------
686
703 True : when all msg_ids are done
687 True : when all msg_ids are done
704 False : timeout reached, some msg_ids still outstanding
688 False : timeout reached, some msg_ids still outstanding
705 """
689 """
706 tic = time.time()
690 tic = time.time()
707 if msg_ids is None:
691 if msg_ids is None:
708 theids = self.outstanding
692 theids = self.outstanding
709 else:
693 else:
710 if isinstance(msg_ids, (int, str, AsyncResult)):
694 if isinstance(msg_ids, (int, str, AsyncResult)):
711 msg_ids = [msg_ids]
695 msg_ids = [msg_ids]
712 theids = set()
696 theids = set()
713 for msg_id in msg_ids:
697 for msg_id in msg_ids:
714 if isinstance(msg_id, int):
698 if isinstance(msg_id, int):
715 msg_id = self.history[msg_id]
699 msg_id = self.history[msg_id]
716 elif isinstance(msg_id, AsyncResult):
700 elif isinstance(msg_id, AsyncResult):
717 map(theids.add, msg_id.msg_ids)
701 map(theids.add, msg_id.msg_ids)
718 continue
702 continue
719 theids.add(msg_id)
703 theids.add(msg_id)
720 if not theids.intersection(self.outstanding):
704 if not theids.intersection(self.outstanding):
721 return True
705 return True
722 self.spin()
706 self.spin()
723 while theids.intersection(self.outstanding):
707 while theids.intersection(self.outstanding):
724 if timeout >= 0 and ( time.time()-tic ) > timeout:
708 if timeout >= 0 and ( time.time()-tic ) > timeout:
725 break
709 break
726 time.sleep(1e-3)
710 time.sleep(1e-3)
727 self.spin()
711 self.spin()
728 return len(theids.intersection(self.outstanding)) == 0
712 return len(theids.intersection(self.outstanding)) == 0
729
713
730 #--------------------------------------------------------------------------
714 #--------------------------------------------------------------------------
731 # Control methods
715 # Control methods
732 #--------------------------------------------------------------------------
716 #--------------------------------------------------------------------------
733
717
734 @spinfirst
718 @spinfirst
735 @defaultblock
719 @defaultblock
736 def clear(self, targets=None, block=None):
720 def clear(self, targets=None, block=None):
737 """Clear the namespace in target(s)."""
721 """Clear the namespace in target(s)."""
738 targets = self._build_targets(targets)[0]
722 targets = self._build_targets(targets)[0]
739 for t in targets:
723 for t in targets:
740 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
724 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
741 error = False
725 error = False
742 if self.block:
726 if self.block:
743 for i in range(len(targets)):
727 for i in range(len(targets)):
744 idents,msg = self.session.recv(self._control_socket,0)
728 idents,msg = self.session.recv(self._control_socket,0)
745 if self.debug:
729 if self.debug:
746 pprint(msg)
730 pprint(msg)
747 if msg['content']['status'] != 'ok':
731 if msg['content']['status'] != 'ok':
748 error = ss.unwrap_exception(msg['content'])
732 error = ss.unwrap_exception(msg['content'])
749 if error:
733 if error:
750 return error
734 return error
751
735
752
736
753 @spinfirst
737 @spinfirst
754 @defaultblock
738 @defaultblock
755 def abort(self, msg_ids = None, targets=None, block=None):
739 def abort(self, msg_ids = None, targets=None, block=None):
756 """Abort the execution queues of target(s)."""
740 """Abort the execution queues of target(s)."""
757 targets = self._build_targets(targets)[0]
741 targets = self._build_targets(targets)[0]
758 if isinstance(msg_ids, basestring):
742 if isinstance(msg_ids, basestring):
759 msg_ids = [msg_ids]
743 msg_ids = [msg_ids]
760 content = dict(msg_ids=msg_ids)
744 content = dict(msg_ids=msg_ids)
761 for t in targets:
745 for t in targets:
762 self.session.send(self._control_socket, 'abort_request',
746 self.session.send(self._control_socket, 'abort_request',
763 content=content, ident=t)
747 content=content, ident=t)
764 error = False
748 error = False
765 if self.block:
749 if self.block:
766 for i in range(len(targets)):
750 for i in range(len(targets)):
767 idents,msg = self.session.recv(self._control_socket,0)
751 idents,msg = self.session.recv(self._control_socket,0)
768 if self.debug:
752 if self.debug:
769 pprint(msg)
753 pprint(msg)
770 if msg['content']['status'] != 'ok':
754 if msg['content']['status'] != 'ok':
771 error = ss.unwrap_exception(msg['content'])
755 error = ss.unwrap_exception(msg['content'])
772 if error:
756 if error:
773 return error
757 return error
774
758
775 @spinfirst
759 @spinfirst
776 @defaultblock
760 @defaultblock
777 def shutdown(self, targets=None, restart=False, controller=False, block=None):
761 def shutdown(self, targets=None, restart=False, controller=False, block=None):
778 """Terminates one or more engine processes, optionally including the controller."""
762 """Terminates one or more engine processes, optionally including the controller."""
779 if controller:
763 if controller:
780 targets = 'all'
764 targets = 'all'
781 targets = self._build_targets(targets)[0]
765 targets = self._build_targets(targets)[0]
782 for t in targets:
766 for t in targets:
783 self.session.send(self._control_socket, 'shutdown_request',
767 self.session.send(self._control_socket, 'shutdown_request',
784 content={'restart':restart},ident=t)
768 content={'restart':restart},ident=t)
785 error = False
769 error = False
786 if block or controller:
770 if block or controller:
787 for i in range(len(targets)):
771 for i in range(len(targets)):
788 idents,msg = self.session.recv(self._control_socket,0)
772 idents,msg = self.session.recv(self._control_socket,0)
789 if self.debug:
773 if self.debug:
790 pprint(msg)
774 pprint(msg)
791 if msg['content']['status'] != 'ok':
775 if msg['content']['status'] != 'ok':
792 error = ss.unwrap_exception(msg['content'])
776 error = ss.unwrap_exception(msg['content'])
793
777
794 if controller:
778 if controller:
795 time.sleep(0.25)
779 time.sleep(0.25)
796 self.session.send(self._query_socket, 'shutdown_request')
780 self.session.send(self._query_socket, 'shutdown_request')
797 idents,msg = self.session.recv(self._query_socket, 0)
781 idents,msg = self.session.recv(self._query_socket, 0)
798 if self.debug:
782 if self.debug:
799 pprint(msg)
783 pprint(msg)
800 if msg['content']['status'] != 'ok':
784 if msg['content']['status'] != 'ok':
801 error = ss.unwrap_exception(msg['content'])
785 error = ss.unwrap_exception(msg['content'])
802
786
803 if error:
787 if error:
804 raise error
788 raise error
805
789
806 #--------------------------------------------------------------------------
790 #--------------------------------------------------------------------------
807 # Execution methods
791 # Execution methods
808 #--------------------------------------------------------------------------
792 #--------------------------------------------------------------------------
809
793
810 @defaultblock
794 @defaultblock
811 def execute(self, code, targets='all', block=None):
795 def execute(self, code, targets='all', block=None):
812 """Executes `code` on `targets` in blocking or nonblocking manner.
796 """Executes `code` on `targets` in blocking or nonblocking manner.
813
797
814 ``execute`` is always `bound` (affects engine namespace)
798 ``execute`` is always `bound` (affects engine namespace)
815
799
816 Parameters
800 Parameters
817 ----------
801 ----------
802
818 code : str
803 code : str
819 the code string to be executed
804 the code string to be executed
820 targets : int/str/list of ints/strs
805 targets : int/str/list of ints/strs
821 the engines on which to execute
806 the engines on which to execute
822 default : all
807 default : all
823 block : bool
808 block : bool
824 whether or not to wait until done to return
809 whether or not to wait until done to return
825 default: self.block
810 default: self.block
826 """
811 """
827 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
812 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False)
828 return result
813 return result
829
814
830 def run(self, filename, targets='all', block=None):
815 def run(self, filename, targets='all', block=None):
831 """Execute contents of `filename` on engine(s).
816 """Execute contents of `filename` on engine(s).
832
817
833 This simply reads the contents of the file and calls `execute`.
818 This simply reads the contents of the file and calls `execute`.
834
819
835 Parameters
820 Parameters
836 ----------
821 ----------
822
837 filename : str
823 filename : str
838 The path to the file
824 The path to the file
839 targets : int/str/list of ints/strs
825 targets : int/str/list of ints/strs
840 the engines on which to execute
826 the engines on which to execute
841 default : all
827 default : all
842 block : bool
828 block : bool
843 whether or not to wait until done
829 whether or not to wait until done
844 default: self.block
830 default: self.block
845
831
846 """
832 """
847 with open(filename, 'rb') as f:
833 with open(filename, 'rb') as f:
848 code = f.read()
834 code = f.read()
849 return self.execute(code, targets=targets, block=block)
835 return self.execute(code, targets=targets, block=block)
850
836
851 def _maybe_raise(self, result):
837 def _maybe_raise(self, result):
852 """wrapper for maybe raising an exception if apply failed."""
838 """wrapper for maybe raising an exception if apply failed."""
853 if isinstance(result, error.RemoteError):
839 if isinstance(result, error.RemoteError):
854 raise result
840 raise result
855
841
856 return result
842 return result
857
843
858 def _build_dependency(self, dep):
844 def _build_dependency(self, dep):
859 """helper for building jsonable dependencies from various input forms"""
845 """helper for building jsonable dependencies from various input forms"""
860 if isinstance(dep, Dependency):
846 if isinstance(dep, Dependency):
861 return dep.as_dict()
847 return dep.as_dict()
862 elif isinstance(dep, AsyncResult):
848 elif isinstance(dep, AsyncResult):
863 return dep.msg_ids
849 return dep.msg_ids
864 elif dep is None:
850 elif dep is None:
865 return []
851 return []
866 else:
852 else:
867 # pass to Dependency constructor
853 # pass to Dependency constructor
868 return list(Dependency(dep))
854 return list(Dependency(dep))
869
855
870 @defaultblock
856 @defaultblock
871 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
857 def apply(self, f, args=None, kwargs=None, bound=True, block=None,
858 targets=None, balanced=None,
872 after=None, follow=None, timeout=None):
859 after=None, follow=None, timeout=None):
873 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
860 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
874
861
875 This is the central execution command for the client.
862 This is the central execution command for the client.
876
863
877 Parameters
864 Parameters
878 ----------
865 ----------
879
866
880 f : function
867 f : function
881 The fuction to be called remotely
868 The fuction to be called remotely
882 args : tuple/list
869 args : tuple/list
883 The positional arguments passed to `f`
870 The positional arguments passed to `f`
884 kwargs : dict
871 kwargs : dict
885 The keyword arguments passed to `f`
872 The keyword arguments passed to `f`
886 bound : bool (default: True)
873 bound : bool (default: True)
887 Whether to execute in the Engine(s) namespace, or in a clean
874 Whether to execute in the Engine(s) namespace, or in a clean
888 namespace not affecting the engine.
875 namespace not affecting the engine.
889 block : bool (default: self.block)
876 block : bool (default: self.block)
890 Whether to wait for the result, or return immediately.
877 Whether to wait for the result, or return immediately.
891 False:
878 False:
892 returns AsyncResult
879 returns AsyncResult
893 True:
880 True:
894 returns actual result(s) of f(*args, **kwargs)
881 returns actual result(s) of f(*args, **kwargs)
895 if multiple targets:
882 if multiple targets:
896 list of results, matching `targets`
883 list of results, matching `targets`
897 targets : int,list of ints, 'all', None
884 targets : int,list of ints, 'all', None
898 Specify the destination of the job.
885 Specify the destination of the job.
899 if None:
886 if None:
900 Submit via Task queue for load-balancing.
887 Submit via Task queue for load-balancing.
901 if 'all':
888 if 'all':
902 Run on all active engines
889 Run on all active engines
903 if list:
890 if list:
904 Run on each specified engine
891 Run on each specified engine
905 if int:
892 if int:
906 Run on single engine
893 Run on single engine
907
894
908 after,follow,timeout only used in `apply_balanced`. See that docstring
895 balanced : bool, default None
909 for details.
896 whether to load-balance. This will default to True
897 if targets is unspecified, or False if targets is specified.
898
899 The following arguments are only used when balanced is True:
900 after : Dependency or collection of msg_ids
901 Only for load-balanced execution (targets=None)
902 Specify a list of msg_ids as a time-based dependency.
903 This job will only be run *after* the dependencies
904 have been met.
905
906 follow : Dependency or collection of msg_ids
907 Only for load-balanced execution (targets=None)
908 Specify a list of msg_ids as a location-based dependency.
909 This job will only be run on an engine where this dependency
910 is met.
911
912 timeout : float/int or None
913 Only for load-balanced execution (targets=None)
914 Specify an amount of time (in seconds) for the scheduler to
915 wait for dependencies to be met before failing with a
916 DependencyTimeout.
917
918 after,follow,timeout only used if `balanced=True`.
910
919
911 Returns
920 Returns
912 -------
921 -------
922
913 if block is False:
923 if block is False:
914 return AsyncResult wrapping msg_ids
924 return AsyncResult wrapping msg_ids
915 output of AsyncResult.get() is identical to that of `apply(...block=True)`
925 output of AsyncResult.get() is identical to that of `apply(...block=True)`
916 else:
926 else:
917 if single target:
927 if single target:
918 return result of `f(*args, **kwargs)`
928 return result of `f(*args, **kwargs)`
919 else:
929 else:
920 return list of results, matching `targets`
930 return list of results, matching `targets`
921 """
931 """
922
932
923 # defaults:
933 # defaults:
924 block = block if block is not None else self.block
925 args = args if args is not None else []
934 args = args if args is not None else []
926 kwargs = kwargs if kwargs is not None else {}
935 kwargs = kwargs if kwargs is not None else {}
927
936
937 if balanced is None:
938 if targets is None:
939 # default to balanced if targets unspecified
940 balanced = True
941 else:
942 # otherwise default to multiplexing
943 balanced = False
944
945 if targets is None and balanced is False:
946 # default to all if *not* balanced, and targets is unspecified
947 targets = 'all'
948
928 # enforce types of f,args,kwrags
949 # enforce types of f,args,kwrags
929 if not callable(f):
950 if not callable(f):
930 raise TypeError("f must be callable, not %s"%type(f))
951 raise TypeError("f must be callable, not %s"%type(f))
931 if not isinstance(args, (tuple, list)):
952 if not isinstance(args, (tuple, list)):
932 raise TypeError("args must be tuple or list, not %s"%type(args))
953 raise TypeError("args must be tuple or list, not %s"%type(args))
933 if not isinstance(kwargs, dict):
954 if not isinstance(kwargs, dict):
934 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
955 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
935
956
936 options = dict(bound=bound, block=block, targets=targets)
957 options = dict(bound=bound, block=block, targets=targets)
937
958
938 if targets is None:
959 if balanced:
939 return self.apply_balanced(f, args, kwargs, timeout=timeout,
960 return self._apply_balanced(f, args, kwargs, timeout=timeout,
940 after=after, follow=follow, **options)
961 after=after, follow=follow, **options)
941 else:
962 elif follow or after or timeout:
942 if follow or after or timeout:
963 msg = "follow, after, and timeout args are only used for"
943 msg = "follow, after, and timeout args are only used for load-balanced"
964 msg += " load-balanced execution."
944 msg += "execution."
945 raise ValueError(msg)
965 raise ValueError(msg)
966 else:
946 return self._apply_direct(f, args, kwargs, **options)
967 return self._apply_direct(f, args, kwargs, **options)
947
968
948 @defaultblock
969 def _apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None,
949 def apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None,
950 after=None, follow=None, timeout=None):
970 after=None, follow=None, timeout=None):
951 """call f(*args, **kwargs) remotely in a load-balanced manner.
971 """call f(*args, **kwargs) remotely in a load-balanced manner.
952
972
953 Parameters
973 This is a private method, see `apply` for details.
954 ----------
974 Not to be called directly!
955
956 f : function
957 The fuction to be called remotely
958 args : tuple/list
959 The positional arguments passed to `f`
960 kwargs : dict
961 The keyword arguments passed to `f`
962 bound : bool (default: True)
963 Whether to execute in the Engine(s) namespace, or in a clean
964 namespace not affecting the engine.
965 block : bool (default: self.block)
966 Whether to wait for the result, or return immediately.
967 False:
968 returns AsyncResult
969 True:
970 returns actual result(s) of f(*args, **kwargs)
971 if multiple targets:
972 list of results, matching `targets`
973 targets : int,list of ints, 'all', None
974 Specify the destination of the job.
975 if None:
976 Submit via Task queue for load-balancing.
977 if 'all':
978 Run on all active engines
979 if list:
980 Run on each specified engine
981 if int:
982 Run on single engine
983
984 after : Dependency or collection of msg_ids
985 Only for load-balanced execution (targets=None)
986 Specify a list of msg_ids as a time-based dependency.
987 This job will only be run *after* the dependencies
988 have been met.
989
990 follow : Dependency or collection of msg_ids
991 Only for load-balanced execution (targets=None)
992 Specify a list of msg_ids as a location-based dependency.
993 This job will only be run on an engine where this dependency
994 is met.
995
996 timeout : float/int or None
997 Only for load-balanced execution (targets=None)
998 Specify an amount of time (in seconds) for the scheduler to
999 wait for dependencies to be met before failing with a
1000 DependencyTimeout.
1001
1002 Returns
1003 -------
1004 if block is False:
1005 return AsyncResult wrapping msg_id
1006 output of AsyncResult.get() is identical to that of `apply(...block=True)`
1007 else:
1008 wait for, and return actual result of `f(*args, **kwargs)`
1009
1010 """
975 """
1011
976
977 for kwarg in (bound, block, targets):
978 assert kwarg is not None, "kwarg %r must be specified!"%kwarg
979
1012 if self._task_socket is None:
980 if self._task_socket is None:
1013 msg = "Task farming is disabled"
981 msg = "Task farming is disabled"
1014 if self._task_scheme == 'pure':
982 if self._task_scheme == 'pure':
1015 msg += " because the pure ZMQ scheduler cannot handle"
983 msg += " because the pure ZMQ scheduler cannot handle"
1016 msg += " disappearing engines."
984 msg += " disappearing engines."
1017 raise RuntimeError(msg)
985 raise RuntimeError(msg)
1018
986
1019 if self._task_scheme == 'pure':
987 if self._task_scheme == 'pure':
1020 # pure zmq scheme doesn't support dependencies
988 # pure zmq scheme doesn't support dependencies
1021 msg = "Pure ZMQ scheduler doesn't support dependencies"
989 msg = "Pure ZMQ scheduler doesn't support dependencies"
1022 if (follow or after):
990 if (follow or after):
1023 # hard fail on DAG dependencies
991 # hard fail on DAG dependencies
1024 raise RuntimeError(msg)
992 raise RuntimeError(msg)
1025 if isinstance(f, dependent):
993 if isinstance(f, dependent):
1026 # soft warn on functional dependencies
994 # soft warn on functional dependencies
1027 warnings.warn(msg, RuntimeWarning)
995 warnings.warn(msg, RuntimeWarning)
1028
1029
996
1030 # defaults:
997 # defaults:
1031 args = args if args is not None else []
998 args = args if args is not None else []
1032 kwargs = kwargs if kwargs is not None else {}
999 kwargs = kwargs if kwargs is not None else {}
1033
1000
1034 if targets:
1001 if targets:
1035 idents,_ = self._build_targets(targets)
1002 idents,_ = self._build_targets(targets)
1036 else:
1003 else:
1037 idents = []
1004 idents = []
1038
1005
1039 # enforce types of f,args,kwrags
1040 if not callable(f):
1041 raise TypeError("f must be callable, not %s"%type(f))
1042 if not isinstance(args, (tuple, list)):
1043 raise TypeError("args must be tuple or list, not %s"%type(args))
1044 if not isinstance(kwargs, dict):
1045 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1046
1047 after = self._build_dependency(after)
1006 after = self._build_dependency(after)
1048 follow = self._build_dependency(follow)
1007 follow = self._build_dependency(follow)
1049 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1008 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1050 bufs = ss.pack_apply_message(f,args,kwargs)
1009 bufs = ss.pack_apply_message(f,args,kwargs)
1051 content = dict(bound=bound)
1010 content = dict(bound=bound)
1052
1011
1053 msg = self.session.send(self._task_socket, "apply_request",
1012 msg = self.session.send(self._task_socket, "apply_request",
1054 content=content, buffers=bufs, subheader=subheader)
1013 content=content, buffers=bufs, subheader=subheader)
1055 msg_id = msg['msg_id']
1014 msg_id = msg['msg_id']
1056 self.outstanding.add(msg_id)
1015 self.outstanding.add(msg_id)
1057 self.history.append(msg_id)
1016 self.history.append(msg_id)
1058 ar = AsyncResult(self, [msg_id], fname=f.__name__)
1017 ar = AsyncResult(self, [msg_id], fname=f.__name__)
1059 if block:
1018 if block:
1060 try:
1019 try:
1061 return ar.get()
1020 return ar.get()
1062 except KeyboardInterrupt:
1021 except KeyboardInterrupt:
1063 return ar
1022 return ar
1064 else:
1023 else:
1065 return ar
1024 return ar
1066
1025
1067 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
1026 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None):
1068 """Then underlying method for applying functions to specific engines
1027 """Then underlying method for applying functions to specific engines
1069 via the MUX queue.
1028 via the MUX queue.
1070
1029
1030 This is a private method, see `apply` for details.
1071 Not to be called directly!
1031 Not to be called directly!
1072 """
1032 """
1073
1033
1034 for kwarg in (bound, block, targets):
1035 assert kwarg is not None, "kwarg %r must be specified!"%kwarg
1036
1074 idents,targets = self._build_targets(targets)
1037 idents,targets = self._build_targets(targets)
1075
1038
1076 subheader = {}
1039 subheader = {}
1077 content = dict(bound=bound)
1040 content = dict(bound=bound)
1078 bufs = ss.pack_apply_message(f,args,kwargs)
1041 bufs = ss.pack_apply_message(f,args,kwargs)
1079
1042
1080 msg_ids = []
1043 msg_ids = []
1081 for ident in idents:
1044 for ident in idents:
1082 msg = self.session.send(self._mux_socket, "apply_request",
1045 msg = self.session.send(self._mux_socket, "apply_request",
1083 content=content, buffers=bufs, ident=ident, subheader=subheader)
1046 content=content, buffers=bufs, ident=ident, subheader=subheader)
1084 msg_id = msg['msg_id']
1047 msg_id = msg['msg_id']
1085 self.outstanding.add(msg_id)
1048 self.outstanding.add(msg_id)
1086 self.history.append(msg_id)
1049 self.history.append(msg_id)
1087 msg_ids.append(msg_id)
1050 msg_ids.append(msg_id)
1088 ar = AsyncResult(self, msg_ids, fname=f.__name__)
1051 ar = AsyncResult(self, msg_ids, fname=f.__name__)
1089 if block:
1052 if block:
1090 try:
1053 try:
1091 return ar.get()
1054 return ar.get()
1092 except KeyboardInterrupt:
1055 except KeyboardInterrupt:
1093 return ar
1056 return ar
1094 else:
1057 else:
1095 return ar
1058 return ar
1096
1059
1097 #--------------------------------------------------------------------------
1060 #--------------------------------------------------------------------------
1098 # Map and decorators
1061 # decorators
1099 #--------------------------------------------------------------------------
1062 #--------------------------------------------------------------------------
1100
1063
1101 def map(self, f, *sequences, **kwargs):
1064 @defaultblock
1102 """Parallel version of builtin `map`, using all our engines.
1065 def parallel(self, bound=True, targets='all', block=None):
1103
1104 `block` and `targets` can be passed as keyword arguments only.
1105
1106 There will be one task per target, so work will be chunked
1107 if the sequences are longer than `targets`.
1108
1109 Results can be iterated as they are ready, but will become available in chunks.
1110
1111 Parameters
1112 ----------
1113
1114 f : callable
1115 function to be mapped
1116 *sequences: one or more sequences of matching length
1117 the sequences to be distributed and passed to `f`
1118 block : bool
1119 whether to wait for the result or not [default self.block]
1120 targets : valid targets
1121 targets to be used [default self.targets]
1122
1123 Returns
1124 -------
1125
1126 if block=False:
1127 AsyncMapResult
1128 An object like AsyncResult, but which reassembles the sequence of results
1129 into a single list. AsyncMapResults can be iterated through before all
1130 results are complete.
1131 else:
1132 the result of map(f,*sequences)
1133
1134 """
1135 block = kwargs.get('block', self.block)
1136 targets = kwargs.get('targets', self.targets)
1137 assert len(sequences) > 0, "must have some sequences to map onto!"
1138 pf = ParallelFunction(self, f, block=block,
1139 bound=True, targets=targets)
1140 return pf.map(*sequences)
1141
1142 def imap(self, f, *sequences, **kwargs):
1143 """Parallel version of builtin `itertools.imap`, load-balanced across all engines.
1144
1145 Each element will be a separate task, and will be load-balanced. This
1146 lets individual elements be ready for iteration as soon as they come.
1147
1148 Parameters
1149 ----------
1150
1151 f : callable
1152 function to be mapped
1153 *sequences: one or more sequences of matching length
1154 the sequences to be distributed and passed to `f`
1155 block : bool
1156 whether to wait for the result or not [default self.block]
1157
1158 Returns
1159 -------
1160
1161 if block=False:
1162 AsyncMapResult
1163 An object like AsyncResult, but which reassembles the sequence of results
1164 into a single list. AsyncMapResults can be iterated through before all
1165 results are complete.
1166 else:
1167 the result of map(f,*sequences)
1168
1169 """
1170
1171 block = kwargs.get('block', self.block)
1172
1173 assert len(sequences) > 0, "must have some sequences to map onto!"
1174
1175 pf = ParallelFunction(self, f, block=self.block,
1176 bound=True, targets=None)
1177 return pf.map(*sequences)
1178
1179 def parallel(self, bound=True, targets='all', block=True):
1180 """Decorator for making a ParallelFunction."""
1066 """Decorator for making a ParallelFunction."""
1181 return parallel(self, bound=bound, targets=targets, block=block)
1067 return parallel(self, bound=bound, targets=targets, block=block)
1182
1068
1183 def remote(self, bound=True, targets='all', block=True):
1069 @defaultblock
1070 def remote(self, bound=True, targets='all', block=None):
1184 """Decorator for making a RemoteFunction."""
1071 """Decorator for making a RemoteFunction."""
1185 return remote(self, bound=bound, targets=targets, block=block)
1072 return remote(self, bound=bound, targets=targets, block=block)
1186
1073
1187 def view(self, targets=None, balanced=False):
1074 def view(self, targets=None, balanced=False):
1188 """Method for constructing View objects"""
1075 """Method for constructing View objects"""
1189 if not balanced:
1076 if targets is None:
1190 if not targets:
1077 if balanced:
1078 return LoadBalancedView(client=self)
1079 else:
1191 targets = slice(None)
1080 targets = slice(None)
1192 return self[targets]
1081
1082 if balanced:
1083 view_class = LoadBalancedView
1084 else:
1085 view_class = DirectView
1086 if isinstance(targets, int):
1087 if targets not in self.ids:
1088 raise IndexError("No such engine: %i"%targets)
1089 return view_class(client=self, targets=targets)
1090
1091 if isinstance(targets, slice):
1092 indices = range(len(self.ids))[targets]
1093 ids = sorted(self._ids)
1094 targets = [ ids[i] for i in indices ]
1095
1096 if isinstance(targets, (tuple, list, xrange)):
1097 _,targets = self._build_targets(list(targets))
1098 return view_class(client=self, targets=targets)
1193 else:
1099 else:
1194 return LoadBalancedView(self, targets)
1100 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1195
1101
1196 #--------------------------------------------------------------------------
1102 #--------------------------------------------------------------------------
1197 # Data movement
1103 # Data movement
1198 #--------------------------------------------------------------------------
1104 #--------------------------------------------------------------------------
1199
1105
1200 @defaultblock
1106 @defaultblock
1201 def push(self, ns, targets='all', block=None):
1107 def push(self, ns, targets='all', block=None):
1202 """Push the contents of `ns` into the namespace on `target`"""
1108 """Push the contents of `ns` into the namespace on `target`"""
1203 if not isinstance(ns, dict):
1109 if not isinstance(ns, dict):
1204 raise TypeError("Must be a dict, not %s"%type(ns))
1110 raise TypeError("Must be a dict, not %s"%type(ns))
1205 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
1111 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
1206 return result
1112 return result
1207
1113
1208 @defaultblock
1114 @defaultblock
1209 def pull(self, keys, targets='all', block=None):
1115 def pull(self, keys, targets='all', block=None):
1210 """Pull objects from `target`'s namespace by `keys`"""
1116 """Pull objects from `target`'s namespace by `keys`"""
1211 if isinstance(keys, str):
1117 if isinstance(keys, str):
1212 pass
1118 pass
1213 elif isinstance(keys, (list,tuple,set)):
1119 elif isinstance(keys, (list,tuple,set)):
1214 for key in keys:
1120 for key in keys:
1215 if not isinstance(key, str):
1121 if not isinstance(key, str):
1216 raise TypeError
1122 raise TypeError
1217 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
1123 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1218 return result
1124 return result
1219
1125
1126 @defaultblock
1220 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1127 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1221 """
1128 """
1222 Partition a Python sequence and send the partitions to a set of engines.
1129 Partition a Python sequence and send the partitions to a set of engines.
1223 """
1130 """
1224 block = block if block is not None else self.block
1225 targets = self._build_targets(targets)[-1]
1131 targets = self._build_targets(targets)[-1]
1226 mapObject = Map.dists[dist]()
1132 mapObject = Map.dists[dist]()
1227 nparts = len(targets)
1133 nparts = len(targets)
1228 msg_ids = []
1134 msg_ids = []
1229 for index, engineid in enumerate(targets):
1135 for index, engineid in enumerate(targets):
1230 partition = mapObject.getPartition(seq, index, nparts)
1136 partition = mapObject.getPartition(seq, index, nparts)
1231 if flatten and len(partition) == 1:
1137 if flatten and len(partition) == 1:
1232 r = self.push({key: partition[0]}, targets=engineid, block=False)
1138 r = self.push({key: partition[0]}, targets=engineid, block=False)
1233 else:
1139 else:
1234 r = self.push({key: partition}, targets=engineid, block=False)
1140 r = self.push({key: partition}, targets=engineid, block=False)
1235 msg_ids.extend(r.msg_ids)
1141 msg_ids.extend(r.msg_ids)
1236 r = AsyncResult(self, msg_ids, fname='scatter')
1142 r = AsyncResult(self, msg_ids, fname='scatter')
1237 if block:
1143 if block:
1238 return r.get()
1144 return r.get()
1239 else:
1145 else:
1240 return r
1146 return r
1241
1147
1148 @defaultblock
1242 def gather(self, key, dist='b', targets='all', block=None):
1149 def gather(self, key, dist='b', targets='all', block=None):
1243 """
1150 """
1244 Gather a partitioned sequence on a set of engines as a single local seq.
1151 Gather a partitioned sequence on a set of engines as a single local seq.
1245 """
1152 """
1246 block = block if block is not None else self.block
1247
1153
1248 targets = self._build_targets(targets)[-1]
1154 targets = self._build_targets(targets)[-1]
1249 mapObject = Map.dists[dist]()
1155 mapObject = Map.dists[dist]()
1250 msg_ids = []
1156 msg_ids = []
1251 for index, engineid in enumerate(targets):
1157 for index, engineid in enumerate(targets):
1252 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1158 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1253
1159
1254 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1160 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1255 if block:
1161 if block:
1256 return r.get()
1162 return r.get()
1257 else:
1163 else:
1258 return r
1164 return r
1259
1165
1260 #--------------------------------------------------------------------------
1166 #--------------------------------------------------------------------------
1261 # Query methods
1167 # Query methods
1262 #--------------------------------------------------------------------------
1168 #--------------------------------------------------------------------------
1263
1169
1264 @spinfirst
1170 @spinfirst
1265 def get_results(self, msg_ids, status_only=False):
1171 def get_results(self, msg_ids, status_only=False):
1266 """Returns the result of the execute or task request with `msg_ids`.
1172 """Returns the result of the execute or task request with `msg_ids`.
1267
1173
1268 Parameters
1174 Parameters
1269 ----------
1175 ----------
1176
1270 msg_ids : list of ints or msg_ids
1177 msg_ids : list of ints or msg_ids
1271 if int:
1178 if int:
1272 Passed as index to self.history for convenience.
1179 Passed as index to self.history for convenience.
1273 status_only : bool (default: False)
1180 status_only : bool (default: False)
1274 if False:
1181 if False:
1275 return the actual results
1182 return the actual results
1276
1183
1277 Returns
1184 Returns
1278 -------
1185 -------
1279
1186
1280 results : dict
1187 results : dict
1281 There will always be the keys 'pending' and 'completed', which will
1188 There will always be the keys 'pending' and 'completed', which will
1282 be lists of msg_ids.
1189 be lists of msg_ids.
1283 """
1190 """
1284 if not isinstance(msg_ids, (list,tuple)):
1191 if not isinstance(msg_ids, (list,tuple)):
1285 msg_ids = [msg_ids]
1192 msg_ids = [msg_ids]
1286 theids = []
1193 theids = []
1287 for msg_id in msg_ids:
1194 for msg_id in msg_ids:
1288 if isinstance(msg_id, int):
1195 if isinstance(msg_id, int):
1289 msg_id = self.history[msg_id]
1196 msg_id = self.history[msg_id]
1290 if not isinstance(msg_id, str):
1197 if not isinstance(msg_id, str):
1291 raise TypeError("msg_ids must be str, not %r"%msg_id)
1198 raise TypeError("msg_ids must be str, not %r"%msg_id)
1292 theids.append(msg_id)
1199 theids.append(msg_id)
1293
1200
1294 completed = []
1201 completed = []
1295 local_results = {}
1202 local_results = {}
1296
1203
1297 # comment this block out to temporarily disable local shortcut:
1204 # comment this block out to temporarily disable local shortcut:
1298 for msg_id in list(theids):
1205 for msg_id in list(theids):
1299 if msg_id in self.results:
1206 if msg_id in self.results:
1300 completed.append(msg_id)
1207 completed.append(msg_id)
1301 local_results[msg_id] = self.results[msg_id]
1208 local_results[msg_id] = self.results[msg_id]
1302 theids.remove(msg_id)
1209 theids.remove(msg_id)
1303
1210
1304 if theids: # some not locally cached
1211 if theids: # some not locally cached
1305 content = dict(msg_ids=theids, status_only=status_only)
1212 content = dict(msg_ids=theids, status_only=status_only)
1306 msg = self.session.send(self._query_socket, "result_request", content=content)
1213 msg = self.session.send(self._query_socket, "result_request", content=content)
1307 zmq.select([self._query_socket], [], [])
1214 zmq.select([self._query_socket], [], [])
1308 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1215 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1309 if self.debug:
1216 if self.debug:
1310 pprint(msg)
1217 pprint(msg)
1311 content = msg['content']
1218 content = msg['content']
1312 if content['status'] != 'ok':
1219 if content['status'] != 'ok':
1313 raise ss.unwrap_exception(content)
1220 raise ss.unwrap_exception(content)
1314 buffers = msg['buffers']
1221 buffers = msg['buffers']
1315 else:
1222 else:
1316 content = dict(completed=[],pending=[])
1223 content = dict(completed=[],pending=[])
1317
1224
1318 content['completed'].extend(completed)
1225 content['completed'].extend(completed)
1319
1226
1320 if status_only:
1227 if status_only:
1321 return content
1228 return content
1322
1229
1323 failures = []
1230 failures = []
1324 # load cached results into result:
1231 # load cached results into result:
1325 content.update(local_results)
1232 content.update(local_results)
1326 # update cache with results:
1233 # update cache with results:
1327 for msg_id in sorted(theids):
1234 for msg_id in sorted(theids):
1328 if msg_id in content['completed']:
1235 if msg_id in content['completed']:
1329 rec = content[msg_id]
1236 rec = content[msg_id]
1330 parent = rec['header']
1237 parent = rec['header']
1331 header = rec['result_header']
1238 header = rec['result_header']
1332 rcontent = rec['result_content']
1239 rcontent = rec['result_content']
1333 iodict = rec['io']
1240 iodict = rec['io']
1334 if isinstance(rcontent, str):
1241 if isinstance(rcontent, str):
1335 rcontent = self.session.unpack(rcontent)
1242 rcontent = self.session.unpack(rcontent)
1336
1243
1337 md = self.metadata.setdefault(msg_id, Metadata())
1244 md = self.metadata.setdefault(msg_id, Metadata())
1338 md.update(self._extract_metadata(header, parent, rcontent))
1245 md.update(self._extract_metadata(header, parent, rcontent))
1339 md.update(iodict)
1246 md.update(iodict)
1340
1247
1341 if rcontent['status'] == 'ok':
1248 if rcontent['status'] == 'ok':
1342 res,buffers = ss.unserialize_object(buffers)
1249 res,buffers = ss.unserialize_object(buffers)
1343 else:
1250 else:
1344 res = ss.unwrap_exception(rcontent)
1251 res = ss.unwrap_exception(rcontent)
1345 failures.append(res)
1252 failures.append(res)
1346
1253
1347 self.results[msg_id] = res
1254 self.results[msg_id] = res
1348 content[msg_id] = res
1255 content[msg_id] = res
1349
1256
1350 error.collect_exceptions(failures, "get_results")
1257 error.collect_exceptions(failures, "get_results")
1351 return content
1258 return content
1352
1259
1353 @spinfirst
1260 @spinfirst
1354 def queue_status(self, targets=None, verbose=False):
1261 def queue_status(self, targets='all', verbose=False):
1355 """Fetch the status of engine queues.
1262 """Fetch the status of engine queues.
1356
1263
1357 Parameters
1264 Parameters
1358 ----------
1265 ----------
1266
1359 targets : int/str/list of ints/strs
1267 targets : int/str/list of ints/strs
1360 the engines on which to execute
1268 the engines whose states are to be queried.
1361 default : all
1269 default : all
1362 verbose : bool
1270 verbose : bool
1363 Whether to return lengths only, or lists of ids for each element
1271 Whether to return lengths only, or lists of ids for each element
1364 """
1272 """
1365 targets = self._build_targets(targets)[1]
1273 targets = self._build_targets(targets)[1]
1366 content = dict(targets=targets, verbose=verbose)
1274 content = dict(targets=targets, verbose=verbose)
1367 self.session.send(self._query_socket, "queue_request", content=content)
1275 self.session.send(self._query_socket, "queue_request", content=content)
1368 idents,msg = self.session.recv(self._query_socket, 0)
1276 idents,msg = self.session.recv(self._query_socket, 0)
1369 if self.debug:
1277 if self.debug:
1370 pprint(msg)
1278 pprint(msg)
1371 content = msg['content']
1279 content = msg['content']
1372 status = content.pop('status')
1280 status = content.pop('status')
1373 if status != 'ok':
1281 if status != 'ok':
1374 raise ss.unwrap_exception(content)
1282 raise ss.unwrap_exception(content)
1375 return ss.rekey(content)
1283 return ss.rekey(content)
1376
1284
1377 @spinfirst
1285 @spinfirst
1378 def purge_results(self, msg_ids=[], targets=[]):
1286 def purge_results(self, msg_ids=[], targets=[]):
1379 """Tell the controller to forget results.
1287 """Tell the controller to forget results.
1380
1288
1381 Individual results can be purged by msg_id, or the entire
1289 Individual results can be purged by msg_id, or the entire
1382 history of specific targets can be purged.
1290 history of specific targets can be purged.
1383
1291
1384 Parameters
1292 Parameters
1385 ----------
1293 ----------
1294
1386 msg_ids : str or list of strs
1295 msg_ids : str or list of strs
1387 the msg_ids whose results should be forgotten.
1296 the msg_ids whose results should be forgotten.
1388 targets : int/str/list of ints/strs
1297 targets : int/str/list of ints/strs
1389 The targets, by uuid or int_id, whose entire history is to be purged.
1298 The targets, by uuid or int_id, whose entire history is to be purged.
1390 Use `targets='all'` to scrub everything from the controller's memory.
1299 Use `targets='all'` to scrub everything from the controller's memory.
1391
1300
1392 default : None
1301 default : None
1393 """
1302 """
1394 if not targets and not msg_ids:
1303 if not targets and not msg_ids:
1395 raise ValueError
1304 raise ValueError
1396 if targets:
1305 if targets:
1397 targets = self._build_targets(targets)[1]
1306 targets = self._build_targets(targets)[1]
1398 content = dict(targets=targets, msg_ids=msg_ids)
1307 content = dict(targets=targets, msg_ids=msg_ids)
1399 self.session.send(self._query_socket, "purge_request", content=content)
1308 self.session.send(self._query_socket, "purge_request", content=content)
1400 idents, msg = self.session.recv(self._query_socket, 0)
1309 idents, msg = self.session.recv(self._query_socket, 0)
1401 if self.debug:
1310 if self.debug:
1402 pprint(msg)
1311 pprint(msg)
1403 content = msg['content']
1312 content = msg['content']
1404 if content['status'] != 'ok':
1313 if content['status'] != 'ok':
1405 raise ss.unwrap_exception(content)
1314 raise ss.unwrap_exception(content)
1406
1315
1407 #----------------------------------------
1408 # activate for %px,%autopx magics
1409 #----------------------------------------
1410 def activate(self):
1411 """Make this `View` active for parallel magic commands.
1412
1413 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1414 In a given IPython session there is a single active one. While
1415 there can be many `Views` created and used by the user,
1416 there is only one active one. The active `View` is used whenever
1417 the magic commands %px and %autopx are used.
1418
1419 The activate() method is called on a given `View` to make it
1420 active. Once this has been done, the magic commands can be used.
1421 """
1422
1423 try:
1424 # This is injected into __builtins__.
1425 ip = get_ipython()
1426 except NameError:
1427 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1428 else:
1429 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1430 if pmagic is not None:
1431 pmagic.active_multiengine_client = self
1432 else:
1433 print "You must first load the parallelmagic extension " \
1434 "by doing '%load_ext parallelmagic'"
1435
1436 class AsynClient(Client):
1437 """An Asynchronous client, using the Tornado Event Loop.
1438 !!!unfinished!!!"""
1439 io_loop = None
1440 _queue_stream = None
1441 _notifier_stream = None
1442 _task_stream = None
1443 _control_stream = None
1444
1445 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1446 Client.__init__(self, addr, context, username, debug)
1447 if io_loop is None:
1448 io_loop = ioloop.IOLoop.instance()
1449 self.io_loop = io_loop
1450
1451 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1452 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1453 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1454 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1455
1456 def spin(self):
1457 for stream in (self.queue_stream, self.notifier_stream,
1458 self.task_stream, self.control_stream):
1459 stream.flush()
1460
1316
1461 __all__ = [ 'Client',
1317 __all__ = [ 'Client',
1462 'depend',
1318 'depend',
1463 'require',
1319 'require',
1464 'remote',
1320 'remote',
1465 'parallel',
1321 'parallel',
1466 'RemoteFunction',
1322 'RemoteFunction',
1467 'ParallelFunction',
1323 'ParallelFunction',
1468 'DirectView',
1324 'DirectView',
1469 'LoadBalancedView',
1325 'LoadBalancedView',
1470 'AsyncResult',
1326 'AsyncResult',
1471 'AsyncMapResult'
1327 'AsyncMapResult'
1472 ]
1328 ]
@@ -1,145 +1,156 b''
1 """Remote Functions and decorators for the client."""
1 """Remote Functions and decorators for the client."""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import map as Map
13 import map as Map
14 from asyncresult import AsyncMapResult
14 from asyncresult import AsyncMapResult
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Decorators
17 # Decorators
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 def remote(client, bound=False, block=None, targets=None):
20 def remote(client, bound=False, block=None, targets=None, balanced=None):
21 """Turn a function into a remote function.
21 """Turn a function into a remote function.
22
22
23 This method can be used for map:
23 This method can be used for map:
24
24
25 >>> @remote(client,block=True)
25 >>> @remote(client,block=True)
26 def func(a)
26 def func(a)
27 """
27 """
28 def remote_function(f):
28 def remote_function(f):
29 return RemoteFunction(client, f, bound, block, targets)
29 return RemoteFunction(client, f, bound, block, targets, balanced)
30 return remote_function
30 return remote_function
31
31
32 def parallel(client, dist='b', bound=False, block=None, targets='all'):
32 def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None):
33 """Turn a function into a parallel remote function.
33 """Turn a function into a parallel remote function.
34
34
35 This method can be used for map:
35 This method can be used for map:
36
36
37 >>> @parallel(client,block=True)
37 >>> @parallel(client,block=True)
38 def func(a)
38 def func(a)
39 """
39 """
40 def parallel_function(f):
40 def parallel_function(f):
41 return ParallelFunction(client, f, dist, bound, block, targets)
41 return ParallelFunction(client, f, dist, bound, block, targets, balanced)
42 return parallel_function
42 return parallel_function
43
43
44 #--------------------------------------------------------------------------
44 #--------------------------------------------------------------------------
45 # Classes
45 # Classes
46 #--------------------------------------------------------------------------
46 #--------------------------------------------------------------------------
47
47
48 class RemoteFunction(object):
48 class RemoteFunction(object):
49 """Turn an existing function into a remote function.
49 """Turn an existing function into a remote function.
50
50
51 Parameters
51 Parameters
52 ----------
52 ----------
53
53
54 client : Client instance
54 client : Client instance
55 The client to be used to connect to engines
55 The client to be used to connect to engines
56 f : callable
56 f : callable
57 The function to be wrapped into a remote function
57 The function to be wrapped into a remote function
58 bound : bool [default: False]
58 bound : bool [default: False]
59 Whether the affect the remote namespace when called
59 Whether the affect the remote namespace when called
60 block : bool [default: None]
60 block : bool [default: None]
61 Whether to wait for results or not. The default behavior is
61 Whether to wait for results or not. The default behavior is
62 to use the current `block` attribute of `client`
62 to use the current `block` attribute of `client`
63 targets : valid target list [default: all]
63 targets : valid target list [default: all]
64 The targets on which to execute.
64 The targets on which to execute.
65 balanced : bool
66 Whether to load-balance with the Task scheduler or not
65 """
67 """
66
68
67 client = None # the remote connection
69 client = None # the remote connection
68 func = None # the wrapped function
70 func = None # the wrapped function
69 block = None # whether to block
71 block = None # whether to block
70 bound = None # whether to affect the namespace
72 bound = None # whether to affect the namespace
71 targets = None # where to execute
73 targets = None # where to execute
74 balanced = None # whether to load-balance
72
75
73 def __init__(self, client, f, bound=False, block=None, targets=None):
76 def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None):
74 self.client = client
77 self.client = client
75 self.func = f
78 self.func = f
76 self.block=block
79 self.block=block
77 self.bound=bound
80 self.bound=bound
78 self.targets=targets
81 self.targets=targets
82 if balanced is None:
83 if targets is None:
84 balanced = True
85 else:
86 balanced = False
87 self.balanced = balanced
79
88
80 def __call__(self, *args, **kwargs):
89 def __call__(self, *args, **kwargs):
81 return self.client.apply(self.func, args=args, kwargs=kwargs,
90 return self.client.apply(self.func, args=args, kwargs=kwargs,
82 block=self.block, targets=self.targets, bound=self.bound)
91 block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced)
83
92
84
93
85 class ParallelFunction(RemoteFunction):
94 class ParallelFunction(RemoteFunction):
86 """Class for mapping a function to sequences."""
95 """Class for mapping a function to sequences."""
87 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'):
96 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None):
88 super(ParallelFunction, self).__init__(client,f,bound,block,targets)
97 super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced)
89 mapClass = Map.dists[dist]
98 mapClass = Map.dists[dist]
90 self.mapObject = mapClass()
99 self.mapObject = mapClass()
91
100
92 def __call__(self, *sequences):
101 def __call__(self, *sequences):
93 len_0 = len(sequences[0])
102 len_0 = len(sequences[0])
94 for s in sequences:
103 for s in sequences:
95 if len(s)!=len_0:
104 if len(s)!=len_0:
96 raise ValueError('all sequences must have equal length')
105 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
106 raise ValueError(msg)
97
107
98 if self.targets is None:
108 if self.balanced:
99 # load-balanced:
109 targets = [self.targets]*len_0
100 engines = [None]*len_0
101 elif isinstance(self.targets, int):
102 engines = [None]*self.targets
103 else:
110 else:
104 # multiplexed:
111 # multiplexed:
105 engines = self.client._build_targets(self.targets)[-1]
112 targets = self.client._build_targets(self.targets)[-1]
106
113
107 nparts = len(engines)
114 nparts = len(targets)
108 msg_ids = []
115 msg_ids = []
109 # my_f = lambda *a: map(self.func, *a)
116 # my_f = lambda *a: map(self.func, *a)
110 for index, engineid in enumerate(engines):
117 for index, t in enumerate(targets):
111 args = []
118 args = []
112 for seq in sequences:
119 for seq in sequences:
113 part = self.mapObject.getPartition(seq, index, nparts)
120 part = self.mapObject.getPartition(seq, index, nparts)
114 if not part:
121 if not part:
115 continue
122 continue
116 else:
123 else:
117 args.append(part)
124 args.append(part)
118 if not args:
125 if not args:
119 continue
126 continue
120
127
121 # print (args)
128 # print (args)
122 if hasattr(self, '_map'):
129 if hasattr(self, '_map'):
123 f = map
130 f = map
124 args = [self.func]+args
131 args = [self.func]+args
125 else:
132 else:
126 f=self.func
133 f=self.func
127 mid = self.client.apply(f, args=args, block=False,
134 ar = self.client.apply(f, args=args, block=False, bound=self.bound,
128 bound=self.bound,
135 targets=targets, balanced=self.balanced)
129 targets=engineid).msg_ids[0]
136
130 msg_ids.append(mid)
137 msg_ids.append(ar.msg_ids[0])
131
138
132 r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
139 r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
133 if self.block:
140 if self.block:
134 r.wait()
141 try:
135 return r.result
142 return r.get()
143 except KeyboardInterrupt:
144 return r
136 else:
145 else:
137 return r
146 return r
138
147
139 def map(self, *sequences):
148 def map(self, *sequences):
140 """call a function on each element of a sequence remotely."""
149 """call a function on each element of a sequence remotely."""
141 self._map = True
150 self._map = True
142 ret = self.__call__(*sequences)
151 try:
143 del self._map
152 ret = self.__call__(*sequences)
153 finally:
154 del self._map
144 return ret
155 return ret
145
156
@@ -1,119 +1,119 b''
1 """some generic utilities"""
1 """some generic utilities"""
2 import re
2 import re
3 import socket
3 import socket
4
4
5 class ReverseDict(dict):
5 class ReverseDict(dict):
6 """simple double-keyed subset of dict methods."""
6 """simple double-keyed subset of dict methods."""
7
7
8 def __init__(self, *args, **kwargs):
8 def __init__(self, *args, **kwargs):
9 dict.__init__(self, *args, **kwargs)
9 dict.__init__(self, *args, **kwargs)
10 self._reverse = dict()
10 self._reverse = dict()
11 for key, value in self.iteritems():
11 for key, value in self.iteritems():
12 self._reverse[value] = key
12 self._reverse[value] = key
13
13
14 def __getitem__(self, key):
14 def __getitem__(self, key):
15 try:
15 try:
16 return dict.__getitem__(self, key)
16 return dict.__getitem__(self, key)
17 except KeyError:
17 except KeyError:
18 return self._reverse[key]
18 return self._reverse[key]
19
19
20 def __setitem__(self, key, value):
20 def __setitem__(self, key, value):
21 if key in self._reverse:
21 if key in self._reverse:
22 raise KeyError("Can't have key %r on both sides!"%key)
22 raise KeyError("Can't have key %r on both sides!"%key)
23 dict.__setitem__(self, key, value)
23 dict.__setitem__(self, key, value)
24 self._reverse[value] = key
24 self._reverse[value] = key
25
25
26 def pop(self, key):
26 def pop(self, key):
27 value = dict.pop(self, key)
27 value = dict.pop(self, key)
28 self._reverse.pop(value)
28 self._reverse.pop(value)
29 return value
29 return value
30
30
31 def get(self, key, default=None):
31 def get(self, key, default=None):
32 try:
32 try:
33 return self[key]
33 return self[key]
34 except KeyError:
34 except KeyError:
35 return default
35 return default
36
36
37
37
38 def validate_url(url):
38 def validate_url(url):
39 """validate a url for zeromq"""
39 """validate a url for zeromq"""
40 if not isinstance(url, basestring):
40 if not isinstance(url, basestring):
41 raise TypeError("url must be a string, not %r"%type(url))
41 raise TypeError("url must be a string, not %r"%type(url))
42 url = url.lower()
42 url = url.lower()
43
43
44 proto_addr = url.split('://')
44 proto_addr = url.split('://')
45 assert len(proto_addr) == 2, 'Invalid url: %r'%url
45 assert len(proto_addr) == 2, 'Invalid url: %r'%url
46 proto, addr = proto_addr
46 proto, addr = proto_addr
47 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
47 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
48
48
49 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
49 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
50 # author: Remi Sabourin
50 # author: Remi Sabourin
51 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
51 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
52
52
53 if proto == 'tcp':
53 if proto == 'tcp':
54 lis = addr.split(':')
54 lis = addr.split(':')
55 assert len(lis) == 2, 'Invalid url: %r'%url
55 assert len(lis) == 2, 'Invalid url: %r'%url
56 addr,s_port = lis
56 addr,s_port = lis
57 try:
57 try:
58 port = int(s_port)
58 port = int(s_port)
59 except ValueError:
59 except ValueError:
60 raise AssertionError("Invalid port %r in url: %r"%(port, url))
60 raise AssertionError("Invalid port %r in url: %r"%(port, url))
61
61
62 assert pat.match(addr) is not None, 'Invalid url: %r'%url
62 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
63
63
64 else:
64 else:
65 # only validate tcp urls currently
65 # only validate tcp urls currently
66 pass
66 pass
67
67
68 return True
68 return True
69
69
70
70
71 def validate_url_container(container):
71 def validate_url_container(container):
72 """validate a potentially nested collection of urls."""
72 """validate a potentially nested collection of urls."""
73 if isinstance(container, basestring):
73 if isinstance(container, basestring):
74 url = container
74 url = container
75 return validate_url(url)
75 return validate_url(url)
76 elif isinstance(container, dict):
76 elif isinstance(container, dict):
77 container = container.itervalues()
77 container = container.itervalues()
78
78
79 for element in container:
79 for element in container:
80 validate_url_container(element)
80 validate_url_container(element)
81
81
82
82
83 def split_url(url):
83 def split_url(url):
84 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
84 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
85 proto_addr = url.split('://')
85 proto_addr = url.split('://')
86 assert len(proto_addr) == 2, 'Invalid url: %r'%url
86 assert len(proto_addr) == 2, 'Invalid url: %r'%url
87 proto, addr = proto_addr
87 proto, addr = proto_addr
88 lis = addr.split(':')
88 lis = addr.split(':')
89 assert len(lis) == 2, 'Invalid url: %r'%url
89 assert len(lis) == 2, 'Invalid url: %r'%url
90 addr,s_port = lis
90 addr,s_port = lis
91 return proto,addr,s_port
91 return proto,addr,s_port
92
92
93 def disambiguate_ip_address(ip, location=None):
93 def disambiguate_ip_address(ip, location=None):
94 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
94 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
95 ones, based on the location (default interpretation of location is localhost)."""
95 ones, based on the location (default interpretation of location is localhost)."""
96 if ip in ('0.0.0.0', '*'):
96 if ip in ('0.0.0.0', '*'):
97 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
97 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
98 if location is None or location in external_ips:
98 if location is None or location in external_ips:
99 ip='127.0.0.1'
99 ip='127.0.0.1'
100 elif location:
100 elif location:
101 return location
101 return location
102 return ip
102 return ip
103
103
104 def disambiguate_url(url, location=None):
104 def disambiguate_url(url, location=None):
105 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
105 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
106 ones, based on the location (default interpretation is localhost).
106 ones, based on the location (default interpretation is localhost).
107
107
108 This is for zeromq urls, such as tcp://*:10101."""
108 This is for zeromq urls, such as tcp://*:10101."""
109 try:
109 try:
110 proto,ip,port = split_url(url)
110 proto,ip,port = split_url(url)
111 except AssertionError:
111 except AssertionError:
112 # probably not tcp url; could be ipc, etc.
112 # probably not tcp url; could be ipc, etc.
113 return url
113 return url
114
114
115 ip = disambiguate_ip_address(ip,location)
115 ip = disambiguate_ip_address(ip,location)
116
116
117 return "%s://%s:%s"%(proto,ip,port)
117 return "%s://%s:%s"%(proto,ip,port)
118
118
119
119
@@ -1,383 +1,572 b''
1 """Views of remote engines"""
1 """Views of remote engines"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance
14
13 from IPython.external.decorator import decorator
15 from IPython.external.decorator import decorator
16 from IPython.zmq.parallel.asyncresult import AsyncResult
17 from IPython.zmq.parallel.dependency import Dependency
14 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
18 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
15
19
16 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
17 # Decorators
21 # Decorators
18 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
19
23
20 @decorator
24 @decorator
21 def myblock(f, self, *args, **kwargs):
25 def myblock(f, self, *args, **kwargs):
22 """override client.block with self.block during a call"""
26 """override client.block with self.block during a call"""
23 block = self.client.block
27 block = self.client.block
24 self.client.block = self.block
28 self.client.block = self.block
25 try:
29 try:
26 ret = f(self, *args, **kwargs)
30 ret = f(self, *args, **kwargs)
27 finally:
31 finally:
28 self.client.block = block
32 self.client.block = block
29 return ret
33 return ret
30
34
31 @decorator
35 @decorator
32 def save_ids(f, self, *args, **kwargs):
36 def save_ids(f, self, *args, **kwargs):
33 """Keep our history and outstanding attributes up to date after a method call."""
37 """Keep our history and outstanding attributes up to date after a method call."""
34 n_previous = len(self.client.history)
38 n_previous = len(self.client.history)
35 ret = f(self, *args, **kwargs)
39 ret = f(self, *args, **kwargs)
36 nmsgs = len(self.client.history) - n_previous
40 nmsgs = len(self.client.history) - n_previous
37 msg_ids = self.client.history[-nmsgs:]
41 msg_ids = self.client.history[-nmsgs:]
38 self.history.extend(msg_ids)
42 self.history.extend(msg_ids)
39 map(self.outstanding.add, msg_ids)
43 map(self.outstanding.add, msg_ids)
40 return ret
44 return ret
41
45
42 @decorator
46 @decorator
43 def sync_results(f, self, *args, **kwargs):
47 def sync_results(f, self, *args, **kwargs):
44 """sync relevant results from self.client to our results attribute."""
48 """sync relevant results from self.client to our results attribute."""
45 ret = f(self, *args, **kwargs)
49 ret = f(self, *args, **kwargs)
46 delta = self.outstanding.difference(self.client.outstanding)
50 delta = self.outstanding.difference(self.client.outstanding)
47 completed = self.outstanding.intersection(delta)
51 completed = self.outstanding.intersection(delta)
48 self.outstanding = self.outstanding.difference(completed)
52 self.outstanding = self.outstanding.difference(completed)
49 for msg_id in completed:
53 for msg_id in completed:
50 self.results[msg_id] = self.client.results[msg_id]
54 self.results[msg_id] = self.client.results[msg_id]
51 return ret
55 return ret
52
56
53 @decorator
57 @decorator
54 def spin_after(f, self, *args, **kwargs):
58 def spin_after(f, self, *args, **kwargs):
55 """call spin after the method."""
59 """call spin after the method."""
56 ret = f(self, *args, **kwargs)
60 ret = f(self, *args, **kwargs)
57 self.spin()
61 self.spin()
58 return ret
62 return ret
59
63
60 #-----------------------------------------------------------------------------
64 #-----------------------------------------------------------------------------
61 # Classes
65 # Classes
62 #-----------------------------------------------------------------------------
66 #-----------------------------------------------------------------------------
63
67
64 class View(object):
68 class View(HasTraits):
65 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
69 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
66
70
67 Don't use this class, use subclasses.
71 Don't use this class, use subclasses.
68 """
72 """
69 block=None
73 block=Bool(False)
70 bound=None
74 bound=Bool(False)
71 history=None
75 history=List()
72 outstanding = set()
76 outstanding = Set()
73 results = {}
77 results = Dict()
74
78 client = Instance('IPython.zmq.parallel.client.Client')
79
80 _ntargets = Int(1)
81 _balanced = Bool(False)
82 _default_names = List(['block', 'bound'])
75 _targets = None
83 _targets = None
76 _apply_name = 'apply'
77 _default_names = ['targets', 'block']
78
84
79 def __init__(self, client, targets=None):
85 def __init__(self, client=None, targets=None):
80 self.client = client
86 super(View, self).__init__(client=client)
81 self._targets = targets
87 self._targets = targets
82 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
88 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
83 self.block = client.block
89 self.block = client.block
84 self.bound=False
90
85 self.history = []
86 self.outstanding = set()
87 self.results = {}
88 for name in self._default_names:
91 for name in self._default_names:
89 setattr(self, name, getattr(self, name, None))
92 setattr(self, name, getattr(self, name, None))
90
93
91
94
92 def __repr__(self):
95 def __repr__(self):
93 strtargets = str(self._targets)
96 strtargets = str(self._targets)
94 if len(strtargets) > 16:
97 if len(strtargets) > 16:
95 strtargets = strtargets[:12]+'...]'
98 strtargets = strtargets[:12]+'...]'
96 return "<%s %s>"%(self.__class__.__name__, strtargets)
99 return "<%s %s>"%(self.__class__.__name__, strtargets)
97
100
98 @property
101 @property
99 def targets(self):
102 def targets(self):
100 return self._targets
103 return self._targets
101
104
102 @targets.setter
105 @targets.setter
103 def targets(self, value):
106 def targets(self, value):
104 self._targets = value
107 raise AttributeError("Cannot set View `targets` after construction!")
105 # raise AttributeError("Cannot set my targets argument after construction!")
106
108
107 def _defaults(self, *excludes):
109 def _defaults(self, *excludes):
108 """return dict of our default attributes, excluding names given."""
110 """return dict of our default attributes, excluding names given."""
109 d = {}
111 d = dict(balanced=self._balanced, targets=self.targets)
110 for name in self._default_names:
112 for name in self._default_names:
111 if name not in excludes:
113 if name not in excludes:
112 d[name] = getattr(self, name)
114 d[name] = getattr(self, name)
113 return d
115 return d
114
116
117 def set_flags(self, **kwargs):
118 """set my attribute flags by keyword.
119
120 A View is a wrapper for the Client's apply method, but
121 with attributes that specify keyword arguments, those attributes
122 can be set by keyword argument with this method.
123
124 Parameters
125 ----------
126
127 block : bool
128 whether to wait for results
129 bound : bool
130 whether to use the client's namespace
131 """
132 for key in kwargs:
133 if key not in self._default_names:
134 raise KeyError("Invalid name: %r"%key)
135 for name in ('block', 'bound'):
136 if name in kwargs:
137 setattr(self, name, kwargs)
138
139 #----------------------------------------------------------------
140 # wrappers for client methods:
141 #----------------------------------------------------------------
115 @sync_results
142 @sync_results
116 def spin(self):
143 def spin(self):
117 """spin the client, and sync"""
144 """spin the client, and sync"""
118 self.client.spin()
145 self.client.spin()
119
146
120 @property
121 def _apply(self):
122 return getattr(self.client, self._apply_name)
123
124 @sync_results
147 @sync_results
125 @save_ids
148 @save_ids
126 def apply(self, f, *args, **kwargs):
149 def apply(self, f, *args, **kwargs):
127 """calls f(*args, **kwargs) on remote engines, returning the result.
150 """calls f(*args, **kwargs) on remote engines, returning the result.
128
151
129 This method does not involve the engine's namespace.
152 This method does not involve the engine's namespace.
130
153
131 if self.block is False:
154 if self.block is False:
132 returns msg_id
155 returns msg_id
133 else:
156 else:
134 returns actual result of f(*args, **kwargs)
157 returns actual result of f(*args, **kwargs)
135 """
158 """
136 return self._apply(f, args, kwargs, **self._defaults())
159 return self.client.apply(f, args, kwargs, **self._defaults())
137
160
138 @save_ids
161 @save_ids
139 def apply_async(self, f, *args, **kwargs):
162 def apply_async(self, f, *args, **kwargs):
140 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
163 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
141
164
142 This method does not involve the engine's namespace.
165 This method does not involve the engine's namespace.
143
166
144 returns msg_id
167 returns msg_id
145 """
168 """
146 d = self._defaults('block', 'bound')
169 d = self._defaults('block', 'bound')
147 return self._apply(f,args,kwargs, block=False, bound=False, **d)
170 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
148
171
149 @spin_after
172 @spin_after
150 @save_ids
173 @save_ids
151 def apply_sync(self, f, *args, **kwargs):
174 def apply_sync(self, f, *args, **kwargs):
152 """calls f(*args, **kwargs) on remote engines in a blocking manner,
175 """calls f(*args, **kwargs) on remote engines in a blocking manner,
153 returning the result.
176 returning the result.
154
177
155 This method does not involve the engine's namespace.
178 This method does not involve the engine's namespace.
156
179
157 returns: actual result of f(*args, **kwargs)
180 returns: actual result of f(*args, **kwargs)
158 """
181 """
159 d = self._defaults('block', 'bound')
182 d = self._defaults('block', 'bound')
160 return self._apply(f,args,kwargs, block=True, bound=False, **d)
183 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
161
184
162 @sync_results
185 @sync_results
163 @save_ids
186 @save_ids
164 def apply_bound(self, f, *args, **kwargs):
187 def apply_bound(self, f, *args, **kwargs):
165 """calls f(*args, **kwargs) bound to engine namespace(s).
188 """calls f(*args, **kwargs) bound to engine namespace(s).
166
189
167 if self.block is False:
190 if self.block is False:
168 returns msg_id
191 returns msg_id
169 else:
192 else:
170 returns actual result of f(*args, **kwargs)
193 returns actual result of f(*args, **kwargs)
171
194
172 This method has access to the targets' globals
195 This method has access to the targets' globals
173
196
174 """
197 """
175 d = self._defaults('bound')
198 d = self._defaults('bound')
176 return self._apply(f, args, kwargs, bound=True, **d)
199 return self.client.apply(f, args, kwargs, bound=True, **d)
177
200
178 @sync_results
201 @sync_results
179 @save_ids
202 @save_ids
180 def apply_async_bound(self, f, *args, **kwargs):
203 def apply_async_bound(self, f, *args, **kwargs):
181 """calls f(*args, **kwargs) bound to engine namespace(s)
204 """calls f(*args, **kwargs) bound to engine namespace(s)
182 in a nonblocking manner.
205 in a nonblocking manner.
183
206
184 returns: msg_id
207 returns: msg_id
185
208
186 This method has access to the targets' globals
209 This method has access to the targets' globals
187
210
188 """
211 """
189 d = self._defaults('block', 'bound')
212 d = self._defaults('block', 'bound')
190 return self._apply(f, args, kwargs, block=False, bound=True, **d)
213 return self.client.apply(f, args, kwargs, block=False, bound=True, **d)
191
214
192 @spin_after
215 @spin_after
193 @save_ids
216 @save_ids
194 def apply_sync_bound(self, f, *args, **kwargs):
217 def apply_sync_bound(self, f, *args, **kwargs):
195 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
218 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
196
219
197 returns: actual result of f(*args, **kwargs)
220 returns: actual result of f(*args, **kwargs)
198
221
199 This method has access to the targets' globals
222 This method has access to the targets' globals
200
223
201 """
224 """
202 d = self._defaults('block', 'bound')
225 d = self._defaults('block', 'bound')
203 return self._apply(f, args, kwargs, block=True, bound=True, **d)
226 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
204
205 @spin_after
206 @save_ids
207 def map(self, f, *sequences):
208 """Parallel version of builtin `map`, using this view's engines."""
209 if isinstance(self.targets, int):
210 targets = [self.targets]
211 else:
212 targets = self.targets
213 pf = ParallelFunction(self.client, f, block=self.block,
214 bound=True, targets=targets)
215 return pf.map(*sequences)
216
217 def parallel(self, bound=True, block=True):
218 """Decorator for making a ParallelFunction"""
219 return parallel(self.client, bound=bound, targets=self.targets, block=block)
220
227
221 def abort(self, msg_ids=None, block=None):
228 def abort(self, msg_ids=None, block=None):
222 """Abort jobs on my engines.
229 """Abort jobs on my engines.
223
230
224 Parameters
231 Parameters
225 ----------
232 ----------
226
233
227 msg_ids : None, str, list of strs, optional
234 msg_ids : None, str, list of strs, optional
228 if None: abort all jobs.
235 if None: abort all jobs.
229 else: abort specific msg_id(s).
236 else: abort specific msg_id(s).
230 """
237 """
231 block = block if block is not None else self.block
238 block = block if block is not None else self.block
232 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
239 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
233
240
234 def queue_status(self, verbose=False):
241 def queue_status(self, verbose=False):
235 """Fetch the Queue status of my engines"""
242 """Fetch the Queue status of my engines"""
236 return self.client.queue_status(targets=self.targets, verbose=verbose)
243 return self.client.queue_status(targets=self.targets, verbose=verbose)
237
244
238 def purge_results(self, msg_ids=[], targets=[]):
245 def purge_results(self, msg_ids=[], targets=[]):
239 """Instruct the controller to forget specific results."""
246 """Instruct the controller to forget specific results."""
240 if targets is None or targets == 'all':
247 if targets is None or targets == 'all':
241 targets = self.targets
248 targets = self.targets
242 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
249 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
250
251 #-------------------------------------------------------------------
252 # Decorators
253 #-------------------------------------------------------------------
254 def parallel(self, bound=True, block=True):
255 """Decorator for making a ParallelFunction"""
256 return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
257
258 def remote(self, bound=True, block=True):
259 """Decorator for making a RemoteFunction"""
260 return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
243
261
244
262
245
263
246 class DirectView(View):
264 class DirectView(View):
247 """Direct Multiplexer View of one or more engines.
265 """Direct Multiplexer View of one or more engines.
248
266
249 These are created via indexed access to a client:
267 These are created via indexed access to a client:
250
268
251 >>> dv_1 = client[1]
269 >>> dv_1 = client[1]
252 >>> dv_all = client[:]
270 >>> dv_all = client[:]
253 >>> dv_even = client[::2]
271 >>> dv_even = client[::2]
254 >>> dv_some = client[1:3]
272 >>> dv_some = client[1:3]
255
273
256 This object provides dictionary access to engine namespaces:
274 This object provides dictionary access to engine namespaces:
257
275
258 # push a=5:
276 # push a=5:
259 >>> dv['a'] = 5
277 >>> dv['a'] = 5
260 # pull 'foo':
278 # pull 'foo':
261 >>> db['foo']
279 >>> db['foo']
262
280
263 """
281 """
264
282
283 def __init__(self, client=None, targets=None):
284 super(DirectView, self).__init__(client=client, targets=targets)
285 self._balanced = False
286
287 @spin_after
288 @save_ids
289 def map(self, f, *sequences, **kwargs):
290 """Parallel version of builtin `map`, using this View's `targets`.
291
292 There will be one task per target, so work will be chunked
293 if the sequences are longer than `targets`.
294
295 Results can be iterated as they are ready, but will become available in chunks.
296
297 Parameters
298 ----------
299
300 f : callable
301 function to be mapped
302 *sequences: one or more sequences of matching length
303 the sequences to be distributed and passed to `f`
304 block : bool
305 whether to wait for the result or not [default self.block]
306 bound : bool
307 whether to wait for the result or not [default self.bound]
308
309 Returns
310 -------
311
312 if block=False:
313 AsyncMapResult
314 An object like AsyncResult, but which reassembles the sequence of results
315 into a single list. AsyncMapResults can be iterated through before all
316 results are complete.
317 else:
318 the result of map(f,*sequences)
319 """
320
321 block = kwargs.get('block', self.block)
322 bound = kwargs.get('bound', self.bound)
323 for k in kwargs.keys():
324 if k not in ['block', 'bound']:
325 raise TypeError("invalid keyword arg, %r"%k)
326
327 assert len(sequences) > 0, "must have some sequences to map onto!"
328 pf = ParallelFunction(self.client, f, block=block,
329 bound=bound, targets=self.targets, balanced=False)
330 return pf.map(*sequences)
331
332 def map_async(self, f, *sequences, **kwargs):
333 """Parallel version of builtin `map`, using this view's engines."""
334 if 'block' in kwargs:
335 raise TypeError("map_async doesn't take a `block` keyword argument.")
336 kwargs['block'] = True
337 return self.map(f,*sequences,**kwargs)
338
265 @sync_results
339 @sync_results
266 @save_ids
340 @save_ids
267 def execute(self, code, block=True):
341 def execute(self, code, block=True):
268 """execute some code on my targets."""
342 """execute some code on my targets."""
269 return self.client.execute(code, block=block, targets=self.targets)
343 return self.client.execute(code, block=block, targets=self.targets)
270
344
271 def update(self, ns):
345 def update(self, ns):
272 """update remote namespace with dict `ns`"""
346 """update remote namespace with dict `ns`"""
273 return self.client.push(ns, targets=self.targets, block=self.block)
347 return self.client.push(ns, targets=self.targets, block=self.block)
274
348
275 push = update
349 push = update
276
350
277 def get(self, key_s):
351 def get(self, key_s):
278 """get object(s) by `key_s` from remote namespace
352 """get object(s) by `key_s` from remote namespace
279 will return one object if it is a key.
353 will return one object if it is a key.
280 It also takes a list of keys, and will return a list of objects."""
354 It also takes a list of keys, and will return a list of objects."""
281 # block = block if block is not None else self.block
355 # block = block if block is not None else self.block
282 return self.client.pull(key_s, block=True, targets=self.targets)
356 return self.client.pull(key_s, block=True, targets=self.targets)
283
357
284 @sync_results
358 @sync_results
285 @save_ids
359 @save_ids
286 def pull(self, key_s, block=True):
360 def pull(self, key_s, block=True):
287 """get object(s) by `key_s` from remote namespace
361 """get object(s) by `key_s` from remote namespace
288 will return one object if it is a key.
362 will return one object if it is a key.
289 It also takes a list of keys, and will return a list of objects."""
363 It also takes a list of keys, and will return a list of objects."""
290 block = block if block is not None else self.block
364 block = block if block is not None else self.block
291 return self.client.pull(key_s, block=block, targets=self.targets)
365 return self.client.pull(key_s, block=block, targets=self.targets)
292
366
293 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
367 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
294 """
368 """
295 Partition a Python sequence and send the partitions to a set of engines.
369 Partition a Python sequence and send the partitions to a set of engines.
296 """
370 """
297 block = block if block is not None else self.block
371 block = block if block is not None else self.block
298 targets = targets if targets is not None else self.targets
372 targets = targets if targets is not None else self.targets
299
373
300 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
374 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
301 targets=targets, block=block)
375 targets=targets, block=block)
302
376
303 @sync_results
377 @sync_results
304 @save_ids
378 @save_ids
305 def gather(self, key, dist='b', targets=None, block=None):
379 def gather(self, key, dist='b', targets=None, block=None):
306 """
380 """
307 Gather a partitioned sequence on a set of engines as a single local seq.
381 Gather a partitioned sequence on a set of engines as a single local seq.
308 """
382 """
309 block = block if block is not None else self.block
383 block = block if block is not None else self.block
310 targets = targets if targets is not None else self.targets
384 targets = targets if targets is not None else self.targets
311
385
312 return self.client.gather(key, dist=dist, targets=targets, block=block)
386 return self.client.gather(key, dist=dist, targets=targets, block=block)
313
387
314 def __getitem__(self, key):
388 def __getitem__(self, key):
315 return self.get(key)
389 return self.get(key)
316
390
317 def __setitem__(self,key, value):
391 def __setitem__(self,key, value):
318 self.update({key:value})
392 self.update({key:value})
319
393
320 def clear(self, block=False):
394 def clear(self, block=False):
321 """Clear the remote namespaces on my engines."""
395 """Clear the remote namespaces on my engines."""
322 block = block if block is not None else self.block
396 block = block if block is not None else self.block
323 return self.client.clear(targets=self.targets, block=block)
397 return self.client.clear(targets=self.targets, block=block)
324
398
325 def kill(self, block=True):
399 def kill(self, block=True):
326 """Kill my engines."""
400 """Kill my engines."""
327 block = block if block is not None else self.block
401 block = block if block is not None else self.block
328 return self.client.kill(targets=self.targets, block=block)
402 return self.client.kill(targets=self.targets, block=block)
329
403
330 #----------------------------------------
404 #----------------------------------------
331 # activate for %px,%autopx magics
405 # activate for %px,%autopx magics
332 #----------------------------------------
406 #----------------------------------------
333 def activate(self):
407 def activate(self):
334 """Make this `View` active for parallel magic commands.
408 """Make this `View` active for parallel magic commands.
335
409
336 IPython has a magic command syntax to work with `MultiEngineClient` objects.
410 IPython has a magic command syntax to work with `MultiEngineClient` objects.
337 In a given IPython session there is a single active one. While
411 In a given IPython session there is a single active one. While
338 there can be many `Views` created and used by the user,
412 there can be many `Views` created and used by the user,
339 there is only one active one. The active `View` is used whenever
413 there is only one active one. The active `View` is used whenever
340 the magic commands %px and %autopx are used.
414 the magic commands %px and %autopx are used.
341
415
342 The activate() method is called on a given `View` to make it
416 The activate() method is called on a given `View` to make it
343 active. Once this has been done, the magic commands can be used.
417 active. Once this has been done, the magic commands can be used.
344 """
418 """
345
419
346 try:
420 try:
347 # This is injected into __builtins__.
421 # This is injected into __builtins__.
348 ip = get_ipython()
422 ip = get_ipython()
349 except NameError:
423 except NameError:
350 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
424 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
351 else:
425 else:
352 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
426 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
353 if pmagic is not None:
427 if pmagic is not None:
354 pmagic.active_multiengine_client = self
428 pmagic.active_multiengine_client = self
355 else:
429 else:
356 print "You must first load the parallelmagic extension " \
430 print "You must first load the parallelmagic extension " \
357 "by doing '%load_ext parallelmagic'"
431 "by doing '%load_ext parallelmagic'"
358
432
359
433
360 class LoadBalancedView(View):
434 class LoadBalancedView(View):
361 """An engine-agnostic View that only executes via the Task queue.
435 """An load-balancing View that only executes via the Task scheduler.
362
436
363 Typically created via:
437 Load-balanced views can be created with the client's `view` method:
364
438
365 >>> v = client[None]
439 >>> v = client.view(balanced=True)
366 <LoadBalancedView None>
367
440
368 but can also be created with:
441 or targets can be specified, to restrict the potential destinations:
369
442
370 >>> v = client.view([1,3],balanced=True)
443 >>> v = client.view([1,3],balanced=True)
371
444
372 which would restrict loadbalancing to between engines 1 and 3.
445 which would restrict loadbalancing to between engines 1 and 3.
373
446
374 """
447 """
375
448
376 _apply_name = 'apply_balanced'
449 _apply_name = 'apply_balanced'
377 _default_names = ['targets', 'block', 'bound', 'follow', 'after', 'timeout']
450 _default_names = ['block', 'bound', 'follow', 'after', 'timeout']
378
451
379 def __init__(self, client, targets=None):
452 def __init__(self, client=None, targets=None):
380 super(LoadBalancedView, self).__init__(client, targets)
453 super(LoadBalancedView, self).__init__(client=client, targets=targets)
381 self._ntargets = 1
454 self._ntargets = 1
382 self._apply_name = 'apply_balanced'
455
456 def _validate_dependency(self, dep):
457 """validate a dependency.
458
459 For use in `set_flags`.
460 """
461 if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
462 return True
463 elif isinstance(dep, (list,set, tuple)):
464 for d in dep:
465 if not isinstance(d, str, AsyncResult):
466 return False
467 elif isinstance(dep, dict):
468 if set(dep.keys()) != set(Dependency().as_dict().keys()):
469 return False
470 if not isinstance(dep['msg_ids'], list):
471 return False
472 for d in dep['msg_ids']:
473 if not isinstance(d, str):
474 return False
475 else:
476 return False
477
478 def set_flags(self, **kwargs):
479 """set my attribute flags by keyword.
480
481 A View is a wrapper for the Client's apply method, but
482 with attributes that specify keyword arguments, those attributes
483 can be set by keyword argument with this method.
484
485 Parameters
486 ----------
487
488 block : bool
489 whether to wait for results
490 bound : bool
491 whether to use the engine's namespace
492 follow : Dependency, list, msg_id, AsyncResult
493 the location dependencies of tasks
494 after : Dependency, list, msg_id, AsyncResult
495 the time dependencies of tasks
496 timeout : int,None
497 the timeout to be used for tasks
498 """
499
500 super(LoadBalancedView, self).set_flags(**kwargs)
501 for name in ('follow', 'after'):
502 if name in kwargs:
503 value = kwargs[name]
504 if self._validate_dependency(value):
505 setattr(self, name, value)
506 else:
507 raise ValueError("Invalid dependency: %r"%value)
508 if 'timeout' in kwargs:
509 t = kwargs['timeout']
510 if not isinstance(t, (int, long, float, None)):
511 raise TypeError("Invalid type for timeout: %r"%type(t))
512 if t is not None:
513 if t < 0:
514 raise ValueError("Invalid timeout: %s"%t)
515 self.timeout = t
516
517 @spin_after
518 @save_ids
519 def map(self, f, *sequences, **kwargs):
520 """Parallel version of builtin `map`, load-balanced by this View.
521
522 Each element will be a separate task, and will be load-balanced. This
523 lets individual elements be available for iteration as soon as they arrive.
524
525 Parameters
526 ----------
527
528 f : callable
529 function to be mapped
530 *sequences: one or more sequences of matching length
531 the sequences to be distributed and passed to `f`
532 block : bool
533 whether to wait for the result or not [default self.block]
534 bound : bool
535 whether to use the engine's namespace
536
537 Returns
538 -------
539
540 if block=False:
541 AsyncMapResult
542 An object like AsyncResult, but which reassembles the sequence of results
543 into a single list. AsyncMapResults can be iterated through before all
544 results are complete.
545 else:
546 the result of map(f,*sequences)
547
548 """
549
550 block = kwargs.get('block', self.block)
551 bound = kwargs.get('bound', self.bound)
552
553 assert len(sequences) > 0, "must have some sequences to map onto!"
554
555 pf = ParallelFunction(self.client, f, block=block, bound=bound,
556 targets=self.targets, balanced=True)
557 return pf.map(*sequences)
558
559 def map_async(self, f, *sequences, **kwargs):
560 """Parallel version of builtin `map`, using this view's engines.
561
562 This is equivalent to map(...block=False)
563
564 See `map` for details.
565 """
566
567 if 'block' in kwargs:
568 raise TypeError("map_async doesn't take a `block` keyword argument.")
569 kwargs['block'] = True
570 return self.map(f,*sequences,**kwargs)
571
383
572
@@ -1,818 +1,796 b''
1 .. _parallelmultiengine:
1 .. _parallelmultiengine:
2
2
3 ==========================
3 ==========================
4 IPython's Direct interface
4 IPython's Direct interface
5 ==========================
5 ==========================
6
6
7 The direct, or multiengine, interface represents one possible way of working with a set of
7 The direct, or multiengine, interface represents one possible way of working with a set of
8 IPython engines. The basic idea behind the multiengine interface is that the
8 IPython engines. The basic idea behind the multiengine interface is that the
9 capabilities of each engine are directly and explicitly exposed to the user.
9 capabilities of each engine are directly and explicitly exposed to the user.
10 Thus, in the multiengine interface, each engine is given an id that is used to
10 Thus, in the multiengine interface, each engine is given an id that is used to
11 identify the engine and give it work to do. This interface is very intuitive
11 identify the engine and give it work to do. This interface is very intuitive
12 and is designed with interactive usage in mind, and is thus the best place for
12 and is designed with interactive usage in mind, and is thus the best place for
13 new users of IPython to begin.
13 new users of IPython to begin.
14
14
15 Starting the IPython controller and engines
15 Starting the IPython controller and engines
16 ===========================================
16 ===========================================
17
17
18 To follow along with this tutorial, you will need to start the IPython
18 To follow along with this tutorial, you will need to start the IPython
19 controller and four IPython engines. The simplest way of doing this is to use
19 controller and four IPython engines. The simplest way of doing this is to use
20 the :command:`ipclusterz` command::
20 the :command:`ipclusterz` command::
21
21
22 $ ipclusterz start -n 4
22 $ ipclusterz start -n 4
23
23
24 For more detailed information about starting the controller and engines, see
24 For more detailed information about starting the controller and engines, see
25 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
25 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
26
26
27 Creating a ``Client`` instance
27 Creating a ``Client`` instance
28 ==============================
28 ==============================
29
29
30 The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
30 The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
31 module and then create a :class:`.Client` instance:
31 module and then create a :class:`.Client` instance:
32
32
33 .. sourcecode:: ipython
33 .. sourcecode:: ipython
34
34
35 In [1]: from IPython.zmq.parallel import client
35 In [1]: from IPython.zmq.parallel import client
36
36
37 In [2]: rc = client.Client()
37 In [2]: rc = client.Client()
38
38
39 This form assumes that the default connection information (stored in
39 This form assumes that the default connection information (stored in
40 :file:`ipcontroller-client.json` found in `~/.ipython/clusterz_default/security`) is
40 :file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/clusterz_default/security`) is
41 accurate. If the controller was started on a remote machine, you must copy that connection
41 accurate. If the controller was started on a remote machine, you must copy that connection
42 file to the client machine, or enter its contents as arguments to the Client constructor:
42 file to the client machine, or enter its contents as arguments to the Client constructor:
43
43
44 .. sourcecode:: ipython
44 .. sourcecode:: ipython
45
45
46 # If you have copied the json connector file from the controller:
46 # If you have copied the json connector file from the controller:
47 In [2]: rc = client.Client('/path/to/ipcontroller-client.json')
47 In [2]: rc = client.Client('/path/to/ipcontroller-client.json')
48 # for a remote controller at 10.0.1.5, visible from my.server.com:
48 # or for a remote controller at 10.0.1.5, visible from my.server.com:
49 In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com')
49 In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com')
50
50
51
51
52 To make sure there are engines connected to the controller, use can get a list
52 To make sure there are engines connected to the controller, users can get a list
53 of engine ids:
53 of engine ids:
54
54
55 .. sourcecode:: ipython
55 .. sourcecode:: ipython
56
56
57 In [3]: rc.ids
57 In [3]: rc.ids
58 Out[3]: set([0, 1, 2, 3])
58 Out[3]: [0, 1, 2, 3]
59
59
60 Here we see that there are four engines ready to do work for us.
60 Here we see that there are four engines ready to do work for us.
61
61
62 Quick and easy parallelism
62 Quick and easy parallelism
63 ==========================
63 ==========================
64
64
65 In many cases, you simply want to apply a Python function to a sequence of
65 In many cases, you simply want to apply a Python function to a sequence of
66 objects, but *in parallel*. The client interface provides a simple way
66 objects, but *in parallel*. The client interface provides a simple way
67 of accomplishing this: using the builtin :func:`map` and the ``@remote``
67 of accomplishing this: using the builtin :func:`map` and the ``@remote``
68 function decorator, or the client's :meth:`map` method.
68 function decorator, or the client's :meth:`map` method.
69
69
70 Parallel map
70 Parallel map
71 ------------
71 ------------
72
72
73 Python's builtin :func:`map` functions allows a function to be applied to a
73 Python's builtin :func:`map` functions allows a function to be applied to a
74 sequence element-by-element. This type of code is typically trivial to
74 sequence element-by-element. This type of code is typically trivial to
75 parallelize. In fact, since IPython's interface is all about functions anyway,
75 parallelize. In fact, since IPython's interface is all about functions anyway,
76 you can just use the builtin :func:`map`, or a client's :meth:`map` method:
76 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
77 DirectView's :meth:`map` method:
77
78
78 .. sourcecode:: ipython
79 .. sourcecode:: ipython
79
80
80 In [62]: serial_result = map(lambda x:x**10, range(32))
81 In [62]: serial_result = map(lambda x:x**10, range(32))
81
82
82 In [66]: parallel_result = rc.map(lambda x: x**10, range(32))
83 In [66]: parallel_result = rc[:].map(lambda x: x**10, range(32))
83
84
84 In [67]: serial_result==parallel_result
85 In [67]: serial_result==parallel_result.get()
85 Out[67]: True
86 Out[67]: True
86
87
87
88
88 .. note::
89 .. note::
89
90
90 The client's own version of :meth:`map` or that of :class:`.DirectView` do
91 The :class:`DirectView`'s version of :meth:`map` does
91 not do any load balancing. For a load balanced version, use a
92 not do any load balancing. For a load balanced version, use a
92 :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
93 :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
93 `targets=None`.
94 `balanced=True`.
94
95
95 .. seealso::
96 .. seealso::
96
97
97 :meth:`map` is implemented via :class:`.ParallelFunction`.
98 :meth:`map` is implemented via :class:`.ParallelFunction`.
98
99
99 Remote function decorator
100 Remote function decorator
100 -------------------------
101 -------------------------
101
102
102 Remote functions are just like normal functions, but when they are called,
103 Remote functions are just like normal functions, but when they are called,
103 they execute on one or more engines, rather than locally. IPython provides
104 they execute on one or more engines, rather than locally. IPython provides
104 some decorators:
105 some decorators:
105
106
106 .. sourcecode:: ipython
107 .. sourcecode:: ipython
107
108
108 In [10]: @rc.remote(block=True)
109 In [10]: @rc.remote(block=True, targets=0)
109 ....: def f(x):
110 ....: def f(x):
110 ....: return 10.0*x**4
111 ....: return 10.0*x**4
111 ....:
112 ....:
112
113
113 In [11]: map(f, range(32)) # this is done in parallel
114 In [11]: map(f, range(32)) # this is done on engine 0
114 Out[11]: [0.0,10.0,160.0,...]
115 Out[11]: [0.0,10.0,160.0,...]
115
116
116 See the docstring for the :func:`parallel` and :func:`remote` decorators for
117 .. seealso::
117 options.
118
119 See the docstring for the :func:`parallel` and :func:`remote` decorators for
120 options.
118
121
119 Calling Python functions
122 Calling Python functions
120 ========================
123 ========================
121
124
122 The most basic type of operation that can be performed on the engines is to
125 The most basic type of operation that can be performed on the engines is to
123 execute Python code or call Python functions. Executing Python code can be
126 execute Python code or call Python functions. Executing Python code can be
124 done in blocking or non-blocking mode (non-blocking is default) using the
127 done in blocking or non-blocking mode (non-blocking is default) using the
125 :meth:`execute` method, and calling functions can be done via the
128 :meth:`execute` method, and calling functions can be done via the
126 :meth:`.View.apply` method.
129 :meth:`.View.apply` method.
127
130
128 apply
131 apply
129 -----
132 -----
130
133
131 The main method for doing remote execution (in fact, all methods that
134 The main method for doing remote execution (in fact, all methods that
132 communicate with the engines are built on top of it), is :meth:`Client.apply`.
135 communicate with the engines are built on top of it), is :meth:`Client.apply`.
133 Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``,
136 Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``,
134 which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients`
137 which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients`
135 require some more options, they cannot easily provide this interface.
138 require some more options, they cannot easily provide this interface.
136 Instead, they provide the signature::
139 Instead, they provide the signature::
137
140
138 c.apply(f, args=None, kwargs=None, bound=True, block=None, targets=None,
141 c.apply(f, args=None, kwargs=None, bound=True, block=None, targets=None,
139 after=None, follow=None, timeout=None)
142 after=None, follow=None, timeout=None)
140
143
141 In order to provide the nicer interface, we have :class:`View` classes, which wrap
144 In order to provide the nicer interface, we have :class:`View` classes, which wrap
142 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
145 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
143 the extra arguments. For instance, performing index-access on a client creates a
146 the extra arguments. For instance, performing index-access on a client creates a
144 :class:`.LoadBalancedView`.
147 :class:`.DirectView`.
145
148
146 .. sourcecode:: ipython
149 .. sourcecode:: ipython
147
150
148 In [4]: view = rc[1:3]
151 In [4]: view = rc[1:3]
149 Out[4]: <DirectView [1, 2]>
152 Out[4]: <DirectView [1, 2]>
150
153
151 In [5]: view.apply<tab>
154 In [5]: view.apply<tab>
152 view.apply view.apply_async view.apply_async_bound view.apply_bound view.apply_sync view.apply_sync_bound
155 view.apply view.apply_async view.apply_async_bound view.apply_bound view.apply_sync view.apply_sync_bound
153
156
154 A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound`
157 A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound`
155 and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
158 and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
156 methods allow specifying `bound` and `block` via the different methods.
159 methods allow specifying `bound` and `block` via the different methods.
157
160
158 ================== ========== ==========
161 ================== ========== ==========
159 method block bound
162 method block bound
160 ================== ========== ==========
163 ================== ========== ==========
161 apply self.block self.bound
164 apply self.block self.bound
162 apply_sync True False
165 apply_sync True False
163 apply_async False False
166 apply_async False False
164 apply_sync_bound True True
167 apply_sync_bound True True
165 apply_async_bound False True
168 apply_async_bound False True
166 ================== ========== ==========
169 ================== ========== ==========
167
170
168 For explanation of these values, read on.
171 For explanation of these values, read on.
169
172
170 Blocking execution
173 Blocking execution
171 ------------------
174 ------------------
172
175
173 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
176 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
174 these examples) submits the command to the controller, which places the
177 these examples) submits the command to the controller, which places the
175 command in the engines' queues for execution. The :meth:`apply` call then
178 command in the engines' queues for execution. The :meth:`apply` call then
176 blocks until the engines are done executing the command:
179 blocks until the engines are done executing the command:
177
180
178 .. sourcecode:: ipython
181 .. sourcecode:: ipython
179
182
180 In [2]: rc.block=True
183 In [2]: rc.block=True
181 In [3]: dview = rc[:] # A DirectView of all engines
184 In [3]: dview = rc[:] # A DirectView of all engines
182 In [4]: dview['a'] = 5
185 In [4]: dview['a'] = 5
183
186
184 In [5]: dview['b'] = 10
187 In [5]: dview['b'] = 10
185
188
186 In [6]: dview.apply_bound(lambda x: a+b+x, 27)
189 In [6]: dview.apply_bound(lambda x: a+b+x, 27)
187 Out[6]: [42, 42, 42, 42]
190 Out[6]: [42, 42, 42, 42]
188
191
189 Python commands can be executed on specific engines by calling execute using
192 Python commands can be executed on specific engines by calling execute using
190 the ``targets`` keyword argument, or creating a :class:`DirectView` instance
193 the ``targets`` keyword argument, or creating a :class:`DirectView` instance
191 by index-access to the client:
194 by index-access to the client:
192
195
193 .. sourcecode:: ipython
196 .. sourcecode:: ipython
194
197
195 In [6]: rc[::2].execute('c=a+b') # shorthand for rc.execute('c=a+b',targets=[0,2])
198 In [6]: rc[::2].execute('c=a+b') # shorthand for rc.execute('c=a+b',targets=[0,2])
196
199
197 In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
200 In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
198
201
199 In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all')
202 In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all')
200 Out[8]: [15, -5, 15, -5]
203 Out[8]: [15, -5, 15, -5]
201
204
202 .. note::
205 .. note::
203
206
204 Note that every call to ``rc.<meth>(...,targets=x)`` can be made via
207 Note that every call to ``rc.<meth>(...,targets=x)`` can be made via
205 ``rc[<x>].<meth>(...)``, which constructs a View object. The only place
208 ``rc[<x>].<meth>(...)``, which constructs a View object. The only place
206 where this differs in in :meth:`apply`. The :class:`Client` takes many
209 where this differs in in :meth:`apply`. The :class:`Client` takes many
207 arguments to apply, so it requires `args` and `kwargs` to be passed as
210 arguments to apply, so it requires `args` and `kwargs` to be passed as
208 individual arguments. Extended options such as `bound`,`targets`, and
211 individual arguments. Extended options such as `bound`,`targets`, and
209 `block` are controlled by the attributes of the :class:`View` objects, so
212 `block` are controlled by the attributes of the :class:`View` objects, so
210 they can provide the much more convenient
213 they can provide the much more convenient
211 :meth:`View.apply(f,*args,**kwargs)`, which simply calls
214 :meth:`View.apply(f,*args,**kwargs)`, which simply calls
212 ``f(*args,**kwargs)`` remotely.
215 ``f(*args,**kwargs)`` remotely.
213
216
214 This example also shows one of the most important things about the IPython
217 This example also shows one of the most important things about the IPython
215 engines: they have a persistent user namespaces. The :meth:`apply` method can
218 engines: they have a persistent user namespaces. The :meth:`apply` method can
216 be run in either a bound or unbound way. The default for a View is to be
219 be run in either a bound or unbound way. The default for a View is to be
217 unbound, unless called by the :meth:`apply_bound` method:
220 unbound, unless called by the :meth:`apply_bound` method:
218
221
219 .. sourcecode:: ipython
222 .. sourcecode:: ipython
220
223
221 In [9]: rc[:]['b'] = 5 # assign b to 5 everywhere
224 In [9]: dview['b'] = 5 # assign b to 5 everywhere
222
225
223 In [10]: v0 = rc[0]
226 In [10]: v0 = rc[0]
224
227
225 In [12]: v0.apply_bound(lambda : b)
228 In [12]: v0.apply_bound(lambda : b)
226 Out[12]: 5
229 Out[12]: 5
227
230
228 In [13]: v0.apply(lambda : b)
231 In [13]: v0.apply(lambda : b)
229 ---------------------------------------------------------------------------
232 ---------------------------------------------------------------------------
230 RemoteError Traceback (most recent call last)
233 RemoteError Traceback (most recent call last)
231 /home/you/<ipython-input-34-21a468eb10f0> in <module>()
234 /home/you/<ipython-input-34-21a468eb10f0> in <module>()
232 ----> 1 v0.apply(lambda : b)
235 ----> 1 v0.apply(lambda : b)
233 ...
236 ...
234 RemoteError: NameError(global name 'b' is not defined)
237 RemoteError: NameError(global name 'b' is not defined)
235 Traceback (most recent call last):
238 Traceback (most recent call last):
236 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 294, in apply_request
239 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 294, in apply_request
237 exec code in working, working
240 exec code in working, working
238 File "<string>", line 1, in <module>
241 File "<string>", line 1, in <module>
239 File "<ipython-input-34-21a468eb10f0>", line 1, in <lambda>
242 File "<ipython-input-34-21a468eb10f0>", line 1, in <lambda>
240 NameError: global name 'b' is not defined
243 NameError: global name 'b' is not defined
241
244
242
245
243 Specifically, `bound=True` specifies that the engine's namespace is to be used
246 Specifically, `bound=True` specifies that the engine's namespace is to be used
244 for execution, and `bound=False` specifies that the engine's namespace is not
247 for execution, and `bound=False` specifies that the engine's namespace is not
245 to be used (hence, 'b' is undefined during unbound execution, since the
248 to be used (hence, 'b' is undefined during unbound execution, since the
246 function is called in an empty namespace). Unbound execution is often useful
249 function is called in an empty namespace). Unbound execution is often useful
247 for large numbers of atomic tasks, which prevents bloating the engine's
250 for large numbers of atomic tasks, which prevents bloating the engine's
248 memory, while bound execution lets you build on your previous work.
251 memory, while bound execution lets you build on your previous work.
249
252
250
253
251 Non-blocking execution
254 Non-blocking execution
252 ----------------------
255 ----------------------
253
256
254 In non-blocking mode, :meth:`apply` submits the command to be executed and
257 In non-blocking mode, :meth:`apply` submits the command to be executed and
255 then returns a :class:`AsyncResult` object immediately. The
258 then returns a :class:`AsyncResult` object immediately. The
256 :class:`AsyncResult` object gives you a way of getting a result at a later
259 :class:`AsyncResult` object gives you a way of getting a result at a later
257 time through its :meth:`get` method.
260 time through its :meth:`get` method.
258
261
259 .. Note::
262 .. Note::
260
263
261 The :class:`AsyncResult` object provides a superset of the interface in
264 The :class:`AsyncResult` object provides a superset of the interface in
262 :py:class:`multiprocessing.pool.AsyncResult`. See the
265 :py:class:`multiprocessing.pool.AsyncResult`. See the
263 `official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_
266 `official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_
264 for more.
267 for more.
265
268
266
269
267 This allows you to quickly submit long running commands without blocking your
270 This allows you to quickly submit long running commands without blocking your
268 local Python/IPython session:
271 local Python/IPython session:
269
272
270 .. sourcecode:: ipython
273 .. sourcecode:: ipython
271
274
272 # define our function
275 # define our function
273 In [6]: def wait(t):
276 In [6]: def wait(t):
274 ...: import time
277 ...: import time
275 ...: tic = time.time()
278 ...: tic = time.time()
276 ...: time.sleep(t)
279 ...: time.sleep(t)
277 ...: return time.time()-tic
280 ...: return time.time()-tic
278
281
279 # In non-blocking mode
282 # In non-blocking mode
280 In [7]: pr = rc[:].apply_async(wait, 2)
283 In [7]: pr = dview.apply_async(wait, 2)
281
284
282 # Now block for the result
285 # Now block for the result
283 In [8]: pr.get()
286 In [8]: pr.get()
284 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
287 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
285
288
286 # Again in non-blocking mode
289 # Again in non-blocking mode
287 In [9]: pr = rc[:].apply_async(wait, 10)
290 In [9]: pr = dview.apply_async(wait, 10)
288
291
289 # Poll to see if the result is ready
292 # Poll to see if the result is ready
290 In [10]: pr.ready()
293 In [10]: pr.ready()
291 Out[10]: False
294 Out[10]: False
292
295
293 # ask for the result, but wait a maximum of 1 second:
296 # ask for the result, but wait a maximum of 1 second:
294 In [45]: pr.get(1)
297 In [45]: pr.get(1)
295 ---------------------------------------------------------------------------
298 ---------------------------------------------------------------------------
296 TimeoutError Traceback (most recent call last)
299 TimeoutError Traceback (most recent call last)
297 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
300 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
298 ----> 1 pr.get(1)
301 ----> 1 pr.get(1)
299
302
300 /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
303 /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
301 62 raise self._exception
304 62 raise self._exception
302 63 else:
305 63 else:
303 ---> 64 raise error.TimeoutError("Result not ready.")
306 ---> 64 raise error.TimeoutError("Result not ready.")
304 65
307 65
305 66 def ready(self):
308 66 def ready(self):
306
309
307 TimeoutError: Result not ready.
310 TimeoutError: Result not ready.
308
311
309 .. Note::
312 .. Note::
310
313
311 Note the import inside the function. This is a common model, to ensure
314 Note the import inside the function. This is a common model, to ensure
312 that the appropriate modules are imported where the task is run.
315 that the appropriate modules are imported where the task is run.
313
316
314 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
317 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
315 are done. For this, there is a the method :meth:`barrier`. This method takes a
318 are done. For this, there is a the method :meth:`barrier`. This method takes a
316 tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the
319 tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the
317 associated results are ready:
320 associated results are ready:
318
321
319 .. sourcecode:: ipython
322 .. sourcecode:: ipython
320
323
321 In [72]: rc.block=False
324 In [72]: rc.block=False
322
325
323 # A trivial list of AsyncResults objects
326 # A trivial list of AsyncResults objects
324 In [73]: pr_list = [rc[:].apply_async(wait, 3) for i in range(10)]
327 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
325
328
326 # Wait until all of them are done
329 # Wait until all of them are done
327 In [74]: rc.barrier(pr_list)
330 In [74]: rc.barrier(pr_list)
328
331
329 # Then, their results are ready using get() or the `.r` attribute
332 # Then, their results are ready using get() or the `.r` attribute
330 In [75]: pr_list[0].get()
333 In [75]: pr_list[0].get()
331 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
334 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
332
335
333
336
334
337
335 The ``block`` and ``targets`` keyword arguments and attributes
338 The ``block`` keyword argument and attributes
336 --------------------------------------------------------------
339 ---------------------------------------------
337
338 .. warning::
339
340 This is different now, I haven't updated this section.
341 -MinRK
342
340
343 Most methods(like :meth:`apply`) accept
341 Most methods(like :meth:`apply`) accept
344 ``block`` and ``targets`` as keyword arguments. As we have seen above, these
342 ``block`` as a keyword argument. As we have seen above, these
345 keyword arguments control the blocking mode and which engines the command is
343 keyword arguments control the blocking mode . The :class:`Client` class also has
346 applied to. The :class:`Client` class also has :attr:`block` and
344 a :attr:`block` attribute that controls the default behavior when the keyword
347 :attr:`targets` attributes that control the default behavior when the keyword
345 argument is not provided. Thus the following logic is used for :attr:`block`:
348 arguments are not provided. Thus the following logic is used for :attr:`block`
349 and :attr:`targets`:
350
346
351 * If no keyword argument is provided, the instance attributes are used.
347 * If no keyword argument is provided, the instance attributes are used.
352 * Keyword argument, if provided override the instance attributes.
348 * Keyword argument, if provided override the instance attributes for
349 the duration of a single call.
353
350
354 The following examples demonstrate how to use the instance attributes:
351 The following examples demonstrate how to use the instance attributes:
355
352
356 .. sourcecode:: ipython
353 .. sourcecode:: ipython
357
354
358 In [16]: rc.targets = [0,2]
359
360 In [17]: rc.block = False
355 In [17]: rc.block = False
361
356
362 In [18]: pr = rc.execute('a=5')
357 In [18]: ar = rc.apply(lambda : 10, targets=[0,2])
363
364 In [19]: pr.r
365 Out[19]:
366 <Results List>
367 [0] In [6]: a=5
368 [2] In [6]: a=5
369
358
370 # Note targets='all' means all engines
359 In [19]: ar.get()
371 In [20]: rc.targets = 'all'
360 Out[19]: [10,10]
372
361
373 In [21]: rc.block = True
362 In [21]: rc.block = True
374
363
375 In [22]: rc.execute('b=10; print b')
364 # Note targets='all' means all engines
376 Out[22]:
365 In [22]: rc.apply(lambda : 42, targets='all')
377 <Results List>
366 Out[22]: [42, 42, 42, 42]
378 [0] In [7]: b=10; print b
379 [0] Out[7]: 10
380
381 [1] In [6]: b=10; print b
382 [1] Out[6]: 10
383
384 [2] In [7]: b=10; print b
385 [2] Out[7]: 10
386
387 [3] In [6]: b=10; print b
388 [3] Out[6]: 10
389
367
390 The :attr:`block` and :attr:`targets` instance attributes also determine the
368 The :attr:`block` and :attr:`targets` instance attributes of the
391 behavior of the parallel magic commands.
369 :class:`.DirectView` also determine the behavior of the parallel magic commands.
392
370
393
371
394 Parallel magic commands
372 Parallel magic commands
395 -----------------------
373 -----------------------
396
374
397 .. warning::
375 .. warning::
398
376
399 The magics have not been changed to work with the zeromq system. ``%px``
377 The magics have not been changed to work with the zeromq system. ``%px``
400 and ``%autopx`` do work, but ``%result`` does not. %px and %autopx *do
378 and ``%autopx`` do work, but ``%result`` does not. %px and %autopx *do
401 not* print stdin/out.
379 not* print stdin/out.
402
380
403 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
381 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
404 that make it more pleasant to execute Python commands on the engines
382 that make it more pleasant to execute Python commands on the engines
405 interactively. These are simply shortcuts to :meth:`execute` and
383 interactively. These are simply shortcuts to :meth:`execute` and
406 :meth:`get_result`. The ``%px`` magic executes a single Python command on the
384 :meth:`get_result`. The ``%px`` magic executes a single Python command on the
407 engines specified by the :attr:`targets` attribute of the
385 engines specified by the :attr:`targets` attribute of the
408 :class:`MultiEngineClient` instance (by default this is ``'all'``):
386 :class:`MultiEngineClient` instance (by default this is ``'all'``):
409
387
410 .. sourcecode:: ipython
388 .. sourcecode:: ipython
411
389
412 # Create a DirectView for all targets
390 # Create a DirectView for all targets
413 In [22]: dv = rc[:]
391 In [22]: dv = rc[:]
414
392
415 # Make this DirectView active for parallel magic commands
393 # Make this DirectView active for parallel magic commands
416 In [23]: dv.activate()
394 In [23]: dv.activate()
417
395
418 In [24]: dv.block=True
396 In [24]: dv.block=True
419
397
420 In [25]: import numpy
398 In [25]: import numpy
421
399
422 In [26]: %px import numpy
400 In [26]: %px import numpy
423 Parallel execution on engines: [0, 1, 2, 3]
401 Parallel execution on engines: [0, 1, 2, 3]
424 Out[26]:[None,None,None,None]
402 Out[26]:[None,None,None,None]
425
403
426 In [27]: %px a = numpy.random.rand(2,2)
404 In [27]: %px a = numpy.random.rand(2,2)
427 Parallel execution on engines: [0, 1, 2, 3]
405 Parallel execution on engines: [0, 1, 2, 3]
428
406
429 In [28]: %px ev = numpy.linalg.eigvals(a)
407 In [28]: %px ev = numpy.linalg.eigvals(a)
430 Parallel execution on engines: [0, 1, 2, 3]
408 Parallel execution on engines: [0, 1, 2, 3]
431
409
432 In [28]: dv['ev']
410 In [28]: dv['ev']
433 Out[44]: [ array([ 1.09522024, -0.09645227]),
411 Out[44]: [ array([ 1.09522024, -0.09645227]),
434 array([ 1.21435496, -0.35546712]),
412 array([ 1.21435496, -0.35546712]),
435 array([ 0.72180653, 0.07133042]),
413 array([ 0.72180653, 0.07133042]),
436 array([ 1.46384341e+00, 1.04353244e-04])
414 array([ 1.46384341e+00, 1.04353244e-04])
437 ]
415 ]
438
416
439 .. Note::
417 .. Note::
440
418
441 ``%result`` doesn't work
419 ``%result`` doesn't work
442
420
443 The ``%result`` magic gets and prints the stdin/stdout/stderr of the last
421 The ``%result`` magic gets and prints the stdin/stdout/stderr of the last
444 command executed on each engine. It is simply a shortcut to the
422 command executed on each engine. It is simply a shortcut to the
445 :meth:`get_result` method:
423 :meth:`get_result` method:
446
424
447 .. sourcecode:: ipython
425 .. sourcecode:: ipython
448
426
449 In [29]: %result
427 In [29]: %result
450 Out[29]:
428 Out[29]:
451 <Results List>
429 <Results List>
452 [0] In [10]: print numpy.linalg.eigvals(a)
430 [0] In [10]: print numpy.linalg.eigvals(a)
453 [0] Out[10]: [ 1.28167017 0.14197338]
431 [0] Out[10]: [ 1.28167017 0.14197338]
454
432
455 [1] In [9]: print numpy.linalg.eigvals(a)
433 [1] In [9]: print numpy.linalg.eigvals(a)
456 [1] Out[9]: [-0.14093616 1.27877273]
434 [1] Out[9]: [-0.14093616 1.27877273]
457
435
458 [2] In [10]: print numpy.linalg.eigvals(a)
436 [2] In [10]: print numpy.linalg.eigvals(a)
459 [2] Out[10]: [-0.37023573 1.06779409]
437 [2] Out[10]: [-0.37023573 1.06779409]
460
438
461 [3] In [9]: print numpy.linalg.eigvals(a)
439 [3] In [9]: print numpy.linalg.eigvals(a)
462 [3] Out[9]: [ 0.83664764 -0.25602658]
440 [3] Out[9]: [ 0.83664764 -0.25602658]
463
441
464 The ``%autopx`` magic switches to a mode where everything you type is executed
442 The ``%autopx`` magic switches to a mode where everything you type is executed
465 on the engines given by the :attr:`targets` attribute:
443 on the engines given by the :attr:`targets` attribute:
466
444
467 .. sourcecode:: ipython
445 .. sourcecode:: ipython
468
446
469 In [30]: dv.block=False
447 In [30]: dv.block=False
470
448
471 In [31]: %autopx
449 In [31]: %autopx
472 Auto Parallel Enabled
450 Auto Parallel Enabled
473 Type %autopx to disable
451 Type %autopx to disable
474
452
475 In [32]: max_evals = []
453 In [32]: max_evals = []
476 <IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17b8a70>
454 <IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17b8a70>
477
455
478 In [33]: for i in range(100):
456 In [33]: for i in range(100):
479 ....: a = numpy.random.rand(10,10)
457 ....: a = numpy.random.rand(10,10)
480 ....: a = a+a.transpose()
458 ....: a = a+a.transpose()
481 ....: evals = numpy.linalg.eigvals(a)
459 ....: evals = numpy.linalg.eigvals(a)
482 ....: max_evals.append(evals[0].real)
460 ....: max_evals.append(evals[0].real)
483 ....:
461 ....:
484 ....:
462 ....:
485 <IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17af8f0>
463 <IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17af8f0>
486
464
487 In [34]: %autopx
465 In [34]: %autopx
488 Auto Parallel Disabled
466 Auto Parallel Disabled
489
467
490 In [35]: dv.block=True
468 In [35]: dv.block=True
491
469
492 In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals))
470 In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals))
493 Parallel execution on engines: [0, 1, 2, 3]
471 Parallel execution on engines: [0, 1, 2, 3]
494
472
495 In [37]: dv['ans']
473 In [37]: dv['ans']
496 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
474 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
497 'Average max eigenvalue is: 10.2076902286',
475 'Average max eigenvalue is: 10.2076902286',
498 'Average max eigenvalue is: 10.1891484655',
476 'Average max eigenvalue is: 10.1891484655',
499 'Average max eigenvalue is: 10.1158837784',]
477 'Average max eigenvalue is: 10.1158837784',]
500
478
501
479
502 .. Note::
480 .. Note::
503
481
504 Multiline ``%autpx`` gets fouled up by NameErrors, because IPython
482 Multiline ``%autpx`` gets fouled up by NameErrors, because IPython
505 currently introspects too much.
483 currently introspects too much.
506
484
507
485
508 Moving Python objects around
486 Moving Python objects around
509 ============================
487 ============================
510
488
511 In addition to calling functions and executing code on engines, you can
489 In addition to calling functions and executing code on engines, you can
512 transfer Python objects to and from your IPython session and the engines. In
490 transfer Python objects to and from your IPython session and the engines. In
513 IPython, these operations are called :meth:`push` (sending an object to the
491 IPython, these operations are called :meth:`push` (sending an object to the
514 engines) and :meth:`pull` (getting an object from the engines).
492 engines) and :meth:`pull` (getting an object from the engines).
515
493
516 Basic push and pull
494 Basic push and pull
517 -------------------
495 -------------------
518
496
519 Here are some examples of how you use :meth:`push` and :meth:`pull`:
497 Here are some examples of how you use :meth:`push` and :meth:`pull`:
520
498
521 .. sourcecode:: ipython
499 .. sourcecode:: ipython
522
500
523 In [38]: rc.push(dict(a=1.03234,b=3453))
501 In [38]: rc.push(dict(a=1.03234,b=3453))
524 Out[38]: [None,None,None,None]
502 Out[38]: [None,None,None,None]
525
503
526 In [39]: rc.pull('a')
504 In [39]: rc.pull('a')
527 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
505 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
528
506
529 In [40]: rc.pull('b',targets=0)
507 In [40]: rc.pull('b',targets=0)
530 Out[40]: 3453
508 Out[40]: 3453
531
509
532 In [41]: rc.pull(('a','b'))
510 In [41]: rc.pull(('a','b'))
533 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
511 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
534
512
535 # zmq client does not have zip_pull
513 # zmq client does not have zip_pull
536 In [42]: rc.zip_pull(('a','b'))
514 In [42]: rc.zip_pull(('a','b'))
537 Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)]
515 Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)]
538
516
539 In [43]: rc.push(dict(c='speed'))
517 In [43]: rc.push(dict(c='speed'))
540 Out[43]: [None,None,None,None]
518 Out[43]: [None,None,None,None]
541
519
542 In non-blocking mode :meth:`push` and :meth:`pull` also return
520 In non-blocking mode :meth:`push` and :meth:`pull` also return
543 :class:`AsyncResult` objects:
521 :class:`AsyncResult` objects:
544
522
545 .. sourcecode:: ipython
523 .. sourcecode:: ipython
546
524
547 In [47]: rc.block=False
525 In [47]: rc.block=False
548
526
549 In [48]: pr = rc.pull('a')
527 In [48]: pr = rc.pull('a')
550
528
551 In [49]: pr.get()
529 In [49]: pr.get()
552 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
530 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
553
531
554
532
555
533
556
534
557 Dictionary interface
535 Dictionary interface
558 --------------------
536 --------------------
559
537
560 Since a namespace is just a :class:`dict`, :class:`DirectView` objects provide
538 Since a namespace is just a :class:`dict`, :class:`DirectView` objects provide
561 dictionary-style access by key and methods such as :meth:`get` and
539 dictionary-style access by key and methods such as :meth:`get` and
562 :meth:`update` for convenience. This make the remote namespaces of the engines
540 :meth:`update` for convenience. This make the remote namespaces of the engines
563 appear as a local dictionary. Underneath, this uses :meth:`push` and
541 appear as a local dictionary. Underneath, this uses :meth:`push` and
564 :meth:`pull`:
542 :meth:`pull`:
565
543
566 .. sourcecode:: ipython
544 .. sourcecode:: ipython
567
545
568 In [50]: rc.block=True
546 In [50]: rc.block=True
569
547
570 In [51]: rc[:]['a']=['foo','bar']
548 In [51]: dview['a']=['foo','bar']
571
549
572 In [52]: rc[:]['a']
550 In [52]: dview['a']
573 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
551 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
574
552
575 Scatter and gather
553 Scatter and gather
576 ------------------
554 ------------------
577
555
578 Sometimes it is useful to partition a sequence and push the partitions to
556 Sometimes it is useful to partition a sequence and push the partitions to
579 different engines. In MPI language, this is know as scatter/gather and we
557 different engines. In MPI language, this is know as scatter/gather and we
580 follow that terminology. However, it is important to remember that in
558 follow that terminology. However, it is important to remember that in
581 IPython's :class:`Client` class, :meth:`scatter` is from the
559 IPython's :class:`Client` class, :meth:`scatter` is from the
582 interactive IPython session to the engines and :meth:`gather` is from the
560 interactive IPython session to the engines and :meth:`gather` is from the
583 engines back to the interactive IPython session. For scatter/gather operations
561 engines back to the interactive IPython session. For scatter/gather operations
584 between engines, MPI should be used:
562 between engines, MPI should be used:
585
563
586 .. sourcecode:: ipython
564 .. sourcecode:: ipython
587
565
588 In [58]: rc.scatter('a',range(16))
566 In [58]: dview.scatter('a',range(16))
589 Out[58]: [None,None,None,None]
567 Out[58]: [None,None,None,None]
590
568
591 In [59]: rc[:]['a']
569 In [59]: dview['a']
592 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
570 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
593
571
594 In [60]: rc.gather('a')
572 In [60]: dview.gather('a')
595 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
573 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
596
574
597 Other things to look at
575 Other things to look at
598 =======================
576 =======================
599
577
600 How to do parallel list comprehensions
578 How to do parallel list comprehensions
601 --------------------------------------
579 --------------------------------------
602
580
603 In many cases list comprehensions are nicer than using the map function. While
581 In many cases list comprehensions are nicer than using the map function. While
604 we don't have fully parallel list comprehensions, it is simple to get the
582 we don't have fully parallel list comprehensions, it is simple to get the
605 basic effect using :meth:`scatter` and :meth:`gather`:
583 basic effect using :meth:`scatter` and :meth:`gather`:
606
584
607 .. sourcecode:: ipython
585 .. sourcecode:: ipython
608
586
609 In [66]: rc.scatter('x',range(64))
587 In [66]: dview.scatter('x',range(64))
610 Out[66]: [None,None,None,None]
588 Out[66]: [None,None,None,None]
611
589
612 In [67]: px y = [i**10 for i in x]
590 In [67]: px y = [i**10 for i in x]
613 Parallel execution on engines: [0, 1, 2, 3]
591 Parallel execution on engines: [0, 1, 2, 3]
614 Out[67]:
592 Out[67]:
615
593
616 In [68]: y = rc.gather('y')
594 In [68]: y = dview.gather('y')
617
595
618 In [69]: print y
596 In [69]: print y
619 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
597 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
620
598
621 Parallel exceptions
599 Parallel exceptions
622 -------------------
600 -------------------
623
601
624 In the multiengine interface, parallel commands can raise Python exceptions,
602 In the multiengine interface, parallel commands can raise Python exceptions,
625 just like serial commands. But, it is a little subtle, because a single
603 just like serial commands. But, it is a little subtle, because a single
626 parallel command can actually raise multiple exceptions (one for each engine
604 parallel command can actually raise multiple exceptions (one for each engine
627 the command was run on). To express this idea, the MultiEngine interface has a
605 the command was run on). To express this idea, the MultiEngine interface has a
628 :exc:`CompositeError` exception class that will be raised in most cases. The
606 :exc:`CompositeError` exception class that will be raised in most cases. The
629 :exc:`CompositeError` class is a special type of exception that wraps one or
607 :exc:`CompositeError` class is a special type of exception that wraps one or
630 more other types of exceptions. Here is how it works:
608 more other types of exceptions. Here is how it works:
631
609
632 .. sourcecode:: ipython
610 .. sourcecode:: ipython
633
611
634 In [76]: rc.block=True
612 In [76]: rc.block=True
635
613
636 In [77]: rc.execute('1/0')
614 In [77]: rc.execute('1/0')
637 ---------------------------------------------------------------------------
615 ---------------------------------------------------------------------------
638 CompositeError Traceback (most recent call last)
616 CompositeError Traceback (most recent call last)
639
617
640 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
618 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
641
619
642 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
620 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
643 432 targets, block = self._findTargetsAndBlock(targets, block)
621 432 targets, block = self._findTargetsAndBlock(targets, block)
644 433 result = blockingCallFromThread(self.smultiengine.execute, lines,
622 433 result = blockingCallFromThread(self.smultiengine.execute, lines,
645 --> 434 targets=targets, block=block)
623 --> 434 targets=targets, block=block)
646 435 if block:
624 435 if block:
647 436 result = ResultList(result)
625 436 result = ResultList(result)
648
626
649 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
627 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
650 72 result.raiseException()
628 72 result.raiseException()
651 73 except Exception, e:
629 73 except Exception, e:
652 ---> 74 raise e
630 ---> 74 raise e
653 75 return result
631 75 return result
654 76
632 76
655
633
656 CompositeError: one or more exceptions from call to method: execute
634 CompositeError: one or more exceptions from call to method: execute
657 [0:execute]: ZeroDivisionError: integer division or modulo by zero
635 [0:execute]: ZeroDivisionError: integer division or modulo by zero
658 [1:execute]: ZeroDivisionError: integer division or modulo by zero
636 [1:execute]: ZeroDivisionError: integer division or modulo by zero
659 [2:execute]: ZeroDivisionError: integer division or modulo by zero
637 [2:execute]: ZeroDivisionError: integer division or modulo by zero
660 [3:execute]: ZeroDivisionError: integer division or modulo by zero
638 [3:execute]: ZeroDivisionError: integer division or modulo by zero
661
639
662 Notice how the error message printed when :exc:`CompositeError` is raised has
640 Notice how the error message printed when :exc:`CompositeError` is raised has
663 information about the individual exceptions that were raised on each engine.
641 information about the individual exceptions that were raised on each engine.
664 If you want, you can even raise one of these original exceptions:
642 If you want, you can even raise one of these original exceptions:
665
643
666 .. sourcecode:: ipython
644 .. sourcecode:: ipython
667
645
668 In [80]: try:
646 In [80]: try:
669 ....: rc.execute('1/0')
647 ....: rc.execute('1/0')
670 ....: except client.CompositeError, e:
648 ....: except client.CompositeError, e:
671 ....: e.raise_exception()
649 ....: e.raise_exception()
672 ....:
650 ....:
673 ....:
651 ....:
674 ---------------------------------------------------------------------------
652 ---------------------------------------------------------------------------
675 ZeroDivisionError Traceback (most recent call last)
653 ZeroDivisionError Traceback (most recent call last)
676
654
677 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
655 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
678
656
679 /ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid)
657 /ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid)
680 156 raise IndexError("an exception with index %i does not exist"%excid)
658 156 raise IndexError("an exception with index %i does not exist"%excid)
681 157 else:
659 157 else:
682 --> 158 raise et, ev, etb
660 --> 158 raise et, ev, etb
683 159
661 159
684 160 def collect_exceptions(rlist, method):
662 160 def collect_exceptions(rlist, method):
685
663
686 ZeroDivisionError: integer division or modulo by zero
664 ZeroDivisionError: integer division or modulo by zero
687
665
688 If you are working in IPython, you can simple type ``%debug`` after one of
666 If you are working in IPython, you can simple type ``%debug`` after one of
689 these :exc:`CompositeError` exceptions is raised, and inspect the exception
667 these :exc:`CompositeError` exceptions is raised, and inspect the exception
690 instance:
668 instance:
691
669
692 .. sourcecode:: ipython
670 .. sourcecode:: ipython
693
671
694 In [81]: rc.execute('1/0')
672 In [81]: rc.execute('1/0')
695 ---------------------------------------------------------------------------
673 ---------------------------------------------------------------------------
696 CompositeError Traceback (most recent call last)
674 CompositeError Traceback (most recent call last)
697
675
698 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
676 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
699
677
700 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
678 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
701 432 targets, block = self._findTargetsAndBlock(targets, block)
679 432 targets, block = self._findTargetsAndBlock(targets, block)
702 433 result = blockingCallFromThread(self.smultiengine.execute, lines,
680 433 result = blockingCallFromThread(self.smultiengine.execute, lines,
703 --> 434 targets=targets, block=block)
681 --> 434 targets=targets, block=block)
704 435 if block:
682 435 if block:
705 436 result = ResultList(result)
683 436 result = ResultList(result)
706
684
707 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
685 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
708 72 result.raiseException()
686 72 result.raiseException()
709 73 except Exception, e:
687 73 except Exception, e:
710 ---> 74 raise e
688 ---> 74 raise e
711 75 return result
689 75 return result
712 76
690 76
713
691
714 CompositeError: one or more exceptions from call to method: execute
692 CompositeError: one or more exceptions from call to method: execute
715 [0:execute]: ZeroDivisionError: integer division or modulo by zero
693 [0:execute]: ZeroDivisionError: integer division or modulo by zero
716 [1:execute]: ZeroDivisionError: integer division or modulo by zero
694 [1:execute]: ZeroDivisionError: integer division or modulo by zero
717 [2:execute]: ZeroDivisionError: integer division or modulo by zero
695 [2:execute]: ZeroDivisionError: integer division or modulo by zero
718 [3:execute]: ZeroDivisionError: integer division or modulo by zero
696 [3:execute]: ZeroDivisionError: integer division or modulo by zero
719
697
720 In [82]: %debug
698 In [82]: %debug
721 >
699 >
722
700
723 /ipython1-client-r3021/ipython1/kernel/twistedutil.py(74)blockingCallFromThread()
701 /ipython1-client-r3021/ipython1/kernel/twistedutil.py(74)blockingCallFromThread()
724 73 except Exception, e:
702 73 except Exception, e:
725 ---> 74 raise e
703 ---> 74 raise e
726 75 return result
704 75 return result
727
705
728 # With the debugger running, e is the exceptions instance. We can tab complete
706 # With the debugger running, e is the exceptions instance. We can tab complete
729 # on it and see the extra methods that are available.
707 # on it and see the extra methods that are available.
730 ipdb> e.
708 ipdb> e.
731 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
709 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
732 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
710 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
733 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
711 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
734 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
712 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
735 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
713 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
736 ipdb> e.print_tracebacks()
714 ipdb> e.print_tracebacks()
737 [0:execute]:
715 [0:execute]:
738 ---------------------------------------------------------------------------
716 ---------------------------------------------------------------------------
739 ZeroDivisionError Traceback (most recent call last)
717 ZeroDivisionError Traceback (most recent call last)
740
718
741 /ipython1-client-r3021/docs/examples/<string> in <module>()
719 /ipython1-client-r3021/docs/examples/<string> in <module>()
742
720
743 ZeroDivisionError: integer division or modulo by zero
721 ZeroDivisionError: integer division or modulo by zero
744
722
745 [1:execute]:
723 [1:execute]:
746 ---------------------------------------------------------------------------
724 ---------------------------------------------------------------------------
747 ZeroDivisionError Traceback (most recent call last)
725 ZeroDivisionError Traceback (most recent call last)
748
726
749 /ipython1-client-r3021/docs/examples/<string> in <module>()
727 /ipython1-client-r3021/docs/examples/<string> in <module>()
750
728
751 ZeroDivisionError: integer division or modulo by zero
729 ZeroDivisionError: integer division or modulo by zero
752
730
753 [2:execute]:
731 [2:execute]:
754 ---------------------------------------------------------------------------
732 ---------------------------------------------------------------------------
755 ZeroDivisionError Traceback (most recent call last)
733 ZeroDivisionError Traceback (most recent call last)
756
734
757 /ipython1-client-r3021/docs/examples/<string> in <module>()
735 /ipython1-client-r3021/docs/examples/<string> in <module>()
758
736
759 ZeroDivisionError: integer division or modulo by zero
737 ZeroDivisionError: integer division or modulo by zero
760
738
761 [3:execute]:
739 [3:execute]:
762 ---------------------------------------------------------------------------
740 ---------------------------------------------------------------------------
763 ZeroDivisionError Traceback (most recent call last)
741 ZeroDivisionError Traceback (most recent call last)
764
742
765 /ipython1-client-r3021/docs/examples/<string> in <module>()
743 /ipython1-client-r3021/docs/examples/<string> in <module>()
766
744
767 ZeroDivisionError: integer division or modulo by zero
745 ZeroDivisionError: integer division or modulo by zero
768
746
769
747
770 All of this same error handling magic even works in non-blocking mode:
748 All of this same error handling magic even works in non-blocking mode:
771
749
772 .. sourcecode:: ipython
750 .. sourcecode:: ipython
773
751
774 In [83]: rc.block=False
752 In [83]: rc.block=False
775
753
776 In [84]: pr = rc.execute('1/0')
754 In [84]: pr = rc.execute('1/0')
777
755
778 In [85]: pr.get()
756 In [85]: pr.get()
779 ---------------------------------------------------------------------------
757 ---------------------------------------------------------------------------
780 CompositeError Traceback (most recent call last)
758 CompositeError Traceback (most recent call last)
781
759
782 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
760 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
783
761
784 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in _get_r(self)
762 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in _get_r(self)
785 170
763 170
786 171 def _get_r(self):
764 171 def _get_r(self):
787 --> 172 return self.get_result(block=True)
765 --> 172 return self.get_result(block=True)
788 173
766 173
789 174 r = property(_get_r)
767 174 r = property(_get_r)
790
768
791 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_result(self, default, block)
769 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_result(self, default, block)
792 131 return self.result
770 131 return self.result
793 132 try:
771 132 try:
794 --> 133 result = self.client.get_pending_deferred(self.result_id, block)
772 --> 133 result = self.client.get_pending_deferred(self.result_id, block)
795 134 except error.ResultNotCompleted:
773 134 except error.ResultNotCompleted:
796 135 return default
774 135 return default
797
775
798 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_pending_deferred(self, deferredID, block)
776 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_pending_deferred(self, deferredID, block)
799 385
777 385
800 386 def get_pending_deferred(self, deferredID, block):
778 386 def get_pending_deferred(self, deferredID, block):
801 --> 387 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
779 --> 387 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
802 388
780 388
803 389 def barrier(self, pendingResults):
781 389 def barrier(self, pendingResults):
804
782
805 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
783 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
806 72 result.raiseException()
784 72 result.raiseException()
807 73 except Exception, e:
785 73 except Exception, e:
808 ---> 74 raise e
786 ---> 74 raise e
809 75 return result
787 75 return result
810 76
788 76
811
789
812 CompositeError: one or more exceptions from call to method: execute
790 CompositeError: one or more exceptions from call to method: execute
813 [0:execute]: ZeroDivisionError: integer division or modulo by zero
791 [0:execute]: ZeroDivisionError: integer division or modulo by zero
814 [1:execute]: ZeroDivisionError: integer division or modulo by zero
792 [1:execute]: ZeroDivisionError: integer division or modulo by zero
815 [2:execute]: ZeroDivisionError: integer division or modulo by zero
793 [2:execute]: ZeroDivisionError: integer division or modulo by zero
816 [3:execute]: ZeroDivisionError: integer division or modulo by zero
794 [3:execute]: ZeroDivisionError: integer division or modulo by zero
817
795
818
796
@@ -1,395 +1,395 b''
1 .. _paralleltask:
1 .. _paralleltask:
2
2
3 ==========================
3 ==========================
4 The IPython task interface
4 The IPython task interface
5 ==========================
5 ==========================
6
6
7 The task interface to the cluster presents the engines as a fault tolerant,
7 The task interface to the cluster presents the engines as a fault tolerant,
8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
9 the task interface the user have no direct access to individual engines. By
9 the task interface the user have no direct access to individual engines. By
10 allowing the IPython scheduler to assign work, this interface is simultaneously
10 allowing the IPython scheduler to assign work, this interface is simultaneously
11 simpler and more powerful.
11 simpler and more powerful.
12
12
13 Best of all, the user can use both of these interfaces running at the same time
13 Best of all, the user can use both of these interfaces running at the same time
14 to take advantage of their respective strengths. When the user can break up
14 to take advantage of their respective strengths. When the user can break up
15 the user's work into segments that do not depend on previous execution, the
15 the user's work into segments that do not depend on previous execution, the
16 task interface is ideal. But it also has more power and flexibility, allowing
16 task interface is ideal. But it also has more power and flexibility, allowing
17 the user to guide the distribution of jobs, without having to assign tasks to
17 the user to guide the distribution of jobs, without having to assign tasks to
18 engines explicitly.
18 engines explicitly.
19
19
20 Starting the IPython controller and engines
20 Starting the IPython controller and engines
21 ===========================================
21 ===========================================
22
22
23 To follow along with this tutorial, you will need to start the IPython
23 To follow along with this tutorial, you will need to start the IPython
24 controller and four IPython engines. The simplest way of doing this is to use
24 controller and four IPython engines. The simplest way of doing this is to use
25 the :command:`ipclusterz` command::
25 the :command:`ipclusterz` command::
26
26
27 $ ipclusterz start -n 4
27 $ ipclusterz start -n 4
28
28
29 For more detailed information about starting the controller and engines, see
29 For more detailed information about starting the controller and engines, see
30 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
30 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
31
31
32 Creating a ``Client`` instance
32 Creating a ``Client`` instance
33 ==============================
33 ==============================
34
34
35 The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
35 The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
36 module and then create a :class:`.Client` instance:
36 module and then create a :class:`.Client` instance:
37
37
38 .. sourcecode:: ipython
38 .. sourcecode:: ipython
39
39
40 In [1]: from IPython.zmq.parallel import client
40 In [1]: from IPython.zmq.parallel import client
41
41
42 In [2]: rc = client.Client()
42 In [2]: rc = client.Client()
43
43
44 In [3]: lview = rc[None]
44 In [3]: lview = rc.view(balanced=True)
45 Out[3]: <LoadBalancedView tcp://127.0.0.1:10101>
45 Out[3]: <LoadBalancedView None>
46
46
47
47
48 This form assumes that the controller was started on localhost with default
48 This form assumes that the controller was started on localhost with default
49 configuration. If not, the location of the controller must be given as an
49 configuration. If not, the location of the controller must be given as an
50 argument to the constructor:
50 argument to the constructor:
51
51
52 .. sourcecode:: ipython
52 .. sourcecode:: ipython
53
53
54 # for a visible LAN controller listening on an external port:
54 # for a visible LAN controller listening on an external port:
55 In [2]: rc = client.Client('tcp://192.168.1.16:10101')
55 In [2]: rc = client.Client('tcp://192.168.1.16:10101')
56 # for a remote controller at my.server.com listening on localhost:
56 # for a remote controller at my.server.com listening on localhost:
57 In [3]: rc = client.Client(sshserver='my.server.com')
57 In [3]: rc = client.Client(sshserver='my.server.com')
58
58
59
59
60
60
61 Quick and easy parallelism
61 Quick and easy parallelism
62 ==========================
62 ==========================
63
63
64 In many cases, you simply want to apply a Python function to a sequence of
64 In many cases, you simply want to apply a Python function to a sequence of
65 objects, but *in parallel*. Like the multiengine interface, these can be
65 objects, but *in parallel*. Like the multiengine interface, these can be
66 implemented via the task interface. The exact same tools can perform these
66 implemented via the task interface. The exact same tools can perform these
67 actions in load-balanced ways as well as multiplexed ways: a parallel version
67 actions in load-balanced ways as well as multiplexed ways: a parallel version
68 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
68 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
69 argument `targets=None`, then they are dynamically load balanced. Thus, if the
69 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
70 execution time per item varies significantly, you should use the versions in
70 execution time per item varies significantly, you should use the versions in
71 the task interface.
71 the task interface.
72
72
73 Parallel map
73 Parallel map
74 ------------
74 ------------
75
75
76 To load-balance :meth:`map`,simply use a LoadBalancedView, created by asking
76 To load-balance :meth:`map`,simply use a LoadBalancedView, created by asking
77 for the ``None`` element:
77 for the ``None`` element:
78
78
79 .. sourcecode:: ipython
79 .. sourcecode:: ipython
80
80
81 In [63]: serial_result = map(lambda x:x**10, range(32))
81 In [63]: serial_result = map(lambda x:x**10, range(32))
82
82
83 In [64]: parallel_result = tc[None].map(lambda x:x**10, range(32))
83 In [64]: parallel_result = lview.map(lambda x:x**10, range(32), block=True)
84
84
85 In [65]: serial_result==parallel_result
85 In [65]: serial_result==parallel_result
86 Out[65]: True
86 Out[65]: True
87
87
88 Parallel function decorator
88 Parallel function decorator
89 ---------------------------
89 ---------------------------
90
90
91 Parallel functions are just like normal function, but they can be called on
91 Parallel functions are just like normal function, but they can be called on
92 sequences and *in parallel*. The multiengine interface provides a decorator
92 sequences and *in parallel*. The multiengine interface provides a decorator
93 that turns any Python function into a parallel function:
93 that turns any Python function into a parallel function:
94
94
95 .. sourcecode:: ipython
95 .. sourcecode:: ipython
96
96
97 In [10]: @lview.parallel()
97 In [10]: @lview.parallel()
98 ....: def f(x):
98 ....: def f(x):
99 ....: return 10.0*x**4
99 ....: return 10.0*x**4
100 ....:
100 ....:
101
101
102 In [11]: f.map(range(32)) # this is done in parallel
102 In [11]: f.map(range(32)) # this is done in parallel
103 Out[11]: [0.0,10.0,160.0,...]
103 Out[11]: [0.0,10.0,160.0,...]
104
104
105 Dependencies
105 Dependencies
106 ============
106 ============
107
107
108 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
108 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
109 may want to associate some kind of `Dependency` that describes when, where, or whether
109 may want to associate some kind of `Dependency` that describes when, where, or whether
110 a task can be run. In IPython, we provide two types of dependencies:
110 a task can be run. In IPython, we provide two types of dependencies:
111 `Functional Dependencies`_ and `Graph Dependencies`_
111 `Functional Dependencies`_ and `Graph Dependencies`_
112
112
113 .. note::
113 .. note::
114
114
115 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
115 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
116 and you will see errors or warnings if you try to use dependencies with the pure
116 and you will see errors or warnings if you try to use dependencies with the pure
117 scheduler.
117 scheduler.
118
118
119 Functional Dependencies
119 Functional Dependencies
120 -----------------------
120 -----------------------
121
121
122 Functional dependencies are used to determine whether a given engine is capable of running
122 Functional dependencies are used to determine whether a given engine is capable of running
123 a particular task. This is implemented via a special :class:`Exception` class,
123 a particular task. This is implemented via a special :class:`Exception` class,
124 :class:`UnmetDependency`, found in `IPython.zmq.parallel.error`. Its use is very simple:
124 :class:`UnmetDependency`, found in `IPython.zmq.parallel.error`. Its use is very simple:
125 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
125 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
126 the error up to the client like any other error, catches the error, and submits the task
126 the error up to the client like any other error, catches the error, and submits the task
127 to a different engine. This will repeat indefinitely, and a task will never be submitted
127 to a different engine. This will repeat indefinitely, and a task will never be submitted
128 to a given engine a second time.
128 to a given engine a second time.
129
129
130 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
130 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
131 some decorators for facilitating this behavior.
131 some decorators for facilitating this behavior.
132
132
133 There are two decorators and a class used for functional dependencies:
133 There are two decorators and a class used for functional dependencies:
134
134
135 .. sourcecode:: ipython
135 .. sourcecode:: ipython
136
136
137 In [9]: from IPython.zmq.parallel.dependency import depend, require, dependent
137 In [9]: from IPython.zmq.parallel.dependency import depend, require, dependent
138
138
139 @require
139 @require
140 ********
140 ********
141
141
142 The simplest sort of dependency is requiring that a Python module is available. The
142 The simplest sort of dependency is requiring that a Python module is available. The
143 ``@require`` decorator lets you define a function that will only run on engines where names
143 ``@require`` decorator lets you define a function that will only run on engines where names
144 you specify are importable:
144 you specify are importable:
145
145
146 .. sourcecode:: ipython
146 .. sourcecode:: ipython
147
147
148 In [10]: @require('numpy', 'zmq')
148 In [10]: @require('numpy', 'zmq')
149 ...: def myfunc():
149 ...: def myfunc():
150 ...: import numpy,zmq
150 ...: import numpy,zmq
151 ...: return dostuff()
151 ...: return dostuff()
152
152
153 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
153 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
154 numpy and pyzmq available.
154 numpy and pyzmq available.
155
155
156 @depend
156 @depend
157 *******
157 *******
158
158
159 The ``@depend`` decorator lets you decorate any function with any *other* function to
159 The ``@depend`` decorator lets you decorate any function with any *other* function to
160 evaluate the dependency. The dependency function will be called at the start of the task,
160 evaluate the dependency. The dependency function will be called at the start of the task,
161 and if it returns ``False``, then the dependency will be considered unmet, and the task
161 and if it returns ``False``, then the dependency will be considered unmet, and the task
162 will be assigned to another engine. If the dependency returns *anything other than
162 will be assigned to another engine. If the dependency returns *anything other than
163 ``False``*, the rest of the task will continue.
163 ``False``*, the rest of the task will continue.
164
164
165 .. sourcecode:: ipython
165 .. sourcecode:: ipython
166
166
167 In [10]: def platform_specific(plat):
167 In [10]: def platform_specific(plat):
168 ...: import sys
168 ...: import sys
169 ...: return sys.platform == plat
169 ...: return sys.platform == plat
170
170
171 In [11]: @depend(platform_specific, 'darwin')
171 In [11]: @depend(platform_specific, 'darwin')
172 ...: def mactask():
172 ...: def mactask():
173 ...: do_mac_stuff()
173 ...: do_mac_stuff()
174
174
175 In [12]: @depend(platform_specific, 'nt')
175 In [12]: @depend(platform_specific, 'nt')
176 ...: def wintask():
176 ...: def wintask():
177 ...: do_windows_stuff()
177 ...: do_windows_stuff()
178
178
179 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
179 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
180 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
180 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
181 signature.
181 signature.
182
182
183 dependents
183 dependents
184 **********
184 **********
185
185
186 You don't have to use the decorators on your tasks, if for instance you may want
186 You don't have to use the decorators on your tasks, if for instance you may want
187 to run tasks with a single function but varying dependencies, you can directly construct
187 to run tasks with a single function but varying dependencies, you can directly construct
188 the :class:`dependent` object that the decorators use:
188 the :class:`dependent` object that the decorators use:
189
189
190 .. sourcecode::ipython
190 .. sourcecode::ipython
191
191
192 In [13]: def mytask(*args):
192 In [13]: def mytask(*args):
193 ...: dostuff()
193 ...: dostuff()
194
194
195 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
195 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
196 # this is the same as decorating the declaration of mytask with @depend
196 # this is the same as decorating the declaration of mytask with @depend
197 # but you can do it again:
197 # but you can do it again:
198
198
199 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
199 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
200
200
201 # in general:
201 # in general:
202 In [16]: t = dependent(f, g, *dargs, **dkwargs)
202 In [16]: t = dependent(f, g, *dargs, **dkwargs)
203
203
204 # is equivalent to:
204 # is equivalent to:
205 In [17]: @depend(g, *dargs, **dkwargs)
205 In [17]: @depend(g, *dargs, **dkwargs)
206 ...: def t(a,b,c):
206 ...: def t(a,b,c):
207 ...: # contents of f
207 ...: # contents of f
208
208
209 Graph Dependencies
209 Graph Dependencies
210 ------------------
210 ------------------
211
211
212 Sometimes you want to restrict the time and/or location to run a given task as a function
212 Sometimes you want to restrict the time and/or location to run a given task as a function
213 of the time and/or location of other tasks. This is implemented via a subclass of
213 of the time and/or location of other tasks. This is implemented via a subclass of
214 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
214 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
215 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
215 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
216 has been met.
216 has been met.
217
217
218 The switches we provide for interpreting whether a given dependency set has been met:
218 The switches we provide for interpreting whether a given dependency set has been met:
219
219
220 any|all
220 any|all
221 Whether the dependency is considered met if *any* of the dependencies are done, or
221 Whether the dependency is considered met if *any* of the dependencies are done, or
222 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
222 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
223 boolean attribute, which defaults to ``True``.
223 boolean attribute, which defaults to ``True``.
224
224
225 success_only
225 success_only
226 Whether to consider only tasks that did not raise an error as being fulfilled.
226 Whether to consider only tasks that did not raise an error as being fulfilled.
227 Sometimes you want to run a task after another, but only if that task succeeded. In
227 Sometimes you want to run a task after another, but only if that task succeeded. In
228 this case, ``success_only`` should be ``True``. However sometimes you may not care
228 this case, ``success_only`` should be ``True``. However sometimes you may not care
229 whether the task succeeds, and always want the second task to run, in which case
229 whether the task succeeds, and always want the second task to run, in which case
230 you should use `success_only=False`. The default behavior is to only use successes.
230 you should use `success_only=False`. The default behavior is to only use successes.
231
231
232 There are other switches for interpretation that are made at the *task* level. These are
232 There are other switches for interpretation that are made at the *task* level. These are
233 specified via keyword arguments to the client's :meth:`apply` method.
233 specified via keyword arguments to the client's :meth:`apply` method.
234
234
235 after,follow
235 after,follow
236 You may want to run a task *after* a given set of dependencies have been run and/or
236 You may want to run a task *after* a given set of dependencies have been run and/or
237 run it *where* another set of dependencies are met. To support this, every task has an
237 run it *where* another set of dependencies are met. To support this, every task has an
238 `after` dependency to restrict time, and a `follow` dependency to restrict
238 `after` dependency to restrict time, and a `follow` dependency to restrict
239 destination.
239 destination.
240
240
241 timeout
241 timeout
242 You may also want to set a time-limit for how long the scheduler should wait before a
242 You may also want to set a time-limit for how long the scheduler should wait before a
243 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
243 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
244 indicates that the task should never timeout. If the timeout is reached, and the
244 indicates that the task should never timeout. If the timeout is reached, and the
245 scheduler still hasn't been able to assign the task to an engine, the task will fail
245 scheduler still hasn't been able to assign the task to an engine, the task will fail
246 with a :class:`DependencyTimeout`.
246 with a :class:`DependencyTimeout`.
247
247
248 .. note::
248 .. note::
249
249
250 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
250 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
251 task to run after a job submitted via the MUX interface.
251 task to run after a job submitted via the MUX interface.
252
252
253 The simplest form of Dependencies is with `all=True,success_only=True`. In these cases,
253 The simplest form of Dependencies is with `all=True,success_only=True`. In these cases,
254 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
254 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
255 `follow` and `after` keywords to :meth:`client.apply`:
255 `follow` and `after` keywords to :meth:`client.apply`:
256
256
257 .. sourcecode:: ipython
257 .. sourcecode:: ipython
258
258
259 In [14]: client.block=False
259 In [14]: client.block=False
260
260
261 In [15]: ar = client.apply(f, args, kwargs, targets=None)
261 In [15]: ar = client.apply(f, args, kwargs, balanced=True)
262
262
263 In [16]: ar2 = client.apply(f2, targets=None)
263 In [16]: ar2 = client.apply(f2, balanced=True)
264
264
265 In [17]: ar3 = client.apply(f3, after=[ar,ar2])
265 In [17]: ar3 = client.apply(f3, after=[ar,ar2], balanced=True)
266
266
267 In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5)
267 In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5, balanced=True)
268
268
269
269
270 .. seealso::
270 .. seealso::
271
271
272 Some parallel workloads can be described as a `Directed Acyclic Graph
272 Some parallel workloads can be described as a `Directed Acyclic Graph
273 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
273 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
274 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
274 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
275 onto task dependencies.
275 onto task dependencies.
276
276
277
277
278
278
279 Impossible Dependencies
279 Impossible Dependencies
280 ***********************
280 ***********************
281
281
282 The schedulers do perform some analysis on graph dependencies to determine whether they
282 The schedulers do perform some analysis on graph dependencies to determine whether they
283 are not possible to be met. If the scheduler does discover that a dependency cannot be
283 are not possible to be met. If the scheduler does discover that a dependency cannot be
284 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
284 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
285 scheduler realized that a task can never be run, it won't sit indefinitely in the
285 scheduler realized that a task can never be run, it won't sit indefinitely in the
286 scheduler clogging the pipeline.
286 scheduler clogging the pipeline.
287
287
288 The basic cases that are checked:
288 The basic cases that are checked:
289
289
290 * depending on nonexistent messages
290 * depending on nonexistent messages
291 * `follow` dependencies were run on more than one machine and `all=True`
291 * `follow` dependencies were run on more than one machine and `all=True`
292 * any dependencies failed and `all=True,success_only=True`
292 * any dependencies failed and `all=True,success_only=True`
293 * all dependencies failed and `all=False,success_only=True`
293 * all dependencies failed and `all=False,success_only=True`
294
294
295 .. warning::
295 .. warning::
296
296
297 This analysis has not been proven to be rigorous, so it is likely possible for tasks
297 This analysis has not been proven to be rigorous, so it is likely possible for tasks
298 to become impossible to run in obscure situations, so a timeout may be a good choice.
298 to become impossible to run in obscure situations, so a timeout may be a good choice.
299
299
300 Schedulers
300 Schedulers
301 ==========
301 ==========
302
302
303 There are a variety of valid ways to determine where jobs should be assigned in a
303 There are a variety of valid ways to determine where jobs should be assigned in a
304 load-balancing situation. In IPython, we support several standard schemes, and
304 load-balancing situation. In IPython, we support several standard schemes, and
305 even make it easy to define your own. The scheme can be selected via the ``--scheme``
305 even make it easy to define your own. The scheme can be selected via the ``--scheme``
306 argument to :command:`ipcontrollerz`, or in the :attr:`HubFactory.scheme` attribute
306 argument to :command:`ipcontrollerz`, or in the :attr:`HubFactory.scheme` attribute
307 of a controller config object.
307 of a controller config object.
308
308
309 The built-in routing schemes:
309 The built-in routing schemes:
310
310
311 lru: Least Recently Used
311 lru: Least Recently Used
312
312
313 Always assign work to the least-recently-used engine. A close relative of
313 Always assign work to the least-recently-used engine. A close relative of
314 round-robin, it will be fair with respect to the number of tasks, agnostic
314 round-robin, it will be fair with respect to the number of tasks, agnostic
315 with respect to runtime of each task.
315 with respect to runtime of each task.
316
316
317 plainrandom: Plain Random
317 plainrandom: Plain Random
318 Randomly picks an engine on which to run.
318 Randomly picks an engine on which to run.
319
319
320 twobin: Two-Bin Random
320 twobin: Two-Bin Random
321
321
322 **Depends on numpy**
322 **Depends on numpy**
323
323
324 Pick two engines at random, and use the LRU of the two. This is known to be better
324 Pick two engines at random, and use the LRU of the two. This is known to be better
325 than plain random in many cases, but requires a small amount of computation.
325 than plain random in many cases, but requires a small amount of computation.
326
326
327 leastload: Least Load
327 leastload: Least Load
328
328
329 **This is the default scheme**
329 **This is the default scheme**
330
330
331 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
331 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
332
332
333 weighted: Weighted Two-Bin Random
333 weighted: Weighted Two-Bin Random
334
334
335 **Depends on numpy**
335 **Depends on numpy**
336
336
337 Pick two engines at random using the number of outstanding tasks as inverse weights,
337 Pick two engines at random using the number of outstanding tasks as inverse weights,
338 and use the one with the lower load.
338 and use the one with the lower load.
339
339
340
340
341 Pure ZMQ Scheduler
341 Pure ZMQ Scheduler
342 ------------------
342 ------------------
343
343
344 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
344 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
345 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all
345 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all
346 load-balancing. This scheduler does not support any of the advanced features of the Python
346 load-balancing. This scheduler does not support any of the advanced features of the Python
347 :class:`.Scheduler`.
347 :class:`.Scheduler`.
348
348
349 Disabled features when using the ZMQ Scheduler:
349 Disabled features when using the ZMQ Scheduler:
350
350
351 * Engine unregistration
351 * Engine unregistration
352 Task farming will be disabled if an engine unregisters.
352 Task farming will be disabled if an engine unregisters.
353 Further, if an engine is unregistered during computation, the scheduler may not recover.
353 Further, if an engine is unregistered during computation, the scheduler may not recover.
354 * Dependencies
354 * Dependencies
355 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
355 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
356 based on message content.
356 based on message content.
357 * Early destination notification
357 * Early destination notification
358 The Python schedulers know which engine gets which task, and notify the Hub. This
358 The Python schedulers know which engine gets which task, and notify the Hub. This
359 allows graceful handling of Engines coming and going. There is no way to know
359 allows graceful handling of Engines coming and going. There is no way to know
360 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
360 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
361 engine until they *finish*. This makes recovery from engine shutdown very difficult.
361 engine until they *finish*. This makes recovery from engine shutdown very difficult.
362
362
363
363
364 .. note::
364 .. note::
365
365
366 TODO: performance comparisons
366 TODO: performance comparisons
367
367
368
368
369 More details
369 More details
370 ============
370 ============
371
371
372 The :class:`Client` has many more powerful features that allow quite a bit
372 The :class:`Client` has many more powerful features that allow quite a bit
373 of flexibility in how tasks are defined and run. The next places to look are
373 of flexibility in how tasks are defined and run. The next places to look are
374 in the following classes:
374 in the following classes:
375
375
376 * :class:`IPython.zmq.parallel.client.Client`
376 * :class:`IPython.zmq.parallel.client.Client`
377 * :class:`IPython.zmq.parallel.client.AsyncResult`
377 * :class:`IPython.zmq.parallel.client.AsyncResult`
378 * :meth:`IPython.zmq.parallel.client.Client.apply`
378 * :meth:`IPython.zmq.parallel.client.Client.apply`
379 * :mod:`IPython.zmq.parallel.dependency`
379 * :mod:`IPython.zmq.parallel.dependency`
380
380
381 The following is an overview of how to use these classes together:
381 The following is an overview of how to use these classes together:
382
382
383 1. Create a :class:`Client`.
383 1. Create a :class:`Client`.
384 2. Define some functions to be run as tasks
384 2. Define some functions to be run as tasks
385 3. Submit your tasks to using the :meth:`apply` method of your
385 3. Submit your tasks to using the :meth:`apply` method of your
386 :class:`Client` instance, specifying `targets=None`. This signals
386 :class:`Client` instance, specifying `balanced=True`. This signals
387 the :class:`Client` to entrust the Scheduler with assigning tasks to engines.
387 the :class:`Client` to entrust the Scheduler with assigning tasks to engines.
388 4. Use :meth:`Client.get_results` to get the results of the
388 4. Use :meth:`Client.get_results` to get the results of the
389 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
389 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
390 for and then receive the results.
390 for and then receive the results.
391
391
392
392
393 .. seealso::
393 .. seealso::
394
394
395 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
395 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
General Comments 0
You need to be logged in to leave comments. Login now