##// END OF EJS Templates
client profile defaults to running IPython profile...
MinRK -
Show More
@@ -1,1374 +1,1397 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import time
20 import time
21 import warnings
21 import warnings
22 from datetime import datetime
22 from datetime import datetime
23 from getpass import getpass
23 from getpass import getpass
24 from pprint import pprint
24 from pprint import pprint
25
25
26 pjoin = os.path.join
26 pjoin = os.path.join
27
27
28 import zmq
28 import zmq
29 # from zmq.eventloop import ioloop, zmqstream
29 # from zmq.eventloop import ioloop, zmqstream
30
30
31 from IPython.config.configurable import MultipleInstanceError
32 from IPython.core.application import BaseIPythonApplication
33
31 from IPython.utils.jsonutil import rekey
34 from IPython.utils.jsonutil import rekey
32 from IPython.utils.path import get_ipython_dir
35 from IPython.utils.path import get_ipython_dir
33 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
36 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
34 Dict, List, Bool, Set)
37 Dict, List, Bool, Set)
35 from IPython.external.decorator import decorator
38 from IPython.external.decorator import decorator
36 from IPython.external.ssh import tunnel
39 from IPython.external.ssh import tunnel
37
40
38 from IPython.parallel import error
41 from IPython.parallel import error
39 from IPython.parallel import util
42 from IPython.parallel import util
40
43
41 from IPython.zmq.session import Session, Message
44 from IPython.zmq.session import Session, Message
42
45
43 from .asyncresult import AsyncResult, AsyncHubResult
46 from .asyncresult import AsyncResult, AsyncHubResult
44 from IPython.core.profiledir import ProfileDir, ProfileDirError
47 from IPython.core.profiledir import ProfileDir, ProfileDirError
45 from .view import DirectView, LoadBalancedView
48 from .view import DirectView, LoadBalancedView
46
49
47 #--------------------------------------------------------------------------
50 #--------------------------------------------------------------------------
48 # Decorators for Client methods
51 # Decorators for Client methods
49 #--------------------------------------------------------------------------
52 #--------------------------------------------------------------------------
50
53
51 @decorator
54 @decorator
52 def spin_first(f, self, *args, **kwargs):
55 def spin_first(f, self, *args, **kwargs):
53 """Call spin() to sync state prior to calling the method."""
56 """Call spin() to sync state prior to calling the method."""
54 self.spin()
57 self.spin()
55 return f(self, *args, **kwargs)
58 return f(self, *args, **kwargs)
56
59
57
60
58 #--------------------------------------------------------------------------
61 #--------------------------------------------------------------------------
59 # Classes
62 # Classes
60 #--------------------------------------------------------------------------
63 #--------------------------------------------------------------------------
61
64
62 class Metadata(dict):
65 class Metadata(dict):
63 """Subclass of dict for initializing metadata values.
66 """Subclass of dict for initializing metadata values.
64
67
65 Attribute access works on keys.
68 Attribute access works on keys.
66
69
67 These objects have a strict set of keys - errors will raise if you try
70 These objects have a strict set of keys - errors will raise if you try
68 to add new keys.
71 to add new keys.
69 """
72 """
70 def __init__(self, *args, **kwargs):
73 def __init__(self, *args, **kwargs):
71 dict.__init__(self)
74 dict.__init__(self)
72 md = {'msg_id' : None,
75 md = {'msg_id' : None,
73 'submitted' : None,
76 'submitted' : None,
74 'started' : None,
77 'started' : None,
75 'completed' : None,
78 'completed' : None,
76 'received' : None,
79 'received' : None,
77 'engine_uuid' : None,
80 'engine_uuid' : None,
78 'engine_id' : None,
81 'engine_id' : None,
79 'follow' : None,
82 'follow' : None,
80 'after' : None,
83 'after' : None,
81 'status' : None,
84 'status' : None,
82
85
83 'pyin' : None,
86 'pyin' : None,
84 'pyout' : None,
87 'pyout' : None,
85 'pyerr' : None,
88 'pyerr' : None,
86 'stdout' : '',
89 'stdout' : '',
87 'stderr' : '',
90 'stderr' : '',
88 }
91 }
89 self.update(md)
92 self.update(md)
90 self.update(dict(*args, **kwargs))
93 self.update(dict(*args, **kwargs))
91
94
92 def __getattr__(self, key):
95 def __getattr__(self, key):
93 """getattr aliased to getitem"""
96 """getattr aliased to getitem"""
94 if key in self.iterkeys():
97 if key in self.iterkeys():
95 return self[key]
98 return self[key]
96 else:
99 else:
97 raise AttributeError(key)
100 raise AttributeError(key)
98
101
99 def __setattr__(self, key, value):
102 def __setattr__(self, key, value):
100 """setattr aliased to setitem, with strict"""
103 """setattr aliased to setitem, with strict"""
101 if key in self.iterkeys():
104 if key in self.iterkeys():
102 self[key] = value
105 self[key] = value
103 else:
106 else:
104 raise AttributeError(key)
107 raise AttributeError(key)
105
108
106 def __setitem__(self, key, value):
109 def __setitem__(self, key, value):
107 """strict static key enforcement"""
110 """strict static key enforcement"""
108 if key in self.iterkeys():
111 if key in self.iterkeys():
109 dict.__setitem__(self, key, value)
112 dict.__setitem__(self, key, value)
110 else:
113 else:
111 raise KeyError(key)
114 raise KeyError(key)
112
115
113
116
114 class Client(HasTraits):
117 class Client(HasTraits):
115 """A semi-synchronous client to the IPython ZMQ cluster
118 """A semi-synchronous client to the IPython ZMQ cluster
116
119
117 Parameters
120 Parameters
118 ----------
121 ----------
119
122
120 url_or_file : bytes; zmq url or path to ipcontroller-client.json
123 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
121 Connection information for the Hub's registration. If a json connector
124 Connection information for the Hub's registration. If a json connector
122 file is given, then likely no further configuration is necessary.
125 file is given, then likely no further configuration is necessary.
123 [Default: use profile]
126 [Default: use profile]
124 profile : bytes
127 profile : bytes
125 The name of the Cluster profile to be used to find connector information.
128 The name of the Cluster profile to be used to find connector information.
126 [Default: 'default']
129 If run from an IPython application, the default profile will be the same
130 as the running application, otherwise it will be 'default'.
127 context : zmq.Context
131 context : zmq.Context
128 Pass an existing zmq.Context instance, otherwise the client will create its own.
132 Pass an existing zmq.Context instance, otherwise the client will create its own.
129 debug : bool
133 debug : bool
130 flag for lots of message printing for debug purposes
134 flag for lots of message printing for debug purposes
131 timeout : int/float
135 timeout : int/float
132 time (in seconds) to wait for connection replies from the Hub
136 time (in seconds) to wait for connection replies from the Hub
133 [Default: 10]
137 [Default: 10]
134
138
135 #-------------- session related args ----------------
139 #-------------- session related args ----------------
136
140
137 config : Config object
141 config : Config object
138 If specified, this will be relayed to the Session for configuration
142 If specified, this will be relayed to the Session for configuration
139 username : str
143 username : str
140 set username for the session object
144 set username for the session object
141 packer : str (import_string) or callable
145 packer : str (import_string) or callable
142 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
146 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
143 function to serialize messages. Must support same input as
147 function to serialize messages. Must support same input as
144 JSON, and output must be bytes.
148 JSON, and output must be bytes.
145 You can pass a callable directly as `pack`
149 You can pass a callable directly as `pack`
146 unpacker : str (import_string) or callable
150 unpacker : str (import_string) or callable
147 The inverse of packer. Only necessary if packer is specified as *not* one
151 The inverse of packer. Only necessary if packer is specified as *not* one
148 of 'json' or 'pickle'.
152 of 'json' or 'pickle'.
149
153
150 #-------------- ssh related args ----------------
154 #-------------- ssh related args ----------------
151 # These are args for configuring the ssh tunnel to be used
155 # These are args for configuring the ssh tunnel to be used
152 # credentials are used to forward connections over ssh to the Controller
156 # credentials are used to forward connections over ssh to the Controller
153 # Note that the ip given in `addr` needs to be relative to sshserver
157 # Note that the ip given in `addr` needs to be relative to sshserver
154 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
158 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
155 # and set sshserver as the same machine the Controller is on. However,
159 # and set sshserver as the same machine the Controller is on. However,
156 # the only requirement is that sshserver is able to see the Controller
160 # the only requirement is that sshserver is able to see the Controller
157 # (i.e. is within the same trusted network).
161 # (i.e. is within the same trusted network).
158
162
159 sshserver : str
163 sshserver : str
160 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
164 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
161 If keyfile or password is specified, and this is not, it will default to
165 If keyfile or password is specified, and this is not, it will default to
162 the ip given in addr.
166 the ip given in addr.
163 sshkey : str; path to public ssh key file
167 sshkey : str; path to public ssh key file
164 This specifies a key to be used in ssh login, default None.
168 This specifies a key to be used in ssh login, default None.
165 Regular default ssh keys will be used without specifying this argument.
169 Regular default ssh keys will be used without specifying this argument.
166 password : str
170 password : str
167 Your ssh password to sshserver. Note that if this is left None,
171 Your ssh password to sshserver. Note that if this is left None,
168 you will be prompted for it if passwordless key based login is unavailable.
172 you will be prompted for it if passwordless key based login is unavailable.
169 paramiko : bool
173 paramiko : bool
170 flag for whether to use paramiko instead of shell ssh for tunneling.
174 flag for whether to use paramiko instead of shell ssh for tunneling.
171 [default: True on win32, False else]
175 [default: True on win32, False else]
172
176
173 ------- exec authentication args -------
177 ------- exec authentication args -------
174 If even localhost is untrusted, you can have some protection against
178 If even localhost is untrusted, you can have some protection against
175 unauthorized execution by signing messages with HMAC digests.
179 unauthorized execution by signing messages with HMAC digests.
176 Messages are still sent as cleartext, so if someone can snoop your
180 Messages are still sent as cleartext, so if someone can snoop your
177 loopback traffic this will not protect your privacy, but will prevent
181 loopback traffic this will not protect your privacy, but will prevent
178 unauthorized execution.
182 unauthorized execution.
179
183
180 exec_key : str
184 exec_key : str
181 an authentication key or file containing a key
185 an authentication key or file containing a key
182 default: None
186 default: None
183
187
184
188
185 Attributes
189 Attributes
186 ----------
190 ----------
187
191
188 ids : list of int engine IDs
192 ids : list of int engine IDs
189 requesting the ids attribute always synchronizes
193 requesting the ids attribute always synchronizes
190 the registration state. To request ids without synchronization,
194 the registration state. To request ids without synchronization,
191 use semi-private _ids attributes.
195 use semi-private _ids attributes.
192
196
193 history : list of msg_ids
197 history : list of msg_ids
194 a list of msg_ids, keeping track of all the execution
198 a list of msg_ids, keeping track of all the execution
195 messages you have submitted in order.
199 messages you have submitted in order.
196
200
197 outstanding : set of msg_ids
201 outstanding : set of msg_ids
198 a set of msg_ids that have been submitted, but whose
202 a set of msg_ids that have been submitted, but whose
199 results have not yet been received.
203 results have not yet been received.
200
204
201 results : dict
205 results : dict
202 a dict of all our results, keyed by msg_id
206 a dict of all our results, keyed by msg_id
203
207
204 block : bool
208 block : bool
205 determines default behavior when block not specified
209 determines default behavior when block not specified
206 in execution methods
210 in execution methods
207
211
208 Methods
212 Methods
209 -------
213 -------
210
214
211 spin
215 spin
212 flushes incoming results and registration state changes
216 flushes incoming results and registration state changes
213 control methods spin, and requesting `ids` also ensures up to date
217 control methods spin, and requesting `ids` also ensures up to date
214
218
215 wait
219 wait
216 wait on one or more msg_ids
220 wait on one or more msg_ids
217
221
218 execution methods
222 execution methods
219 apply
223 apply
220 legacy: execute, run
224 legacy: execute, run
221
225
222 data movement
226 data movement
223 push, pull, scatter, gather
227 push, pull, scatter, gather
224
228
225 query methods
229 query methods
226 queue_status, get_result, purge, result_status
230 queue_status, get_result, purge, result_status
227
231
228 control methods
232 control methods
229 abort, shutdown
233 abort, shutdown
230
234
231 """
235 """
232
236
233
237
234 block = Bool(False)
238 block = Bool(False)
235 outstanding = Set()
239 outstanding = Set()
236 results = Instance('collections.defaultdict', (dict,))
240 results = Instance('collections.defaultdict', (dict,))
237 metadata = Instance('collections.defaultdict', (Metadata,))
241 metadata = Instance('collections.defaultdict', (Metadata,))
238 history = List()
242 history = List()
239 debug = Bool(False)
243 debug = Bool(False)
240 profile=Unicode('default')
244
245 profile=Unicode()
246 def _profile_default(self):
247 if BaseIPythonApplication.initialized():
248 # an IPython app *might* be running, try to get its profile
249 try:
250 return BaseIPythonApplication.instance().profile
251 except (AttributeError, MultipleInstanceError):
252 # could be a *different* subclass of config.Application,
253 # which would raise one of these two errors.
254 return u'default'
255 else:
256 return u'default'
257
241
258
242 _outstanding_dict = Instance('collections.defaultdict', (set,))
259 _outstanding_dict = Instance('collections.defaultdict', (set,))
243 _ids = List()
260 _ids = List()
244 _connected=Bool(False)
261 _connected=Bool(False)
245 _ssh=Bool(False)
262 _ssh=Bool(False)
246 _context = Instance('zmq.Context')
263 _context = Instance('zmq.Context')
247 _config = Dict()
264 _config = Dict()
248 _engines=Instance(util.ReverseDict, (), {})
265 _engines=Instance(util.ReverseDict, (), {})
249 # _hub_socket=Instance('zmq.Socket')
266 # _hub_socket=Instance('zmq.Socket')
250 _query_socket=Instance('zmq.Socket')
267 _query_socket=Instance('zmq.Socket')
251 _control_socket=Instance('zmq.Socket')
268 _control_socket=Instance('zmq.Socket')
252 _iopub_socket=Instance('zmq.Socket')
269 _iopub_socket=Instance('zmq.Socket')
253 _notification_socket=Instance('zmq.Socket')
270 _notification_socket=Instance('zmq.Socket')
254 _mux_socket=Instance('zmq.Socket')
271 _mux_socket=Instance('zmq.Socket')
255 _task_socket=Instance('zmq.Socket')
272 _task_socket=Instance('zmq.Socket')
256 _task_scheme=Unicode()
273 _task_scheme=Unicode()
257 _closed = False
274 _closed = False
258 _ignored_control_replies=Int(0)
275 _ignored_control_replies=Int(0)
259 _ignored_hub_replies=Int(0)
276 _ignored_hub_replies=Int(0)
260
277
261 def __init__(self, url_or_file=None, profile='default', profile_dir=None, ipython_dir=None,
278 def __new__(self, *args, **kw):
279 # don't raise on positional args
280 return HasTraits.__new__(self, **kw)
281
282 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
262 context=None, debug=False, exec_key=None,
283 context=None, debug=False, exec_key=None,
263 sshserver=None, sshkey=None, password=None, paramiko=None,
284 sshserver=None, sshkey=None, password=None, paramiko=None,
264 timeout=10, **extra_args
285 timeout=10, **extra_args
265 ):
286 ):
266 super(Client, self).__init__(debug=debug, profile=profile)
287 if profile:
288 super(Client, self).__init__(debug=debug, profile=profile)
289 else:
290 super(Client, self).__init__(debug=debug)
267 if context is None:
291 if context is None:
268 context = zmq.Context.instance()
292 context = zmq.Context.instance()
269 self._context = context
293 self._context = context
270
271
294
272 self._setup_profile_dir(profile, profile_dir, ipython_dir)
295 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
273 if self._cd is not None:
296 if self._cd is not None:
274 if url_or_file is None:
297 if url_or_file is None:
275 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
298 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
276 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
299 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
277 " Please specify at least one of url_or_file or profile."
300 " Please specify at least one of url_or_file or profile."
278
301
279 try:
302 try:
280 util.validate_url(url_or_file)
303 util.validate_url(url_or_file)
281 except AssertionError:
304 except AssertionError:
282 if not os.path.exists(url_or_file):
305 if not os.path.exists(url_or_file):
283 if self._cd:
306 if self._cd:
284 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
307 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
285 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
308 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
286 with open(url_or_file) as f:
309 with open(url_or_file) as f:
287 cfg = json.loads(f.read())
310 cfg = json.loads(f.read())
288 else:
311 else:
289 cfg = {'url':url_or_file}
312 cfg = {'url':url_or_file}
290
313
291 # sync defaults from args, json:
314 # sync defaults from args, json:
292 if sshserver:
315 if sshserver:
293 cfg['ssh'] = sshserver
316 cfg['ssh'] = sshserver
294 if exec_key:
317 if exec_key:
295 cfg['exec_key'] = exec_key
318 cfg['exec_key'] = exec_key
296 exec_key = cfg['exec_key']
319 exec_key = cfg['exec_key']
297 sshserver=cfg['ssh']
320 sshserver=cfg['ssh']
298 url = cfg['url']
321 url = cfg['url']
299 location = cfg.setdefault('location', None)
322 location = cfg.setdefault('location', None)
300 cfg['url'] = util.disambiguate_url(cfg['url'], location)
323 cfg['url'] = util.disambiguate_url(cfg['url'], location)
301 url = cfg['url']
324 url = cfg['url']
302
325
303 self._config = cfg
326 self._config = cfg
304
327
305 self._ssh = bool(sshserver or sshkey or password)
328 self._ssh = bool(sshserver or sshkey or password)
306 if self._ssh and sshserver is None:
329 if self._ssh and sshserver is None:
307 # default to ssh via localhost
330 # default to ssh via localhost
308 sshserver = url.split('://')[1].split(':')[0]
331 sshserver = url.split('://')[1].split(':')[0]
309 if self._ssh and password is None:
332 if self._ssh and password is None:
310 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
333 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
311 password=False
334 password=False
312 else:
335 else:
313 password = getpass("SSH Password for %s: "%sshserver)
336 password = getpass("SSH Password for %s: "%sshserver)
314 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
337 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
315
338
316 # configure and construct the session
339 # configure and construct the session
317 if exec_key is not None:
340 if exec_key is not None:
318 if os.path.isfile(exec_key):
341 if os.path.isfile(exec_key):
319 extra_args['keyfile'] = exec_key
342 extra_args['keyfile'] = exec_key
320 else:
343 else:
321 extra_args['key'] = exec_key
344 extra_args['key'] = exec_key
322 self.session = Session(**extra_args)
345 self.session = Session(**extra_args)
323
346
324 self._query_socket = self._context.socket(zmq.XREQ)
347 self._query_socket = self._context.socket(zmq.XREQ)
325 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
348 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
326 if self._ssh:
349 if self._ssh:
327 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
350 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
328 else:
351 else:
329 self._query_socket.connect(url)
352 self._query_socket.connect(url)
330
353
331 self.session.debug = self.debug
354 self.session.debug = self.debug
332
355
333 self._notification_handlers = {'registration_notification' : self._register_engine,
356 self._notification_handlers = {'registration_notification' : self._register_engine,
334 'unregistration_notification' : self._unregister_engine,
357 'unregistration_notification' : self._unregister_engine,
335 'shutdown_notification' : lambda msg: self.close(),
358 'shutdown_notification' : lambda msg: self.close(),
336 }
359 }
337 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
360 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
338 'apply_reply' : self._handle_apply_reply}
361 'apply_reply' : self._handle_apply_reply}
339 self._connect(sshserver, ssh_kwargs, timeout)
362 self._connect(sshserver, ssh_kwargs, timeout)
340
363
341 def __del__(self):
364 def __del__(self):
342 """cleanup sockets, but _not_ context."""
365 """cleanup sockets, but _not_ context."""
343 self.close()
366 self.close()
344
367
345 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
368 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
346 if ipython_dir is None:
369 if ipython_dir is None:
347 ipython_dir = get_ipython_dir()
370 ipython_dir = get_ipython_dir()
348 if profile_dir is not None:
371 if profile_dir is not None:
349 try:
372 try:
350 self._cd = ProfileDir.find_profile_dir(profile_dir)
373 self._cd = ProfileDir.find_profile_dir(profile_dir)
351 return
374 return
352 except ProfileDirError:
375 except ProfileDirError:
353 pass
376 pass
354 elif profile is not None:
377 elif profile is not None:
355 try:
378 try:
356 self._cd = ProfileDir.find_profile_dir_by_name(
379 self._cd = ProfileDir.find_profile_dir_by_name(
357 ipython_dir, profile)
380 ipython_dir, profile)
358 return
381 return
359 except ProfileDirError:
382 except ProfileDirError:
360 pass
383 pass
361 self._cd = None
384 self._cd = None
362
385
363 def _update_engines(self, engines):
386 def _update_engines(self, engines):
364 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
387 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
365 for k,v in engines.iteritems():
388 for k,v in engines.iteritems():
366 eid = int(k)
389 eid = int(k)
367 self._engines[eid] = bytes(v) # force not unicode
390 self._engines[eid] = bytes(v) # force not unicode
368 self._ids.append(eid)
391 self._ids.append(eid)
369 self._ids = sorted(self._ids)
392 self._ids = sorted(self._ids)
370 if sorted(self._engines.keys()) != range(len(self._engines)) and \
393 if sorted(self._engines.keys()) != range(len(self._engines)) and \
371 self._task_scheme == 'pure' and self._task_socket:
394 self._task_scheme == 'pure' and self._task_socket:
372 self._stop_scheduling_tasks()
395 self._stop_scheduling_tasks()
373
396
374 def _stop_scheduling_tasks(self):
397 def _stop_scheduling_tasks(self):
375 """Stop scheduling tasks because an engine has been unregistered
398 """Stop scheduling tasks because an engine has been unregistered
376 from a pure ZMQ scheduler.
399 from a pure ZMQ scheduler.
377 """
400 """
378 self._task_socket.close()
401 self._task_socket.close()
379 self._task_socket = None
402 self._task_socket = None
380 msg = "An engine has been unregistered, and we are using pure " +\
403 msg = "An engine has been unregistered, and we are using pure " +\
381 "ZMQ task scheduling. Task farming will be disabled."
404 "ZMQ task scheduling. Task farming will be disabled."
382 if self.outstanding:
405 if self.outstanding:
383 msg += " If you were running tasks when this happened, " +\
406 msg += " If you were running tasks when this happened, " +\
384 "some `outstanding` msg_ids may never resolve."
407 "some `outstanding` msg_ids may never resolve."
385 warnings.warn(msg, RuntimeWarning)
408 warnings.warn(msg, RuntimeWarning)
386
409
387 def _build_targets(self, targets):
410 def _build_targets(self, targets):
388 """Turn valid target IDs or 'all' into two lists:
411 """Turn valid target IDs or 'all' into two lists:
389 (int_ids, uuids).
412 (int_ids, uuids).
390 """
413 """
391 if not self._ids:
414 if not self._ids:
392 # flush notification socket if no engines yet, just in case
415 # flush notification socket if no engines yet, just in case
393 if not self.ids:
416 if not self.ids:
394 raise error.NoEnginesRegistered("Can't build targets without any engines")
417 raise error.NoEnginesRegistered("Can't build targets without any engines")
395
418
396 if targets is None:
419 if targets is None:
397 targets = self._ids
420 targets = self._ids
398 elif isinstance(targets, str):
421 elif isinstance(targets, str):
399 if targets.lower() == 'all':
422 if targets.lower() == 'all':
400 targets = self._ids
423 targets = self._ids
401 else:
424 else:
402 raise TypeError("%r not valid str target, must be 'all'"%(targets))
425 raise TypeError("%r not valid str target, must be 'all'"%(targets))
403 elif isinstance(targets, int):
426 elif isinstance(targets, int):
404 if targets < 0:
427 if targets < 0:
405 targets = self.ids[targets]
428 targets = self.ids[targets]
406 if targets not in self._ids:
429 if targets not in self._ids:
407 raise IndexError("No such engine: %i"%targets)
430 raise IndexError("No such engine: %i"%targets)
408 targets = [targets]
431 targets = [targets]
409
432
410 if isinstance(targets, slice):
433 if isinstance(targets, slice):
411 indices = range(len(self._ids))[targets]
434 indices = range(len(self._ids))[targets]
412 ids = self.ids
435 ids = self.ids
413 targets = [ ids[i] for i in indices ]
436 targets = [ ids[i] for i in indices ]
414
437
415 if not isinstance(targets, (tuple, list, xrange)):
438 if not isinstance(targets, (tuple, list, xrange)):
416 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
439 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
417
440
418 return [self._engines[t] for t in targets], list(targets)
441 return [self._engines[t] for t in targets], list(targets)
419
442
420 def _connect(self, sshserver, ssh_kwargs, timeout):
443 def _connect(self, sshserver, ssh_kwargs, timeout):
421 """setup all our socket connections to the cluster. This is called from
444 """setup all our socket connections to the cluster. This is called from
422 __init__."""
445 __init__."""
423
446
424 # Maybe allow reconnecting?
447 # Maybe allow reconnecting?
425 if self._connected:
448 if self._connected:
426 return
449 return
427 self._connected=True
450 self._connected=True
428
451
429 def connect_socket(s, url):
452 def connect_socket(s, url):
430 url = util.disambiguate_url(url, self._config['location'])
453 url = util.disambiguate_url(url, self._config['location'])
431 if self._ssh:
454 if self._ssh:
432 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
455 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
433 else:
456 else:
434 return s.connect(url)
457 return s.connect(url)
435
458
436 self.session.send(self._query_socket, 'connection_request')
459 self.session.send(self._query_socket, 'connection_request')
437 r,w,x = zmq.select([self._query_socket],[],[], timeout)
460 r,w,x = zmq.select([self._query_socket],[],[], timeout)
438 if not r:
461 if not r:
439 raise error.TimeoutError("Hub connection request timed out")
462 raise error.TimeoutError("Hub connection request timed out")
440 idents,msg = self.session.recv(self._query_socket,mode=0)
463 idents,msg = self.session.recv(self._query_socket,mode=0)
441 if self.debug:
464 if self.debug:
442 pprint(msg)
465 pprint(msg)
443 msg = Message(msg)
466 msg = Message(msg)
444 content = msg.content
467 content = msg.content
445 self._config['registration'] = dict(content)
468 self._config['registration'] = dict(content)
446 if content.status == 'ok':
469 if content.status == 'ok':
447 if content.mux:
470 if content.mux:
448 self._mux_socket = self._context.socket(zmq.XREQ)
471 self._mux_socket = self._context.socket(zmq.XREQ)
449 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
472 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
450 connect_socket(self._mux_socket, content.mux)
473 connect_socket(self._mux_socket, content.mux)
451 if content.task:
474 if content.task:
452 self._task_scheme, task_addr = content.task
475 self._task_scheme, task_addr = content.task
453 self._task_socket = self._context.socket(zmq.XREQ)
476 self._task_socket = self._context.socket(zmq.XREQ)
454 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
477 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
455 connect_socket(self._task_socket, task_addr)
478 connect_socket(self._task_socket, task_addr)
456 if content.notification:
479 if content.notification:
457 self._notification_socket = self._context.socket(zmq.SUB)
480 self._notification_socket = self._context.socket(zmq.SUB)
458 connect_socket(self._notification_socket, content.notification)
481 connect_socket(self._notification_socket, content.notification)
459 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
482 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
460 # if content.query:
483 # if content.query:
461 # self._query_socket = self._context.socket(zmq.XREQ)
484 # self._query_socket = self._context.socket(zmq.XREQ)
462 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
485 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
463 # connect_socket(self._query_socket, content.query)
486 # connect_socket(self._query_socket, content.query)
464 if content.control:
487 if content.control:
465 self._control_socket = self._context.socket(zmq.XREQ)
488 self._control_socket = self._context.socket(zmq.XREQ)
466 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
489 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
467 connect_socket(self._control_socket, content.control)
490 connect_socket(self._control_socket, content.control)
468 if content.iopub:
491 if content.iopub:
469 self._iopub_socket = self._context.socket(zmq.SUB)
492 self._iopub_socket = self._context.socket(zmq.SUB)
470 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
493 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
471 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
494 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
472 connect_socket(self._iopub_socket, content.iopub)
495 connect_socket(self._iopub_socket, content.iopub)
473 self._update_engines(dict(content.engines))
496 self._update_engines(dict(content.engines))
474 else:
497 else:
475 self._connected = False
498 self._connected = False
476 raise Exception("Failed to connect!")
499 raise Exception("Failed to connect!")
477
500
478 #--------------------------------------------------------------------------
501 #--------------------------------------------------------------------------
479 # handlers and callbacks for incoming messages
502 # handlers and callbacks for incoming messages
480 #--------------------------------------------------------------------------
503 #--------------------------------------------------------------------------
481
504
482 def _unwrap_exception(self, content):
505 def _unwrap_exception(self, content):
483 """unwrap exception, and remap engine_id to int."""
506 """unwrap exception, and remap engine_id to int."""
484 e = error.unwrap_exception(content)
507 e = error.unwrap_exception(content)
485 # print e.traceback
508 # print e.traceback
486 if e.engine_info:
509 if e.engine_info:
487 e_uuid = e.engine_info['engine_uuid']
510 e_uuid = e.engine_info['engine_uuid']
488 eid = self._engines[e_uuid]
511 eid = self._engines[e_uuid]
489 e.engine_info['engine_id'] = eid
512 e.engine_info['engine_id'] = eid
490 return e
513 return e
491
514
492 def _extract_metadata(self, header, parent, content):
515 def _extract_metadata(self, header, parent, content):
493 md = {'msg_id' : parent['msg_id'],
516 md = {'msg_id' : parent['msg_id'],
494 'received' : datetime.now(),
517 'received' : datetime.now(),
495 'engine_uuid' : header.get('engine', None),
518 'engine_uuid' : header.get('engine', None),
496 'follow' : parent.get('follow', []),
519 'follow' : parent.get('follow', []),
497 'after' : parent.get('after', []),
520 'after' : parent.get('after', []),
498 'status' : content['status'],
521 'status' : content['status'],
499 }
522 }
500
523
501 if md['engine_uuid'] is not None:
524 if md['engine_uuid'] is not None:
502 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
525 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
503
526
504 if 'date' in parent:
527 if 'date' in parent:
505 md['submitted'] = parent['date']
528 md['submitted'] = parent['date']
506 if 'started' in header:
529 if 'started' in header:
507 md['started'] = header['started']
530 md['started'] = header['started']
508 if 'date' in header:
531 if 'date' in header:
509 md['completed'] = header['date']
532 md['completed'] = header['date']
510 return md
533 return md
511
534
512 def _register_engine(self, msg):
535 def _register_engine(self, msg):
513 """Register a new engine, and update our connection info."""
536 """Register a new engine, and update our connection info."""
514 content = msg['content']
537 content = msg['content']
515 eid = content['id']
538 eid = content['id']
516 d = {eid : content['queue']}
539 d = {eid : content['queue']}
517 self._update_engines(d)
540 self._update_engines(d)
518
541
519 def _unregister_engine(self, msg):
542 def _unregister_engine(self, msg):
520 """Unregister an engine that has died."""
543 """Unregister an engine that has died."""
521 content = msg['content']
544 content = msg['content']
522 eid = int(content['id'])
545 eid = int(content['id'])
523 if eid in self._ids:
546 if eid in self._ids:
524 self._ids.remove(eid)
547 self._ids.remove(eid)
525 uuid = self._engines.pop(eid)
548 uuid = self._engines.pop(eid)
526
549
527 self._handle_stranded_msgs(eid, uuid)
550 self._handle_stranded_msgs(eid, uuid)
528
551
529 if self._task_socket and self._task_scheme == 'pure':
552 if self._task_socket and self._task_scheme == 'pure':
530 self._stop_scheduling_tasks()
553 self._stop_scheduling_tasks()
531
554
532 def _handle_stranded_msgs(self, eid, uuid):
555 def _handle_stranded_msgs(self, eid, uuid):
533 """Handle messages known to be on an engine when the engine unregisters.
556 """Handle messages known to be on an engine when the engine unregisters.
534
557
535 It is possible that this will fire prematurely - that is, an engine will
558 It is possible that this will fire prematurely - that is, an engine will
536 go down after completing a result, and the client will be notified
559 go down after completing a result, and the client will be notified
537 of the unregistration and later receive the successful result.
560 of the unregistration and later receive the successful result.
538 """
561 """
539
562
540 outstanding = self._outstanding_dict[uuid]
563 outstanding = self._outstanding_dict[uuid]
541
564
542 for msg_id in list(outstanding):
565 for msg_id in list(outstanding):
543 if msg_id in self.results:
566 if msg_id in self.results:
544 # we already
567 # we already
545 continue
568 continue
546 try:
569 try:
547 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
570 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
548 except:
571 except:
549 content = error.wrap_exception()
572 content = error.wrap_exception()
550 # build a fake message:
573 # build a fake message:
551 parent = {}
574 parent = {}
552 header = {}
575 header = {}
553 parent['msg_id'] = msg_id
576 parent['msg_id'] = msg_id
554 header['engine'] = uuid
577 header['engine'] = uuid
555 header['date'] = datetime.now()
578 header['date'] = datetime.now()
556 msg = dict(parent_header=parent, header=header, content=content)
579 msg = dict(parent_header=parent, header=header, content=content)
557 self._handle_apply_reply(msg)
580 self._handle_apply_reply(msg)
558
581
559 def _handle_execute_reply(self, msg):
582 def _handle_execute_reply(self, msg):
560 """Save the reply to an execute_request into our results.
583 """Save the reply to an execute_request into our results.
561
584
562 execute messages are never actually used. apply is used instead.
585 execute messages are never actually used. apply is used instead.
563 """
586 """
564
587
565 parent = msg['parent_header']
588 parent = msg['parent_header']
566 msg_id = parent['msg_id']
589 msg_id = parent['msg_id']
567 if msg_id not in self.outstanding:
590 if msg_id not in self.outstanding:
568 if msg_id in self.history:
591 if msg_id in self.history:
569 print ("got stale result: %s"%msg_id)
592 print ("got stale result: %s"%msg_id)
570 else:
593 else:
571 print ("got unknown result: %s"%msg_id)
594 print ("got unknown result: %s"%msg_id)
572 else:
595 else:
573 self.outstanding.remove(msg_id)
596 self.outstanding.remove(msg_id)
574 self.results[msg_id] = self._unwrap_exception(msg['content'])
597 self.results[msg_id] = self._unwrap_exception(msg['content'])
575
598
576 def _handle_apply_reply(self, msg):
599 def _handle_apply_reply(self, msg):
577 """Save the reply to an apply_request into our results."""
600 """Save the reply to an apply_request into our results."""
578 parent = msg['parent_header']
601 parent = msg['parent_header']
579 msg_id = parent['msg_id']
602 msg_id = parent['msg_id']
580 if msg_id not in self.outstanding:
603 if msg_id not in self.outstanding:
581 if msg_id in self.history:
604 if msg_id in self.history:
582 print ("got stale result: %s"%msg_id)
605 print ("got stale result: %s"%msg_id)
583 print self.results[msg_id]
606 print self.results[msg_id]
584 print msg
607 print msg
585 else:
608 else:
586 print ("got unknown result: %s"%msg_id)
609 print ("got unknown result: %s"%msg_id)
587 else:
610 else:
588 self.outstanding.remove(msg_id)
611 self.outstanding.remove(msg_id)
589 content = msg['content']
612 content = msg['content']
590 header = msg['header']
613 header = msg['header']
591
614
592 # construct metadata:
615 # construct metadata:
593 md = self.metadata[msg_id]
616 md = self.metadata[msg_id]
594 md.update(self._extract_metadata(header, parent, content))
617 md.update(self._extract_metadata(header, parent, content))
595 # is this redundant?
618 # is this redundant?
596 self.metadata[msg_id] = md
619 self.metadata[msg_id] = md
597
620
598 e_outstanding = self._outstanding_dict[md['engine_uuid']]
621 e_outstanding = self._outstanding_dict[md['engine_uuid']]
599 if msg_id in e_outstanding:
622 if msg_id in e_outstanding:
600 e_outstanding.remove(msg_id)
623 e_outstanding.remove(msg_id)
601
624
602 # construct result:
625 # construct result:
603 if content['status'] == 'ok':
626 if content['status'] == 'ok':
604 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
627 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
605 elif content['status'] == 'aborted':
628 elif content['status'] == 'aborted':
606 self.results[msg_id] = error.TaskAborted(msg_id)
629 self.results[msg_id] = error.TaskAborted(msg_id)
607 elif content['status'] == 'resubmitted':
630 elif content['status'] == 'resubmitted':
608 # TODO: handle resubmission
631 # TODO: handle resubmission
609 pass
632 pass
610 else:
633 else:
611 self.results[msg_id] = self._unwrap_exception(content)
634 self.results[msg_id] = self._unwrap_exception(content)
612
635
613 def _flush_notifications(self):
636 def _flush_notifications(self):
614 """Flush notifications of engine registrations waiting
637 """Flush notifications of engine registrations waiting
615 in ZMQ queue."""
638 in ZMQ queue."""
616 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
639 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
617 while msg is not None:
640 while msg is not None:
618 if self.debug:
641 if self.debug:
619 pprint(msg)
642 pprint(msg)
620 msg_type = msg['msg_type']
643 msg_type = msg['msg_type']
621 handler = self._notification_handlers.get(msg_type, None)
644 handler = self._notification_handlers.get(msg_type, None)
622 if handler is None:
645 if handler is None:
623 raise Exception("Unhandled message type: %s"%msg.msg_type)
646 raise Exception("Unhandled message type: %s"%msg.msg_type)
624 else:
647 else:
625 handler(msg)
648 handler(msg)
626 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
649 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
627
650
628 def _flush_results(self, sock):
651 def _flush_results(self, sock):
629 """Flush task or queue results waiting in ZMQ queue."""
652 """Flush task or queue results waiting in ZMQ queue."""
630 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
653 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
631 while msg is not None:
654 while msg is not None:
632 if self.debug:
655 if self.debug:
633 pprint(msg)
656 pprint(msg)
634 msg_type = msg['msg_type']
657 msg_type = msg['msg_type']
635 handler = self._queue_handlers.get(msg_type, None)
658 handler = self._queue_handlers.get(msg_type, None)
636 if handler is None:
659 if handler is None:
637 raise Exception("Unhandled message type: %s"%msg.msg_type)
660 raise Exception("Unhandled message type: %s"%msg.msg_type)
638 else:
661 else:
639 handler(msg)
662 handler(msg)
640 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
663 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
641
664
642 def _flush_control(self, sock):
665 def _flush_control(self, sock):
643 """Flush replies from the control channel waiting
666 """Flush replies from the control channel waiting
644 in the ZMQ queue.
667 in the ZMQ queue.
645
668
646 Currently: ignore them."""
669 Currently: ignore them."""
647 if self._ignored_control_replies <= 0:
670 if self._ignored_control_replies <= 0:
648 return
671 return
649 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
672 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
650 while msg is not None:
673 while msg is not None:
651 self._ignored_control_replies -= 1
674 self._ignored_control_replies -= 1
652 if self.debug:
675 if self.debug:
653 pprint(msg)
676 pprint(msg)
654 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
677 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
655
678
656 def _flush_ignored_control(self):
679 def _flush_ignored_control(self):
657 """flush ignored control replies"""
680 """flush ignored control replies"""
658 while self._ignored_control_replies > 0:
681 while self._ignored_control_replies > 0:
659 self.session.recv(self._control_socket)
682 self.session.recv(self._control_socket)
660 self._ignored_control_replies -= 1
683 self._ignored_control_replies -= 1
661
684
662 def _flush_ignored_hub_replies(self):
685 def _flush_ignored_hub_replies(self):
663 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
686 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
664 while msg is not None:
687 while msg is not None:
665 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
688 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
666
689
667 def _flush_iopub(self, sock):
690 def _flush_iopub(self, sock):
668 """Flush replies from the iopub channel waiting
691 """Flush replies from the iopub channel waiting
669 in the ZMQ queue.
692 in the ZMQ queue.
670 """
693 """
671 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
694 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
672 while msg is not None:
695 while msg is not None:
673 if self.debug:
696 if self.debug:
674 pprint(msg)
697 pprint(msg)
675 parent = msg['parent_header']
698 parent = msg['parent_header']
676 msg_id = parent['msg_id']
699 msg_id = parent['msg_id']
677 content = msg['content']
700 content = msg['content']
678 header = msg['header']
701 header = msg['header']
679 msg_type = msg['msg_type']
702 msg_type = msg['msg_type']
680
703
681 # init metadata:
704 # init metadata:
682 md = self.metadata[msg_id]
705 md = self.metadata[msg_id]
683
706
684 if msg_type == 'stream':
707 if msg_type == 'stream':
685 name = content['name']
708 name = content['name']
686 s = md[name] or ''
709 s = md[name] or ''
687 md[name] = s + content['data']
710 md[name] = s + content['data']
688 elif msg_type == 'pyerr':
711 elif msg_type == 'pyerr':
689 md.update({'pyerr' : self._unwrap_exception(content)})
712 md.update({'pyerr' : self._unwrap_exception(content)})
690 elif msg_type == 'pyin':
713 elif msg_type == 'pyin':
691 md.update({'pyin' : content['code']})
714 md.update({'pyin' : content['code']})
692 else:
715 else:
693 md.update({msg_type : content.get('data', '')})
716 md.update({msg_type : content.get('data', '')})
694
717
695 # reduntant?
718 # reduntant?
696 self.metadata[msg_id] = md
719 self.metadata[msg_id] = md
697
720
698 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
721 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
699
722
700 #--------------------------------------------------------------------------
723 #--------------------------------------------------------------------------
701 # len, getitem
724 # len, getitem
702 #--------------------------------------------------------------------------
725 #--------------------------------------------------------------------------
703
726
704 def __len__(self):
727 def __len__(self):
705 """len(client) returns # of engines."""
728 """len(client) returns # of engines."""
706 return len(self.ids)
729 return len(self.ids)
707
730
708 def __getitem__(self, key):
731 def __getitem__(self, key):
709 """index access returns DirectView multiplexer objects
732 """index access returns DirectView multiplexer objects
710
733
711 Must be int, slice, or list/tuple/xrange of ints"""
734 Must be int, slice, or list/tuple/xrange of ints"""
712 if not isinstance(key, (int, slice, tuple, list, xrange)):
735 if not isinstance(key, (int, slice, tuple, list, xrange)):
713 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
736 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
714 else:
737 else:
715 return self.direct_view(key)
738 return self.direct_view(key)
716
739
717 #--------------------------------------------------------------------------
740 #--------------------------------------------------------------------------
718 # Begin public methods
741 # Begin public methods
719 #--------------------------------------------------------------------------
742 #--------------------------------------------------------------------------
720
743
721 @property
744 @property
722 def ids(self):
745 def ids(self):
723 """Always up-to-date ids property."""
746 """Always up-to-date ids property."""
724 self._flush_notifications()
747 self._flush_notifications()
725 # always copy:
748 # always copy:
726 return list(self._ids)
749 return list(self._ids)
727
750
728 def close(self):
751 def close(self):
729 if self._closed:
752 if self._closed:
730 return
753 return
731 snames = filter(lambda n: n.endswith('socket'), dir(self))
754 snames = filter(lambda n: n.endswith('socket'), dir(self))
732 for socket in map(lambda name: getattr(self, name), snames):
755 for socket in map(lambda name: getattr(self, name), snames):
733 if isinstance(socket, zmq.Socket) and not socket.closed:
756 if isinstance(socket, zmq.Socket) and not socket.closed:
734 socket.close()
757 socket.close()
735 self._closed = True
758 self._closed = True
736
759
737 def spin(self):
760 def spin(self):
738 """Flush any registration notifications and execution results
761 """Flush any registration notifications and execution results
739 waiting in the ZMQ queue.
762 waiting in the ZMQ queue.
740 """
763 """
741 if self._notification_socket:
764 if self._notification_socket:
742 self._flush_notifications()
765 self._flush_notifications()
743 if self._mux_socket:
766 if self._mux_socket:
744 self._flush_results(self._mux_socket)
767 self._flush_results(self._mux_socket)
745 if self._task_socket:
768 if self._task_socket:
746 self._flush_results(self._task_socket)
769 self._flush_results(self._task_socket)
747 if self._control_socket:
770 if self._control_socket:
748 self._flush_control(self._control_socket)
771 self._flush_control(self._control_socket)
749 if self._iopub_socket:
772 if self._iopub_socket:
750 self._flush_iopub(self._iopub_socket)
773 self._flush_iopub(self._iopub_socket)
751 if self._query_socket:
774 if self._query_socket:
752 self._flush_ignored_hub_replies()
775 self._flush_ignored_hub_replies()
753
776
754 def wait(self, jobs=None, timeout=-1):
777 def wait(self, jobs=None, timeout=-1):
755 """waits on one or more `jobs`, for up to `timeout` seconds.
778 """waits on one or more `jobs`, for up to `timeout` seconds.
756
779
757 Parameters
780 Parameters
758 ----------
781 ----------
759
782
760 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
783 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
761 ints are indices to self.history
784 ints are indices to self.history
762 strs are msg_ids
785 strs are msg_ids
763 default: wait on all outstanding messages
786 default: wait on all outstanding messages
764 timeout : float
787 timeout : float
765 a time in seconds, after which to give up.
788 a time in seconds, after which to give up.
766 default is -1, which means no timeout
789 default is -1, which means no timeout
767
790
768 Returns
791 Returns
769 -------
792 -------
770
793
771 True : when all msg_ids are done
794 True : when all msg_ids are done
772 False : timeout reached, some msg_ids still outstanding
795 False : timeout reached, some msg_ids still outstanding
773 """
796 """
774 tic = time.time()
797 tic = time.time()
775 if jobs is None:
798 if jobs is None:
776 theids = self.outstanding
799 theids = self.outstanding
777 else:
800 else:
778 if isinstance(jobs, (int, str, AsyncResult)):
801 if isinstance(jobs, (int, str, AsyncResult)):
779 jobs = [jobs]
802 jobs = [jobs]
780 theids = set()
803 theids = set()
781 for job in jobs:
804 for job in jobs:
782 if isinstance(job, int):
805 if isinstance(job, int):
783 # index access
806 # index access
784 job = self.history[job]
807 job = self.history[job]
785 elif isinstance(job, AsyncResult):
808 elif isinstance(job, AsyncResult):
786 map(theids.add, job.msg_ids)
809 map(theids.add, job.msg_ids)
787 continue
810 continue
788 theids.add(job)
811 theids.add(job)
789 if not theids.intersection(self.outstanding):
812 if not theids.intersection(self.outstanding):
790 return True
813 return True
791 self.spin()
814 self.spin()
792 while theids.intersection(self.outstanding):
815 while theids.intersection(self.outstanding):
793 if timeout >= 0 and ( time.time()-tic ) > timeout:
816 if timeout >= 0 and ( time.time()-tic ) > timeout:
794 break
817 break
795 time.sleep(1e-3)
818 time.sleep(1e-3)
796 self.spin()
819 self.spin()
797 return len(theids.intersection(self.outstanding)) == 0
820 return len(theids.intersection(self.outstanding)) == 0
798
821
799 #--------------------------------------------------------------------------
822 #--------------------------------------------------------------------------
800 # Control methods
823 # Control methods
801 #--------------------------------------------------------------------------
824 #--------------------------------------------------------------------------
802
825
803 @spin_first
826 @spin_first
804 def clear(self, targets=None, block=None):
827 def clear(self, targets=None, block=None):
805 """Clear the namespace in target(s)."""
828 """Clear the namespace in target(s)."""
806 block = self.block if block is None else block
829 block = self.block if block is None else block
807 targets = self._build_targets(targets)[0]
830 targets = self._build_targets(targets)[0]
808 for t in targets:
831 for t in targets:
809 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
832 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
810 error = False
833 error = False
811 if block:
834 if block:
812 self._flush_ignored_control()
835 self._flush_ignored_control()
813 for i in range(len(targets)):
836 for i in range(len(targets)):
814 idents,msg = self.session.recv(self._control_socket,0)
837 idents,msg = self.session.recv(self._control_socket,0)
815 if self.debug:
838 if self.debug:
816 pprint(msg)
839 pprint(msg)
817 if msg['content']['status'] != 'ok':
840 if msg['content']['status'] != 'ok':
818 error = self._unwrap_exception(msg['content'])
841 error = self._unwrap_exception(msg['content'])
819 else:
842 else:
820 self._ignored_control_replies += len(targets)
843 self._ignored_control_replies += len(targets)
821 if error:
844 if error:
822 raise error
845 raise error
823
846
824
847
825 @spin_first
848 @spin_first
826 def abort(self, jobs=None, targets=None, block=None):
849 def abort(self, jobs=None, targets=None, block=None):
827 """Abort specific jobs from the execution queues of target(s).
850 """Abort specific jobs from the execution queues of target(s).
828
851
829 This is a mechanism to prevent jobs that have already been submitted
852 This is a mechanism to prevent jobs that have already been submitted
830 from executing.
853 from executing.
831
854
832 Parameters
855 Parameters
833 ----------
856 ----------
834
857
835 jobs : msg_id, list of msg_ids, or AsyncResult
858 jobs : msg_id, list of msg_ids, or AsyncResult
836 The jobs to be aborted
859 The jobs to be aborted
837
860
838
861
839 """
862 """
840 block = self.block if block is None else block
863 block = self.block if block is None else block
841 targets = self._build_targets(targets)[0]
864 targets = self._build_targets(targets)[0]
842 msg_ids = []
865 msg_ids = []
843 if isinstance(jobs, (basestring,AsyncResult)):
866 if isinstance(jobs, (basestring,AsyncResult)):
844 jobs = [jobs]
867 jobs = [jobs]
845 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
868 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
846 if bad_ids:
869 if bad_ids:
847 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
870 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
848 for j in jobs:
871 for j in jobs:
849 if isinstance(j, AsyncResult):
872 if isinstance(j, AsyncResult):
850 msg_ids.extend(j.msg_ids)
873 msg_ids.extend(j.msg_ids)
851 else:
874 else:
852 msg_ids.append(j)
875 msg_ids.append(j)
853 content = dict(msg_ids=msg_ids)
876 content = dict(msg_ids=msg_ids)
854 for t in targets:
877 for t in targets:
855 self.session.send(self._control_socket, 'abort_request',
878 self.session.send(self._control_socket, 'abort_request',
856 content=content, ident=t)
879 content=content, ident=t)
857 error = False
880 error = False
858 if block:
881 if block:
859 self._flush_ignored_control()
882 self._flush_ignored_control()
860 for i in range(len(targets)):
883 for i in range(len(targets)):
861 idents,msg = self.session.recv(self._control_socket,0)
884 idents,msg = self.session.recv(self._control_socket,0)
862 if self.debug:
885 if self.debug:
863 pprint(msg)
886 pprint(msg)
864 if msg['content']['status'] != 'ok':
887 if msg['content']['status'] != 'ok':
865 error = self._unwrap_exception(msg['content'])
888 error = self._unwrap_exception(msg['content'])
866 else:
889 else:
867 self._ignored_control_replies += len(targets)
890 self._ignored_control_replies += len(targets)
868 if error:
891 if error:
869 raise error
892 raise error
870
893
871 @spin_first
894 @spin_first
872 def shutdown(self, targets=None, restart=False, hub=False, block=None):
895 def shutdown(self, targets=None, restart=False, hub=False, block=None):
873 """Terminates one or more engine processes, optionally including the hub."""
896 """Terminates one or more engine processes, optionally including the hub."""
874 block = self.block if block is None else block
897 block = self.block if block is None else block
875 if hub:
898 if hub:
876 targets = 'all'
899 targets = 'all'
877 targets = self._build_targets(targets)[0]
900 targets = self._build_targets(targets)[0]
878 for t in targets:
901 for t in targets:
879 self.session.send(self._control_socket, 'shutdown_request',
902 self.session.send(self._control_socket, 'shutdown_request',
880 content={'restart':restart},ident=t)
903 content={'restart':restart},ident=t)
881 error = False
904 error = False
882 if block or hub:
905 if block or hub:
883 self._flush_ignored_control()
906 self._flush_ignored_control()
884 for i in range(len(targets)):
907 for i in range(len(targets)):
885 idents,msg = self.session.recv(self._control_socket, 0)
908 idents,msg = self.session.recv(self._control_socket, 0)
886 if self.debug:
909 if self.debug:
887 pprint(msg)
910 pprint(msg)
888 if msg['content']['status'] != 'ok':
911 if msg['content']['status'] != 'ok':
889 error = self._unwrap_exception(msg['content'])
912 error = self._unwrap_exception(msg['content'])
890 else:
913 else:
891 self._ignored_control_replies += len(targets)
914 self._ignored_control_replies += len(targets)
892
915
893 if hub:
916 if hub:
894 time.sleep(0.25)
917 time.sleep(0.25)
895 self.session.send(self._query_socket, 'shutdown_request')
918 self.session.send(self._query_socket, 'shutdown_request')
896 idents,msg = self.session.recv(self._query_socket, 0)
919 idents,msg = self.session.recv(self._query_socket, 0)
897 if self.debug:
920 if self.debug:
898 pprint(msg)
921 pprint(msg)
899 if msg['content']['status'] != 'ok':
922 if msg['content']['status'] != 'ok':
900 error = self._unwrap_exception(msg['content'])
923 error = self._unwrap_exception(msg['content'])
901
924
902 if error:
925 if error:
903 raise error
926 raise error
904
927
905 #--------------------------------------------------------------------------
928 #--------------------------------------------------------------------------
906 # Execution related methods
929 # Execution related methods
907 #--------------------------------------------------------------------------
930 #--------------------------------------------------------------------------
908
931
909 def _maybe_raise(self, result):
932 def _maybe_raise(self, result):
910 """wrapper for maybe raising an exception if apply failed."""
933 """wrapper for maybe raising an exception if apply failed."""
911 if isinstance(result, error.RemoteError):
934 if isinstance(result, error.RemoteError):
912 raise result
935 raise result
913
936
914 return result
937 return result
915
938
916 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
939 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
917 ident=None):
940 ident=None):
918 """construct and send an apply message via a socket.
941 """construct and send an apply message via a socket.
919
942
920 This is the principal method with which all engine execution is performed by views.
943 This is the principal method with which all engine execution is performed by views.
921 """
944 """
922
945
923 assert not self._closed, "cannot use me anymore, I'm closed!"
946 assert not self._closed, "cannot use me anymore, I'm closed!"
924 # defaults:
947 # defaults:
925 args = args if args is not None else []
948 args = args if args is not None else []
926 kwargs = kwargs if kwargs is not None else {}
949 kwargs = kwargs if kwargs is not None else {}
927 subheader = subheader if subheader is not None else {}
950 subheader = subheader if subheader is not None else {}
928
951
929 # validate arguments
952 # validate arguments
930 if not callable(f):
953 if not callable(f):
931 raise TypeError("f must be callable, not %s"%type(f))
954 raise TypeError("f must be callable, not %s"%type(f))
932 if not isinstance(args, (tuple, list)):
955 if not isinstance(args, (tuple, list)):
933 raise TypeError("args must be tuple or list, not %s"%type(args))
956 raise TypeError("args must be tuple or list, not %s"%type(args))
934 if not isinstance(kwargs, dict):
957 if not isinstance(kwargs, dict):
935 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
958 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
936 if not isinstance(subheader, dict):
959 if not isinstance(subheader, dict):
937 raise TypeError("subheader must be dict, not %s"%type(subheader))
960 raise TypeError("subheader must be dict, not %s"%type(subheader))
938
961
939 bufs = util.pack_apply_message(f,args,kwargs)
962 bufs = util.pack_apply_message(f,args,kwargs)
940
963
941 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
964 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
942 subheader=subheader, track=track)
965 subheader=subheader, track=track)
943
966
944 msg_id = msg['msg_id']
967 msg_id = msg['msg_id']
945 self.outstanding.add(msg_id)
968 self.outstanding.add(msg_id)
946 if ident:
969 if ident:
947 # possibly routed to a specific engine
970 # possibly routed to a specific engine
948 if isinstance(ident, list):
971 if isinstance(ident, list):
949 ident = ident[-1]
972 ident = ident[-1]
950 if ident in self._engines.values():
973 if ident in self._engines.values():
951 # save for later, in case of engine death
974 # save for later, in case of engine death
952 self._outstanding_dict[ident].add(msg_id)
975 self._outstanding_dict[ident].add(msg_id)
953 self.history.append(msg_id)
976 self.history.append(msg_id)
954 self.metadata[msg_id]['submitted'] = datetime.now()
977 self.metadata[msg_id]['submitted'] = datetime.now()
955
978
956 return msg
979 return msg
957
980
958 #--------------------------------------------------------------------------
981 #--------------------------------------------------------------------------
959 # construct a View object
982 # construct a View object
960 #--------------------------------------------------------------------------
983 #--------------------------------------------------------------------------
961
984
962 def load_balanced_view(self, targets=None):
985 def load_balanced_view(self, targets=None):
963 """construct a DirectView object.
986 """construct a DirectView object.
964
987
965 If no arguments are specified, create a LoadBalancedView
988 If no arguments are specified, create a LoadBalancedView
966 using all engines.
989 using all engines.
967
990
968 Parameters
991 Parameters
969 ----------
992 ----------
970
993
971 targets: list,slice,int,etc. [default: use all engines]
994 targets: list,slice,int,etc. [default: use all engines]
972 The subset of engines across which to load-balance
995 The subset of engines across which to load-balance
973 """
996 """
974 if targets is not None:
997 if targets is not None:
975 targets = self._build_targets(targets)[1]
998 targets = self._build_targets(targets)[1]
976 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
999 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
977
1000
978 def direct_view(self, targets='all'):
1001 def direct_view(self, targets='all'):
979 """construct a DirectView object.
1002 """construct a DirectView object.
980
1003
981 If no targets are specified, create a DirectView
1004 If no targets are specified, create a DirectView
982 using all engines.
1005 using all engines.
983
1006
984 Parameters
1007 Parameters
985 ----------
1008 ----------
986
1009
987 targets: list,slice,int,etc. [default: use all engines]
1010 targets: list,slice,int,etc. [default: use all engines]
988 The engines to use for the View
1011 The engines to use for the View
989 """
1012 """
990 single = isinstance(targets, int)
1013 single = isinstance(targets, int)
991 targets = self._build_targets(targets)[1]
1014 targets = self._build_targets(targets)[1]
992 if single:
1015 if single:
993 targets = targets[0]
1016 targets = targets[0]
994 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1017 return DirectView(client=self, socket=self._mux_socket, targets=targets)
995
1018
996 #--------------------------------------------------------------------------
1019 #--------------------------------------------------------------------------
997 # Query methods
1020 # Query methods
998 #--------------------------------------------------------------------------
1021 #--------------------------------------------------------------------------
999
1022
1000 @spin_first
1023 @spin_first
1001 def get_result(self, indices_or_msg_ids=None, block=None):
1024 def get_result(self, indices_or_msg_ids=None, block=None):
1002 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1025 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1003
1026
1004 If the client already has the results, no request to the Hub will be made.
1027 If the client already has the results, no request to the Hub will be made.
1005
1028
1006 This is a convenient way to construct AsyncResult objects, which are wrappers
1029 This is a convenient way to construct AsyncResult objects, which are wrappers
1007 that include metadata about execution, and allow for awaiting results that
1030 that include metadata about execution, and allow for awaiting results that
1008 were not submitted by this Client.
1031 were not submitted by this Client.
1009
1032
1010 It can also be a convenient way to retrieve the metadata associated with
1033 It can also be a convenient way to retrieve the metadata associated with
1011 blocking execution, since it always retrieves
1034 blocking execution, since it always retrieves
1012
1035
1013 Examples
1036 Examples
1014 --------
1037 --------
1015 ::
1038 ::
1016
1039
1017 In [10]: r = client.apply()
1040 In [10]: r = client.apply()
1018
1041
1019 Parameters
1042 Parameters
1020 ----------
1043 ----------
1021
1044
1022 indices_or_msg_ids : integer history index, str msg_id, or list of either
1045 indices_or_msg_ids : integer history index, str msg_id, or list of either
1023 The indices or msg_ids of indices to be retrieved
1046 The indices or msg_ids of indices to be retrieved
1024
1047
1025 block : bool
1048 block : bool
1026 Whether to wait for the result to be done
1049 Whether to wait for the result to be done
1027
1050
1028 Returns
1051 Returns
1029 -------
1052 -------
1030
1053
1031 AsyncResult
1054 AsyncResult
1032 A single AsyncResult object will always be returned.
1055 A single AsyncResult object will always be returned.
1033
1056
1034 AsyncHubResult
1057 AsyncHubResult
1035 A subclass of AsyncResult that retrieves results from the Hub
1058 A subclass of AsyncResult that retrieves results from the Hub
1036
1059
1037 """
1060 """
1038 block = self.block if block is None else block
1061 block = self.block if block is None else block
1039 if indices_or_msg_ids is None:
1062 if indices_or_msg_ids is None:
1040 indices_or_msg_ids = -1
1063 indices_or_msg_ids = -1
1041
1064
1042 if not isinstance(indices_or_msg_ids, (list,tuple)):
1065 if not isinstance(indices_or_msg_ids, (list,tuple)):
1043 indices_or_msg_ids = [indices_or_msg_ids]
1066 indices_or_msg_ids = [indices_or_msg_ids]
1044
1067
1045 theids = []
1068 theids = []
1046 for id in indices_or_msg_ids:
1069 for id in indices_or_msg_ids:
1047 if isinstance(id, int):
1070 if isinstance(id, int):
1048 id = self.history[id]
1071 id = self.history[id]
1049 if not isinstance(id, str):
1072 if not isinstance(id, str):
1050 raise TypeError("indices must be str or int, not %r"%id)
1073 raise TypeError("indices must be str or int, not %r"%id)
1051 theids.append(id)
1074 theids.append(id)
1052
1075
1053 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1076 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1054 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1077 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1055
1078
1056 if remote_ids:
1079 if remote_ids:
1057 ar = AsyncHubResult(self, msg_ids=theids)
1080 ar = AsyncHubResult(self, msg_ids=theids)
1058 else:
1081 else:
1059 ar = AsyncResult(self, msg_ids=theids)
1082 ar = AsyncResult(self, msg_ids=theids)
1060
1083
1061 if block:
1084 if block:
1062 ar.wait()
1085 ar.wait()
1063
1086
1064 return ar
1087 return ar
1065
1088
1066 @spin_first
1089 @spin_first
1067 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1090 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1068 """Resubmit one or more tasks.
1091 """Resubmit one or more tasks.
1069
1092
1070 in-flight tasks may not be resubmitted.
1093 in-flight tasks may not be resubmitted.
1071
1094
1072 Parameters
1095 Parameters
1073 ----------
1096 ----------
1074
1097
1075 indices_or_msg_ids : integer history index, str msg_id, or list of either
1098 indices_or_msg_ids : integer history index, str msg_id, or list of either
1076 The indices or msg_ids of indices to be retrieved
1099 The indices or msg_ids of indices to be retrieved
1077
1100
1078 block : bool
1101 block : bool
1079 Whether to wait for the result to be done
1102 Whether to wait for the result to be done
1080
1103
1081 Returns
1104 Returns
1082 -------
1105 -------
1083
1106
1084 AsyncHubResult
1107 AsyncHubResult
1085 A subclass of AsyncResult that retrieves results from the Hub
1108 A subclass of AsyncResult that retrieves results from the Hub
1086
1109
1087 """
1110 """
1088 block = self.block if block is None else block
1111 block = self.block if block is None else block
1089 if indices_or_msg_ids is None:
1112 if indices_or_msg_ids is None:
1090 indices_or_msg_ids = -1
1113 indices_or_msg_ids = -1
1091
1114
1092 if not isinstance(indices_or_msg_ids, (list,tuple)):
1115 if not isinstance(indices_or_msg_ids, (list,tuple)):
1093 indices_or_msg_ids = [indices_or_msg_ids]
1116 indices_or_msg_ids = [indices_or_msg_ids]
1094
1117
1095 theids = []
1118 theids = []
1096 for id in indices_or_msg_ids:
1119 for id in indices_or_msg_ids:
1097 if isinstance(id, int):
1120 if isinstance(id, int):
1098 id = self.history[id]
1121 id = self.history[id]
1099 if not isinstance(id, str):
1122 if not isinstance(id, str):
1100 raise TypeError("indices must be str or int, not %r"%id)
1123 raise TypeError("indices must be str or int, not %r"%id)
1101 theids.append(id)
1124 theids.append(id)
1102
1125
1103 for msg_id in theids:
1126 for msg_id in theids:
1104 self.outstanding.discard(msg_id)
1127 self.outstanding.discard(msg_id)
1105 if msg_id in self.history:
1128 if msg_id in self.history:
1106 self.history.remove(msg_id)
1129 self.history.remove(msg_id)
1107 self.results.pop(msg_id, None)
1130 self.results.pop(msg_id, None)
1108 self.metadata.pop(msg_id, None)
1131 self.metadata.pop(msg_id, None)
1109 content = dict(msg_ids = theids)
1132 content = dict(msg_ids = theids)
1110
1133
1111 self.session.send(self._query_socket, 'resubmit_request', content)
1134 self.session.send(self._query_socket, 'resubmit_request', content)
1112
1135
1113 zmq.select([self._query_socket], [], [])
1136 zmq.select([self._query_socket], [], [])
1114 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1137 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1115 if self.debug:
1138 if self.debug:
1116 pprint(msg)
1139 pprint(msg)
1117 content = msg['content']
1140 content = msg['content']
1118 if content['status'] != 'ok':
1141 if content['status'] != 'ok':
1119 raise self._unwrap_exception(content)
1142 raise self._unwrap_exception(content)
1120
1143
1121 ar = AsyncHubResult(self, msg_ids=theids)
1144 ar = AsyncHubResult(self, msg_ids=theids)
1122
1145
1123 if block:
1146 if block:
1124 ar.wait()
1147 ar.wait()
1125
1148
1126 return ar
1149 return ar
1127
1150
1128 @spin_first
1151 @spin_first
1129 def result_status(self, msg_ids, status_only=True):
1152 def result_status(self, msg_ids, status_only=True):
1130 """Check on the status of the result(s) of the apply request with `msg_ids`.
1153 """Check on the status of the result(s) of the apply request with `msg_ids`.
1131
1154
1132 If status_only is False, then the actual results will be retrieved, else
1155 If status_only is False, then the actual results will be retrieved, else
1133 only the status of the results will be checked.
1156 only the status of the results will be checked.
1134
1157
1135 Parameters
1158 Parameters
1136 ----------
1159 ----------
1137
1160
1138 msg_ids : list of msg_ids
1161 msg_ids : list of msg_ids
1139 if int:
1162 if int:
1140 Passed as index to self.history for convenience.
1163 Passed as index to self.history for convenience.
1141 status_only : bool (default: True)
1164 status_only : bool (default: True)
1142 if False:
1165 if False:
1143 Retrieve the actual results of completed tasks.
1166 Retrieve the actual results of completed tasks.
1144
1167
1145 Returns
1168 Returns
1146 -------
1169 -------
1147
1170
1148 results : dict
1171 results : dict
1149 There will always be the keys 'pending' and 'completed', which will
1172 There will always be the keys 'pending' and 'completed', which will
1150 be lists of msg_ids that are incomplete or complete. If `status_only`
1173 be lists of msg_ids that are incomplete or complete. If `status_only`
1151 is False, then completed results will be keyed by their `msg_id`.
1174 is False, then completed results will be keyed by their `msg_id`.
1152 """
1175 """
1153 if not isinstance(msg_ids, (list,tuple)):
1176 if not isinstance(msg_ids, (list,tuple)):
1154 msg_ids = [msg_ids]
1177 msg_ids = [msg_ids]
1155
1178
1156 theids = []
1179 theids = []
1157 for msg_id in msg_ids:
1180 for msg_id in msg_ids:
1158 if isinstance(msg_id, int):
1181 if isinstance(msg_id, int):
1159 msg_id = self.history[msg_id]
1182 msg_id = self.history[msg_id]
1160 if not isinstance(msg_id, basestring):
1183 if not isinstance(msg_id, basestring):
1161 raise TypeError("msg_ids must be str, not %r"%msg_id)
1184 raise TypeError("msg_ids must be str, not %r"%msg_id)
1162 theids.append(msg_id)
1185 theids.append(msg_id)
1163
1186
1164 completed = []
1187 completed = []
1165 local_results = {}
1188 local_results = {}
1166
1189
1167 # comment this block out to temporarily disable local shortcut:
1190 # comment this block out to temporarily disable local shortcut:
1168 for msg_id in theids:
1191 for msg_id in theids:
1169 if msg_id in self.results:
1192 if msg_id in self.results:
1170 completed.append(msg_id)
1193 completed.append(msg_id)
1171 local_results[msg_id] = self.results[msg_id]
1194 local_results[msg_id] = self.results[msg_id]
1172 theids.remove(msg_id)
1195 theids.remove(msg_id)
1173
1196
1174 if theids: # some not locally cached
1197 if theids: # some not locally cached
1175 content = dict(msg_ids=theids, status_only=status_only)
1198 content = dict(msg_ids=theids, status_only=status_only)
1176 msg = self.session.send(self._query_socket, "result_request", content=content)
1199 msg = self.session.send(self._query_socket, "result_request", content=content)
1177 zmq.select([self._query_socket], [], [])
1200 zmq.select([self._query_socket], [], [])
1178 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1201 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1179 if self.debug:
1202 if self.debug:
1180 pprint(msg)
1203 pprint(msg)
1181 content = msg['content']
1204 content = msg['content']
1182 if content['status'] != 'ok':
1205 if content['status'] != 'ok':
1183 raise self._unwrap_exception(content)
1206 raise self._unwrap_exception(content)
1184 buffers = msg['buffers']
1207 buffers = msg['buffers']
1185 else:
1208 else:
1186 content = dict(completed=[],pending=[])
1209 content = dict(completed=[],pending=[])
1187
1210
1188 content['completed'].extend(completed)
1211 content['completed'].extend(completed)
1189
1212
1190 if status_only:
1213 if status_only:
1191 return content
1214 return content
1192
1215
1193 failures = []
1216 failures = []
1194 # load cached results into result:
1217 # load cached results into result:
1195 content.update(local_results)
1218 content.update(local_results)
1196
1219
1197 # update cache with results:
1220 # update cache with results:
1198 for msg_id in sorted(theids):
1221 for msg_id in sorted(theids):
1199 if msg_id in content['completed']:
1222 if msg_id in content['completed']:
1200 rec = content[msg_id]
1223 rec = content[msg_id]
1201 parent = rec['header']
1224 parent = rec['header']
1202 header = rec['result_header']
1225 header = rec['result_header']
1203 rcontent = rec['result_content']
1226 rcontent = rec['result_content']
1204 iodict = rec['io']
1227 iodict = rec['io']
1205 if isinstance(rcontent, str):
1228 if isinstance(rcontent, str):
1206 rcontent = self.session.unpack(rcontent)
1229 rcontent = self.session.unpack(rcontent)
1207
1230
1208 md = self.metadata[msg_id]
1231 md = self.metadata[msg_id]
1209 md.update(self._extract_metadata(header, parent, rcontent))
1232 md.update(self._extract_metadata(header, parent, rcontent))
1210 md.update(iodict)
1233 md.update(iodict)
1211
1234
1212 if rcontent['status'] == 'ok':
1235 if rcontent['status'] == 'ok':
1213 res,buffers = util.unserialize_object(buffers)
1236 res,buffers = util.unserialize_object(buffers)
1214 else:
1237 else:
1215 print rcontent
1238 print rcontent
1216 res = self._unwrap_exception(rcontent)
1239 res = self._unwrap_exception(rcontent)
1217 failures.append(res)
1240 failures.append(res)
1218
1241
1219 self.results[msg_id] = res
1242 self.results[msg_id] = res
1220 content[msg_id] = res
1243 content[msg_id] = res
1221
1244
1222 if len(theids) == 1 and failures:
1245 if len(theids) == 1 and failures:
1223 raise failures[0]
1246 raise failures[0]
1224
1247
1225 error.collect_exceptions(failures, "result_status")
1248 error.collect_exceptions(failures, "result_status")
1226 return content
1249 return content
1227
1250
1228 @spin_first
1251 @spin_first
1229 def queue_status(self, targets='all', verbose=False):
1252 def queue_status(self, targets='all', verbose=False):
1230 """Fetch the status of engine queues.
1253 """Fetch the status of engine queues.
1231
1254
1232 Parameters
1255 Parameters
1233 ----------
1256 ----------
1234
1257
1235 targets : int/str/list of ints/strs
1258 targets : int/str/list of ints/strs
1236 the engines whose states are to be queried.
1259 the engines whose states are to be queried.
1237 default : all
1260 default : all
1238 verbose : bool
1261 verbose : bool
1239 Whether to return lengths only, or lists of ids for each element
1262 Whether to return lengths only, or lists of ids for each element
1240 """
1263 """
1241 engine_ids = self._build_targets(targets)[1]
1264 engine_ids = self._build_targets(targets)[1]
1242 content = dict(targets=engine_ids, verbose=verbose)
1265 content = dict(targets=engine_ids, verbose=verbose)
1243 self.session.send(self._query_socket, "queue_request", content=content)
1266 self.session.send(self._query_socket, "queue_request", content=content)
1244 idents,msg = self.session.recv(self._query_socket, 0)
1267 idents,msg = self.session.recv(self._query_socket, 0)
1245 if self.debug:
1268 if self.debug:
1246 pprint(msg)
1269 pprint(msg)
1247 content = msg['content']
1270 content = msg['content']
1248 status = content.pop('status')
1271 status = content.pop('status')
1249 if status != 'ok':
1272 if status != 'ok':
1250 raise self._unwrap_exception(content)
1273 raise self._unwrap_exception(content)
1251 content = rekey(content)
1274 content = rekey(content)
1252 if isinstance(targets, int):
1275 if isinstance(targets, int):
1253 return content[targets]
1276 return content[targets]
1254 else:
1277 else:
1255 return content
1278 return content
1256
1279
1257 @spin_first
1280 @spin_first
1258 def purge_results(self, jobs=[], targets=[]):
1281 def purge_results(self, jobs=[], targets=[]):
1259 """Tell the Hub to forget results.
1282 """Tell the Hub to forget results.
1260
1283
1261 Individual results can be purged by msg_id, or the entire
1284 Individual results can be purged by msg_id, or the entire
1262 history of specific targets can be purged.
1285 history of specific targets can be purged.
1263
1286
1264 Parameters
1287 Parameters
1265 ----------
1288 ----------
1266
1289
1267 jobs : str or list of str or AsyncResult objects
1290 jobs : str or list of str or AsyncResult objects
1268 the msg_ids whose results should be forgotten.
1291 the msg_ids whose results should be forgotten.
1269 targets : int/str/list of ints/strs
1292 targets : int/str/list of ints/strs
1270 The targets, by uuid or int_id, whose entire history is to be purged.
1293 The targets, by uuid or int_id, whose entire history is to be purged.
1271 Use `targets='all'` to scrub everything from the Hub's memory.
1294 Use `targets='all'` to scrub everything from the Hub's memory.
1272
1295
1273 default : None
1296 default : None
1274 """
1297 """
1275 if not targets and not jobs:
1298 if not targets and not jobs:
1276 raise ValueError("Must specify at least one of `targets` and `jobs`")
1299 raise ValueError("Must specify at least one of `targets` and `jobs`")
1277 if targets:
1300 if targets:
1278 targets = self._build_targets(targets)[1]
1301 targets = self._build_targets(targets)[1]
1279
1302
1280 # construct msg_ids from jobs
1303 # construct msg_ids from jobs
1281 msg_ids = []
1304 msg_ids = []
1282 if isinstance(jobs, (basestring,AsyncResult)):
1305 if isinstance(jobs, (basestring,AsyncResult)):
1283 jobs = [jobs]
1306 jobs = [jobs]
1284 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1307 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1285 if bad_ids:
1308 if bad_ids:
1286 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1309 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1287 for j in jobs:
1310 for j in jobs:
1288 if isinstance(j, AsyncResult):
1311 if isinstance(j, AsyncResult):
1289 msg_ids.extend(j.msg_ids)
1312 msg_ids.extend(j.msg_ids)
1290 else:
1313 else:
1291 msg_ids.append(j)
1314 msg_ids.append(j)
1292
1315
1293 content = dict(targets=targets, msg_ids=msg_ids)
1316 content = dict(targets=targets, msg_ids=msg_ids)
1294 self.session.send(self._query_socket, "purge_request", content=content)
1317 self.session.send(self._query_socket, "purge_request", content=content)
1295 idents, msg = self.session.recv(self._query_socket, 0)
1318 idents, msg = self.session.recv(self._query_socket, 0)
1296 if self.debug:
1319 if self.debug:
1297 pprint(msg)
1320 pprint(msg)
1298 content = msg['content']
1321 content = msg['content']
1299 if content['status'] != 'ok':
1322 if content['status'] != 'ok':
1300 raise self._unwrap_exception(content)
1323 raise self._unwrap_exception(content)
1301
1324
1302 @spin_first
1325 @spin_first
1303 def hub_history(self):
1326 def hub_history(self):
1304 """Get the Hub's history
1327 """Get the Hub's history
1305
1328
1306 Just like the Client, the Hub has a history, which is a list of msg_ids.
1329 Just like the Client, the Hub has a history, which is a list of msg_ids.
1307 This will contain the history of all clients, and, depending on configuration,
1330 This will contain the history of all clients, and, depending on configuration,
1308 may contain history across multiple cluster sessions.
1331 may contain history across multiple cluster sessions.
1309
1332
1310 Any msg_id returned here is a valid argument to `get_result`.
1333 Any msg_id returned here is a valid argument to `get_result`.
1311
1334
1312 Returns
1335 Returns
1313 -------
1336 -------
1314
1337
1315 msg_ids : list of strs
1338 msg_ids : list of strs
1316 list of all msg_ids, ordered by task submission time.
1339 list of all msg_ids, ordered by task submission time.
1317 """
1340 """
1318
1341
1319 self.session.send(self._query_socket, "history_request", content={})
1342 self.session.send(self._query_socket, "history_request", content={})
1320 idents, msg = self.session.recv(self._query_socket, 0)
1343 idents, msg = self.session.recv(self._query_socket, 0)
1321
1344
1322 if self.debug:
1345 if self.debug:
1323 pprint(msg)
1346 pprint(msg)
1324 content = msg['content']
1347 content = msg['content']
1325 if content['status'] != 'ok':
1348 if content['status'] != 'ok':
1326 raise self._unwrap_exception(content)
1349 raise self._unwrap_exception(content)
1327 else:
1350 else:
1328 return content['history']
1351 return content['history']
1329
1352
1330 @spin_first
1353 @spin_first
1331 def db_query(self, query, keys=None):
1354 def db_query(self, query, keys=None):
1332 """Query the Hub's TaskRecord database
1355 """Query the Hub's TaskRecord database
1333
1356
1334 This will return a list of task record dicts that match `query`
1357 This will return a list of task record dicts that match `query`
1335
1358
1336 Parameters
1359 Parameters
1337 ----------
1360 ----------
1338
1361
1339 query : mongodb query dict
1362 query : mongodb query dict
1340 The search dict. See mongodb query docs for details.
1363 The search dict. See mongodb query docs for details.
1341 keys : list of strs [optional]
1364 keys : list of strs [optional]
1342 The subset of keys to be returned. The default is to fetch everything but buffers.
1365 The subset of keys to be returned. The default is to fetch everything but buffers.
1343 'msg_id' will *always* be included.
1366 'msg_id' will *always* be included.
1344 """
1367 """
1345 if isinstance(keys, basestring):
1368 if isinstance(keys, basestring):
1346 keys = [keys]
1369 keys = [keys]
1347 content = dict(query=query, keys=keys)
1370 content = dict(query=query, keys=keys)
1348 self.session.send(self._query_socket, "db_request", content=content)
1371 self.session.send(self._query_socket, "db_request", content=content)
1349 idents, msg = self.session.recv(self._query_socket, 0)
1372 idents, msg = self.session.recv(self._query_socket, 0)
1350 if self.debug:
1373 if self.debug:
1351 pprint(msg)
1374 pprint(msg)
1352 content = msg['content']
1375 content = msg['content']
1353 if content['status'] != 'ok':
1376 if content['status'] != 'ok':
1354 raise self._unwrap_exception(content)
1377 raise self._unwrap_exception(content)
1355
1378
1356 records = content['records']
1379 records = content['records']
1357
1380
1358 buffer_lens = content['buffer_lens']
1381 buffer_lens = content['buffer_lens']
1359 result_buffer_lens = content['result_buffer_lens']
1382 result_buffer_lens = content['result_buffer_lens']
1360 buffers = msg['buffers']
1383 buffers = msg['buffers']
1361 has_bufs = buffer_lens is not None
1384 has_bufs = buffer_lens is not None
1362 has_rbufs = result_buffer_lens is not None
1385 has_rbufs = result_buffer_lens is not None
1363 for i,rec in enumerate(records):
1386 for i,rec in enumerate(records):
1364 # relink buffers
1387 # relink buffers
1365 if has_bufs:
1388 if has_bufs:
1366 blen = buffer_lens[i]
1389 blen = buffer_lens[i]
1367 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1390 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1368 if has_rbufs:
1391 if has_rbufs:
1369 blen = result_buffer_lens[i]
1392 blen = result_buffer_lens[i]
1370 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1393 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1371
1394
1372 return records
1395 return records
1373
1396
1374 __all__ = [ 'Client' ]
1397 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now