##// END OF EJS Templates
add Reference object
MinRK -
Show More
@@ -1,127 +1,142 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """Pickle related utilities. Perhaps this should be called 'can'."""
3 """Pickle related utilities. Perhaps this should be called 'can'."""
4
4
5 __docformat__ = "restructuredtext en"
5 __docformat__ = "restructuredtext en"
6
6
7 #-------------------------------------------------------------------------------
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
8 # Copyright (C) 2008 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 from types import FunctionType
18 from types import FunctionType
19 import copy
19 import copy
20
20
21 from IPython.zmq.parallel.dependency import dependent
21 from IPython.zmq.parallel.dependency import dependent
22
22
23 import codeutil
23 import codeutil
24
24
25 #-------------------------------------------------------------------------------
25 #-------------------------------------------------------------------------------
26 # Classes
26 # Classes
27 #-------------------------------------------------------------------------------
27 #-------------------------------------------------------------------------------
28
28
29
29
30 class CannedObject(object):
30 class CannedObject(object):
31 def __init__(self, obj, keys=[]):
31 def __init__(self, obj, keys=[]):
32 self.keys = keys
32 self.keys = keys
33 self.obj = copy.copy(obj)
33 self.obj = copy.copy(obj)
34 for key in keys:
34 for key in keys:
35 setattr(self.obj, key, can(getattr(obj, key)))
35 setattr(self.obj, key, can(getattr(obj, key)))
36
36
37
37
38 def getObject(self, g=None):
38 def getObject(self, g=None):
39 if g is None:
39 if g is None:
40 g = globals()
40 g = globals()
41 for key in self.keys:
41 for key in self.keys:
42 setattr(self.obj, key, uncan(getattr(self.obj, key), g))
42 setattr(self.obj, key, uncan(getattr(self.obj, key), g))
43 return self.obj
43 return self.obj
44
44
45
45 class Reference(CannedObject):
46 """object for wrapping a remote reference by name."""
47 def __init__(self, name):
48 if not isinstance(name, basestring):
49 raise TypeError("illegal name: %r"%name)
50 self.name = name
51
52 def __repr__(self):
53 return "<Reference: %r>"%self.name
54
55 def getObject(self, g=None):
56 if g is None:
57 g = globals()
58 try:
59 return g[self.name]
60 except KeyError:
61 raise NameError("name %r is not defined"%self.name)
62
46
63
47 class CannedFunction(CannedObject):
64 class CannedFunction(CannedObject):
48
65
49 def __init__(self, f):
66 def __init__(self, f):
50 self._checkType(f)
67 self._checkType(f)
51 self.code = f.func_code
68 self.code = f.func_code
52 self.__name__ = f.__name__
69 self.__name__ = f.__name__
53
70
54 def _checkType(self, obj):
71 def _checkType(self, obj):
55 assert isinstance(obj, FunctionType), "Not a function type"
72 assert isinstance(obj, FunctionType), "Not a function type"
56
73
57 def getFunction(self, g=None):
74 def getObject(self, g=None):
58 if g is None:
75 if g is None:
59 g = globals()
76 g = globals()
60 newFunc = FunctionType(self.code, g)
77 newFunc = FunctionType(self.code, g)
61 return newFunc
78 return newFunc
62
79
63 #-------------------------------------------------------------------------------
80 #-------------------------------------------------------------------------------
64 # Functions
81 # Functions
65 #-------------------------------------------------------------------------------
82 #-------------------------------------------------------------------------------
66
83
67
84
68 def can(obj):
85 def can(obj):
69 if isinstance(obj, FunctionType):
86 if isinstance(obj, dependent):
70 return CannedFunction(obj)
71 elif isinstance(obj, dependent):
72 keys = ('f','df')
87 keys = ('f','df')
73 return CannedObject(obj, keys=keys)
88 return CannedObject(obj, keys=keys)
89 elif isinstance(obj, FunctionType):
90 return CannedFunction(obj)
74 elif isinstance(obj,dict):
91 elif isinstance(obj,dict):
75 return canDict(obj)
92 return canDict(obj)
76 elif isinstance(obj, (list,tuple)):
93 elif isinstance(obj, (list,tuple)):
77 return canSequence(obj)
94 return canSequence(obj)
78 else:
95 else:
79 return obj
96 return obj
80
97
81 def canDict(obj):
98 def canDict(obj):
82 if isinstance(obj, dict):
99 if isinstance(obj, dict):
83 newobj = {}
100 newobj = {}
84 for k, v in obj.iteritems():
101 for k, v in obj.iteritems():
85 newobj[k] = can(v)
102 newobj[k] = can(v)
86 return newobj
103 return newobj
87 else:
104 else:
88 return obj
105 return obj
89
106
90 def canSequence(obj):
107 def canSequence(obj):
91 if isinstance(obj, (list, tuple)):
108 if isinstance(obj, (list, tuple)):
92 t = type(obj)
109 t = type(obj)
93 return t([can(i) for i in obj])
110 return t([can(i) for i in obj])
94 else:
111 else:
95 return obj
112 return obj
96
113
97 def uncan(obj, g=None):
114 def uncan(obj, g=None):
98 if isinstance(obj, CannedFunction):
115 if isinstance(obj, CannedObject):
99 return obj.getFunction(g)
100 elif isinstance(obj, CannedObject):
101 return obj.getObject(g)
116 return obj.getObject(g)
102 elif isinstance(obj,dict):
117 elif isinstance(obj,dict):
103 return uncanDict(obj, g)
118 return uncanDict(obj, g)
104 elif isinstance(obj, (list,tuple)):
119 elif isinstance(obj, (list,tuple)):
105 return uncanSequence(obj, g)
120 return uncanSequence(obj, g)
106 else:
121 else:
107 return obj
122 return obj
108
123
109 def uncanDict(obj, g=None):
124 def uncanDict(obj, g=None):
110 if isinstance(obj, dict):
125 if isinstance(obj, dict):
111 newobj = {}
126 newobj = {}
112 for k, v in obj.iteritems():
127 for k, v in obj.iteritems():
113 newobj[k] = uncan(v,g)
128 newobj[k] = uncan(v,g)
114 return newobj
129 return newobj
115 else:
130 else:
116 return obj
131 return obj
117
132
118 def uncanSequence(obj, g=None):
133 def uncanSequence(obj, g=None):
119 if isinstance(obj, (list, tuple)):
134 if isinstance(obj, (list, tuple)):
120 t = type(obj)
135 t = type(obj)
121 return t([uncan(i,g) for i in obj])
136 return t([uncan(i,g) for i in obj])
122 else:
137 else:
123 return obj
138 return obj
124
139
125
140
126 def rebindFunctionGlobals(f, glbls):
141 def rebindFunctionGlobals(f, glbls):
127 return FunctionType(f.func_code, glbls)
142 return FunctionType(f.func_code, glbls)
@@ -1,1498 +1,1499 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import json
14 import json
15 import time
15 import time
16 import warnings
16 import warnings
17 from datetime import datetime
17 from datetime import datetime
18 from getpass import getpass
18 from getpass import getpass
19 from pprint import pprint
19 from pprint import pprint
20
20
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 import zmq
23 import zmq
24 # from zmq.eventloop import ioloop, zmqstream
24 # from zmq.eventloop import ioloop, zmqstream
25
25
26 from IPython.utils.path import get_ipython_dir
26 from IPython.utils.path import get_ipython_dir
27 from IPython.utils.pickleutil import Reference
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode,
28 from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode,
28 Dict, List, Bool, Str, Set)
29 Dict, List, Bool, Str, Set)
29 from IPython.external.decorator import decorator
30 from IPython.external.decorator import decorator
30 from IPython.external.ssh import tunnel
31 from IPython.external.ssh import tunnel
31
32
32 from . import error
33 from . import error
33 from . import map as Map
34 from . import map as Map
34 from . import streamsession as ss
35 from . import streamsession as ss
35 from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult
36 from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult
36 from .clusterdir import ClusterDir, ClusterDirError
37 from .clusterdir import ClusterDir, ClusterDirError
37 from .dependency import Dependency, depend, require, dependent
38 from .dependency import Dependency, depend, require, dependent
38 from .remotefunction import remote,parallel,ParallelFunction,RemoteFunction
39 from .remotefunction import remote,parallel,ParallelFunction,RemoteFunction
39 from .util import ReverseDict, disambiguate_url, validate_url
40 from .util import ReverseDict, disambiguate_url, validate_url
40 from .view import DirectView, LoadBalancedView
41 from .view import DirectView, LoadBalancedView
41
42
42 #--------------------------------------------------------------------------
43 #--------------------------------------------------------------------------
43 # helpers for implementing old MEC API via client.apply
44 # helpers for implementing old MEC API via client.apply
44 #--------------------------------------------------------------------------
45 #--------------------------------------------------------------------------
45
46
46 def _push(ns):
47 def _push(ns):
47 """helper method for implementing `client.push` via `client.apply`"""
48 """helper method for implementing `client.push` via `client.apply`"""
48 globals().update(ns)
49 globals().update(ns)
49
50
50 def _pull(keys):
51 def _pull(keys):
51 """helper method for implementing `client.pull` via `client.apply`"""
52 """helper method for implementing `client.pull` via `client.apply`"""
52 g = globals()
53 g = globals()
53 if isinstance(keys, (list,tuple, set)):
54 if isinstance(keys, (list,tuple, set)):
54 for key in keys:
55 for key in keys:
55 if not g.has_key(key):
56 if not g.has_key(key):
56 raise NameError("name '%s' is not defined"%key)
57 raise NameError("name '%s' is not defined"%key)
57 return map(g.get, keys)
58 return map(g.get, keys)
58 else:
59 else:
59 if not g.has_key(keys):
60 if not g.has_key(keys):
60 raise NameError("name '%s' is not defined"%keys)
61 raise NameError("name '%s' is not defined"%keys)
61 return g.get(keys)
62 return g.get(keys)
62
63
63 def _clear():
64 def _clear():
64 """helper method for implementing `client.clear` via `client.apply`"""
65 """helper method for implementing `client.clear` via `client.apply`"""
65 globals().clear()
66 globals().clear()
66
67
67 def _execute(code):
68 def _execute(code):
68 """helper method for implementing `client.execute` via `client.apply`"""
69 """helper method for implementing `client.execute` via `client.apply`"""
69 exec code in globals()
70 exec code in globals()
70
71
71
72
72 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
73 # Decorators for Client methods
74 # Decorators for Client methods
74 #--------------------------------------------------------------------------
75 #--------------------------------------------------------------------------
75
76
76 @decorator
77 @decorator
77 def spinfirst(f, self, *args, **kwargs):
78 def spinfirst(f, self, *args, **kwargs):
78 """Call spin() to sync state prior to calling the method."""
79 """Call spin() to sync state prior to calling the method."""
79 self.spin()
80 self.spin()
80 return f(self, *args, **kwargs)
81 return f(self, *args, **kwargs)
81
82
82 @decorator
83 @decorator
83 def defaultblock(f, self, *args, **kwargs):
84 def defaultblock(f, self, *args, **kwargs):
84 """Default to self.block; preserve self.block."""
85 """Default to self.block; preserve self.block."""
85 block = kwargs.get('block',None)
86 block = kwargs.get('block',None)
86 block = self.block if block is None else block
87 block = self.block if block is None else block
87 saveblock = self.block
88 saveblock = self.block
88 self.block = block
89 self.block = block
89 try:
90 try:
90 ret = f(self, *args, **kwargs)
91 ret = f(self, *args, **kwargs)
91 finally:
92 finally:
92 self.block = saveblock
93 self.block = saveblock
93 return ret
94 return ret
94
95
95
96
96 #--------------------------------------------------------------------------
97 #--------------------------------------------------------------------------
97 # Classes
98 # Classes
98 #--------------------------------------------------------------------------
99 #--------------------------------------------------------------------------
99
100
100 class Metadata(dict):
101 class Metadata(dict):
101 """Subclass of dict for initializing metadata values.
102 """Subclass of dict for initializing metadata values.
102
103
103 Attribute access works on keys.
104 Attribute access works on keys.
104
105
105 These objects have a strict set of keys - errors will raise if you try
106 These objects have a strict set of keys - errors will raise if you try
106 to add new keys.
107 to add new keys.
107 """
108 """
108 def __init__(self, *args, **kwargs):
109 def __init__(self, *args, **kwargs):
109 dict.__init__(self)
110 dict.__init__(self)
110 md = {'msg_id' : None,
111 md = {'msg_id' : None,
111 'submitted' : None,
112 'submitted' : None,
112 'started' : None,
113 'started' : None,
113 'completed' : None,
114 'completed' : None,
114 'received' : None,
115 'received' : None,
115 'engine_uuid' : None,
116 'engine_uuid' : None,
116 'engine_id' : None,
117 'engine_id' : None,
117 'follow' : None,
118 'follow' : None,
118 'after' : None,
119 'after' : None,
119 'status' : None,
120 'status' : None,
120
121
121 'pyin' : None,
122 'pyin' : None,
122 'pyout' : None,
123 'pyout' : None,
123 'pyerr' : None,
124 'pyerr' : None,
124 'stdout' : '',
125 'stdout' : '',
125 'stderr' : '',
126 'stderr' : '',
126 }
127 }
127 self.update(md)
128 self.update(md)
128 self.update(dict(*args, **kwargs))
129 self.update(dict(*args, **kwargs))
129
130
130 def __getattr__(self, key):
131 def __getattr__(self, key):
131 """getattr aliased to getitem"""
132 """getattr aliased to getitem"""
132 if key in self.iterkeys():
133 if key in self.iterkeys():
133 return self[key]
134 return self[key]
134 else:
135 else:
135 raise AttributeError(key)
136 raise AttributeError(key)
136
137
137 def __setattr__(self, key, value):
138 def __setattr__(self, key, value):
138 """setattr aliased to setitem, with strict"""
139 """setattr aliased to setitem, with strict"""
139 if key in self.iterkeys():
140 if key in self.iterkeys():
140 self[key] = value
141 self[key] = value
141 else:
142 else:
142 raise AttributeError(key)
143 raise AttributeError(key)
143
144
144 def __setitem__(self, key, value):
145 def __setitem__(self, key, value):
145 """strict static key enforcement"""
146 """strict static key enforcement"""
146 if key in self.iterkeys():
147 if key in self.iterkeys():
147 dict.__setitem__(self, key, value)
148 dict.__setitem__(self, key, value)
148 else:
149 else:
149 raise KeyError(key)
150 raise KeyError(key)
150
151
151
152
152 class Client(HasTraits):
153 class Client(HasTraits):
153 """A semi-synchronous client to the IPython ZMQ controller
154 """A semi-synchronous client to the IPython ZMQ controller
154
155
155 Parameters
156 Parameters
156 ----------
157 ----------
157
158
158 url_or_file : bytes; zmq url or path to ipcontroller-client.json
159 url_or_file : bytes; zmq url or path to ipcontroller-client.json
159 Connection information for the Hub's registration. If a json connector
160 Connection information for the Hub's registration. If a json connector
160 file is given, then likely no further configuration is necessary.
161 file is given, then likely no further configuration is necessary.
161 [Default: use profile]
162 [Default: use profile]
162 profile : bytes
163 profile : bytes
163 The name of the Cluster profile to be used to find connector information.
164 The name of the Cluster profile to be used to find connector information.
164 [Default: 'default']
165 [Default: 'default']
165 context : zmq.Context
166 context : zmq.Context
166 Pass an existing zmq.Context instance, otherwise the client will create its own.
167 Pass an existing zmq.Context instance, otherwise the client will create its own.
167 username : bytes
168 username : bytes
168 set username to be passed to the Session object
169 set username to be passed to the Session object
169 debug : bool
170 debug : bool
170 flag for lots of message printing for debug purposes
171 flag for lots of message printing for debug purposes
171
172
172 #-------------- ssh related args ----------------
173 #-------------- ssh related args ----------------
173 # These are args for configuring the ssh tunnel to be used
174 # These are args for configuring the ssh tunnel to be used
174 # credentials are used to forward connections over ssh to the Controller
175 # credentials are used to forward connections over ssh to the Controller
175 # Note that the ip given in `addr` needs to be relative to sshserver
176 # Note that the ip given in `addr` needs to be relative to sshserver
176 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
177 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
177 # and set sshserver as the same machine the Controller is on. However,
178 # and set sshserver as the same machine the Controller is on. However,
178 # the only requirement is that sshserver is able to see the Controller
179 # the only requirement is that sshserver is able to see the Controller
179 # (i.e. is within the same trusted network).
180 # (i.e. is within the same trusted network).
180
181
181 sshserver : str
182 sshserver : str
182 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
183 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
183 If keyfile or password is specified, and this is not, it will default to
184 If keyfile or password is specified, and this is not, it will default to
184 the ip given in addr.
185 the ip given in addr.
185 sshkey : str; path to public ssh key file
186 sshkey : str; path to public ssh key file
186 This specifies a key to be used in ssh login, default None.
187 This specifies a key to be used in ssh login, default None.
187 Regular default ssh keys will be used without specifying this argument.
188 Regular default ssh keys will be used without specifying this argument.
188 password : str
189 password : str
189 Your ssh password to sshserver. Note that if this is left None,
190 Your ssh password to sshserver. Note that if this is left None,
190 you will be prompted for it if passwordless key based login is unavailable.
191 you will be prompted for it if passwordless key based login is unavailable.
191 paramiko : bool
192 paramiko : bool
192 flag for whether to use paramiko instead of shell ssh for tunneling.
193 flag for whether to use paramiko instead of shell ssh for tunneling.
193 [default: True on win32, False else]
194 [default: True on win32, False else]
194
195
195 #------- exec authentication args -------
196 #------- exec authentication args -------
196 # If even localhost is untrusted, you can have some protection against
197 # If even localhost is untrusted, you can have some protection against
197 # unauthorized execution by using a key. Messages are still sent
198 # unauthorized execution by using a key. Messages are still sent
198 # as cleartext, so if someone can snoop your loopback traffic this will
199 # as cleartext, so if someone can snoop your loopback traffic this will
199 # not help against malicious attacks.
200 # not help against malicious attacks.
200
201
201 exec_key : str
202 exec_key : str
202 an authentication key or file containing a key
203 an authentication key or file containing a key
203 default: None
204 default: None
204
205
205
206
206 Attributes
207 Attributes
207 ----------
208 ----------
208
209
209 ids : set of int engine IDs
210 ids : set of int engine IDs
210 requesting the ids attribute always synchronizes
211 requesting the ids attribute always synchronizes
211 the registration state. To request ids without synchronization,
212 the registration state. To request ids without synchronization,
212 use semi-private _ids attributes.
213 use semi-private _ids attributes.
213
214
214 history : list of msg_ids
215 history : list of msg_ids
215 a list of msg_ids, keeping track of all the execution
216 a list of msg_ids, keeping track of all the execution
216 messages you have submitted in order.
217 messages you have submitted in order.
217
218
218 outstanding : set of msg_ids
219 outstanding : set of msg_ids
219 a set of msg_ids that have been submitted, but whose
220 a set of msg_ids that have been submitted, but whose
220 results have not yet been received.
221 results have not yet been received.
221
222
222 results : dict
223 results : dict
223 a dict of all our results, keyed by msg_id
224 a dict of all our results, keyed by msg_id
224
225
225 block : bool
226 block : bool
226 determines default behavior when block not specified
227 determines default behavior when block not specified
227 in execution methods
228 in execution methods
228
229
229 Methods
230 Methods
230 -------
231 -------
231
232
232 spin
233 spin
233 flushes incoming results and registration state changes
234 flushes incoming results and registration state changes
234 control methods spin, and requesting `ids` also ensures up to date
235 control methods spin, and requesting `ids` also ensures up to date
235
236
236 barrier
237 barrier
237 wait on one or more msg_ids
238 wait on one or more msg_ids
238
239
239 execution methods
240 execution methods
240 apply
241 apply
241 legacy: execute, run
242 legacy: execute, run
242
243
243 query methods
244 query methods
244 queue_status, get_result, purge
245 queue_status, get_result, purge
245
246
246 control methods
247 control methods
247 abort, shutdown
248 abort, shutdown
248
249
249 """
250 """
250
251
251
252
252 block = Bool(False)
253 block = Bool(False)
253 outstanding=Set()
254 outstanding=Set()
254 results = Dict()
255 results = Dict()
255 metadata = Dict()
256 metadata = Dict()
256 history = List()
257 history = List()
257 debug = Bool(False)
258 debug = Bool(False)
258 profile=CUnicode('default')
259 profile=CUnicode('default')
259
260
260 _ids = List()
261 _ids = List()
261 _connected=Bool(False)
262 _connected=Bool(False)
262 _ssh=Bool(False)
263 _ssh=Bool(False)
263 _context = Instance('zmq.Context')
264 _context = Instance('zmq.Context')
264 _config = Dict()
265 _config = Dict()
265 _engines=Instance(ReverseDict, (), {})
266 _engines=Instance(ReverseDict, (), {})
266 _registration_socket=Instance('zmq.Socket')
267 _registration_socket=Instance('zmq.Socket')
267 _query_socket=Instance('zmq.Socket')
268 _query_socket=Instance('zmq.Socket')
268 _control_socket=Instance('zmq.Socket')
269 _control_socket=Instance('zmq.Socket')
269 _iopub_socket=Instance('zmq.Socket')
270 _iopub_socket=Instance('zmq.Socket')
270 _notification_socket=Instance('zmq.Socket')
271 _notification_socket=Instance('zmq.Socket')
271 _mux_socket=Instance('zmq.Socket')
272 _mux_socket=Instance('zmq.Socket')
272 _task_socket=Instance('zmq.Socket')
273 _task_socket=Instance('zmq.Socket')
273 _task_scheme=Str()
274 _task_scheme=Str()
274 _balanced_views=Dict()
275 _balanced_views=Dict()
275 _direct_views=Dict()
276 _direct_views=Dict()
276 _closed = False
277 _closed = False
277
278
278 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
279 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
279 context=None, username=None, debug=False, exec_key=None,
280 context=None, username=None, debug=False, exec_key=None,
280 sshserver=None, sshkey=None, password=None, paramiko=None,
281 sshserver=None, sshkey=None, password=None, paramiko=None,
281 ):
282 ):
282 super(Client, self).__init__(debug=debug, profile=profile)
283 super(Client, self).__init__(debug=debug, profile=profile)
283 if context is None:
284 if context is None:
284 context = zmq.Context()
285 context = zmq.Context()
285 self._context = context
286 self._context = context
286
287
287
288
288 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
289 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
289 if self._cd is not None:
290 if self._cd is not None:
290 if url_or_file is None:
291 if url_or_file is None:
291 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
292 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
292 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
293 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
293 " Please specify at least one of url_or_file or profile."
294 " Please specify at least one of url_or_file or profile."
294
295
295 try:
296 try:
296 validate_url(url_or_file)
297 validate_url(url_or_file)
297 except AssertionError:
298 except AssertionError:
298 if not os.path.exists(url_or_file):
299 if not os.path.exists(url_or_file):
299 if self._cd:
300 if self._cd:
300 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
301 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
301 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
302 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
302 with open(url_or_file) as f:
303 with open(url_or_file) as f:
303 cfg = json.loads(f.read())
304 cfg = json.loads(f.read())
304 else:
305 else:
305 cfg = {'url':url_or_file}
306 cfg = {'url':url_or_file}
306
307
307 # sync defaults from args, json:
308 # sync defaults from args, json:
308 if sshserver:
309 if sshserver:
309 cfg['ssh'] = sshserver
310 cfg['ssh'] = sshserver
310 if exec_key:
311 if exec_key:
311 cfg['exec_key'] = exec_key
312 cfg['exec_key'] = exec_key
312 exec_key = cfg['exec_key']
313 exec_key = cfg['exec_key']
313 sshserver=cfg['ssh']
314 sshserver=cfg['ssh']
314 url = cfg['url']
315 url = cfg['url']
315 location = cfg.setdefault('location', None)
316 location = cfg.setdefault('location', None)
316 cfg['url'] = disambiguate_url(cfg['url'], location)
317 cfg['url'] = disambiguate_url(cfg['url'], location)
317 url = cfg['url']
318 url = cfg['url']
318
319
319 self._config = cfg
320 self._config = cfg
320
321
321 self._ssh = bool(sshserver or sshkey or password)
322 self._ssh = bool(sshserver or sshkey or password)
322 if self._ssh and sshserver is None:
323 if self._ssh and sshserver is None:
323 # default to ssh via localhost
324 # default to ssh via localhost
324 sshserver = url.split('://')[1].split(':')[0]
325 sshserver = url.split('://')[1].split(':')[0]
325 if self._ssh and password is None:
326 if self._ssh and password is None:
326 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
327 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
327 password=False
328 password=False
328 else:
329 else:
329 password = getpass("SSH Password for %s: "%sshserver)
330 password = getpass("SSH Password for %s: "%sshserver)
330 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
331 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
331 if exec_key is not None and os.path.isfile(exec_key):
332 if exec_key is not None and os.path.isfile(exec_key):
332 arg = 'keyfile'
333 arg = 'keyfile'
333 else:
334 else:
334 arg = 'key'
335 arg = 'key'
335 key_arg = {arg:exec_key}
336 key_arg = {arg:exec_key}
336 if username is None:
337 if username is None:
337 self.session = ss.StreamSession(**key_arg)
338 self.session = ss.StreamSession(**key_arg)
338 else:
339 else:
339 self.session = ss.StreamSession(username, **key_arg)
340 self.session = ss.StreamSession(username, **key_arg)
340 self._registration_socket = self._context.socket(zmq.XREQ)
341 self._registration_socket = self._context.socket(zmq.XREQ)
341 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
342 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
342 if self._ssh:
343 if self._ssh:
343 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
344 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
344 else:
345 else:
345 self._registration_socket.connect(url)
346 self._registration_socket.connect(url)
346
347
347 self.session.debug = self.debug
348 self.session.debug = self.debug
348
349
349 self._notification_handlers = {'registration_notification' : self._register_engine,
350 self._notification_handlers = {'registration_notification' : self._register_engine,
350 'unregistration_notification' : self._unregister_engine,
351 'unregistration_notification' : self._unregister_engine,
351 }
352 }
352 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
353 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
353 'apply_reply' : self._handle_apply_reply}
354 'apply_reply' : self._handle_apply_reply}
354 self._connect(sshserver, ssh_kwargs)
355 self._connect(sshserver, ssh_kwargs)
355
356
356
357
357 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
358 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
358 if ipython_dir is None:
359 if ipython_dir is None:
359 ipython_dir = get_ipython_dir()
360 ipython_dir = get_ipython_dir()
360 if cluster_dir is not None:
361 if cluster_dir is not None:
361 try:
362 try:
362 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
363 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
363 return
364 return
364 except ClusterDirError:
365 except ClusterDirError:
365 pass
366 pass
366 elif profile is not None:
367 elif profile is not None:
367 try:
368 try:
368 self._cd = ClusterDir.find_cluster_dir_by_profile(
369 self._cd = ClusterDir.find_cluster_dir_by_profile(
369 ipython_dir, profile)
370 ipython_dir, profile)
370 return
371 return
371 except ClusterDirError:
372 except ClusterDirError:
372 pass
373 pass
373 self._cd = None
374 self._cd = None
374
375
375 @property
376 @property
376 def ids(self):
377 def ids(self):
377 """Always up-to-date ids property."""
378 """Always up-to-date ids property."""
378 self._flush_notifications()
379 self._flush_notifications()
379 return self._ids
380 return self._ids
380
381
381 def close(self):
382 def close(self):
382 if self._closed:
383 if self._closed:
383 return
384 return
384 snames = filter(lambda n: n.endswith('socket'), dir(self))
385 snames = filter(lambda n: n.endswith('socket'), dir(self))
385 for socket in map(lambda name: getattr(self, name), snames):
386 for socket in map(lambda name: getattr(self, name), snames):
386 socket.close()
387 socket.close()
387 self._closed = True
388 self._closed = True
388
389
389 def _update_engines(self, engines):
390 def _update_engines(self, engines):
390 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
391 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
391 for k,v in engines.iteritems():
392 for k,v in engines.iteritems():
392 eid = int(k)
393 eid = int(k)
393 self._engines[eid] = bytes(v) # force not unicode
394 self._engines[eid] = bytes(v) # force not unicode
394 self._ids.append(eid)
395 self._ids.append(eid)
395 self._ids = sorted(self._ids)
396 self._ids = sorted(self._ids)
396 if sorted(self._engines.keys()) != range(len(self._engines)) and \
397 if sorted(self._engines.keys()) != range(len(self._engines)) and \
397 self._task_scheme == 'pure' and self._task_socket:
398 self._task_scheme == 'pure' and self._task_socket:
398 self._stop_scheduling_tasks()
399 self._stop_scheduling_tasks()
399
400
400 def _stop_scheduling_tasks(self):
401 def _stop_scheduling_tasks(self):
401 """Stop scheduling tasks because an engine has been unregistered
402 """Stop scheduling tasks because an engine has been unregistered
402 from a pure ZMQ scheduler.
403 from a pure ZMQ scheduler.
403 """
404 """
404
405
405 self._task_socket.close()
406 self._task_socket.close()
406 self._task_socket = None
407 self._task_socket = None
407 msg = "An engine has been unregistered, and we are using pure " +\
408 msg = "An engine has been unregistered, and we are using pure " +\
408 "ZMQ task scheduling. Task farming will be disabled."
409 "ZMQ task scheduling. Task farming will be disabled."
409 if self.outstanding:
410 if self.outstanding:
410 msg += " If you were running tasks when this happened, " +\
411 msg += " If you were running tasks when this happened, " +\
411 "some `outstanding` msg_ids may never resolve."
412 "some `outstanding` msg_ids may never resolve."
412 warnings.warn(msg, RuntimeWarning)
413 warnings.warn(msg, RuntimeWarning)
413
414
414 def _build_targets(self, targets):
415 def _build_targets(self, targets):
415 """Turn valid target IDs or 'all' into two lists:
416 """Turn valid target IDs or 'all' into two lists:
416 (int_ids, uuids).
417 (int_ids, uuids).
417 """
418 """
418 if targets is None:
419 if targets is None:
419 targets = self._ids
420 targets = self._ids
420 elif isinstance(targets, str):
421 elif isinstance(targets, str):
421 if targets.lower() == 'all':
422 if targets.lower() == 'all':
422 targets = self._ids
423 targets = self._ids
423 else:
424 else:
424 raise TypeError("%r not valid str target, must be 'all'"%(targets))
425 raise TypeError("%r not valid str target, must be 'all'"%(targets))
425 elif isinstance(targets, int):
426 elif isinstance(targets, int):
426 targets = [targets]
427 targets = [targets]
427 return [self._engines[t] for t in targets], list(targets)
428 return [self._engines[t] for t in targets], list(targets)
428
429
429 def _connect(self, sshserver, ssh_kwargs):
430 def _connect(self, sshserver, ssh_kwargs):
430 """setup all our socket connections to the controller. This is called from
431 """setup all our socket connections to the controller. This is called from
431 __init__."""
432 __init__."""
432
433
433 # Maybe allow reconnecting?
434 # Maybe allow reconnecting?
434 if self._connected:
435 if self._connected:
435 return
436 return
436 self._connected=True
437 self._connected=True
437
438
438 def connect_socket(s, url):
439 def connect_socket(s, url):
439 url = disambiguate_url(url, self._config['location'])
440 url = disambiguate_url(url, self._config['location'])
440 if self._ssh:
441 if self._ssh:
441 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
442 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
442 else:
443 else:
443 return s.connect(url)
444 return s.connect(url)
444
445
445 self.session.send(self._registration_socket, 'connection_request')
446 self.session.send(self._registration_socket, 'connection_request')
446 idents,msg = self.session.recv(self._registration_socket,mode=0)
447 idents,msg = self.session.recv(self._registration_socket,mode=0)
447 if self.debug:
448 if self.debug:
448 pprint(msg)
449 pprint(msg)
449 msg = ss.Message(msg)
450 msg = ss.Message(msg)
450 content = msg.content
451 content = msg.content
451 self._config['registration'] = dict(content)
452 self._config['registration'] = dict(content)
452 if content.status == 'ok':
453 if content.status == 'ok':
453 if content.mux:
454 if content.mux:
454 self._mux_socket = self._context.socket(zmq.PAIR)
455 self._mux_socket = self._context.socket(zmq.PAIR)
455 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
456 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
456 connect_socket(self._mux_socket, content.mux)
457 connect_socket(self._mux_socket, content.mux)
457 if content.task:
458 if content.task:
458 self._task_scheme, task_addr = content.task
459 self._task_scheme, task_addr = content.task
459 self._task_socket = self._context.socket(zmq.PAIR)
460 self._task_socket = self._context.socket(zmq.PAIR)
460 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
461 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
461 connect_socket(self._task_socket, task_addr)
462 connect_socket(self._task_socket, task_addr)
462 if content.notification:
463 if content.notification:
463 self._notification_socket = self._context.socket(zmq.SUB)
464 self._notification_socket = self._context.socket(zmq.SUB)
464 connect_socket(self._notification_socket, content.notification)
465 connect_socket(self._notification_socket, content.notification)
465 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
466 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
466 if content.query:
467 if content.query:
467 self._query_socket = self._context.socket(zmq.PAIR)
468 self._query_socket = self._context.socket(zmq.PAIR)
468 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
469 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
469 connect_socket(self._query_socket, content.query)
470 connect_socket(self._query_socket, content.query)
470 if content.control:
471 if content.control:
471 self._control_socket = self._context.socket(zmq.PAIR)
472 self._control_socket = self._context.socket(zmq.PAIR)
472 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
473 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
473 connect_socket(self._control_socket, content.control)
474 connect_socket(self._control_socket, content.control)
474 if content.iopub:
475 if content.iopub:
475 self._iopub_socket = self._context.socket(zmq.SUB)
476 self._iopub_socket = self._context.socket(zmq.SUB)
476 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
477 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
477 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
478 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
478 connect_socket(self._iopub_socket, content.iopub)
479 connect_socket(self._iopub_socket, content.iopub)
479 self._update_engines(dict(content.engines))
480 self._update_engines(dict(content.engines))
480
481
481 else:
482 else:
482 self._connected = False
483 self._connected = False
483 raise Exception("Failed to connect!")
484 raise Exception("Failed to connect!")
484
485
485 #--------------------------------------------------------------------------
486 #--------------------------------------------------------------------------
486 # handlers and callbacks for incoming messages
487 # handlers and callbacks for incoming messages
487 #--------------------------------------------------------------------------
488 #--------------------------------------------------------------------------
488
489
489 def _unwrap_exception(self, content):
490 def _unwrap_exception(self, content):
490 """unwrap exception, and remap engineid to int."""
491 """unwrap exception, and remap engineid to int."""
491 e = ss.unwrap_exception(content)
492 e = ss.unwrap_exception(content)
492 if e.engine_info:
493 if e.engine_info:
493 e_uuid = e.engine_info['engine_uuid']
494 e_uuid = e.engine_info['engine_uuid']
494 eid = self._engines[e_uuid]
495 eid = self._engines[e_uuid]
495 e.engine_info['engine_id'] = eid
496 e.engine_info['engine_id'] = eid
496 return e
497 return e
497
498
498 def _register_engine(self, msg):
499 def _register_engine(self, msg):
499 """Register a new engine, and update our connection info."""
500 """Register a new engine, and update our connection info."""
500 content = msg['content']
501 content = msg['content']
501 eid = content['id']
502 eid = content['id']
502 d = {eid : content['queue']}
503 d = {eid : content['queue']}
503 self._update_engines(d)
504 self._update_engines(d)
504
505
505 def _unregister_engine(self, msg):
506 def _unregister_engine(self, msg):
506 """Unregister an engine that has died."""
507 """Unregister an engine that has died."""
507 content = msg['content']
508 content = msg['content']
508 eid = int(content['id'])
509 eid = int(content['id'])
509 if eid in self._ids:
510 if eid in self._ids:
510 self._ids.remove(eid)
511 self._ids.remove(eid)
511 self._engines.pop(eid)
512 self._engines.pop(eid)
512 if self._task_socket and self._task_scheme == 'pure':
513 if self._task_socket and self._task_scheme == 'pure':
513 self._stop_scheduling_tasks()
514 self._stop_scheduling_tasks()
514
515
515 def _extract_metadata(self, header, parent, content):
516 def _extract_metadata(self, header, parent, content):
516 md = {'msg_id' : parent['msg_id'],
517 md = {'msg_id' : parent['msg_id'],
517 'received' : datetime.now(),
518 'received' : datetime.now(),
518 'engine_uuid' : header.get('engine', None),
519 'engine_uuid' : header.get('engine', None),
519 'follow' : parent.get('follow', []),
520 'follow' : parent.get('follow', []),
520 'after' : parent.get('after', []),
521 'after' : parent.get('after', []),
521 'status' : content['status'],
522 'status' : content['status'],
522 }
523 }
523
524
524 if md['engine_uuid'] is not None:
525 if md['engine_uuid'] is not None:
525 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
526 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
526
527
527 if 'date' in parent:
528 if 'date' in parent:
528 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
529 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
529 if 'started' in header:
530 if 'started' in header:
530 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
531 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
531 if 'date' in header:
532 if 'date' in header:
532 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
533 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
533 return md
534 return md
534
535
535 def _handle_execute_reply(self, msg):
536 def _handle_execute_reply(self, msg):
536 """Save the reply to an execute_request into our results.
537 """Save the reply to an execute_request into our results.
537
538
538 execute messages are never actually used. apply is used instead.
539 execute messages are never actually used. apply is used instead.
539 """
540 """
540
541
541 parent = msg['parent_header']
542 parent = msg['parent_header']
542 msg_id = parent['msg_id']
543 msg_id = parent['msg_id']
543 if msg_id not in self.outstanding:
544 if msg_id not in self.outstanding:
544 if msg_id in self.history:
545 if msg_id in self.history:
545 print ("got stale result: %s"%msg_id)
546 print ("got stale result: %s"%msg_id)
546 else:
547 else:
547 print ("got unknown result: %s"%msg_id)
548 print ("got unknown result: %s"%msg_id)
548 else:
549 else:
549 self.outstanding.remove(msg_id)
550 self.outstanding.remove(msg_id)
550 self.results[msg_id] = self._unwrap_exception(msg['content'])
551 self.results[msg_id] = self._unwrap_exception(msg['content'])
551
552
552 def _handle_apply_reply(self, msg):
553 def _handle_apply_reply(self, msg):
553 """Save the reply to an apply_request into our results."""
554 """Save the reply to an apply_request into our results."""
554 parent = msg['parent_header']
555 parent = msg['parent_header']
555 msg_id = parent['msg_id']
556 msg_id = parent['msg_id']
556 if msg_id not in self.outstanding:
557 if msg_id not in self.outstanding:
557 if msg_id in self.history:
558 if msg_id in self.history:
558 print ("got stale result: %s"%msg_id)
559 print ("got stale result: %s"%msg_id)
559 print self.results[msg_id]
560 print self.results[msg_id]
560 print msg
561 print msg
561 else:
562 else:
562 print ("got unknown result: %s"%msg_id)
563 print ("got unknown result: %s"%msg_id)
563 else:
564 else:
564 self.outstanding.remove(msg_id)
565 self.outstanding.remove(msg_id)
565 content = msg['content']
566 content = msg['content']
566 header = msg['header']
567 header = msg['header']
567
568
568 # construct metadata:
569 # construct metadata:
569 md = self.metadata.setdefault(msg_id, Metadata())
570 md = self.metadata.setdefault(msg_id, Metadata())
570 md.update(self._extract_metadata(header, parent, content))
571 md.update(self._extract_metadata(header, parent, content))
571 self.metadata[msg_id] = md
572 self.metadata[msg_id] = md
572
573
573 # construct result:
574 # construct result:
574 if content['status'] == 'ok':
575 if content['status'] == 'ok':
575 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
576 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
576 elif content['status'] == 'aborted':
577 elif content['status'] == 'aborted':
577 self.results[msg_id] = error.AbortedTask(msg_id)
578 self.results[msg_id] = error.AbortedTask(msg_id)
578 elif content['status'] == 'resubmitted':
579 elif content['status'] == 'resubmitted':
579 # TODO: handle resubmission
580 # TODO: handle resubmission
580 pass
581 pass
581 else:
582 else:
582 self.results[msg_id] = self._unwrap_exception(content)
583 self.results[msg_id] = self._unwrap_exception(content)
583
584
584 def _flush_notifications(self):
585 def _flush_notifications(self):
585 """Flush notifications of engine registrations waiting
586 """Flush notifications of engine registrations waiting
586 in ZMQ queue."""
587 in ZMQ queue."""
587 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
588 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
588 while msg is not None:
589 while msg is not None:
589 if self.debug:
590 if self.debug:
590 pprint(msg)
591 pprint(msg)
591 msg = msg[-1]
592 msg = msg[-1]
592 msg_type = msg['msg_type']
593 msg_type = msg['msg_type']
593 handler = self._notification_handlers.get(msg_type, None)
594 handler = self._notification_handlers.get(msg_type, None)
594 if handler is None:
595 if handler is None:
595 raise Exception("Unhandled message type: %s"%msg.msg_type)
596 raise Exception("Unhandled message type: %s"%msg.msg_type)
596 else:
597 else:
597 handler(msg)
598 handler(msg)
598 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
599 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
599
600
600 def _flush_results(self, sock):
601 def _flush_results(self, sock):
601 """Flush task or queue results waiting in ZMQ queue."""
602 """Flush task or queue results waiting in ZMQ queue."""
602 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
603 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
603 while msg is not None:
604 while msg is not None:
604 if self.debug:
605 if self.debug:
605 pprint(msg)
606 pprint(msg)
606 msg = msg[-1]
607 msg = msg[-1]
607 msg_type = msg['msg_type']
608 msg_type = msg['msg_type']
608 handler = self._queue_handlers.get(msg_type, None)
609 handler = self._queue_handlers.get(msg_type, None)
609 if handler is None:
610 if handler is None:
610 raise Exception("Unhandled message type: %s"%msg.msg_type)
611 raise Exception("Unhandled message type: %s"%msg.msg_type)
611 else:
612 else:
612 handler(msg)
613 handler(msg)
613 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
614 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
614
615
615 def _flush_control(self, sock):
616 def _flush_control(self, sock):
616 """Flush replies from the control channel waiting
617 """Flush replies from the control channel waiting
617 in the ZMQ queue.
618 in the ZMQ queue.
618
619
619 Currently: ignore them."""
620 Currently: ignore them."""
620 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
621 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
621 while msg is not None:
622 while msg is not None:
622 if self.debug:
623 if self.debug:
623 pprint(msg)
624 pprint(msg)
624 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
625 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
625
626
626 def _flush_iopub(self, sock):
627 def _flush_iopub(self, sock):
627 """Flush replies from the iopub channel waiting
628 """Flush replies from the iopub channel waiting
628 in the ZMQ queue.
629 in the ZMQ queue.
629 """
630 """
630 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
631 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
631 while msg is not None:
632 while msg is not None:
632 if self.debug:
633 if self.debug:
633 pprint(msg)
634 pprint(msg)
634 msg = msg[-1]
635 msg = msg[-1]
635 parent = msg['parent_header']
636 parent = msg['parent_header']
636 msg_id = parent['msg_id']
637 msg_id = parent['msg_id']
637 content = msg['content']
638 content = msg['content']
638 header = msg['header']
639 header = msg['header']
639 msg_type = msg['msg_type']
640 msg_type = msg['msg_type']
640
641
641 # init metadata:
642 # init metadata:
642 md = self.metadata.setdefault(msg_id, Metadata())
643 md = self.metadata.setdefault(msg_id, Metadata())
643
644
644 if msg_type == 'stream':
645 if msg_type == 'stream':
645 name = content['name']
646 name = content['name']
646 s = md[name] or ''
647 s = md[name] or ''
647 md[name] = s + content['data']
648 md[name] = s + content['data']
648 elif msg_type == 'pyerr':
649 elif msg_type == 'pyerr':
649 md.update({'pyerr' : self._unwrap_exception(content)})
650 md.update({'pyerr' : self._unwrap_exception(content)})
650 else:
651 else:
651 md.update({msg_type : content['data']})
652 md.update({msg_type : content['data']})
652
653
653 self.metadata[msg_id] = md
654 self.metadata[msg_id] = md
654
655
655 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
656 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
656
657
657 #--------------------------------------------------------------------------
658 #--------------------------------------------------------------------------
658 # len, getitem
659 # len, getitem
659 #--------------------------------------------------------------------------
660 #--------------------------------------------------------------------------
660
661
661 def __len__(self):
662 def __len__(self):
662 """len(client) returns # of engines."""
663 """len(client) returns # of engines."""
663 return len(self.ids)
664 return len(self.ids)
664
665
665 def __getitem__(self, key):
666 def __getitem__(self, key):
666 """index access returns DirectView multiplexer objects
667 """index access returns DirectView multiplexer objects
667
668
668 Must be int, slice, or list/tuple/xrange of ints"""
669 Must be int, slice, or list/tuple/xrange of ints"""
669 if not isinstance(key, (int, slice, tuple, list, xrange)):
670 if not isinstance(key, (int, slice, tuple, list, xrange)):
670 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
671 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
671 else:
672 else:
672 return self.view(key, balanced=False)
673 return self.view(key, balanced=False)
673
674
674 #--------------------------------------------------------------------------
675 #--------------------------------------------------------------------------
675 # Begin public methods
676 # Begin public methods
676 #--------------------------------------------------------------------------
677 #--------------------------------------------------------------------------
677
678
678 def spin(self):
679 def spin(self):
679 """Flush any registration notifications and execution results
680 """Flush any registration notifications and execution results
680 waiting in the ZMQ queue.
681 waiting in the ZMQ queue.
681 """
682 """
682 if self._notification_socket:
683 if self._notification_socket:
683 self._flush_notifications()
684 self._flush_notifications()
684 if self._mux_socket:
685 if self._mux_socket:
685 self._flush_results(self._mux_socket)
686 self._flush_results(self._mux_socket)
686 if self._task_socket:
687 if self._task_socket:
687 self._flush_results(self._task_socket)
688 self._flush_results(self._task_socket)
688 if self._control_socket:
689 if self._control_socket:
689 self._flush_control(self._control_socket)
690 self._flush_control(self._control_socket)
690 if self._iopub_socket:
691 if self._iopub_socket:
691 self._flush_iopub(self._iopub_socket)
692 self._flush_iopub(self._iopub_socket)
692
693
693 def barrier(self, jobs=None, timeout=-1):
694 def barrier(self, jobs=None, timeout=-1):
694 """waits on one or more `jobs`, for up to `timeout` seconds.
695 """waits on one or more `jobs`, for up to `timeout` seconds.
695
696
696 Parameters
697 Parameters
697 ----------
698 ----------
698
699
699 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
700 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
700 ints are indices to self.history
701 ints are indices to self.history
701 strs are msg_ids
702 strs are msg_ids
702 default: wait on all outstanding messages
703 default: wait on all outstanding messages
703 timeout : float
704 timeout : float
704 a time in seconds, after which to give up.
705 a time in seconds, after which to give up.
705 default is -1, which means no timeout
706 default is -1, which means no timeout
706
707
707 Returns
708 Returns
708 -------
709 -------
709
710
710 True : when all msg_ids are done
711 True : when all msg_ids are done
711 False : timeout reached, some msg_ids still outstanding
712 False : timeout reached, some msg_ids still outstanding
712 """
713 """
713 tic = time.time()
714 tic = time.time()
714 if jobs is None:
715 if jobs is None:
715 theids = self.outstanding
716 theids = self.outstanding
716 else:
717 else:
717 if isinstance(jobs, (int, str, AsyncResult)):
718 if isinstance(jobs, (int, str, AsyncResult)):
718 jobs = [jobs]
719 jobs = [jobs]
719 theids = set()
720 theids = set()
720 for job in jobs:
721 for job in jobs:
721 if isinstance(job, int):
722 if isinstance(job, int):
722 # index access
723 # index access
723 job = self.history[job]
724 job = self.history[job]
724 elif isinstance(job, AsyncResult):
725 elif isinstance(job, AsyncResult):
725 map(theids.add, job.msg_ids)
726 map(theids.add, job.msg_ids)
726 continue
727 continue
727 theids.add(job)
728 theids.add(job)
728 if not theids.intersection(self.outstanding):
729 if not theids.intersection(self.outstanding):
729 return True
730 return True
730 self.spin()
731 self.spin()
731 while theids.intersection(self.outstanding):
732 while theids.intersection(self.outstanding):
732 if timeout >= 0 and ( time.time()-tic ) > timeout:
733 if timeout >= 0 and ( time.time()-tic ) > timeout:
733 break
734 break
734 time.sleep(1e-3)
735 time.sleep(1e-3)
735 self.spin()
736 self.spin()
736 return len(theids.intersection(self.outstanding)) == 0
737 return len(theids.intersection(self.outstanding)) == 0
737
738
738 #--------------------------------------------------------------------------
739 #--------------------------------------------------------------------------
739 # Control methods
740 # Control methods
740 #--------------------------------------------------------------------------
741 #--------------------------------------------------------------------------
741
742
742 @spinfirst
743 @spinfirst
743 @defaultblock
744 @defaultblock
744 def clear(self, targets=None, block=None):
745 def clear(self, targets=None, block=None):
745 """Clear the namespace in target(s)."""
746 """Clear the namespace in target(s)."""
746 targets = self._build_targets(targets)[0]
747 targets = self._build_targets(targets)[0]
747 for t in targets:
748 for t in targets:
748 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
749 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
749 error = False
750 error = False
750 if self.block:
751 if self.block:
751 for i in range(len(targets)):
752 for i in range(len(targets)):
752 idents,msg = self.session.recv(self._control_socket,0)
753 idents,msg = self.session.recv(self._control_socket,0)
753 if self.debug:
754 if self.debug:
754 pprint(msg)
755 pprint(msg)
755 if msg['content']['status'] != 'ok':
756 if msg['content']['status'] != 'ok':
756 error = self._unwrap_exception(msg['content'])
757 error = self._unwrap_exception(msg['content'])
757 if error:
758 if error:
758 return error
759 return error
759
760
760
761
761 @spinfirst
762 @spinfirst
762 @defaultblock
763 @defaultblock
763 def abort(self, jobs=None, targets=None, block=None):
764 def abort(self, jobs=None, targets=None, block=None):
764 """Abort specific jobs from the execution queues of target(s).
765 """Abort specific jobs from the execution queues of target(s).
765
766
766 This is a mechanism to prevent jobs that have already been submitted
767 This is a mechanism to prevent jobs that have already been submitted
767 from executing.
768 from executing.
768
769
769 Parameters
770 Parameters
770 ----------
771 ----------
771
772
772 jobs : msg_id, list of msg_ids, or AsyncResult
773 jobs : msg_id, list of msg_ids, or AsyncResult
773 The jobs to be aborted
774 The jobs to be aborted
774
775
775
776
776 """
777 """
777 targets = self._build_targets(targets)[0]
778 targets = self._build_targets(targets)[0]
778 msg_ids = []
779 msg_ids = []
779 if isinstance(jobs, (basestring,AsyncResult)):
780 if isinstance(jobs, (basestring,AsyncResult)):
780 jobs = [jobs]
781 jobs = [jobs]
781 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
782 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
782 if bad_ids:
783 if bad_ids:
783 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
784 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
784 for j in jobs:
785 for j in jobs:
785 if isinstance(j, AsyncResult):
786 if isinstance(j, AsyncResult):
786 msg_ids.extend(j.msg_ids)
787 msg_ids.extend(j.msg_ids)
787 else:
788 else:
788 msg_ids.append(j)
789 msg_ids.append(j)
789 content = dict(msg_ids=msg_ids)
790 content = dict(msg_ids=msg_ids)
790 for t in targets:
791 for t in targets:
791 self.session.send(self._control_socket, 'abort_request',
792 self.session.send(self._control_socket, 'abort_request',
792 content=content, ident=t)
793 content=content, ident=t)
793 error = False
794 error = False
794 if self.block:
795 if self.block:
795 for i in range(len(targets)):
796 for i in range(len(targets)):
796 idents,msg = self.session.recv(self._control_socket,0)
797 idents,msg = self.session.recv(self._control_socket,0)
797 if self.debug:
798 if self.debug:
798 pprint(msg)
799 pprint(msg)
799 if msg['content']['status'] != 'ok':
800 if msg['content']['status'] != 'ok':
800 error = self._unwrap_exception(msg['content'])
801 error = self._unwrap_exception(msg['content'])
801 if error:
802 if error:
802 return error
803 return error
803
804
804 @spinfirst
805 @spinfirst
805 @defaultblock
806 @defaultblock
806 def shutdown(self, targets=None, restart=False, controller=False, block=None):
807 def shutdown(self, targets=None, restart=False, controller=False, block=None):
807 """Terminates one or more engine processes, optionally including the controller."""
808 """Terminates one or more engine processes, optionally including the controller."""
808 if controller:
809 if controller:
809 targets = 'all'
810 targets = 'all'
810 targets = self._build_targets(targets)[0]
811 targets = self._build_targets(targets)[0]
811 for t in targets:
812 for t in targets:
812 self.session.send(self._control_socket, 'shutdown_request',
813 self.session.send(self._control_socket, 'shutdown_request',
813 content={'restart':restart},ident=t)
814 content={'restart':restart},ident=t)
814 error = False
815 error = False
815 if block or controller:
816 if block or controller:
816 for i in range(len(targets)):
817 for i in range(len(targets)):
817 idents,msg = self.session.recv(self._control_socket,0)
818 idents,msg = self.session.recv(self._control_socket,0)
818 if self.debug:
819 if self.debug:
819 pprint(msg)
820 pprint(msg)
820 if msg['content']['status'] != 'ok':
821 if msg['content']['status'] != 'ok':
821 error = self._unwrap_exception(msg['content'])
822 error = self._unwrap_exception(msg['content'])
822
823
823 if controller:
824 if controller:
824 time.sleep(0.25)
825 time.sleep(0.25)
825 self.session.send(self._query_socket, 'shutdown_request')
826 self.session.send(self._query_socket, 'shutdown_request')
826 idents,msg = self.session.recv(self._query_socket, 0)
827 idents,msg = self.session.recv(self._query_socket, 0)
827 if self.debug:
828 if self.debug:
828 pprint(msg)
829 pprint(msg)
829 if msg['content']['status'] != 'ok':
830 if msg['content']['status'] != 'ok':
830 error = self._unwrap_exception(msg['content'])
831 error = self._unwrap_exception(msg['content'])
831
832
832 if error:
833 if error:
833 raise error
834 raise error
834
835
835 #--------------------------------------------------------------------------
836 #--------------------------------------------------------------------------
836 # Execution methods
837 # Execution methods
837 #--------------------------------------------------------------------------
838 #--------------------------------------------------------------------------
838
839
839 @defaultblock
840 @defaultblock
840 def execute(self, code, targets='all', block=None):
841 def execute(self, code, targets='all', block=None):
841 """Executes `code` on `targets` in blocking or nonblocking manner.
842 """Executes `code` on `targets` in blocking or nonblocking manner.
842
843
843 ``execute`` is always `bound` (affects engine namespace)
844 ``execute`` is always `bound` (affects engine namespace)
844
845
845 Parameters
846 Parameters
846 ----------
847 ----------
847
848
848 code : str
849 code : str
849 the code string to be executed
850 the code string to be executed
850 targets : int/str/list of ints/strs
851 targets : int/str/list of ints/strs
851 the engines on which to execute
852 the engines on which to execute
852 default : all
853 default : all
853 block : bool
854 block : bool
854 whether or not to wait until done to return
855 whether or not to wait until done to return
855 default: self.block
856 default: self.block
856 """
857 """
857 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
858 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
858 if not block:
859 if not block:
859 return result
860 return result
860
861
861 def run(self, filename, targets='all', block=None):
862 def run(self, filename, targets='all', block=None):
862 """Execute contents of `filename` on engine(s).
863 """Execute contents of `filename` on engine(s).
863
864
864 This simply reads the contents of the file and calls `execute`.
865 This simply reads the contents of the file and calls `execute`.
865
866
866 Parameters
867 Parameters
867 ----------
868 ----------
868
869
869 filename : str
870 filename : str
870 The path to the file
871 The path to the file
871 targets : int/str/list of ints/strs
872 targets : int/str/list of ints/strs
872 the engines on which to execute
873 the engines on which to execute
873 default : all
874 default : all
874 block : bool
875 block : bool
875 whether or not to wait until done
876 whether or not to wait until done
876 default: self.block
877 default: self.block
877
878
878 """
879 """
879 with open(filename, 'rb') as f:
880 with open(filename, 'rb') as f:
880 code = f.read()
881 code = f.read()
881 return self.execute(code, targets=targets, block=block)
882 return self.execute(code, targets=targets, block=block)
882
883
883 def _maybe_raise(self, result):
884 def _maybe_raise(self, result):
884 """wrapper for maybe raising an exception if apply failed."""
885 """wrapper for maybe raising an exception if apply failed."""
885 if isinstance(result, error.RemoteError):
886 if isinstance(result, error.RemoteError):
886 raise result
887 raise result
887
888
888 return result
889 return result
889
890
890 def _build_dependency(self, dep):
891 def _build_dependency(self, dep):
891 """helper for building jsonable dependencies from various input forms"""
892 """helper for building jsonable dependencies from various input forms"""
892 if isinstance(dep, Dependency):
893 if isinstance(dep, Dependency):
893 return dep.as_dict()
894 return dep.as_dict()
894 elif isinstance(dep, AsyncResult):
895 elif isinstance(dep, AsyncResult):
895 return dep.msg_ids
896 return dep.msg_ids
896 elif dep is None:
897 elif dep is None:
897 return []
898 return []
898 else:
899 else:
899 # pass to Dependency constructor
900 # pass to Dependency constructor
900 return list(Dependency(dep))
901 return list(Dependency(dep))
901
902
902 @defaultblock
903 @defaultblock
903 def apply(self, f, args=None, kwargs=None, bound=True, block=None,
904 def apply(self, f, args=None, kwargs=None, bound=True, block=None,
904 targets=None, balanced=None,
905 targets=None, balanced=None,
905 after=None, follow=None, timeout=None):
906 after=None, follow=None, timeout=None):
906 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
907 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
907
908
908 This is the central execution command for the client.
909 This is the central execution command for the client.
909
910
910 Parameters
911 Parameters
911 ----------
912 ----------
912
913
913 f : function
914 f : function
914 The fuction to be called remotely
915 The fuction to be called remotely
915 args : tuple/list
916 args : tuple/list
916 The positional arguments passed to `f`
917 The positional arguments passed to `f`
917 kwargs : dict
918 kwargs : dict
918 The keyword arguments passed to `f`
919 The keyword arguments passed to `f`
919 bound : bool (default: True)
920 bound : bool (default: True)
920 Whether to execute in the Engine(s) namespace, or in a clean
921 Whether to execute in the Engine(s) namespace, or in a clean
921 namespace not affecting the engine.
922 namespace not affecting the engine.
922 block : bool (default: self.block)
923 block : bool (default: self.block)
923 Whether to wait for the result, or return immediately.
924 Whether to wait for the result, or return immediately.
924 False:
925 False:
925 returns AsyncResult
926 returns AsyncResult
926 True:
927 True:
927 returns actual result(s) of f(*args, **kwargs)
928 returns actual result(s) of f(*args, **kwargs)
928 if multiple targets:
929 if multiple targets:
929 list of results, matching `targets`
930 list of results, matching `targets`
930 targets : int,list of ints, 'all', None
931 targets : int,list of ints, 'all', None
931 Specify the destination of the job.
932 Specify the destination of the job.
932 if None:
933 if None:
933 Submit via Task queue for load-balancing.
934 Submit via Task queue for load-balancing.
934 if 'all':
935 if 'all':
935 Run on all active engines
936 Run on all active engines
936 if list:
937 if list:
937 Run on each specified engine
938 Run on each specified engine
938 if int:
939 if int:
939 Run on single engine
940 Run on single engine
940
941
941 balanced : bool, default None
942 balanced : bool, default None
942 whether to load-balance. This will default to True
943 whether to load-balance. This will default to True
943 if targets is unspecified, or False if targets is specified.
944 if targets is unspecified, or False if targets is specified.
944
945
945 The following arguments are only used when balanced is True:
946 The following arguments are only used when balanced is True:
946 after : Dependency or collection of msg_ids
947 after : Dependency or collection of msg_ids
947 Only for load-balanced execution (targets=None)
948 Only for load-balanced execution (targets=None)
948 Specify a list of msg_ids as a time-based dependency.
949 Specify a list of msg_ids as a time-based dependency.
949 This job will only be run *after* the dependencies
950 This job will only be run *after* the dependencies
950 have been met.
951 have been met.
951
952
952 follow : Dependency or collection of msg_ids
953 follow : Dependency or collection of msg_ids
953 Only for load-balanced execution (targets=None)
954 Only for load-balanced execution (targets=None)
954 Specify a list of msg_ids as a location-based dependency.
955 Specify a list of msg_ids as a location-based dependency.
955 This job will only be run on an engine where this dependency
956 This job will only be run on an engine where this dependency
956 is met.
957 is met.
957
958
958 timeout : float/int or None
959 timeout : float/int or None
959 Only for load-balanced execution (targets=None)
960 Only for load-balanced execution (targets=None)
960 Specify an amount of time (in seconds) for the scheduler to
961 Specify an amount of time (in seconds) for the scheduler to
961 wait for dependencies to be met before failing with a
962 wait for dependencies to be met before failing with a
962 DependencyTimeout.
963 DependencyTimeout.
963
964
964 after,follow,timeout only used if `balanced=True`.
965 after,follow,timeout only used if `balanced=True`.
965
966
966 Returns
967 Returns
967 -------
968 -------
968
969
969 if block is False:
970 if block is False:
970 return AsyncResult wrapping msg_ids
971 return AsyncResult wrapping msg_ids
971 output of AsyncResult.get() is identical to that of `apply(...block=True)`
972 output of AsyncResult.get() is identical to that of `apply(...block=True)`
972 else:
973 else:
973 if single target:
974 if single target:
974 return result of `f(*args, **kwargs)`
975 return result of `f(*args, **kwargs)`
975 else:
976 else:
976 return list of results, matching `targets`
977 return list of results, matching `targets`
977 """
978 """
978 assert not self._closed, "cannot use me anymore, I'm closed!"
979 assert not self._closed, "cannot use me anymore, I'm closed!"
979 # defaults:
980 # defaults:
980 block = block if block is not None else self.block
981 block = block if block is not None else self.block
981 args = args if args is not None else []
982 args = args if args is not None else []
982 kwargs = kwargs if kwargs is not None else {}
983 kwargs = kwargs if kwargs is not None else {}
983
984
984 if balanced is None:
985 if balanced is None:
985 if targets is None:
986 if targets is None:
986 # default to balanced if targets unspecified
987 # default to balanced if targets unspecified
987 balanced = True
988 balanced = True
988 else:
989 else:
989 # otherwise default to multiplexing
990 # otherwise default to multiplexing
990 balanced = False
991 balanced = False
991
992
992 if targets is None and balanced is False:
993 if targets is None and balanced is False:
993 # default to all if *not* balanced, and targets is unspecified
994 # default to all if *not* balanced, and targets is unspecified
994 targets = 'all'
995 targets = 'all'
995
996
996 # enforce types of f,args,kwrags
997 # enforce types of f,args,kwrags
997 if not callable(f):
998 if not callable(f):
998 raise TypeError("f must be callable, not %s"%type(f))
999 raise TypeError("f must be callable, not %s"%type(f))
999 if not isinstance(args, (tuple, list)):
1000 if not isinstance(args, (tuple, list)):
1000 raise TypeError("args must be tuple or list, not %s"%type(args))
1001 raise TypeError("args must be tuple or list, not %s"%type(args))
1001 if not isinstance(kwargs, dict):
1002 if not isinstance(kwargs, dict):
1002 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1003 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1003
1004
1004 options = dict(bound=bound, block=block, targets=targets)
1005 options = dict(bound=bound, block=block, targets=targets)
1005
1006
1006 if balanced:
1007 if balanced:
1007 return self._apply_balanced(f, args, kwargs, timeout=timeout,
1008 return self._apply_balanced(f, args, kwargs, timeout=timeout,
1008 after=after, follow=follow, **options)
1009 after=after, follow=follow, **options)
1009 elif follow or after or timeout:
1010 elif follow or after or timeout:
1010 msg = "follow, after, and timeout args are only used for"
1011 msg = "follow, after, and timeout args are only used for"
1011 msg += " load-balanced execution."
1012 msg += " load-balanced execution."
1012 raise ValueError(msg)
1013 raise ValueError(msg)
1013 else:
1014 else:
1014 return self._apply_direct(f, args, kwargs, **options)
1015 return self._apply_direct(f, args, kwargs, **options)
1015
1016
1016 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
1017 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
1017 after=None, follow=None, timeout=None):
1018 after=None, follow=None, timeout=None):
1018 """call f(*args, **kwargs) remotely in a load-balanced manner.
1019 """call f(*args, **kwargs) remotely in a load-balanced manner.
1019
1020
1020 This is a private method, see `apply` for details.
1021 This is a private method, see `apply` for details.
1021 Not to be called directly!
1022 Not to be called directly!
1022 """
1023 """
1023
1024
1024 loc = locals()
1025 loc = locals()
1025 for name in ('bound', 'block'):
1026 for name in ('bound', 'block'):
1026 assert loc[name] is not None, "kwarg %r must be specified!"%name
1027 assert loc[name] is not None, "kwarg %r must be specified!"%name
1027
1028
1028 if self._task_socket is None:
1029 if self._task_socket is None:
1029 msg = "Task farming is disabled"
1030 msg = "Task farming is disabled"
1030 if self._task_scheme == 'pure':
1031 if self._task_scheme == 'pure':
1031 msg += " because the pure ZMQ scheduler cannot handle"
1032 msg += " because the pure ZMQ scheduler cannot handle"
1032 msg += " disappearing engines."
1033 msg += " disappearing engines."
1033 raise RuntimeError(msg)
1034 raise RuntimeError(msg)
1034
1035
1035 if self._task_scheme == 'pure':
1036 if self._task_scheme == 'pure':
1036 # pure zmq scheme doesn't support dependencies
1037 # pure zmq scheme doesn't support dependencies
1037 msg = "Pure ZMQ scheduler doesn't support dependencies"
1038 msg = "Pure ZMQ scheduler doesn't support dependencies"
1038 if (follow or after):
1039 if (follow or after):
1039 # hard fail on DAG dependencies
1040 # hard fail on DAG dependencies
1040 raise RuntimeError(msg)
1041 raise RuntimeError(msg)
1041 if isinstance(f, dependent):
1042 if isinstance(f, dependent):
1042 # soft warn on functional dependencies
1043 # soft warn on functional dependencies
1043 warnings.warn(msg, RuntimeWarning)
1044 warnings.warn(msg, RuntimeWarning)
1044
1045
1045 # defaults:
1046 # defaults:
1046 args = args if args is not None else []
1047 args = args if args is not None else []
1047 kwargs = kwargs if kwargs is not None else {}
1048 kwargs = kwargs if kwargs is not None else {}
1048
1049
1049 if targets:
1050 if targets:
1050 idents,_ = self._build_targets(targets)
1051 idents,_ = self._build_targets(targets)
1051 else:
1052 else:
1052 idents = []
1053 idents = []
1053
1054
1054 after = self._build_dependency(after)
1055 after = self._build_dependency(after)
1055 follow = self._build_dependency(follow)
1056 follow = self._build_dependency(follow)
1056 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1057 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1057 bufs = ss.pack_apply_message(f,args,kwargs)
1058 bufs = ss.pack_apply_message(f,args,kwargs)
1058 content = dict(bound=bound)
1059 content = dict(bound=bound)
1059
1060
1060 msg = self.session.send(self._task_socket, "apply_request",
1061 msg = self.session.send(self._task_socket, "apply_request",
1061 content=content, buffers=bufs, subheader=subheader)
1062 content=content, buffers=bufs, subheader=subheader)
1062 msg_id = msg['msg_id']
1063 msg_id = msg['msg_id']
1063 self.outstanding.add(msg_id)
1064 self.outstanding.add(msg_id)
1064 self.history.append(msg_id)
1065 self.history.append(msg_id)
1065 ar = AsyncResult(self, [msg_id], fname=f.__name__)
1066 ar = AsyncResult(self, [msg_id], fname=f.__name__)
1066 if block:
1067 if block:
1067 try:
1068 try:
1068 return ar.get()
1069 return ar.get()
1069 except KeyboardInterrupt:
1070 except KeyboardInterrupt:
1070 return ar
1071 return ar
1071 else:
1072 else:
1072 return ar
1073 return ar
1073
1074
1074 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None):
1075 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None):
1075 """Then underlying method for applying functions to specific engines
1076 """Then underlying method for applying functions to specific engines
1076 via the MUX queue.
1077 via the MUX queue.
1077
1078
1078 This is a private method, see `apply` for details.
1079 This is a private method, see `apply` for details.
1079 Not to be called directly!
1080 Not to be called directly!
1080 """
1081 """
1081 loc = locals()
1082 loc = locals()
1082 for name in ('bound', 'block', 'targets'):
1083 for name in ('bound', 'block', 'targets'):
1083 assert loc[name] is not None, "kwarg %r must be specified!"%name
1084 assert loc[name] is not None, "kwarg %r must be specified!"%name
1084
1085
1085 idents,targets = self._build_targets(targets)
1086 idents,targets = self._build_targets(targets)
1086
1087
1087 subheader = {}
1088 subheader = {}
1088 content = dict(bound=bound)
1089 content = dict(bound=bound)
1089 bufs = ss.pack_apply_message(f,args,kwargs)
1090 bufs = ss.pack_apply_message(f,args,kwargs)
1090
1091
1091 msg_ids = []
1092 msg_ids = []
1092 for ident in idents:
1093 for ident in idents:
1093 msg = self.session.send(self._mux_socket, "apply_request",
1094 msg = self.session.send(self._mux_socket, "apply_request",
1094 content=content, buffers=bufs, ident=ident, subheader=subheader)
1095 content=content, buffers=bufs, ident=ident, subheader=subheader)
1095 msg_id = msg['msg_id']
1096 msg_id = msg['msg_id']
1096 self.outstanding.add(msg_id)
1097 self.outstanding.add(msg_id)
1097 self.history.append(msg_id)
1098 self.history.append(msg_id)
1098 msg_ids.append(msg_id)
1099 msg_ids.append(msg_id)
1099 ar = AsyncResult(self, msg_ids, fname=f.__name__)
1100 ar = AsyncResult(self, msg_ids, fname=f.__name__)
1100 if block:
1101 if block:
1101 try:
1102 try:
1102 return ar.get()
1103 return ar.get()
1103 except KeyboardInterrupt:
1104 except KeyboardInterrupt:
1104 return ar
1105 return ar
1105 else:
1106 else:
1106 return ar
1107 return ar
1107
1108
1108 #--------------------------------------------------------------------------
1109 #--------------------------------------------------------------------------
1109 # construct a View object
1110 # construct a View object
1110 #--------------------------------------------------------------------------
1111 #--------------------------------------------------------------------------
1111
1112
1112 @defaultblock
1113 @defaultblock
1113 def remote(self, bound=True, block=None, targets=None, balanced=None):
1114 def remote(self, bound=True, block=None, targets=None, balanced=None):
1114 """Decorator for making a RemoteFunction"""
1115 """Decorator for making a RemoteFunction"""
1115 return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
1116 return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
1116
1117
1117 @defaultblock
1118 @defaultblock
1118 def parallel(self, dist='b', bound=True, block=None, targets=None, balanced=None):
1119 def parallel(self, dist='b', bound=True, block=None, targets=None, balanced=None):
1119 """Decorator for making a ParallelFunction"""
1120 """Decorator for making a ParallelFunction"""
1120 return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
1121 return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
1121
1122
1122 def _cache_view(self, targets, balanced):
1123 def _cache_view(self, targets, balanced):
1123 """save views, so subsequent requests don't create new objects."""
1124 """save views, so subsequent requests don't create new objects."""
1124 if balanced:
1125 if balanced:
1125 view_class = LoadBalancedView
1126 view_class = LoadBalancedView
1126 view_cache = self._balanced_views
1127 view_cache = self._balanced_views
1127 else:
1128 else:
1128 view_class = DirectView
1129 view_class = DirectView
1129 view_cache = self._direct_views
1130 view_cache = self._direct_views
1130
1131
1131 # use str, since often targets will be a list
1132 # use str, since often targets will be a list
1132 key = str(targets)
1133 key = str(targets)
1133 if key not in view_cache:
1134 if key not in view_cache:
1134 view_cache[key] = view_class(client=self, targets=targets)
1135 view_cache[key] = view_class(client=self, targets=targets)
1135
1136
1136 return view_cache[key]
1137 return view_cache[key]
1137
1138
1138 def view(self, targets=None, balanced=None):
1139 def view(self, targets=None, balanced=None):
1139 """Method for constructing View objects.
1140 """Method for constructing View objects.
1140
1141
1141 If no arguments are specified, create a LoadBalancedView
1142 If no arguments are specified, create a LoadBalancedView
1142 using all engines. If only `targets` specified, it will
1143 using all engines. If only `targets` specified, it will
1143 be a DirectView. This method is the underlying implementation
1144 be a DirectView. This method is the underlying implementation
1144 of ``client.__getitem__``.
1145 of ``client.__getitem__``.
1145
1146
1146 Parameters
1147 Parameters
1147 ----------
1148 ----------
1148
1149
1149 targets: list,slice,int,etc. [default: use all engines]
1150 targets: list,slice,int,etc. [default: use all engines]
1150 The engines to use for the View
1151 The engines to use for the View
1151 balanced : bool [default: False if targets specified, True else]
1152 balanced : bool [default: False if targets specified, True else]
1152 whether to build a LoadBalancedView or a DirectView
1153 whether to build a LoadBalancedView or a DirectView
1153
1154
1154 """
1155 """
1155
1156
1156 balanced = (targets is None) if balanced is None else balanced
1157 balanced = (targets is None) if balanced is None else balanced
1157
1158
1158 if targets is None:
1159 if targets is None:
1159 if balanced:
1160 if balanced:
1160 return self._cache_view(None,True)
1161 return self._cache_view(None,True)
1161 else:
1162 else:
1162 targets = slice(None)
1163 targets = slice(None)
1163
1164
1164 if isinstance(targets, int):
1165 if isinstance(targets, int):
1165 if targets < 0:
1166 if targets < 0:
1166 targets = self.ids[targets]
1167 targets = self.ids[targets]
1167 if targets not in self.ids:
1168 if targets not in self.ids:
1168 raise IndexError("No such engine: %i"%targets)
1169 raise IndexError("No such engine: %i"%targets)
1169 return self._cache_view(targets, balanced)
1170 return self._cache_view(targets, balanced)
1170
1171
1171 if isinstance(targets, slice):
1172 if isinstance(targets, slice):
1172 indices = range(len(self.ids))[targets]
1173 indices = range(len(self.ids))[targets]
1173 ids = sorted(self._ids)
1174 ids = sorted(self._ids)
1174 targets = [ ids[i] for i in indices ]
1175 targets = [ ids[i] for i in indices ]
1175
1176
1176 if isinstance(targets, (tuple, list, xrange)):
1177 if isinstance(targets, (tuple, list, xrange)):
1177 _,targets = self._build_targets(list(targets))
1178 _,targets = self._build_targets(list(targets))
1178 return self._cache_view(targets, balanced)
1179 return self._cache_view(targets, balanced)
1179 else:
1180 else:
1180 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1181 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1181
1182
1182 #--------------------------------------------------------------------------
1183 #--------------------------------------------------------------------------
1183 # Data movement
1184 # Data movement
1184 #--------------------------------------------------------------------------
1185 #--------------------------------------------------------------------------
1185
1186
1186 @defaultblock
1187 @defaultblock
1187 def push(self, ns, targets='all', block=None):
1188 def push(self, ns, targets='all', block=None):
1188 """Push the contents of `ns` into the namespace on `target`"""
1189 """Push the contents of `ns` into the namespace on `target`"""
1189 if not isinstance(ns, dict):
1190 if not isinstance(ns, dict):
1190 raise TypeError("Must be a dict, not %s"%type(ns))
1191 raise TypeError("Must be a dict, not %s"%type(ns))
1191 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
1192 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
1192 if not block:
1193 if not block:
1193 return result
1194 return result
1194
1195
1195 @defaultblock
1196 @defaultblock
1196 def pull(self, keys, targets='all', block=None):
1197 def pull(self, keys, targets='all', block=None):
1197 """Pull objects from `target`'s namespace by `keys`"""
1198 """Pull objects from `target`'s namespace by `keys`"""
1198 if isinstance(keys, str):
1199 if isinstance(keys, str):
1199 pass
1200 pass
1200 elif isinstance(keys, (list,tuple,set)):
1201 elif isinstance(keys, (list,tuple,set)):
1201 for key in keys:
1202 for key in keys:
1202 if not isinstance(key, str):
1203 if not isinstance(key, str):
1203 raise TypeError
1204 raise TypeError
1204 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1205 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1205 return result
1206 return result
1206
1207
1207 @defaultblock
1208 @defaultblock
1208 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1209 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1209 """
1210 """
1210 Partition a Python sequence and send the partitions to a set of engines.
1211 Partition a Python sequence and send the partitions to a set of engines.
1211 """
1212 """
1212 targets = self._build_targets(targets)[-1]
1213 targets = self._build_targets(targets)[-1]
1213 mapObject = Map.dists[dist]()
1214 mapObject = Map.dists[dist]()
1214 nparts = len(targets)
1215 nparts = len(targets)
1215 msg_ids = []
1216 msg_ids = []
1216 for index, engineid in enumerate(targets):
1217 for index, engineid in enumerate(targets):
1217 partition = mapObject.getPartition(seq, index, nparts)
1218 partition = mapObject.getPartition(seq, index, nparts)
1218 if flatten and len(partition) == 1:
1219 if flatten and len(partition) == 1:
1219 r = self.push({key: partition[0]}, targets=engineid, block=False)
1220 r = self.push({key: partition[0]}, targets=engineid, block=False)
1220 else:
1221 else:
1221 r = self.push({key: partition}, targets=engineid, block=False)
1222 r = self.push({key: partition}, targets=engineid, block=False)
1222 msg_ids.extend(r.msg_ids)
1223 msg_ids.extend(r.msg_ids)
1223 r = AsyncResult(self, msg_ids, fname='scatter')
1224 r = AsyncResult(self, msg_ids, fname='scatter')
1224 if block:
1225 if block:
1225 r.get()
1226 r.get()
1226 else:
1227 else:
1227 return r
1228 return r
1228
1229
1229 @defaultblock
1230 @defaultblock
1230 def gather(self, key, dist='b', targets='all', block=None):
1231 def gather(self, key, dist='b', targets='all', block=None):
1231 """
1232 """
1232 Gather a partitioned sequence on a set of engines as a single local seq.
1233 Gather a partitioned sequence on a set of engines as a single local seq.
1233 """
1234 """
1234
1235
1235 targets = self._build_targets(targets)[-1]
1236 targets = self._build_targets(targets)[-1]
1236 mapObject = Map.dists[dist]()
1237 mapObject = Map.dists[dist]()
1237 msg_ids = []
1238 msg_ids = []
1238 for index, engineid in enumerate(targets):
1239 for index, engineid in enumerate(targets):
1239 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1240 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1240
1241
1241 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1242 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1242 if block:
1243 if block:
1243 return r.get()
1244 return r.get()
1244 else:
1245 else:
1245 return r
1246 return r
1246
1247
1247 #--------------------------------------------------------------------------
1248 #--------------------------------------------------------------------------
1248 # Query methods
1249 # Query methods
1249 #--------------------------------------------------------------------------
1250 #--------------------------------------------------------------------------
1250
1251
1251 @spinfirst
1252 @spinfirst
1252 @defaultblock
1253 @defaultblock
1253 def get_result(self, indices_or_msg_ids=None, block=None):
1254 def get_result(self, indices_or_msg_ids=None, block=None):
1254 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1255 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1255
1256
1256 If the client already has the results, no request to the Hub will be made.
1257 If the client already has the results, no request to the Hub will be made.
1257
1258
1258 This is a convenient way to construct AsyncResult objects, which are wrappers
1259 This is a convenient way to construct AsyncResult objects, which are wrappers
1259 that include metadata about execution, and allow for awaiting results that
1260 that include metadata about execution, and allow for awaiting results that
1260 were not submitted by this Client.
1261 were not submitted by this Client.
1261
1262
1262 It can also be a convenient way to retrieve the metadata associated with
1263 It can also be a convenient way to retrieve the metadata associated with
1263 blocking execution, since it always retrieves
1264 blocking execution, since it always retrieves
1264
1265
1265 Examples
1266 Examples
1266 --------
1267 --------
1267 ::
1268 ::
1268
1269
1269 In [10]: r = client.apply()
1270 In [10]: r = client.apply()
1270
1271
1271 Parameters
1272 Parameters
1272 ----------
1273 ----------
1273
1274
1274 indices_or_msg_ids : integer history index, str msg_id, or list of either
1275 indices_or_msg_ids : integer history index, str msg_id, or list of either
1275 The indices or msg_ids of indices to be retrieved
1276 The indices or msg_ids of indices to be retrieved
1276
1277
1277 block : bool
1278 block : bool
1278 Whether to wait for the result to be done
1279 Whether to wait for the result to be done
1279
1280
1280 Returns
1281 Returns
1281 -------
1282 -------
1282
1283
1283 AsyncResult
1284 AsyncResult
1284 A single AsyncResult object will always be returned.
1285 A single AsyncResult object will always be returned.
1285
1286
1286 AsyncHubResult
1287 AsyncHubResult
1287 A subclass of AsyncResult that retrieves results from the Hub
1288 A subclass of AsyncResult that retrieves results from the Hub
1288
1289
1289 """
1290 """
1290 if indices_or_msg_ids is None:
1291 if indices_or_msg_ids is None:
1291 indices_or_msg_ids = -1
1292 indices_or_msg_ids = -1
1292
1293
1293 if not isinstance(indices_or_msg_ids, (list,tuple)):
1294 if not isinstance(indices_or_msg_ids, (list,tuple)):
1294 indices_or_msg_ids = [indices_or_msg_ids]
1295 indices_or_msg_ids = [indices_or_msg_ids]
1295
1296
1296 theids = []
1297 theids = []
1297 for id in indices_or_msg_ids:
1298 for id in indices_or_msg_ids:
1298 if isinstance(id, int):
1299 if isinstance(id, int):
1299 id = self.history[id]
1300 id = self.history[id]
1300 if not isinstance(id, str):
1301 if not isinstance(id, str):
1301 raise TypeError("indices must be str or int, not %r"%id)
1302 raise TypeError("indices must be str or int, not %r"%id)
1302 theids.append(id)
1303 theids.append(id)
1303
1304
1304 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1305 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1305 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1306 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1306
1307
1307 if remote_ids:
1308 if remote_ids:
1308 ar = AsyncHubResult(self, msg_ids=theids)
1309 ar = AsyncHubResult(self, msg_ids=theids)
1309 else:
1310 else:
1310 ar = AsyncResult(self, msg_ids=theids)
1311 ar = AsyncResult(self, msg_ids=theids)
1311
1312
1312 if block:
1313 if block:
1313 ar.wait()
1314 ar.wait()
1314
1315
1315 return ar
1316 return ar
1316
1317
1317 @spinfirst
1318 @spinfirst
1318 def result_status(self, msg_ids, status_only=True):
1319 def result_status(self, msg_ids, status_only=True):
1319 """Check on the status of the result(s) of the apply request with `msg_ids`.
1320 """Check on the status of the result(s) of the apply request with `msg_ids`.
1320
1321
1321 If status_only is False, then the actual results will be retrieved, else
1322 If status_only is False, then the actual results will be retrieved, else
1322 only the status of the results will be checked.
1323 only the status of the results will be checked.
1323
1324
1324 Parameters
1325 Parameters
1325 ----------
1326 ----------
1326
1327
1327 msg_ids : list of msg_ids
1328 msg_ids : list of msg_ids
1328 if int:
1329 if int:
1329 Passed as index to self.history for convenience.
1330 Passed as index to self.history for convenience.
1330 status_only : bool (default: True)
1331 status_only : bool (default: True)
1331 if False:
1332 if False:
1332 Retrieve the actual results of completed tasks.
1333 Retrieve the actual results of completed tasks.
1333
1334
1334 Returns
1335 Returns
1335 -------
1336 -------
1336
1337
1337 results : dict
1338 results : dict
1338 There will always be the keys 'pending' and 'completed', which will
1339 There will always be the keys 'pending' and 'completed', which will
1339 be lists of msg_ids that are incomplete or complete. If `status_only`
1340 be lists of msg_ids that are incomplete or complete. If `status_only`
1340 is False, then completed results will be keyed by their `msg_id`.
1341 is False, then completed results will be keyed by their `msg_id`.
1341 """
1342 """
1342 if not isinstance(msg_ids, (list,tuple)):
1343 if not isinstance(msg_ids, (list,tuple)):
1343 indices_or_msg_ids = [msg_ids]
1344 indices_or_msg_ids = [msg_ids]
1344
1345
1345 theids = []
1346 theids = []
1346 for msg_id in msg_ids:
1347 for msg_id in msg_ids:
1347 if isinstance(msg_id, int):
1348 if isinstance(msg_id, int):
1348 msg_id = self.history[msg_id]
1349 msg_id = self.history[msg_id]
1349 if not isinstance(msg_id, basestring):
1350 if not isinstance(msg_id, basestring):
1350 raise TypeError("msg_ids must be str, not %r"%msg_id)
1351 raise TypeError("msg_ids must be str, not %r"%msg_id)
1351 theids.append(msg_id)
1352 theids.append(msg_id)
1352
1353
1353 completed = []
1354 completed = []
1354 local_results = {}
1355 local_results = {}
1355
1356
1356 # comment this block out to temporarily disable local shortcut:
1357 # comment this block out to temporarily disable local shortcut:
1357 for msg_id in theids:
1358 for msg_id in theids:
1358 if msg_id in self.results:
1359 if msg_id in self.results:
1359 completed.append(msg_id)
1360 completed.append(msg_id)
1360 local_results[msg_id] = self.results[msg_id]
1361 local_results[msg_id] = self.results[msg_id]
1361 theids.remove(msg_id)
1362 theids.remove(msg_id)
1362
1363
1363 if theids: # some not locally cached
1364 if theids: # some not locally cached
1364 content = dict(msg_ids=theids, status_only=status_only)
1365 content = dict(msg_ids=theids, status_only=status_only)
1365 msg = self.session.send(self._query_socket, "result_request", content=content)
1366 msg = self.session.send(self._query_socket, "result_request", content=content)
1366 zmq.select([self._query_socket], [], [])
1367 zmq.select([self._query_socket], [], [])
1367 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1368 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1368 if self.debug:
1369 if self.debug:
1369 pprint(msg)
1370 pprint(msg)
1370 content = msg['content']
1371 content = msg['content']
1371 if content['status'] != 'ok':
1372 if content['status'] != 'ok':
1372 raise self._unwrap_exception(content)
1373 raise self._unwrap_exception(content)
1373 buffers = msg['buffers']
1374 buffers = msg['buffers']
1374 else:
1375 else:
1375 content = dict(completed=[],pending=[])
1376 content = dict(completed=[],pending=[])
1376
1377
1377 content['completed'].extend(completed)
1378 content['completed'].extend(completed)
1378
1379
1379 if status_only:
1380 if status_only:
1380 return content
1381 return content
1381
1382
1382 failures = []
1383 failures = []
1383 # load cached results into result:
1384 # load cached results into result:
1384 content.update(local_results)
1385 content.update(local_results)
1385 # update cache with results:
1386 # update cache with results:
1386 for msg_id in sorted(theids):
1387 for msg_id in sorted(theids):
1387 if msg_id in content['completed']:
1388 if msg_id in content['completed']:
1388 rec = content[msg_id]
1389 rec = content[msg_id]
1389 parent = rec['header']
1390 parent = rec['header']
1390 header = rec['result_header']
1391 header = rec['result_header']
1391 rcontent = rec['result_content']
1392 rcontent = rec['result_content']
1392 iodict = rec['io']
1393 iodict = rec['io']
1393 if isinstance(rcontent, str):
1394 if isinstance(rcontent, str):
1394 rcontent = self.session.unpack(rcontent)
1395 rcontent = self.session.unpack(rcontent)
1395
1396
1396 md = self.metadata.setdefault(msg_id, Metadata())
1397 md = self.metadata.setdefault(msg_id, Metadata())
1397 md.update(self._extract_metadata(header, parent, rcontent))
1398 md.update(self._extract_metadata(header, parent, rcontent))
1398 md.update(iodict)
1399 md.update(iodict)
1399
1400
1400 if rcontent['status'] == 'ok':
1401 if rcontent['status'] == 'ok':
1401 res,buffers = ss.unserialize_object(buffers)
1402 res,buffers = ss.unserialize_object(buffers)
1402 else:
1403 else:
1403 print rcontent
1404 print rcontent
1404 res = self._unwrap_exception(rcontent)
1405 res = self._unwrap_exception(rcontent)
1405 failures.append(res)
1406 failures.append(res)
1406
1407
1407 self.results[msg_id] = res
1408 self.results[msg_id] = res
1408 content[msg_id] = res
1409 content[msg_id] = res
1409
1410
1410 if len(theids) == 1 and failures:
1411 if len(theids) == 1 and failures:
1411 raise failures[0]
1412 raise failures[0]
1412
1413
1413 error.collect_exceptions(failures, "result_status")
1414 error.collect_exceptions(failures, "result_status")
1414 return content
1415 return content
1415
1416
1416 @spinfirst
1417 @spinfirst
1417 def queue_status(self, targets='all', verbose=False):
1418 def queue_status(self, targets='all', verbose=False):
1418 """Fetch the status of engine queues.
1419 """Fetch the status of engine queues.
1419
1420
1420 Parameters
1421 Parameters
1421 ----------
1422 ----------
1422
1423
1423 targets : int/str/list of ints/strs
1424 targets : int/str/list of ints/strs
1424 the engines whose states are to be queried.
1425 the engines whose states are to be queried.
1425 default : all
1426 default : all
1426 verbose : bool
1427 verbose : bool
1427 Whether to return lengths only, or lists of ids for each element
1428 Whether to return lengths only, or lists of ids for each element
1428 """
1429 """
1429 targets = self._build_targets(targets)[1]
1430 targets = self._build_targets(targets)[1]
1430 content = dict(targets=targets, verbose=verbose)
1431 content = dict(targets=targets, verbose=verbose)
1431 self.session.send(self._query_socket, "queue_request", content=content)
1432 self.session.send(self._query_socket, "queue_request", content=content)
1432 idents,msg = self.session.recv(self._query_socket, 0)
1433 idents,msg = self.session.recv(self._query_socket, 0)
1433 if self.debug:
1434 if self.debug:
1434 pprint(msg)
1435 pprint(msg)
1435 content = msg['content']
1436 content = msg['content']
1436 status = content.pop('status')
1437 status = content.pop('status')
1437 if status != 'ok':
1438 if status != 'ok':
1438 raise self._unwrap_exception(content)
1439 raise self._unwrap_exception(content)
1439 return ss.rekey(content)
1440 return ss.rekey(content)
1440
1441
1441 @spinfirst
1442 @spinfirst
1442 def purge_results(self, jobs=[], targets=[]):
1443 def purge_results(self, jobs=[], targets=[]):
1443 """Tell the controller to forget results.
1444 """Tell the controller to forget results.
1444
1445
1445 Individual results can be purged by msg_id, or the entire
1446 Individual results can be purged by msg_id, or the entire
1446 history of specific targets can be purged.
1447 history of specific targets can be purged.
1447
1448
1448 Parameters
1449 Parameters
1449 ----------
1450 ----------
1450
1451
1451 jobs : str or list of strs or AsyncResult objects
1452 jobs : str or list of strs or AsyncResult objects
1452 the msg_ids whose results should be forgotten.
1453 the msg_ids whose results should be forgotten.
1453 targets : int/str/list of ints/strs
1454 targets : int/str/list of ints/strs
1454 The targets, by uuid or int_id, whose entire history is to be purged.
1455 The targets, by uuid or int_id, whose entire history is to be purged.
1455 Use `targets='all'` to scrub everything from the controller's memory.
1456 Use `targets='all'` to scrub everything from the controller's memory.
1456
1457
1457 default : None
1458 default : None
1458 """
1459 """
1459 if not targets and not jobs:
1460 if not targets and not jobs:
1460 raise ValueError("Must specify at least one of `targets` and `jobs`")
1461 raise ValueError("Must specify at least one of `targets` and `jobs`")
1461 if targets:
1462 if targets:
1462 targets = self._build_targets(targets)[1]
1463 targets = self._build_targets(targets)[1]
1463
1464
1464 # construct msg_ids from jobs
1465 # construct msg_ids from jobs
1465 msg_ids = []
1466 msg_ids = []
1466 if isinstance(jobs, (basestring,AsyncResult)):
1467 if isinstance(jobs, (basestring,AsyncResult)):
1467 jobs = [jobs]
1468 jobs = [jobs]
1468 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1469 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1469 if bad_ids:
1470 if bad_ids:
1470 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1471 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1471 for j in jobs:
1472 for j in jobs:
1472 if isinstance(j, AsyncResult):
1473 if isinstance(j, AsyncResult):
1473 msg_ids.extend(j.msg_ids)
1474 msg_ids.extend(j.msg_ids)
1474 else:
1475 else:
1475 msg_ids.append(j)
1476 msg_ids.append(j)
1476
1477
1477 content = dict(targets=targets, msg_ids=msg_ids)
1478 content = dict(targets=targets, msg_ids=msg_ids)
1478 self.session.send(self._query_socket, "purge_request", content=content)
1479 self.session.send(self._query_socket, "purge_request", content=content)
1479 idents, msg = self.session.recv(self._query_socket, 0)
1480 idents, msg = self.session.recv(self._query_socket, 0)
1480 if self.debug:
1481 if self.debug:
1481 pprint(msg)
1482 pprint(msg)
1482 content = msg['content']
1483 content = msg['content']
1483 if content['status'] != 'ok':
1484 if content['status'] != 'ok':
1484 raise self._unwrap_exception(content)
1485 raise self._unwrap_exception(content)
1485
1486
1486
1487
1487 __all__ = [ 'Client',
1488 __all__ = [ 'Client',
1488 'depend',
1489 'depend',
1489 'require',
1490 'require',
1490 'remote',
1491 'remote',
1491 'parallel',
1492 'parallel',
1492 'RemoteFunction',
1493 'RemoteFunction',
1493 'ParallelFunction',
1494 'ParallelFunction',
1494 'DirectView',
1495 'DirectView',
1495 'LoadBalancedView',
1496 'LoadBalancedView',
1496 'AsyncResult',
1497 'AsyncResult',
1497 'AsyncMapResult'
1498 'AsyncMapResult'
1498 ]
1499 ]
General Comments 0
You need to be logged in to leave comments. Login now