##// END OF EJS Templates
fix is_url typo in parallel client...
MinRK -
Show More
@@ -1,1433 +1,1434 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 import time
21 import time
22 import warnings
22 import warnings
23 from datetime import datetime
23 from datetime import datetime
24 from getpass import getpass
24 from getpass import getpass
25 from pprint import pprint
25 from pprint import pprint
26
26
27 pjoin = os.path.join
27 pjoin = os.path.join
28
28
29 import zmq
29 import zmq
30 # from zmq.eventloop import ioloop, zmqstream
30 # from zmq.eventloop import ioloop, zmqstream
31
31
32 from IPython.config.configurable import MultipleInstanceError
32 from IPython.config.configurable import MultipleInstanceError
33 from IPython.core.application import BaseIPythonApplication
33 from IPython.core.application import BaseIPythonApplication
34
34
35 from IPython.utils.jsonutil import rekey
35 from IPython.utils.jsonutil import rekey
36 from IPython.utils.localinterfaces import LOCAL_IPS
36 from IPython.utils.localinterfaces import LOCAL_IPS
37 from IPython.utils.path import get_ipython_dir
37 from IPython.utils.path import get_ipython_dir
38 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
38 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
39 Dict, List, Bool, Set)
39 Dict, List, Bool, Set)
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.external.ssh import tunnel
41 from IPython.external.ssh import tunnel
42
42
43 from IPython.parallel import error
43 from IPython.parallel import error
44 from IPython.parallel import util
44 from IPython.parallel import util
45
45
46 from IPython.zmq.session import Session, Message
46 from IPython.zmq.session import Session, Message
47
47
48 from .asyncresult import AsyncResult, AsyncHubResult
48 from .asyncresult import AsyncResult, AsyncHubResult
49 from IPython.core.profiledir import ProfileDir, ProfileDirError
49 from IPython.core.profiledir import ProfileDir, ProfileDirError
50 from .view import DirectView, LoadBalancedView
50 from .view import DirectView, LoadBalancedView
51
51
52 if sys.version_info[0] >= 3:
52 if sys.version_info[0] >= 3:
53 # xrange is used in a couple 'isinstance' tests in py2
53 # xrange is used in a couple 'isinstance' tests in py2
54 # should be just 'range' in 3k
54 # should be just 'range' in 3k
55 xrange = range
55 xrange = range
56
56
57 #--------------------------------------------------------------------------
57 #--------------------------------------------------------------------------
58 # Decorators for Client methods
58 # Decorators for Client methods
59 #--------------------------------------------------------------------------
59 #--------------------------------------------------------------------------
60
60
61 @decorator
61 @decorator
62 def spin_first(f, self, *args, **kwargs):
62 def spin_first(f, self, *args, **kwargs):
63 """Call spin() to sync state prior to calling the method."""
63 """Call spin() to sync state prior to calling the method."""
64 self.spin()
64 self.spin()
65 return f(self, *args, **kwargs)
65 return f(self, *args, **kwargs)
66
66
67
67
68 #--------------------------------------------------------------------------
68 #--------------------------------------------------------------------------
69 # Classes
69 # Classes
70 #--------------------------------------------------------------------------
70 #--------------------------------------------------------------------------
71
71
72 class Metadata(dict):
72 class Metadata(dict):
73 """Subclass of dict for initializing metadata values.
73 """Subclass of dict for initializing metadata values.
74
74
75 Attribute access works on keys.
75 Attribute access works on keys.
76
76
77 These objects have a strict set of keys - errors will raise if you try
77 These objects have a strict set of keys - errors will raise if you try
78 to add new keys.
78 to add new keys.
79 """
79 """
80 def __init__(self, *args, **kwargs):
80 def __init__(self, *args, **kwargs):
81 dict.__init__(self)
81 dict.__init__(self)
82 md = {'msg_id' : None,
82 md = {'msg_id' : None,
83 'submitted' : None,
83 'submitted' : None,
84 'started' : None,
84 'started' : None,
85 'completed' : None,
85 'completed' : None,
86 'received' : None,
86 'received' : None,
87 'engine_uuid' : None,
87 'engine_uuid' : None,
88 'engine_id' : None,
88 'engine_id' : None,
89 'follow' : None,
89 'follow' : None,
90 'after' : None,
90 'after' : None,
91 'status' : None,
91 'status' : None,
92
92
93 'pyin' : None,
93 'pyin' : None,
94 'pyout' : None,
94 'pyout' : None,
95 'pyerr' : None,
95 'pyerr' : None,
96 'stdout' : '',
96 'stdout' : '',
97 'stderr' : '',
97 'stderr' : '',
98 }
98 }
99 self.update(md)
99 self.update(md)
100 self.update(dict(*args, **kwargs))
100 self.update(dict(*args, **kwargs))
101
101
102 def __getattr__(self, key):
102 def __getattr__(self, key):
103 """getattr aliased to getitem"""
103 """getattr aliased to getitem"""
104 if key in self.iterkeys():
104 if key in self.iterkeys():
105 return self[key]
105 return self[key]
106 else:
106 else:
107 raise AttributeError(key)
107 raise AttributeError(key)
108
108
109 def __setattr__(self, key, value):
109 def __setattr__(self, key, value):
110 """setattr aliased to setitem, with strict"""
110 """setattr aliased to setitem, with strict"""
111 if key in self.iterkeys():
111 if key in self.iterkeys():
112 self[key] = value
112 self[key] = value
113 else:
113 else:
114 raise AttributeError(key)
114 raise AttributeError(key)
115
115
116 def __setitem__(self, key, value):
116 def __setitem__(self, key, value):
117 """strict static key enforcement"""
117 """strict static key enforcement"""
118 if key in self.iterkeys():
118 if key in self.iterkeys():
119 dict.__setitem__(self, key, value)
119 dict.__setitem__(self, key, value)
120 else:
120 else:
121 raise KeyError(key)
121 raise KeyError(key)
122
122
123
123
124 class Client(HasTraits):
124 class Client(HasTraits):
125 """A semi-synchronous client to the IPython ZMQ cluster
125 """A semi-synchronous client to the IPython ZMQ cluster
126
126
127 Parameters
127 Parameters
128 ----------
128 ----------
129
129
130 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
130 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
131 Connection information for the Hub's registration. If a json connector
131 Connection information for the Hub's registration. If a json connector
132 file is given, then likely no further configuration is necessary.
132 file is given, then likely no further configuration is necessary.
133 [Default: use profile]
133 [Default: use profile]
134 profile : bytes
134 profile : bytes
135 The name of the Cluster profile to be used to find connector information.
135 The name of the Cluster profile to be used to find connector information.
136 If run from an IPython application, the default profile will be the same
136 If run from an IPython application, the default profile will be the same
137 as the running application, otherwise it will be 'default'.
137 as the running application, otherwise it will be 'default'.
138 context : zmq.Context
138 context : zmq.Context
139 Pass an existing zmq.Context instance, otherwise the client will create its own.
139 Pass an existing zmq.Context instance, otherwise the client will create its own.
140 debug : bool
140 debug : bool
141 flag for lots of message printing for debug purposes
141 flag for lots of message printing for debug purposes
142 timeout : int/float
142 timeout : int/float
143 time (in seconds) to wait for connection replies from the Hub
143 time (in seconds) to wait for connection replies from the Hub
144 [Default: 10]
144 [Default: 10]
145
145
146 #-------------- session related args ----------------
146 #-------------- session related args ----------------
147
147
148 config : Config object
148 config : Config object
149 If specified, this will be relayed to the Session for configuration
149 If specified, this will be relayed to the Session for configuration
150 username : str
150 username : str
151 set username for the session object
151 set username for the session object
152 packer : str (import_string) or callable
152 packer : str (import_string) or callable
153 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
153 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
154 function to serialize messages. Must support same input as
154 function to serialize messages. Must support same input as
155 JSON, and output must be bytes.
155 JSON, and output must be bytes.
156 You can pass a callable directly as `pack`
156 You can pass a callable directly as `pack`
157 unpacker : str (import_string) or callable
157 unpacker : str (import_string) or callable
158 The inverse of packer. Only necessary if packer is specified as *not* one
158 The inverse of packer. Only necessary if packer is specified as *not* one
159 of 'json' or 'pickle'.
159 of 'json' or 'pickle'.
160
160
161 #-------------- ssh related args ----------------
161 #-------------- ssh related args ----------------
162 # These are args for configuring the ssh tunnel to be used
162 # These are args for configuring the ssh tunnel to be used
163 # credentials are used to forward connections over ssh to the Controller
163 # credentials are used to forward connections over ssh to the Controller
164 # Note that the ip given in `addr` needs to be relative to sshserver
164 # Note that the ip given in `addr` needs to be relative to sshserver
165 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
165 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
166 # and set sshserver as the same machine the Controller is on. However,
166 # and set sshserver as the same machine the Controller is on. However,
167 # the only requirement is that sshserver is able to see the Controller
167 # the only requirement is that sshserver is able to see the Controller
168 # (i.e. is within the same trusted network).
168 # (i.e. is within the same trusted network).
169
169
170 sshserver : str
170 sshserver : str
171 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
171 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
172 If keyfile or password is specified, and this is not, it will default to
172 If keyfile or password is specified, and this is not, it will default to
173 the ip given in addr.
173 the ip given in addr.
174 sshkey : str; path to ssh private key file
174 sshkey : str; path to ssh private key file
175 This specifies a key to be used in ssh login, default None.
175 This specifies a key to be used in ssh login, default None.
176 Regular default ssh keys will be used without specifying this argument.
176 Regular default ssh keys will be used without specifying this argument.
177 password : str
177 password : str
178 Your ssh password to sshserver. Note that if this is left None,
178 Your ssh password to sshserver. Note that if this is left None,
179 you will be prompted for it if passwordless key based login is unavailable.
179 you will be prompted for it if passwordless key based login is unavailable.
180 paramiko : bool
180 paramiko : bool
181 flag for whether to use paramiko instead of shell ssh for tunneling.
181 flag for whether to use paramiko instead of shell ssh for tunneling.
182 [default: True on win32, False else]
182 [default: True on win32, False else]
183
183
184 ------- exec authentication args -------
184 ------- exec authentication args -------
185 If even localhost is untrusted, you can have some protection against
185 If even localhost is untrusted, you can have some protection against
186 unauthorized execution by signing messages with HMAC digests.
186 unauthorized execution by signing messages with HMAC digests.
187 Messages are still sent as cleartext, so if someone can snoop your
187 Messages are still sent as cleartext, so if someone can snoop your
188 loopback traffic this will not protect your privacy, but will prevent
188 loopback traffic this will not protect your privacy, but will prevent
189 unauthorized execution.
189 unauthorized execution.
190
190
191 exec_key : str
191 exec_key : str
192 an authentication key or file containing a key
192 an authentication key or file containing a key
193 default: None
193 default: None
194
194
195
195
196 Attributes
196 Attributes
197 ----------
197 ----------
198
198
199 ids : list of int engine IDs
199 ids : list of int engine IDs
200 requesting the ids attribute always synchronizes
200 requesting the ids attribute always synchronizes
201 the registration state. To request ids without synchronization,
201 the registration state. To request ids without synchronization,
202 use semi-private _ids attributes.
202 use semi-private _ids attributes.
203
203
204 history : list of msg_ids
204 history : list of msg_ids
205 a list of msg_ids, keeping track of all the execution
205 a list of msg_ids, keeping track of all the execution
206 messages you have submitted in order.
206 messages you have submitted in order.
207
207
208 outstanding : set of msg_ids
208 outstanding : set of msg_ids
209 a set of msg_ids that have been submitted, but whose
209 a set of msg_ids that have been submitted, but whose
210 results have not yet been received.
210 results have not yet been received.
211
211
212 results : dict
212 results : dict
213 a dict of all our results, keyed by msg_id
213 a dict of all our results, keyed by msg_id
214
214
215 block : bool
215 block : bool
216 determines default behavior when block not specified
216 determines default behavior when block not specified
217 in execution methods
217 in execution methods
218
218
219 Methods
219 Methods
220 -------
220 -------
221
221
222 spin
222 spin
223 flushes incoming results and registration state changes
223 flushes incoming results and registration state changes
224 control methods spin, and requesting `ids` also ensures up to date
224 control methods spin, and requesting `ids` also ensures up to date
225
225
226 wait
226 wait
227 wait on one or more msg_ids
227 wait on one or more msg_ids
228
228
229 execution methods
229 execution methods
230 apply
230 apply
231 legacy: execute, run
231 legacy: execute, run
232
232
233 data movement
233 data movement
234 push, pull, scatter, gather
234 push, pull, scatter, gather
235
235
236 query methods
236 query methods
237 queue_status, get_result, purge, result_status
237 queue_status, get_result, purge, result_status
238
238
239 control methods
239 control methods
240 abort, shutdown
240 abort, shutdown
241
241
242 """
242 """
243
243
244
244
245 block = Bool(False)
245 block = Bool(False)
246 outstanding = Set()
246 outstanding = Set()
247 results = Instance('collections.defaultdict', (dict,))
247 results = Instance('collections.defaultdict', (dict,))
248 metadata = Instance('collections.defaultdict', (Metadata,))
248 metadata = Instance('collections.defaultdict', (Metadata,))
249 history = List()
249 history = List()
250 debug = Bool(False)
250 debug = Bool(False)
251
251
252 profile=Unicode()
252 profile=Unicode()
253 def _profile_default(self):
253 def _profile_default(self):
254 if BaseIPythonApplication.initialized():
254 if BaseIPythonApplication.initialized():
255 # an IPython app *might* be running, try to get its profile
255 # an IPython app *might* be running, try to get its profile
256 try:
256 try:
257 return BaseIPythonApplication.instance().profile
257 return BaseIPythonApplication.instance().profile
258 except (AttributeError, MultipleInstanceError):
258 except (AttributeError, MultipleInstanceError):
259 # could be a *different* subclass of config.Application,
259 # could be a *different* subclass of config.Application,
260 # which would raise one of these two errors.
260 # which would raise one of these two errors.
261 return u'default'
261 return u'default'
262 else:
262 else:
263 return u'default'
263 return u'default'
264
264
265
265
266 _outstanding_dict = Instance('collections.defaultdict', (set,))
266 _outstanding_dict = Instance('collections.defaultdict', (set,))
267 _ids = List()
267 _ids = List()
268 _connected=Bool(False)
268 _connected=Bool(False)
269 _ssh=Bool(False)
269 _ssh=Bool(False)
270 _context = Instance('zmq.Context')
270 _context = Instance('zmq.Context')
271 _config = Dict()
271 _config = Dict()
272 _engines=Instance(util.ReverseDict, (), {})
272 _engines=Instance(util.ReverseDict, (), {})
273 # _hub_socket=Instance('zmq.Socket')
273 # _hub_socket=Instance('zmq.Socket')
274 _query_socket=Instance('zmq.Socket')
274 _query_socket=Instance('zmq.Socket')
275 _control_socket=Instance('zmq.Socket')
275 _control_socket=Instance('zmq.Socket')
276 _iopub_socket=Instance('zmq.Socket')
276 _iopub_socket=Instance('zmq.Socket')
277 _notification_socket=Instance('zmq.Socket')
277 _notification_socket=Instance('zmq.Socket')
278 _mux_socket=Instance('zmq.Socket')
278 _mux_socket=Instance('zmq.Socket')
279 _task_socket=Instance('zmq.Socket')
279 _task_socket=Instance('zmq.Socket')
280 _task_scheme=Unicode()
280 _task_scheme=Unicode()
281 _closed = False
281 _closed = False
282 _ignored_control_replies=Int(0)
282 _ignored_control_replies=Int(0)
283 _ignored_hub_replies=Int(0)
283 _ignored_hub_replies=Int(0)
284
284
285 def __new__(self, *args, **kw):
285 def __new__(self, *args, **kw):
286 # don't raise on positional args
286 # don't raise on positional args
287 return HasTraits.__new__(self, **kw)
287 return HasTraits.__new__(self, **kw)
288
288
289 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
289 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
290 context=None, debug=False, exec_key=None,
290 context=None, debug=False, exec_key=None,
291 sshserver=None, sshkey=None, password=None, paramiko=None,
291 sshserver=None, sshkey=None, password=None, paramiko=None,
292 timeout=10, **extra_args
292 timeout=10, **extra_args
293 ):
293 ):
294 if profile:
294 if profile:
295 super(Client, self).__init__(debug=debug, profile=profile)
295 super(Client, self).__init__(debug=debug, profile=profile)
296 else:
296 else:
297 super(Client, self).__init__(debug=debug)
297 super(Client, self).__init__(debug=debug)
298 if context is None:
298 if context is None:
299 context = zmq.Context.instance()
299 context = zmq.Context.instance()
300 self._context = context
300 self._context = context
301
301
302 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
302 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
303 if self._cd is not None:
303 if self._cd is not None:
304 if url_or_file is None:
304 if url_or_file is None:
305 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
305 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
306 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
306 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
307 " Please specify at least one of url_or_file or profile."
307 " Please specify at least one of url_or_file or profile."
308
308
309 if not is_url(url_or_file):
309 if not util.is_url(url_or_file):
310 # it's not a url, try for a file
310 if not os.path.exists(url_or_file):
311 if not os.path.exists(url_or_file):
311 if self._cd:
312 if self._cd:
312 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
313 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
313 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
314 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
314 with open(url_or_file) as f:
315 with open(url_or_file) as f:
315 cfg = json.loads(f.read())
316 cfg = json.loads(f.read())
316 else:
317 else:
317 cfg = {'url':url_or_file}
318 cfg = {'url':url_or_file}
318
319
319 # sync defaults from args, json:
320 # sync defaults from args, json:
320 if sshserver:
321 if sshserver:
321 cfg['ssh'] = sshserver
322 cfg['ssh'] = sshserver
322 if exec_key:
323 if exec_key:
323 cfg['exec_key'] = exec_key
324 cfg['exec_key'] = exec_key
324 exec_key = cfg['exec_key']
325 exec_key = cfg['exec_key']
325 location = cfg.setdefault('location', None)
326 location = cfg.setdefault('location', None)
326 cfg['url'] = util.disambiguate_url(cfg['url'], location)
327 cfg['url'] = util.disambiguate_url(cfg['url'], location)
327 url = cfg['url']
328 url = cfg['url']
328 proto,addr,port = util.split_url(url)
329 proto,addr,port = util.split_url(url)
329 if location is not None and addr == '127.0.0.1':
330 if location is not None and addr == '127.0.0.1':
330 # location specified, and connection is expected to be local
331 # location specified, and connection is expected to be local
331 if location not in LOCAL_IPS and not sshserver:
332 if location not in LOCAL_IPS and not sshserver:
332 # load ssh from JSON *only* if the controller is not on
333 # load ssh from JSON *only* if the controller is not on
333 # this machine
334 # this machine
334 sshserver=cfg['ssh']
335 sshserver=cfg['ssh']
335 if location not in LOCAL_IPS and not sshserver:
336 if location not in LOCAL_IPS and not sshserver:
336 # warn if no ssh specified, but SSH is probably needed
337 # warn if no ssh specified, but SSH is probably needed
337 # This is only a warning, because the most likely cause
338 # This is only a warning, because the most likely cause
338 # is a local Controller on a laptop whose IP is dynamic
339 # is a local Controller on a laptop whose IP is dynamic
339 warnings.warn("""
340 warnings.warn("""
340 Controller appears to be listening on localhost, but not on this machine.
341 Controller appears to be listening on localhost, but not on this machine.
341 If this is true, you should specify Client(...,sshserver='you@%s')
342 If this is true, you should specify Client(...,sshserver='you@%s')
342 or instruct your controller to listen on an external IP."""%location,
343 or instruct your controller to listen on an external IP."""%location,
343 RuntimeWarning)
344 RuntimeWarning)
344 elif not sshserver:
345 elif not sshserver:
345 # otherwise sync with cfg
346 # otherwise sync with cfg
346 sshserver = cfg['ssh']
347 sshserver = cfg['ssh']
347
348
348 self._config = cfg
349 self._config = cfg
349
350
350 self._ssh = bool(sshserver or sshkey or password)
351 self._ssh = bool(sshserver or sshkey or password)
351 if self._ssh and sshserver is None:
352 if self._ssh and sshserver is None:
352 # default to ssh via localhost
353 # default to ssh via localhost
353 sshserver = url.split('://')[1].split(':')[0]
354 sshserver = url.split('://')[1].split(':')[0]
354 if self._ssh and password is None:
355 if self._ssh and password is None:
355 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
356 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
356 password=False
357 password=False
357 else:
358 else:
358 password = getpass("SSH Password for %s: "%sshserver)
359 password = getpass("SSH Password for %s: "%sshserver)
359 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
360 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
360
361
361 # configure and construct the session
362 # configure and construct the session
362 if exec_key is not None:
363 if exec_key is not None:
363 if os.path.isfile(exec_key):
364 if os.path.isfile(exec_key):
364 extra_args['keyfile'] = exec_key
365 extra_args['keyfile'] = exec_key
365 else:
366 else:
366 exec_key = util.asbytes(exec_key)
367 exec_key = util.asbytes(exec_key)
367 extra_args['key'] = exec_key
368 extra_args['key'] = exec_key
368 self.session = Session(**extra_args)
369 self.session = Session(**extra_args)
369
370
370 self._query_socket = self._context.socket(zmq.DEALER)
371 self._query_socket = self._context.socket(zmq.DEALER)
371 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
372 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
372 if self._ssh:
373 if self._ssh:
373 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
374 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
374 else:
375 else:
375 self._query_socket.connect(url)
376 self._query_socket.connect(url)
376
377
377 self.session.debug = self.debug
378 self.session.debug = self.debug
378
379
379 self._notification_handlers = {'registration_notification' : self._register_engine,
380 self._notification_handlers = {'registration_notification' : self._register_engine,
380 'unregistration_notification' : self._unregister_engine,
381 'unregistration_notification' : self._unregister_engine,
381 'shutdown_notification' : lambda msg: self.close(),
382 'shutdown_notification' : lambda msg: self.close(),
382 }
383 }
383 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
384 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
384 'apply_reply' : self._handle_apply_reply}
385 'apply_reply' : self._handle_apply_reply}
385 self._connect(sshserver, ssh_kwargs, timeout)
386 self._connect(sshserver, ssh_kwargs, timeout)
386
387
387 def __del__(self):
388 def __del__(self):
388 """cleanup sockets, but _not_ context."""
389 """cleanup sockets, but _not_ context."""
389 self.close()
390 self.close()
390
391
391 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
392 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
392 if ipython_dir is None:
393 if ipython_dir is None:
393 ipython_dir = get_ipython_dir()
394 ipython_dir = get_ipython_dir()
394 if profile_dir is not None:
395 if profile_dir is not None:
395 try:
396 try:
396 self._cd = ProfileDir.find_profile_dir(profile_dir)
397 self._cd = ProfileDir.find_profile_dir(profile_dir)
397 return
398 return
398 except ProfileDirError:
399 except ProfileDirError:
399 pass
400 pass
400 elif profile is not None:
401 elif profile is not None:
401 try:
402 try:
402 self._cd = ProfileDir.find_profile_dir_by_name(
403 self._cd = ProfileDir.find_profile_dir_by_name(
403 ipython_dir, profile)
404 ipython_dir, profile)
404 return
405 return
405 except ProfileDirError:
406 except ProfileDirError:
406 pass
407 pass
407 self._cd = None
408 self._cd = None
408
409
409 def _update_engines(self, engines):
410 def _update_engines(self, engines):
410 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
411 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
411 for k,v in engines.iteritems():
412 for k,v in engines.iteritems():
412 eid = int(k)
413 eid = int(k)
413 self._engines[eid] = v
414 self._engines[eid] = v
414 self._ids.append(eid)
415 self._ids.append(eid)
415 self._ids = sorted(self._ids)
416 self._ids = sorted(self._ids)
416 if sorted(self._engines.keys()) != range(len(self._engines)) and \
417 if sorted(self._engines.keys()) != range(len(self._engines)) and \
417 self._task_scheme == 'pure' and self._task_socket:
418 self._task_scheme == 'pure' and self._task_socket:
418 self._stop_scheduling_tasks()
419 self._stop_scheduling_tasks()
419
420
420 def _stop_scheduling_tasks(self):
421 def _stop_scheduling_tasks(self):
421 """Stop scheduling tasks because an engine has been unregistered
422 """Stop scheduling tasks because an engine has been unregistered
422 from a pure ZMQ scheduler.
423 from a pure ZMQ scheduler.
423 """
424 """
424 self._task_socket.close()
425 self._task_socket.close()
425 self._task_socket = None
426 self._task_socket = None
426 msg = "An engine has been unregistered, and we are using pure " +\
427 msg = "An engine has been unregistered, and we are using pure " +\
427 "ZMQ task scheduling. Task farming will be disabled."
428 "ZMQ task scheduling. Task farming will be disabled."
428 if self.outstanding:
429 if self.outstanding:
429 msg += " If you were running tasks when this happened, " +\
430 msg += " If you were running tasks when this happened, " +\
430 "some `outstanding` msg_ids may never resolve."
431 "some `outstanding` msg_ids may never resolve."
431 warnings.warn(msg, RuntimeWarning)
432 warnings.warn(msg, RuntimeWarning)
432
433
433 def _build_targets(self, targets):
434 def _build_targets(self, targets):
434 """Turn valid target IDs or 'all' into two lists:
435 """Turn valid target IDs or 'all' into two lists:
435 (int_ids, uuids).
436 (int_ids, uuids).
436 """
437 """
437 if not self._ids:
438 if not self._ids:
438 # flush notification socket if no engines yet, just in case
439 # flush notification socket if no engines yet, just in case
439 if not self.ids:
440 if not self.ids:
440 raise error.NoEnginesRegistered("Can't build targets without any engines")
441 raise error.NoEnginesRegistered("Can't build targets without any engines")
441
442
442 if targets is None:
443 if targets is None:
443 targets = self._ids
444 targets = self._ids
444 elif isinstance(targets, basestring):
445 elif isinstance(targets, basestring):
445 if targets.lower() == 'all':
446 if targets.lower() == 'all':
446 targets = self._ids
447 targets = self._ids
447 else:
448 else:
448 raise TypeError("%r not valid str target, must be 'all'"%(targets))
449 raise TypeError("%r not valid str target, must be 'all'"%(targets))
449 elif isinstance(targets, int):
450 elif isinstance(targets, int):
450 if targets < 0:
451 if targets < 0:
451 targets = self.ids[targets]
452 targets = self.ids[targets]
452 if targets not in self._ids:
453 if targets not in self._ids:
453 raise IndexError("No such engine: %i"%targets)
454 raise IndexError("No such engine: %i"%targets)
454 targets = [targets]
455 targets = [targets]
455
456
456 if isinstance(targets, slice):
457 if isinstance(targets, slice):
457 indices = range(len(self._ids))[targets]
458 indices = range(len(self._ids))[targets]
458 ids = self.ids
459 ids = self.ids
459 targets = [ ids[i] for i in indices ]
460 targets = [ ids[i] for i in indices ]
460
461
461 if not isinstance(targets, (tuple, list, xrange)):
462 if not isinstance(targets, (tuple, list, xrange)):
462 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
463 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
463
464
464 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
465 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
465
466
466 def _connect(self, sshserver, ssh_kwargs, timeout):
467 def _connect(self, sshserver, ssh_kwargs, timeout):
467 """setup all our socket connections to the cluster. This is called from
468 """setup all our socket connections to the cluster. This is called from
468 __init__."""
469 __init__."""
469
470
470 # Maybe allow reconnecting?
471 # Maybe allow reconnecting?
471 if self._connected:
472 if self._connected:
472 return
473 return
473 self._connected=True
474 self._connected=True
474
475
475 def connect_socket(s, url):
476 def connect_socket(s, url):
476 url = util.disambiguate_url(url, self._config['location'])
477 url = util.disambiguate_url(url, self._config['location'])
477 if self._ssh:
478 if self._ssh:
478 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
479 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
479 else:
480 else:
480 return s.connect(url)
481 return s.connect(url)
481
482
482 self.session.send(self._query_socket, 'connection_request')
483 self.session.send(self._query_socket, 'connection_request')
483 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
484 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
484 poller = zmq.Poller()
485 poller = zmq.Poller()
485 poller.register(self._query_socket, zmq.POLLIN)
486 poller.register(self._query_socket, zmq.POLLIN)
486 # poll expects milliseconds, timeout is seconds
487 # poll expects milliseconds, timeout is seconds
487 evts = poller.poll(timeout*1000)
488 evts = poller.poll(timeout*1000)
488 if not evts:
489 if not evts:
489 raise error.TimeoutError("Hub connection request timed out")
490 raise error.TimeoutError("Hub connection request timed out")
490 idents,msg = self.session.recv(self._query_socket,mode=0)
491 idents,msg = self.session.recv(self._query_socket,mode=0)
491 if self.debug:
492 if self.debug:
492 pprint(msg)
493 pprint(msg)
493 msg = Message(msg)
494 msg = Message(msg)
494 content = msg.content
495 content = msg.content
495 self._config['registration'] = dict(content)
496 self._config['registration'] = dict(content)
496 if content.status == 'ok':
497 if content.status == 'ok':
497 ident = self.session.bsession
498 ident = self.session.bsession
498 if content.mux:
499 if content.mux:
499 self._mux_socket = self._context.socket(zmq.DEALER)
500 self._mux_socket = self._context.socket(zmq.DEALER)
500 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
501 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
501 connect_socket(self._mux_socket, content.mux)
502 connect_socket(self._mux_socket, content.mux)
502 if content.task:
503 if content.task:
503 self._task_scheme, task_addr = content.task
504 self._task_scheme, task_addr = content.task
504 self._task_socket = self._context.socket(zmq.DEALER)
505 self._task_socket = self._context.socket(zmq.DEALER)
505 self._task_socket.setsockopt(zmq.IDENTITY, ident)
506 self._task_socket.setsockopt(zmq.IDENTITY, ident)
506 connect_socket(self._task_socket, task_addr)
507 connect_socket(self._task_socket, task_addr)
507 if content.notification:
508 if content.notification:
508 self._notification_socket = self._context.socket(zmq.SUB)
509 self._notification_socket = self._context.socket(zmq.SUB)
509 connect_socket(self._notification_socket, content.notification)
510 connect_socket(self._notification_socket, content.notification)
510 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
511 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
511 # if content.query:
512 # if content.query:
512 # self._query_socket = self._context.socket(zmq.DEALER)
513 # self._query_socket = self._context.socket(zmq.DEALER)
513 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
514 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
514 # connect_socket(self._query_socket, content.query)
515 # connect_socket(self._query_socket, content.query)
515 if content.control:
516 if content.control:
516 self._control_socket = self._context.socket(zmq.DEALER)
517 self._control_socket = self._context.socket(zmq.DEALER)
517 self._control_socket.setsockopt(zmq.IDENTITY, ident)
518 self._control_socket.setsockopt(zmq.IDENTITY, ident)
518 connect_socket(self._control_socket, content.control)
519 connect_socket(self._control_socket, content.control)
519 if content.iopub:
520 if content.iopub:
520 self._iopub_socket = self._context.socket(zmq.SUB)
521 self._iopub_socket = self._context.socket(zmq.SUB)
521 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
522 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
522 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
523 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
523 connect_socket(self._iopub_socket, content.iopub)
524 connect_socket(self._iopub_socket, content.iopub)
524 self._update_engines(dict(content.engines))
525 self._update_engines(dict(content.engines))
525 else:
526 else:
526 self._connected = False
527 self._connected = False
527 raise Exception("Failed to connect!")
528 raise Exception("Failed to connect!")
528
529
529 #--------------------------------------------------------------------------
530 #--------------------------------------------------------------------------
530 # handlers and callbacks for incoming messages
531 # handlers and callbacks for incoming messages
531 #--------------------------------------------------------------------------
532 #--------------------------------------------------------------------------
532
533
533 def _unwrap_exception(self, content):
534 def _unwrap_exception(self, content):
534 """unwrap exception, and remap engine_id to int."""
535 """unwrap exception, and remap engine_id to int."""
535 e = error.unwrap_exception(content)
536 e = error.unwrap_exception(content)
536 # print e.traceback
537 # print e.traceback
537 if e.engine_info:
538 if e.engine_info:
538 e_uuid = e.engine_info['engine_uuid']
539 e_uuid = e.engine_info['engine_uuid']
539 eid = self._engines[e_uuid]
540 eid = self._engines[e_uuid]
540 e.engine_info['engine_id'] = eid
541 e.engine_info['engine_id'] = eid
541 return e
542 return e
542
543
543 def _extract_metadata(self, header, parent, content):
544 def _extract_metadata(self, header, parent, content):
544 md = {'msg_id' : parent['msg_id'],
545 md = {'msg_id' : parent['msg_id'],
545 'received' : datetime.now(),
546 'received' : datetime.now(),
546 'engine_uuid' : header.get('engine', None),
547 'engine_uuid' : header.get('engine', None),
547 'follow' : parent.get('follow', []),
548 'follow' : parent.get('follow', []),
548 'after' : parent.get('after', []),
549 'after' : parent.get('after', []),
549 'status' : content['status'],
550 'status' : content['status'],
550 }
551 }
551
552
552 if md['engine_uuid'] is not None:
553 if md['engine_uuid'] is not None:
553 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
554 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
554
555
555 if 'date' in parent:
556 if 'date' in parent:
556 md['submitted'] = parent['date']
557 md['submitted'] = parent['date']
557 if 'started' in header:
558 if 'started' in header:
558 md['started'] = header['started']
559 md['started'] = header['started']
559 if 'date' in header:
560 if 'date' in header:
560 md['completed'] = header['date']
561 md['completed'] = header['date']
561 return md
562 return md
562
563
563 def _register_engine(self, msg):
564 def _register_engine(self, msg):
564 """Register a new engine, and update our connection info."""
565 """Register a new engine, and update our connection info."""
565 content = msg['content']
566 content = msg['content']
566 eid = content['id']
567 eid = content['id']
567 d = {eid : content['queue']}
568 d = {eid : content['queue']}
568 self._update_engines(d)
569 self._update_engines(d)
569
570
570 def _unregister_engine(self, msg):
571 def _unregister_engine(self, msg):
571 """Unregister an engine that has died."""
572 """Unregister an engine that has died."""
572 content = msg['content']
573 content = msg['content']
573 eid = int(content['id'])
574 eid = int(content['id'])
574 if eid in self._ids:
575 if eid in self._ids:
575 self._ids.remove(eid)
576 self._ids.remove(eid)
576 uuid = self._engines.pop(eid)
577 uuid = self._engines.pop(eid)
577
578
578 self._handle_stranded_msgs(eid, uuid)
579 self._handle_stranded_msgs(eid, uuid)
579
580
580 if self._task_socket and self._task_scheme == 'pure':
581 if self._task_socket and self._task_scheme == 'pure':
581 self._stop_scheduling_tasks()
582 self._stop_scheduling_tasks()
582
583
583 def _handle_stranded_msgs(self, eid, uuid):
584 def _handle_stranded_msgs(self, eid, uuid):
584 """Handle messages known to be on an engine when the engine unregisters.
585 """Handle messages known to be on an engine when the engine unregisters.
585
586
586 It is possible that this will fire prematurely - that is, an engine will
587 It is possible that this will fire prematurely - that is, an engine will
587 go down after completing a result, and the client will be notified
588 go down after completing a result, and the client will be notified
588 of the unregistration and later receive the successful result.
589 of the unregistration and later receive the successful result.
589 """
590 """
590
591
591 outstanding = self._outstanding_dict[uuid]
592 outstanding = self._outstanding_dict[uuid]
592
593
593 for msg_id in list(outstanding):
594 for msg_id in list(outstanding):
594 if msg_id in self.results:
595 if msg_id in self.results:
595 # we already
596 # we already
596 continue
597 continue
597 try:
598 try:
598 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
599 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
599 except:
600 except:
600 content = error.wrap_exception()
601 content = error.wrap_exception()
601 # build a fake message:
602 # build a fake message:
602 parent = {}
603 parent = {}
603 header = {}
604 header = {}
604 parent['msg_id'] = msg_id
605 parent['msg_id'] = msg_id
605 header['engine'] = uuid
606 header['engine'] = uuid
606 header['date'] = datetime.now()
607 header['date'] = datetime.now()
607 msg = dict(parent_header=parent, header=header, content=content)
608 msg = dict(parent_header=parent, header=header, content=content)
608 self._handle_apply_reply(msg)
609 self._handle_apply_reply(msg)
609
610
610 def _handle_execute_reply(self, msg):
611 def _handle_execute_reply(self, msg):
611 """Save the reply to an execute_request into our results.
612 """Save the reply to an execute_request into our results.
612
613
613 execute messages are never actually used. apply is used instead.
614 execute messages are never actually used. apply is used instead.
614 """
615 """
615
616
616 parent = msg['parent_header']
617 parent = msg['parent_header']
617 msg_id = parent['msg_id']
618 msg_id = parent['msg_id']
618 if msg_id not in self.outstanding:
619 if msg_id not in self.outstanding:
619 if msg_id in self.history:
620 if msg_id in self.history:
620 print ("got stale result: %s"%msg_id)
621 print ("got stale result: %s"%msg_id)
621 else:
622 else:
622 print ("got unknown result: %s"%msg_id)
623 print ("got unknown result: %s"%msg_id)
623 else:
624 else:
624 self.outstanding.remove(msg_id)
625 self.outstanding.remove(msg_id)
625 self.results[msg_id] = self._unwrap_exception(msg['content'])
626 self.results[msg_id] = self._unwrap_exception(msg['content'])
626
627
627 def _handle_apply_reply(self, msg):
628 def _handle_apply_reply(self, msg):
628 """Save the reply to an apply_request into our results."""
629 """Save the reply to an apply_request into our results."""
629 parent = msg['parent_header']
630 parent = msg['parent_header']
630 msg_id = parent['msg_id']
631 msg_id = parent['msg_id']
631 if msg_id not in self.outstanding:
632 if msg_id not in self.outstanding:
632 if msg_id in self.history:
633 if msg_id in self.history:
633 print ("got stale result: %s"%msg_id)
634 print ("got stale result: %s"%msg_id)
634 print self.results[msg_id]
635 print self.results[msg_id]
635 print msg
636 print msg
636 else:
637 else:
637 print ("got unknown result: %s"%msg_id)
638 print ("got unknown result: %s"%msg_id)
638 else:
639 else:
639 self.outstanding.remove(msg_id)
640 self.outstanding.remove(msg_id)
640 content = msg['content']
641 content = msg['content']
641 header = msg['header']
642 header = msg['header']
642
643
643 # construct metadata:
644 # construct metadata:
644 md = self.metadata[msg_id]
645 md = self.metadata[msg_id]
645 md.update(self._extract_metadata(header, parent, content))
646 md.update(self._extract_metadata(header, parent, content))
646 # is this redundant?
647 # is this redundant?
647 self.metadata[msg_id] = md
648 self.metadata[msg_id] = md
648
649
649 e_outstanding = self._outstanding_dict[md['engine_uuid']]
650 e_outstanding = self._outstanding_dict[md['engine_uuid']]
650 if msg_id in e_outstanding:
651 if msg_id in e_outstanding:
651 e_outstanding.remove(msg_id)
652 e_outstanding.remove(msg_id)
652
653
653 # construct result:
654 # construct result:
654 if content['status'] == 'ok':
655 if content['status'] == 'ok':
655 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
656 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
656 elif content['status'] == 'aborted':
657 elif content['status'] == 'aborted':
657 self.results[msg_id] = error.TaskAborted(msg_id)
658 self.results[msg_id] = error.TaskAborted(msg_id)
658 elif content['status'] == 'resubmitted':
659 elif content['status'] == 'resubmitted':
659 # TODO: handle resubmission
660 # TODO: handle resubmission
660 pass
661 pass
661 else:
662 else:
662 self.results[msg_id] = self._unwrap_exception(content)
663 self.results[msg_id] = self._unwrap_exception(content)
663
664
664 def _flush_notifications(self):
665 def _flush_notifications(self):
665 """Flush notifications of engine registrations waiting
666 """Flush notifications of engine registrations waiting
666 in ZMQ queue."""
667 in ZMQ queue."""
667 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
668 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
668 while msg is not None:
669 while msg is not None:
669 if self.debug:
670 if self.debug:
670 pprint(msg)
671 pprint(msg)
671 msg_type = msg['header']['msg_type']
672 msg_type = msg['header']['msg_type']
672 handler = self._notification_handlers.get(msg_type, None)
673 handler = self._notification_handlers.get(msg_type, None)
673 if handler is None:
674 if handler is None:
674 raise Exception("Unhandled message type: %s"%msg.msg_type)
675 raise Exception("Unhandled message type: %s"%msg.msg_type)
675 else:
676 else:
676 handler(msg)
677 handler(msg)
677 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
678 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
678
679
679 def _flush_results(self, sock):
680 def _flush_results(self, sock):
680 """Flush task or queue results waiting in ZMQ queue."""
681 """Flush task or queue results waiting in ZMQ queue."""
681 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
682 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
682 while msg is not None:
683 while msg is not None:
683 if self.debug:
684 if self.debug:
684 pprint(msg)
685 pprint(msg)
685 msg_type = msg['header']['msg_type']
686 msg_type = msg['header']['msg_type']
686 handler = self._queue_handlers.get(msg_type, None)
687 handler = self._queue_handlers.get(msg_type, None)
687 if handler is None:
688 if handler is None:
688 raise Exception("Unhandled message type: %s"%msg.msg_type)
689 raise Exception("Unhandled message type: %s"%msg.msg_type)
689 else:
690 else:
690 handler(msg)
691 handler(msg)
691 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
692 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
692
693
693 def _flush_control(self, sock):
694 def _flush_control(self, sock):
694 """Flush replies from the control channel waiting
695 """Flush replies from the control channel waiting
695 in the ZMQ queue.
696 in the ZMQ queue.
696
697
697 Currently: ignore them."""
698 Currently: ignore them."""
698 if self._ignored_control_replies <= 0:
699 if self._ignored_control_replies <= 0:
699 return
700 return
700 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
701 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
701 while msg is not None:
702 while msg is not None:
702 self._ignored_control_replies -= 1
703 self._ignored_control_replies -= 1
703 if self.debug:
704 if self.debug:
704 pprint(msg)
705 pprint(msg)
705 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
706 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
706
707
707 def _flush_ignored_control(self):
708 def _flush_ignored_control(self):
708 """flush ignored control replies"""
709 """flush ignored control replies"""
709 while self._ignored_control_replies > 0:
710 while self._ignored_control_replies > 0:
710 self.session.recv(self._control_socket)
711 self.session.recv(self._control_socket)
711 self._ignored_control_replies -= 1
712 self._ignored_control_replies -= 1
712
713
713 def _flush_ignored_hub_replies(self):
714 def _flush_ignored_hub_replies(self):
714 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
715 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
715 while msg is not None:
716 while msg is not None:
716 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
717 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
717
718
718 def _flush_iopub(self, sock):
719 def _flush_iopub(self, sock):
719 """Flush replies from the iopub channel waiting
720 """Flush replies from the iopub channel waiting
720 in the ZMQ queue.
721 in the ZMQ queue.
721 """
722 """
722 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
723 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
723 while msg is not None:
724 while msg is not None:
724 if self.debug:
725 if self.debug:
725 pprint(msg)
726 pprint(msg)
726 parent = msg['parent_header']
727 parent = msg['parent_header']
727 msg_id = parent['msg_id']
728 msg_id = parent['msg_id']
728 content = msg['content']
729 content = msg['content']
729 header = msg['header']
730 header = msg['header']
730 msg_type = msg['header']['msg_type']
731 msg_type = msg['header']['msg_type']
731
732
732 # init metadata:
733 # init metadata:
733 md = self.metadata[msg_id]
734 md = self.metadata[msg_id]
734
735
735 if msg_type == 'stream':
736 if msg_type == 'stream':
736 name = content['name']
737 name = content['name']
737 s = md[name] or ''
738 s = md[name] or ''
738 md[name] = s + content['data']
739 md[name] = s + content['data']
739 elif msg_type == 'pyerr':
740 elif msg_type == 'pyerr':
740 md.update({'pyerr' : self._unwrap_exception(content)})
741 md.update({'pyerr' : self._unwrap_exception(content)})
741 elif msg_type == 'pyin':
742 elif msg_type == 'pyin':
742 md.update({'pyin' : content['code']})
743 md.update({'pyin' : content['code']})
743 else:
744 else:
744 md.update({msg_type : content.get('data', '')})
745 md.update({msg_type : content.get('data', '')})
745
746
746 # reduntant?
747 # reduntant?
747 self.metadata[msg_id] = md
748 self.metadata[msg_id] = md
748
749
749 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
750 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
750
751
751 #--------------------------------------------------------------------------
752 #--------------------------------------------------------------------------
752 # len, getitem
753 # len, getitem
753 #--------------------------------------------------------------------------
754 #--------------------------------------------------------------------------
754
755
755 def __len__(self):
756 def __len__(self):
756 """len(client) returns # of engines."""
757 """len(client) returns # of engines."""
757 return len(self.ids)
758 return len(self.ids)
758
759
759 def __getitem__(self, key):
760 def __getitem__(self, key):
760 """index access returns DirectView multiplexer objects
761 """index access returns DirectView multiplexer objects
761
762
762 Must be int, slice, or list/tuple/xrange of ints"""
763 Must be int, slice, or list/tuple/xrange of ints"""
763 if not isinstance(key, (int, slice, tuple, list, xrange)):
764 if not isinstance(key, (int, slice, tuple, list, xrange)):
764 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
765 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
765 else:
766 else:
766 return self.direct_view(key)
767 return self.direct_view(key)
767
768
768 #--------------------------------------------------------------------------
769 #--------------------------------------------------------------------------
769 # Begin public methods
770 # Begin public methods
770 #--------------------------------------------------------------------------
771 #--------------------------------------------------------------------------
771
772
772 @property
773 @property
773 def ids(self):
774 def ids(self):
774 """Always up-to-date ids property."""
775 """Always up-to-date ids property."""
775 self._flush_notifications()
776 self._flush_notifications()
776 # always copy:
777 # always copy:
777 return list(self._ids)
778 return list(self._ids)
778
779
779 def close(self):
780 def close(self):
780 if self._closed:
781 if self._closed:
781 return
782 return
782 snames = filter(lambda n: n.endswith('socket'), dir(self))
783 snames = filter(lambda n: n.endswith('socket'), dir(self))
783 for socket in map(lambda name: getattr(self, name), snames):
784 for socket in map(lambda name: getattr(self, name), snames):
784 if isinstance(socket, zmq.Socket) and not socket.closed:
785 if isinstance(socket, zmq.Socket) and not socket.closed:
785 socket.close()
786 socket.close()
786 self._closed = True
787 self._closed = True
787
788
788 def spin(self):
789 def spin(self):
789 """Flush any registration notifications and execution results
790 """Flush any registration notifications and execution results
790 waiting in the ZMQ queue.
791 waiting in the ZMQ queue.
791 """
792 """
792 if self._notification_socket:
793 if self._notification_socket:
793 self._flush_notifications()
794 self._flush_notifications()
794 if self._mux_socket:
795 if self._mux_socket:
795 self._flush_results(self._mux_socket)
796 self._flush_results(self._mux_socket)
796 if self._task_socket:
797 if self._task_socket:
797 self._flush_results(self._task_socket)
798 self._flush_results(self._task_socket)
798 if self._control_socket:
799 if self._control_socket:
799 self._flush_control(self._control_socket)
800 self._flush_control(self._control_socket)
800 if self._iopub_socket:
801 if self._iopub_socket:
801 self._flush_iopub(self._iopub_socket)
802 self._flush_iopub(self._iopub_socket)
802 if self._query_socket:
803 if self._query_socket:
803 self._flush_ignored_hub_replies()
804 self._flush_ignored_hub_replies()
804
805
805 def wait(self, jobs=None, timeout=-1):
806 def wait(self, jobs=None, timeout=-1):
806 """waits on one or more `jobs`, for up to `timeout` seconds.
807 """waits on one or more `jobs`, for up to `timeout` seconds.
807
808
808 Parameters
809 Parameters
809 ----------
810 ----------
810
811
811 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
812 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
812 ints are indices to self.history
813 ints are indices to self.history
813 strs are msg_ids
814 strs are msg_ids
814 default: wait on all outstanding messages
815 default: wait on all outstanding messages
815 timeout : float
816 timeout : float
816 a time in seconds, after which to give up.
817 a time in seconds, after which to give up.
817 default is -1, which means no timeout
818 default is -1, which means no timeout
818
819
819 Returns
820 Returns
820 -------
821 -------
821
822
822 True : when all msg_ids are done
823 True : when all msg_ids are done
823 False : timeout reached, some msg_ids still outstanding
824 False : timeout reached, some msg_ids still outstanding
824 """
825 """
825 tic = time.time()
826 tic = time.time()
826 if jobs is None:
827 if jobs is None:
827 theids = self.outstanding
828 theids = self.outstanding
828 else:
829 else:
829 if isinstance(jobs, (int, basestring, AsyncResult)):
830 if isinstance(jobs, (int, basestring, AsyncResult)):
830 jobs = [jobs]
831 jobs = [jobs]
831 theids = set()
832 theids = set()
832 for job in jobs:
833 for job in jobs:
833 if isinstance(job, int):
834 if isinstance(job, int):
834 # index access
835 # index access
835 job = self.history[job]
836 job = self.history[job]
836 elif isinstance(job, AsyncResult):
837 elif isinstance(job, AsyncResult):
837 map(theids.add, job.msg_ids)
838 map(theids.add, job.msg_ids)
838 continue
839 continue
839 theids.add(job)
840 theids.add(job)
840 if not theids.intersection(self.outstanding):
841 if not theids.intersection(self.outstanding):
841 return True
842 return True
842 self.spin()
843 self.spin()
843 while theids.intersection(self.outstanding):
844 while theids.intersection(self.outstanding):
844 if timeout >= 0 and ( time.time()-tic ) > timeout:
845 if timeout >= 0 and ( time.time()-tic ) > timeout:
845 break
846 break
846 time.sleep(1e-3)
847 time.sleep(1e-3)
847 self.spin()
848 self.spin()
848 return len(theids.intersection(self.outstanding)) == 0
849 return len(theids.intersection(self.outstanding)) == 0
849
850
850 #--------------------------------------------------------------------------
851 #--------------------------------------------------------------------------
851 # Control methods
852 # Control methods
852 #--------------------------------------------------------------------------
853 #--------------------------------------------------------------------------
853
854
854 @spin_first
855 @spin_first
855 def clear(self, targets=None, block=None):
856 def clear(self, targets=None, block=None):
856 """Clear the namespace in target(s)."""
857 """Clear the namespace in target(s)."""
857 block = self.block if block is None else block
858 block = self.block if block is None else block
858 targets = self._build_targets(targets)[0]
859 targets = self._build_targets(targets)[0]
859 for t in targets:
860 for t in targets:
860 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
861 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
861 error = False
862 error = False
862 if block:
863 if block:
863 self._flush_ignored_control()
864 self._flush_ignored_control()
864 for i in range(len(targets)):
865 for i in range(len(targets)):
865 idents,msg = self.session.recv(self._control_socket,0)
866 idents,msg = self.session.recv(self._control_socket,0)
866 if self.debug:
867 if self.debug:
867 pprint(msg)
868 pprint(msg)
868 if msg['content']['status'] != 'ok':
869 if msg['content']['status'] != 'ok':
869 error = self._unwrap_exception(msg['content'])
870 error = self._unwrap_exception(msg['content'])
870 else:
871 else:
871 self._ignored_control_replies += len(targets)
872 self._ignored_control_replies += len(targets)
872 if error:
873 if error:
873 raise error
874 raise error
874
875
875
876
876 @spin_first
877 @spin_first
877 def abort(self, jobs=None, targets=None, block=None):
878 def abort(self, jobs=None, targets=None, block=None):
878 """Abort specific jobs from the execution queues of target(s).
879 """Abort specific jobs from the execution queues of target(s).
879
880
880 This is a mechanism to prevent jobs that have already been submitted
881 This is a mechanism to prevent jobs that have already been submitted
881 from executing.
882 from executing.
882
883
883 Parameters
884 Parameters
884 ----------
885 ----------
885
886
886 jobs : msg_id, list of msg_ids, or AsyncResult
887 jobs : msg_id, list of msg_ids, or AsyncResult
887 The jobs to be aborted
888 The jobs to be aborted
888
889
889
890
890 """
891 """
891 block = self.block if block is None else block
892 block = self.block if block is None else block
892 targets = self._build_targets(targets)[0]
893 targets = self._build_targets(targets)[0]
893 msg_ids = []
894 msg_ids = []
894 if isinstance(jobs, (basestring,AsyncResult)):
895 if isinstance(jobs, (basestring,AsyncResult)):
895 jobs = [jobs]
896 jobs = [jobs]
896 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
897 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
897 if bad_ids:
898 if bad_ids:
898 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
899 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
899 for j in jobs:
900 for j in jobs:
900 if isinstance(j, AsyncResult):
901 if isinstance(j, AsyncResult):
901 msg_ids.extend(j.msg_ids)
902 msg_ids.extend(j.msg_ids)
902 else:
903 else:
903 msg_ids.append(j)
904 msg_ids.append(j)
904 content = dict(msg_ids=msg_ids)
905 content = dict(msg_ids=msg_ids)
905 for t in targets:
906 for t in targets:
906 self.session.send(self._control_socket, 'abort_request',
907 self.session.send(self._control_socket, 'abort_request',
907 content=content, ident=t)
908 content=content, ident=t)
908 error = False
909 error = False
909 if block:
910 if block:
910 self._flush_ignored_control()
911 self._flush_ignored_control()
911 for i in range(len(targets)):
912 for i in range(len(targets)):
912 idents,msg = self.session.recv(self._control_socket,0)
913 idents,msg = self.session.recv(self._control_socket,0)
913 if self.debug:
914 if self.debug:
914 pprint(msg)
915 pprint(msg)
915 if msg['content']['status'] != 'ok':
916 if msg['content']['status'] != 'ok':
916 error = self._unwrap_exception(msg['content'])
917 error = self._unwrap_exception(msg['content'])
917 else:
918 else:
918 self._ignored_control_replies += len(targets)
919 self._ignored_control_replies += len(targets)
919 if error:
920 if error:
920 raise error
921 raise error
921
922
922 @spin_first
923 @spin_first
923 def shutdown(self, targets=None, restart=False, hub=False, block=None):
924 def shutdown(self, targets=None, restart=False, hub=False, block=None):
924 """Terminates one or more engine processes, optionally including the hub."""
925 """Terminates one or more engine processes, optionally including the hub."""
925 block = self.block if block is None else block
926 block = self.block if block is None else block
926 if hub:
927 if hub:
927 targets = 'all'
928 targets = 'all'
928 targets = self._build_targets(targets)[0]
929 targets = self._build_targets(targets)[0]
929 for t in targets:
930 for t in targets:
930 self.session.send(self._control_socket, 'shutdown_request',
931 self.session.send(self._control_socket, 'shutdown_request',
931 content={'restart':restart},ident=t)
932 content={'restart':restart},ident=t)
932 error = False
933 error = False
933 if block or hub:
934 if block or hub:
934 self._flush_ignored_control()
935 self._flush_ignored_control()
935 for i in range(len(targets)):
936 for i in range(len(targets)):
936 idents,msg = self.session.recv(self._control_socket, 0)
937 idents,msg = self.session.recv(self._control_socket, 0)
937 if self.debug:
938 if self.debug:
938 pprint(msg)
939 pprint(msg)
939 if msg['content']['status'] != 'ok':
940 if msg['content']['status'] != 'ok':
940 error = self._unwrap_exception(msg['content'])
941 error = self._unwrap_exception(msg['content'])
941 else:
942 else:
942 self._ignored_control_replies += len(targets)
943 self._ignored_control_replies += len(targets)
943
944
944 if hub:
945 if hub:
945 time.sleep(0.25)
946 time.sleep(0.25)
946 self.session.send(self._query_socket, 'shutdown_request')
947 self.session.send(self._query_socket, 'shutdown_request')
947 idents,msg = self.session.recv(self._query_socket, 0)
948 idents,msg = self.session.recv(self._query_socket, 0)
948 if self.debug:
949 if self.debug:
949 pprint(msg)
950 pprint(msg)
950 if msg['content']['status'] != 'ok':
951 if msg['content']['status'] != 'ok':
951 error = self._unwrap_exception(msg['content'])
952 error = self._unwrap_exception(msg['content'])
952
953
953 if error:
954 if error:
954 raise error
955 raise error
955
956
956 #--------------------------------------------------------------------------
957 #--------------------------------------------------------------------------
957 # Execution related methods
958 # Execution related methods
958 #--------------------------------------------------------------------------
959 #--------------------------------------------------------------------------
959
960
960 def _maybe_raise(self, result):
961 def _maybe_raise(self, result):
961 """wrapper for maybe raising an exception if apply failed."""
962 """wrapper for maybe raising an exception if apply failed."""
962 if isinstance(result, error.RemoteError):
963 if isinstance(result, error.RemoteError):
963 raise result
964 raise result
964
965
965 return result
966 return result
966
967
967 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
968 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
968 ident=None):
969 ident=None):
969 """construct and send an apply message via a socket.
970 """construct and send an apply message via a socket.
970
971
971 This is the principal method with which all engine execution is performed by views.
972 This is the principal method with which all engine execution is performed by views.
972 """
973 """
973
974
974 assert not self._closed, "cannot use me anymore, I'm closed!"
975 assert not self._closed, "cannot use me anymore, I'm closed!"
975 # defaults:
976 # defaults:
976 args = args if args is not None else []
977 args = args if args is not None else []
977 kwargs = kwargs if kwargs is not None else {}
978 kwargs = kwargs if kwargs is not None else {}
978 subheader = subheader if subheader is not None else {}
979 subheader = subheader if subheader is not None else {}
979
980
980 # validate arguments
981 # validate arguments
981 if not callable(f):
982 if not callable(f):
982 raise TypeError("f must be callable, not %s"%type(f))
983 raise TypeError("f must be callable, not %s"%type(f))
983 if not isinstance(args, (tuple, list)):
984 if not isinstance(args, (tuple, list)):
984 raise TypeError("args must be tuple or list, not %s"%type(args))
985 raise TypeError("args must be tuple or list, not %s"%type(args))
985 if not isinstance(kwargs, dict):
986 if not isinstance(kwargs, dict):
986 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
987 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
987 if not isinstance(subheader, dict):
988 if not isinstance(subheader, dict):
988 raise TypeError("subheader must be dict, not %s"%type(subheader))
989 raise TypeError("subheader must be dict, not %s"%type(subheader))
989
990
990 bufs = util.pack_apply_message(f,args,kwargs)
991 bufs = util.pack_apply_message(f,args,kwargs)
991
992
992 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
993 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
993 subheader=subheader, track=track)
994 subheader=subheader, track=track)
994
995
995 msg_id = msg['header']['msg_id']
996 msg_id = msg['header']['msg_id']
996 self.outstanding.add(msg_id)
997 self.outstanding.add(msg_id)
997 if ident:
998 if ident:
998 # possibly routed to a specific engine
999 # possibly routed to a specific engine
999 if isinstance(ident, list):
1000 if isinstance(ident, list):
1000 ident = ident[-1]
1001 ident = ident[-1]
1001 if ident in self._engines.values():
1002 if ident in self._engines.values():
1002 # save for later, in case of engine death
1003 # save for later, in case of engine death
1003 self._outstanding_dict[ident].add(msg_id)
1004 self._outstanding_dict[ident].add(msg_id)
1004 self.history.append(msg_id)
1005 self.history.append(msg_id)
1005 self.metadata[msg_id]['submitted'] = datetime.now()
1006 self.metadata[msg_id]['submitted'] = datetime.now()
1006
1007
1007 return msg
1008 return msg
1008
1009
1009 #--------------------------------------------------------------------------
1010 #--------------------------------------------------------------------------
1010 # construct a View object
1011 # construct a View object
1011 #--------------------------------------------------------------------------
1012 #--------------------------------------------------------------------------
1012
1013
1013 def load_balanced_view(self, targets=None):
1014 def load_balanced_view(self, targets=None):
1014 """construct a DirectView object.
1015 """construct a DirectView object.
1015
1016
1016 If no arguments are specified, create a LoadBalancedView
1017 If no arguments are specified, create a LoadBalancedView
1017 using all engines.
1018 using all engines.
1018
1019
1019 Parameters
1020 Parameters
1020 ----------
1021 ----------
1021
1022
1022 targets: list,slice,int,etc. [default: use all engines]
1023 targets: list,slice,int,etc. [default: use all engines]
1023 The subset of engines across which to load-balance
1024 The subset of engines across which to load-balance
1024 """
1025 """
1025 if targets == 'all':
1026 if targets == 'all':
1026 targets = None
1027 targets = None
1027 if targets is not None:
1028 if targets is not None:
1028 targets = self._build_targets(targets)[1]
1029 targets = self._build_targets(targets)[1]
1029 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1030 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1030
1031
1031 def direct_view(self, targets='all'):
1032 def direct_view(self, targets='all'):
1032 """construct a DirectView object.
1033 """construct a DirectView object.
1033
1034
1034 If no targets are specified, create a DirectView
1035 If no targets are specified, create a DirectView
1035 using all engines.
1036 using all engines.
1036
1037
1037 Parameters
1038 Parameters
1038 ----------
1039 ----------
1039
1040
1040 targets: list,slice,int,etc. [default: use all engines]
1041 targets: list,slice,int,etc. [default: use all engines]
1041 The engines to use for the View
1042 The engines to use for the View
1042 """
1043 """
1043 single = isinstance(targets, int)
1044 single = isinstance(targets, int)
1044 # allow 'all' to be lazily evaluated at each execution
1045 # allow 'all' to be lazily evaluated at each execution
1045 if targets != 'all':
1046 if targets != 'all':
1046 targets = self._build_targets(targets)[1]
1047 targets = self._build_targets(targets)[1]
1047 if single:
1048 if single:
1048 targets = targets[0]
1049 targets = targets[0]
1049 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1050 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1050
1051
1051 #--------------------------------------------------------------------------
1052 #--------------------------------------------------------------------------
1052 # Query methods
1053 # Query methods
1053 #--------------------------------------------------------------------------
1054 #--------------------------------------------------------------------------
1054
1055
1055 @spin_first
1056 @spin_first
1056 def get_result(self, indices_or_msg_ids=None, block=None):
1057 def get_result(self, indices_or_msg_ids=None, block=None):
1057 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1058 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1058
1059
1059 If the client already has the results, no request to the Hub will be made.
1060 If the client already has the results, no request to the Hub will be made.
1060
1061
1061 This is a convenient way to construct AsyncResult objects, which are wrappers
1062 This is a convenient way to construct AsyncResult objects, which are wrappers
1062 that include metadata about execution, and allow for awaiting results that
1063 that include metadata about execution, and allow for awaiting results that
1063 were not submitted by this Client.
1064 were not submitted by this Client.
1064
1065
1065 It can also be a convenient way to retrieve the metadata associated with
1066 It can also be a convenient way to retrieve the metadata associated with
1066 blocking execution, since it always retrieves
1067 blocking execution, since it always retrieves
1067
1068
1068 Examples
1069 Examples
1069 --------
1070 --------
1070 ::
1071 ::
1071
1072
1072 In [10]: r = client.apply()
1073 In [10]: r = client.apply()
1073
1074
1074 Parameters
1075 Parameters
1075 ----------
1076 ----------
1076
1077
1077 indices_or_msg_ids : integer history index, str msg_id, or list of either
1078 indices_or_msg_ids : integer history index, str msg_id, or list of either
1078 The indices or msg_ids of indices to be retrieved
1079 The indices or msg_ids of indices to be retrieved
1079
1080
1080 block : bool
1081 block : bool
1081 Whether to wait for the result to be done
1082 Whether to wait for the result to be done
1082
1083
1083 Returns
1084 Returns
1084 -------
1085 -------
1085
1086
1086 AsyncResult
1087 AsyncResult
1087 A single AsyncResult object will always be returned.
1088 A single AsyncResult object will always be returned.
1088
1089
1089 AsyncHubResult
1090 AsyncHubResult
1090 A subclass of AsyncResult that retrieves results from the Hub
1091 A subclass of AsyncResult that retrieves results from the Hub
1091
1092
1092 """
1093 """
1093 block = self.block if block is None else block
1094 block = self.block if block is None else block
1094 if indices_or_msg_ids is None:
1095 if indices_or_msg_ids is None:
1095 indices_or_msg_ids = -1
1096 indices_or_msg_ids = -1
1096
1097
1097 if not isinstance(indices_or_msg_ids, (list,tuple)):
1098 if not isinstance(indices_or_msg_ids, (list,tuple)):
1098 indices_or_msg_ids = [indices_or_msg_ids]
1099 indices_or_msg_ids = [indices_or_msg_ids]
1099
1100
1100 theids = []
1101 theids = []
1101 for id in indices_or_msg_ids:
1102 for id in indices_or_msg_ids:
1102 if isinstance(id, int):
1103 if isinstance(id, int):
1103 id = self.history[id]
1104 id = self.history[id]
1104 if not isinstance(id, basestring):
1105 if not isinstance(id, basestring):
1105 raise TypeError("indices must be str or int, not %r"%id)
1106 raise TypeError("indices must be str or int, not %r"%id)
1106 theids.append(id)
1107 theids.append(id)
1107
1108
1108 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1109 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1109 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1110 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1110
1111
1111 if remote_ids:
1112 if remote_ids:
1112 ar = AsyncHubResult(self, msg_ids=theids)
1113 ar = AsyncHubResult(self, msg_ids=theids)
1113 else:
1114 else:
1114 ar = AsyncResult(self, msg_ids=theids)
1115 ar = AsyncResult(self, msg_ids=theids)
1115
1116
1116 if block:
1117 if block:
1117 ar.wait()
1118 ar.wait()
1118
1119
1119 return ar
1120 return ar
1120
1121
1121 @spin_first
1122 @spin_first
1122 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1123 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1123 """Resubmit one or more tasks.
1124 """Resubmit one or more tasks.
1124
1125
1125 in-flight tasks may not be resubmitted.
1126 in-flight tasks may not be resubmitted.
1126
1127
1127 Parameters
1128 Parameters
1128 ----------
1129 ----------
1129
1130
1130 indices_or_msg_ids : integer history index, str msg_id, or list of either
1131 indices_or_msg_ids : integer history index, str msg_id, or list of either
1131 The indices or msg_ids of indices to be retrieved
1132 The indices or msg_ids of indices to be retrieved
1132
1133
1133 block : bool
1134 block : bool
1134 Whether to wait for the result to be done
1135 Whether to wait for the result to be done
1135
1136
1136 Returns
1137 Returns
1137 -------
1138 -------
1138
1139
1139 AsyncHubResult
1140 AsyncHubResult
1140 A subclass of AsyncResult that retrieves results from the Hub
1141 A subclass of AsyncResult that retrieves results from the Hub
1141
1142
1142 """
1143 """
1143 block = self.block if block is None else block
1144 block = self.block if block is None else block
1144 if indices_or_msg_ids is None:
1145 if indices_or_msg_ids is None:
1145 indices_or_msg_ids = -1
1146 indices_or_msg_ids = -1
1146
1147
1147 if not isinstance(indices_or_msg_ids, (list,tuple)):
1148 if not isinstance(indices_or_msg_ids, (list,tuple)):
1148 indices_or_msg_ids = [indices_or_msg_ids]
1149 indices_or_msg_ids = [indices_or_msg_ids]
1149
1150
1150 theids = []
1151 theids = []
1151 for id in indices_or_msg_ids:
1152 for id in indices_or_msg_ids:
1152 if isinstance(id, int):
1153 if isinstance(id, int):
1153 id = self.history[id]
1154 id = self.history[id]
1154 if not isinstance(id, basestring):
1155 if not isinstance(id, basestring):
1155 raise TypeError("indices must be str or int, not %r"%id)
1156 raise TypeError("indices must be str or int, not %r"%id)
1156 theids.append(id)
1157 theids.append(id)
1157
1158
1158 for msg_id in theids:
1159 for msg_id in theids:
1159 self.outstanding.discard(msg_id)
1160 self.outstanding.discard(msg_id)
1160 if msg_id in self.history:
1161 if msg_id in self.history:
1161 self.history.remove(msg_id)
1162 self.history.remove(msg_id)
1162 self.results.pop(msg_id, None)
1163 self.results.pop(msg_id, None)
1163 self.metadata.pop(msg_id, None)
1164 self.metadata.pop(msg_id, None)
1164 content = dict(msg_ids = theids)
1165 content = dict(msg_ids = theids)
1165
1166
1166 self.session.send(self._query_socket, 'resubmit_request', content)
1167 self.session.send(self._query_socket, 'resubmit_request', content)
1167
1168
1168 zmq.select([self._query_socket], [], [])
1169 zmq.select([self._query_socket], [], [])
1169 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1170 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1170 if self.debug:
1171 if self.debug:
1171 pprint(msg)
1172 pprint(msg)
1172 content = msg['content']
1173 content = msg['content']
1173 if content['status'] != 'ok':
1174 if content['status'] != 'ok':
1174 raise self._unwrap_exception(content)
1175 raise self._unwrap_exception(content)
1175
1176
1176 ar = AsyncHubResult(self, msg_ids=theids)
1177 ar = AsyncHubResult(self, msg_ids=theids)
1177
1178
1178 if block:
1179 if block:
1179 ar.wait()
1180 ar.wait()
1180
1181
1181 return ar
1182 return ar
1182
1183
1183 @spin_first
1184 @spin_first
1184 def result_status(self, msg_ids, status_only=True):
1185 def result_status(self, msg_ids, status_only=True):
1185 """Check on the status of the result(s) of the apply request with `msg_ids`.
1186 """Check on the status of the result(s) of the apply request with `msg_ids`.
1186
1187
1187 If status_only is False, then the actual results will be retrieved, else
1188 If status_only is False, then the actual results will be retrieved, else
1188 only the status of the results will be checked.
1189 only the status of the results will be checked.
1189
1190
1190 Parameters
1191 Parameters
1191 ----------
1192 ----------
1192
1193
1193 msg_ids : list of msg_ids
1194 msg_ids : list of msg_ids
1194 if int:
1195 if int:
1195 Passed as index to self.history for convenience.
1196 Passed as index to self.history for convenience.
1196 status_only : bool (default: True)
1197 status_only : bool (default: True)
1197 if False:
1198 if False:
1198 Retrieve the actual results of completed tasks.
1199 Retrieve the actual results of completed tasks.
1199
1200
1200 Returns
1201 Returns
1201 -------
1202 -------
1202
1203
1203 results : dict
1204 results : dict
1204 There will always be the keys 'pending' and 'completed', which will
1205 There will always be the keys 'pending' and 'completed', which will
1205 be lists of msg_ids that are incomplete or complete. If `status_only`
1206 be lists of msg_ids that are incomplete or complete. If `status_only`
1206 is False, then completed results will be keyed by their `msg_id`.
1207 is False, then completed results will be keyed by their `msg_id`.
1207 """
1208 """
1208 if not isinstance(msg_ids, (list,tuple)):
1209 if not isinstance(msg_ids, (list,tuple)):
1209 msg_ids = [msg_ids]
1210 msg_ids = [msg_ids]
1210
1211
1211 theids = []
1212 theids = []
1212 for msg_id in msg_ids:
1213 for msg_id in msg_ids:
1213 if isinstance(msg_id, int):
1214 if isinstance(msg_id, int):
1214 msg_id = self.history[msg_id]
1215 msg_id = self.history[msg_id]
1215 if not isinstance(msg_id, basestring):
1216 if not isinstance(msg_id, basestring):
1216 raise TypeError("msg_ids must be str, not %r"%msg_id)
1217 raise TypeError("msg_ids must be str, not %r"%msg_id)
1217 theids.append(msg_id)
1218 theids.append(msg_id)
1218
1219
1219 completed = []
1220 completed = []
1220 local_results = {}
1221 local_results = {}
1221
1222
1222 # comment this block out to temporarily disable local shortcut:
1223 # comment this block out to temporarily disable local shortcut:
1223 for msg_id in theids:
1224 for msg_id in theids:
1224 if msg_id in self.results:
1225 if msg_id in self.results:
1225 completed.append(msg_id)
1226 completed.append(msg_id)
1226 local_results[msg_id] = self.results[msg_id]
1227 local_results[msg_id] = self.results[msg_id]
1227 theids.remove(msg_id)
1228 theids.remove(msg_id)
1228
1229
1229 if theids: # some not locally cached
1230 if theids: # some not locally cached
1230 content = dict(msg_ids=theids, status_only=status_only)
1231 content = dict(msg_ids=theids, status_only=status_only)
1231 msg = self.session.send(self._query_socket, "result_request", content=content)
1232 msg = self.session.send(self._query_socket, "result_request", content=content)
1232 zmq.select([self._query_socket], [], [])
1233 zmq.select([self._query_socket], [], [])
1233 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1234 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1234 if self.debug:
1235 if self.debug:
1235 pprint(msg)
1236 pprint(msg)
1236 content = msg['content']
1237 content = msg['content']
1237 if content['status'] != 'ok':
1238 if content['status'] != 'ok':
1238 raise self._unwrap_exception(content)
1239 raise self._unwrap_exception(content)
1239 buffers = msg['buffers']
1240 buffers = msg['buffers']
1240 else:
1241 else:
1241 content = dict(completed=[],pending=[])
1242 content = dict(completed=[],pending=[])
1242
1243
1243 content['completed'].extend(completed)
1244 content['completed'].extend(completed)
1244
1245
1245 if status_only:
1246 if status_only:
1246 return content
1247 return content
1247
1248
1248 failures = []
1249 failures = []
1249 # load cached results into result:
1250 # load cached results into result:
1250 content.update(local_results)
1251 content.update(local_results)
1251
1252
1252 # update cache with results:
1253 # update cache with results:
1253 for msg_id in sorted(theids):
1254 for msg_id in sorted(theids):
1254 if msg_id in content['completed']:
1255 if msg_id in content['completed']:
1255 rec = content[msg_id]
1256 rec = content[msg_id]
1256 parent = rec['header']
1257 parent = rec['header']
1257 header = rec['result_header']
1258 header = rec['result_header']
1258 rcontent = rec['result_content']
1259 rcontent = rec['result_content']
1259 iodict = rec['io']
1260 iodict = rec['io']
1260 if isinstance(rcontent, str):
1261 if isinstance(rcontent, str):
1261 rcontent = self.session.unpack(rcontent)
1262 rcontent = self.session.unpack(rcontent)
1262
1263
1263 md = self.metadata[msg_id]
1264 md = self.metadata[msg_id]
1264 md.update(self._extract_metadata(header, parent, rcontent))
1265 md.update(self._extract_metadata(header, parent, rcontent))
1265 md.update(iodict)
1266 md.update(iodict)
1266
1267
1267 if rcontent['status'] == 'ok':
1268 if rcontent['status'] == 'ok':
1268 res,buffers = util.unserialize_object(buffers)
1269 res,buffers = util.unserialize_object(buffers)
1269 else:
1270 else:
1270 print rcontent
1271 print rcontent
1271 res = self._unwrap_exception(rcontent)
1272 res = self._unwrap_exception(rcontent)
1272 failures.append(res)
1273 failures.append(res)
1273
1274
1274 self.results[msg_id] = res
1275 self.results[msg_id] = res
1275 content[msg_id] = res
1276 content[msg_id] = res
1276
1277
1277 if len(theids) == 1 and failures:
1278 if len(theids) == 1 and failures:
1278 raise failures[0]
1279 raise failures[0]
1279
1280
1280 error.collect_exceptions(failures, "result_status")
1281 error.collect_exceptions(failures, "result_status")
1281 return content
1282 return content
1282
1283
1283 @spin_first
1284 @spin_first
1284 def queue_status(self, targets='all', verbose=False):
1285 def queue_status(self, targets='all', verbose=False):
1285 """Fetch the status of engine queues.
1286 """Fetch the status of engine queues.
1286
1287
1287 Parameters
1288 Parameters
1288 ----------
1289 ----------
1289
1290
1290 targets : int/str/list of ints/strs
1291 targets : int/str/list of ints/strs
1291 the engines whose states are to be queried.
1292 the engines whose states are to be queried.
1292 default : all
1293 default : all
1293 verbose : bool
1294 verbose : bool
1294 Whether to return lengths only, or lists of ids for each element
1295 Whether to return lengths only, or lists of ids for each element
1295 """
1296 """
1296 engine_ids = self._build_targets(targets)[1]
1297 engine_ids = self._build_targets(targets)[1]
1297 content = dict(targets=engine_ids, verbose=verbose)
1298 content = dict(targets=engine_ids, verbose=verbose)
1298 self.session.send(self._query_socket, "queue_request", content=content)
1299 self.session.send(self._query_socket, "queue_request", content=content)
1299 idents,msg = self.session.recv(self._query_socket, 0)
1300 idents,msg = self.session.recv(self._query_socket, 0)
1300 if self.debug:
1301 if self.debug:
1301 pprint(msg)
1302 pprint(msg)
1302 content = msg['content']
1303 content = msg['content']
1303 status = content.pop('status')
1304 status = content.pop('status')
1304 if status != 'ok':
1305 if status != 'ok':
1305 raise self._unwrap_exception(content)
1306 raise self._unwrap_exception(content)
1306 content = rekey(content)
1307 content = rekey(content)
1307 if isinstance(targets, int):
1308 if isinstance(targets, int):
1308 return content[targets]
1309 return content[targets]
1309 else:
1310 else:
1310 return content
1311 return content
1311
1312
1312 @spin_first
1313 @spin_first
1313 def purge_results(self, jobs=[], targets=[]):
1314 def purge_results(self, jobs=[], targets=[]):
1314 """Tell the Hub to forget results.
1315 """Tell the Hub to forget results.
1315
1316
1316 Individual results can be purged by msg_id, or the entire
1317 Individual results can be purged by msg_id, or the entire
1317 history of specific targets can be purged.
1318 history of specific targets can be purged.
1318
1319
1319 Use `purge_results('all')` to scrub everything from the Hub's db.
1320 Use `purge_results('all')` to scrub everything from the Hub's db.
1320
1321
1321 Parameters
1322 Parameters
1322 ----------
1323 ----------
1323
1324
1324 jobs : str or list of str or AsyncResult objects
1325 jobs : str or list of str or AsyncResult objects
1325 the msg_ids whose results should be forgotten.
1326 the msg_ids whose results should be forgotten.
1326 targets : int/str/list of ints/strs
1327 targets : int/str/list of ints/strs
1327 The targets, by int_id, whose entire history is to be purged.
1328 The targets, by int_id, whose entire history is to be purged.
1328
1329
1329 default : None
1330 default : None
1330 """
1331 """
1331 if not targets and not jobs:
1332 if not targets and not jobs:
1332 raise ValueError("Must specify at least one of `targets` and `jobs`")
1333 raise ValueError("Must specify at least one of `targets` and `jobs`")
1333 if targets:
1334 if targets:
1334 targets = self._build_targets(targets)[1]
1335 targets = self._build_targets(targets)[1]
1335
1336
1336 # construct msg_ids from jobs
1337 # construct msg_ids from jobs
1337 if jobs == 'all':
1338 if jobs == 'all':
1338 msg_ids = jobs
1339 msg_ids = jobs
1339 else:
1340 else:
1340 msg_ids = []
1341 msg_ids = []
1341 if isinstance(jobs, (basestring,AsyncResult)):
1342 if isinstance(jobs, (basestring,AsyncResult)):
1342 jobs = [jobs]
1343 jobs = [jobs]
1343 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1344 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1344 if bad_ids:
1345 if bad_ids:
1345 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1346 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1346 for j in jobs:
1347 for j in jobs:
1347 if isinstance(j, AsyncResult):
1348 if isinstance(j, AsyncResult):
1348 msg_ids.extend(j.msg_ids)
1349 msg_ids.extend(j.msg_ids)
1349 else:
1350 else:
1350 msg_ids.append(j)
1351 msg_ids.append(j)
1351
1352
1352 content = dict(engine_ids=targets, msg_ids=msg_ids)
1353 content = dict(engine_ids=targets, msg_ids=msg_ids)
1353 self.session.send(self._query_socket, "purge_request", content=content)
1354 self.session.send(self._query_socket, "purge_request", content=content)
1354 idents, msg = self.session.recv(self._query_socket, 0)
1355 idents, msg = self.session.recv(self._query_socket, 0)
1355 if self.debug:
1356 if self.debug:
1356 pprint(msg)
1357 pprint(msg)
1357 content = msg['content']
1358 content = msg['content']
1358 if content['status'] != 'ok':
1359 if content['status'] != 'ok':
1359 raise self._unwrap_exception(content)
1360 raise self._unwrap_exception(content)
1360
1361
1361 @spin_first
1362 @spin_first
1362 def hub_history(self):
1363 def hub_history(self):
1363 """Get the Hub's history
1364 """Get the Hub's history
1364
1365
1365 Just like the Client, the Hub has a history, which is a list of msg_ids.
1366 Just like the Client, the Hub has a history, which is a list of msg_ids.
1366 This will contain the history of all clients, and, depending on configuration,
1367 This will contain the history of all clients, and, depending on configuration,
1367 may contain history across multiple cluster sessions.
1368 may contain history across multiple cluster sessions.
1368
1369
1369 Any msg_id returned here is a valid argument to `get_result`.
1370 Any msg_id returned here is a valid argument to `get_result`.
1370
1371
1371 Returns
1372 Returns
1372 -------
1373 -------
1373
1374
1374 msg_ids : list of strs
1375 msg_ids : list of strs
1375 list of all msg_ids, ordered by task submission time.
1376 list of all msg_ids, ordered by task submission time.
1376 """
1377 """
1377
1378
1378 self.session.send(self._query_socket, "history_request", content={})
1379 self.session.send(self._query_socket, "history_request", content={})
1379 idents, msg = self.session.recv(self._query_socket, 0)
1380 idents, msg = self.session.recv(self._query_socket, 0)
1380
1381
1381 if self.debug:
1382 if self.debug:
1382 pprint(msg)
1383 pprint(msg)
1383 content = msg['content']
1384 content = msg['content']
1384 if content['status'] != 'ok':
1385 if content['status'] != 'ok':
1385 raise self._unwrap_exception(content)
1386 raise self._unwrap_exception(content)
1386 else:
1387 else:
1387 return content['history']
1388 return content['history']
1388
1389
1389 @spin_first
1390 @spin_first
1390 def db_query(self, query, keys=None):
1391 def db_query(self, query, keys=None):
1391 """Query the Hub's TaskRecord database
1392 """Query the Hub's TaskRecord database
1392
1393
1393 This will return a list of task record dicts that match `query`
1394 This will return a list of task record dicts that match `query`
1394
1395
1395 Parameters
1396 Parameters
1396 ----------
1397 ----------
1397
1398
1398 query : mongodb query dict
1399 query : mongodb query dict
1399 The search dict. See mongodb query docs for details.
1400 The search dict. See mongodb query docs for details.
1400 keys : list of strs [optional]
1401 keys : list of strs [optional]
1401 The subset of keys to be returned. The default is to fetch everything but buffers.
1402 The subset of keys to be returned. The default is to fetch everything but buffers.
1402 'msg_id' will *always* be included.
1403 'msg_id' will *always* be included.
1403 """
1404 """
1404 if isinstance(keys, basestring):
1405 if isinstance(keys, basestring):
1405 keys = [keys]
1406 keys = [keys]
1406 content = dict(query=query, keys=keys)
1407 content = dict(query=query, keys=keys)
1407 self.session.send(self._query_socket, "db_request", content=content)
1408 self.session.send(self._query_socket, "db_request", content=content)
1408 idents, msg = self.session.recv(self._query_socket, 0)
1409 idents, msg = self.session.recv(self._query_socket, 0)
1409 if self.debug:
1410 if self.debug:
1410 pprint(msg)
1411 pprint(msg)
1411 content = msg['content']
1412 content = msg['content']
1412 if content['status'] != 'ok':
1413 if content['status'] != 'ok':
1413 raise self._unwrap_exception(content)
1414 raise self._unwrap_exception(content)
1414
1415
1415 records = content['records']
1416 records = content['records']
1416
1417
1417 buffer_lens = content['buffer_lens']
1418 buffer_lens = content['buffer_lens']
1418 result_buffer_lens = content['result_buffer_lens']
1419 result_buffer_lens = content['result_buffer_lens']
1419 buffers = msg['buffers']
1420 buffers = msg['buffers']
1420 has_bufs = buffer_lens is not None
1421 has_bufs = buffer_lens is not None
1421 has_rbufs = result_buffer_lens is not None
1422 has_rbufs = result_buffer_lens is not None
1422 for i,rec in enumerate(records):
1423 for i,rec in enumerate(records):
1423 # relink buffers
1424 # relink buffers
1424 if has_bufs:
1425 if has_bufs:
1425 blen = buffer_lens[i]
1426 blen = buffer_lens[i]
1426 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1427 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1427 if has_rbufs:
1428 if has_rbufs:
1428 blen = result_buffer_lens[i]
1429 blen = result_buffer_lens[i]
1429 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1430 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1430
1431
1431 return records
1432 return records
1432
1433
1433 __all__ = [ 'Client' ]
1434 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now