##// END OF EJS Templates
store execute_replies in a nice wrapper
MinRK -
Show More
@@ -1,1566 +1,1620 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35
35
36 from IPython.utils.jsonutil import rekey
36 from IPython.utils.jsonutil import rekey
37 from IPython.utils.localinterfaces import LOCAL_IPS
37 from IPython.utils.localinterfaces import LOCAL_IPS
38 from IPython.utils.path import get_ipython_dir
38 from IPython.utils.path import get_ipython_dir
39 from IPython.utils.py3compat import cast_bytes
39 from IPython.utils.py3compat import cast_bytes
40 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
40 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
41 Dict, List, Bool, Set, Any)
41 Dict, List, Bool, Set, Any)
42 from IPython.external.decorator import decorator
42 from IPython.external.decorator import decorator
43 from IPython.external.ssh import tunnel
43 from IPython.external.ssh import tunnel
44
44
45 from IPython.parallel import Reference
45 from IPython.parallel import Reference
46 from IPython.parallel import error
46 from IPython.parallel import error
47 from IPython.parallel import util
47 from IPython.parallel import util
48
48
49 from IPython.zmq.session import Session, Message
49 from IPython.zmq.session import Session, Message
50
50
51 from .asyncresult import AsyncResult, AsyncHubResult
51 from .asyncresult import AsyncResult, AsyncHubResult
52 from IPython.core.profiledir import ProfileDir, ProfileDirError
52 from IPython.core.profiledir import ProfileDir, ProfileDirError
53 from .view import DirectView, LoadBalancedView
53 from .view import DirectView, LoadBalancedView
54
54
55 if sys.version_info[0] >= 3:
55 if sys.version_info[0] >= 3:
56 # xrange is used in a couple 'isinstance' tests in py2
56 # xrange is used in a couple 'isinstance' tests in py2
57 # should be just 'range' in 3k
57 # should be just 'range' in 3k
58 xrange = range
58 xrange = range
59
59
60 #--------------------------------------------------------------------------
60 #--------------------------------------------------------------------------
61 # Decorators for Client methods
61 # Decorators for Client methods
62 #--------------------------------------------------------------------------
62 #--------------------------------------------------------------------------
63
63
64 @decorator
64 @decorator
65 def spin_first(f, self, *args, **kwargs):
65 def spin_first(f, self, *args, **kwargs):
66 """Call spin() to sync state prior to calling the method."""
66 """Call spin() to sync state prior to calling the method."""
67 self.spin()
67 self.spin()
68 return f(self, *args, **kwargs)
68 return f(self, *args, **kwargs)
69
69
70
70
71 #--------------------------------------------------------------------------
71 #--------------------------------------------------------------------------
72 # Classes
72 # Classes
73 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
74
74
75
76 class ExecuteReply(object):
77 """wrapper for finished Execute results"""
78 def __init__(self, msg_id, content, metadata):
79 self.msg_id = msg_id
80 self._content = content
81 self.execution_count = content['execution_count']
82 self.metadata = metadata
83
84 def __getitem__(self, key):
85 return self.metadata[key]
86
87 def __getattr__(self, key):
88 if key not in self.metadata:
89 raise AttributeError(key)
90 return self.metadata[key]
91
92 def __repr__(self):
93 pyout = self.metadata['pyout'] or {}
94 text_out = pyout.get('text/plain', '')
95 if len(text_out) > 32:
96 text_out = text_out[:29] + '...'
97
98 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
99
100 def _repr_html_(self):
101 pyout = self.metadata['pyout'] or {}
102 return pyout.get("text/html")
103
104 def _repr_latex_(self):
105 pyout = self.metadata['pyout'] or {}
106 return pyout.get("text/latex")
107
108 def _repr_json_(self):
109 pyout = self.metadata['pyout'] or {}
110 return pyout.get("application/json")
111
112 def _repr_javascript_(self):
113 pyout = self.metadata['pyout'] or {}
114 return pyout.get("application/javascript")
115
116 def _repr_png_(self):
117 pyout = self.metadata['pyout'] or {}
118 return pyout.get("image/png")
119
120 def _repr_jpeg_(self):
121 pyout = self.metadata['pyout'] or {}
122 return pyout.get("image/jpeg")
123
124 def _repr_svg_(self):
125 pyout = self.metadata['pyout'] or {}
126 return pyout.get("image/svg+xml")
127
128
75 class Metadata(dict):
129 class Metadata(dict):
76 """Subclass of dict for initializing metadata values.
130 """Subclass of dict for initializing metadata values.
77
131
78 Attribute access works on keys.
132 Attribute access works on keys.
79
133
80 These objects have a strict set of keys - errors will raise if you try
134 These objects have a strict set of keys - errors will raise if you try
81 to add new keys.
135 to add new keys.
82 """
136 """
83 def __init__(self, *args, **kwargs):
137 def __init__(self, *args, **kwargs):
84 dict.__init__(self)
138 dict.__init__(self)
85 md = {'msg_id' : None,
139 md = {'msg_id' : None,
86 'submitted' : None,
140 'submitted' : None,
87 'started' : None,
141 'started' : None,
88 'completed' : None,
142 'completed' : None,
89 'received' : None,
143 'received' : None,
90 'engine_uuid' : None,
144 'engine_uuid' : None,
91 'engine_id' : None,
145 'engine_id' : None,
92 'follow' : None,
146 'follow' : None,
93 'after' : None,
147 'after' : None,
94 'status' : None,
148 'status' : None,
95
149
96 'pyin' : None,
150 'pyin' : None,
97 'pyout' : None,
151 'pyout' : None,
98 'pyerr' : None,
152 'pyerr' : None,
99 'stdout' : '',
153 'stdout' : '',
100 'stderr' : '',
154 'stderr' : '',
101 'outputs' : [],
155 'outputs' : [],
102 }
156 }
103 self.update(md)
157 self.update(md)
104 self.update(dict(*args, **kwargs))
158 self.update(dict(*args, **kwargs))
105
159
106 def __getattr__(self, key):
160 def __getattr__(self, key):
107 """getattr aliased to getitem"""
161 """getattr aliased to getitem"""
108 if key in self.iterkeys():
162 if key in self.iterkeys():
109 return self[key]
163 return self[key]
110 else:
164 else:
111 raise AttributeError(key)
165 raise AttributeError(key)
112
166
113 def __setattr__(self, key, value):
167 def __setattr__(self, key, value):
114 """setattr aliased to setitem, with strict"""
168 """setattr aliased to setitem, with strict"""
115 if key in self.iterkeys():
169 if key in self.iterkeys():
116 self[key] = value
170 self[key] = value
117 else:
171 else:
118 raise AttributeError(key)
172 raise AttributeError(key)
119
173
120 def __setitem__(self, key, value):
174 def __setitem__(self, key, value):
121 """strict static key enforcement"""
175 """strict static key enforcement"""
122 if key in self.iterkeys():
176 if key in self.iterkeys():
123 dict.__setitem__(self, key, value)
177 dict.__setitem__(self, key, value)
124 else:
178 else:
125 raise KeyError(key)
179 raise KeyError(key)
126
180
127
181
128 class Client(HasTraits):
182 class Client(HasTraits):
129 """A semi-synchronous client to the IPython ZMQ cluster
183 """A semi-synchronous client to the IPython ZMQ cluster
130
184
131 Parameters
185 Parameters
132 ----------
186 ----------
133
187
134 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
188 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
135 Connection information for the Hub's registration. If a json connector
189 Connection information for the Hub's registration. If a json connector
136 file is given, then likely no further configuration is necessary.
190 file is given, then likely no further configuration is necessary.
137 [Default: use profile]
191 [Default: use profile]
138 profile : bytes
192 profile : bytes
139 The name of the Cluster profile to be used to find connector information.
193 The name of the Cluster profile to be used to find connector information.
140 If run from an IPython application, the default profile will be the same
194 If run from an IPython application, the default profile will be the same
141 as the running application, otherwise it will be 'default'.
195 as the running application, otherwise it will be 'default'.
142 context : zmq.Context
196 context : zmq.Context
143 Pass an existing zmq.Context instance, otherwise the client will create its own.
197 Pass an existing zmq.Context instance, otherwise the client will create its own.
144 debug : bool
198 debug : bool
145 flag for lots of message printing for debug purposes
199 flag for lots of message printing for debug purposes
146 timeout : int/float
200 timeout : int/float
147 time (in seconds) to wait for connection replies from the Hub
201 time (in seconds) to wait for connection replies from the Hub
148 [Default: 10]
202 [Default: 10]
149
203
150 #-------------- session related args ----------------
204 #-------------- session related args ----------------
151
205
152 config : Config object
206 config : Config object
153 If specified, this will be relayed to the Session for configuration
207 If specified, this will be relayed to the Session for configuration
154 username : str
208 username : str
155 set username for the session object
209 set username for the session object
156 packer : str (import_string) or callable
210 packer : str (import_string) or callable
157 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
211 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
158 function to serialize messages. Must support same input as
212 function to serialize messages. Must support same input as
159 JSON, and output must be bytes.
213 JSON, and output must be bytes.
160 You can pass a callable directly as `pack`
214 You can pass a callable directly as `pack`
161 unpacker : str (import_string) or callable
215 unpacker : str (import_string) or callable
162 The inverse of packer. Only necessary if packer is specified as *not* one
216 The inverse of packer. Only necessary if packer is specified as *not* one
163 of 'json' or 'pickle'.
217 of 'json' or 'pickle'.
164
218
165 #-------------- ssh related args ----------------
219 #-------------- ssh related args ----------------
166 # These are args for configuring the ssh tunnel to be used
220 # These are args for configuring the ssh tunnel to be used
167 # credentials are used to forward connections over ssh to the Controller
221 # credentials are used to forward connections over ssh to the Controller
168 # Note that the ip given in `addr` needs to be relative to sshserver
222 # Note that the ip given in `addr` needs to be relative to sshserver
169 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
223 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
170 # and set sshserver as the same machine the Controller is on. However,
224 # and set sshserver as the same machine the Controller is on. However,
171 # the only requirement is that sshserver is able to see the Controller
225 # the only requirement is that sshserver is able to see the Controller
172 # (i.e. is within the same trusted network).
226 # (i.e. is within the same trusted network).
173
227
174 sshserver : str
228 sshserver : str
175 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
229 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
176 If keyfile or password is specified, and this is not, it will default to
230 If keyfile or password is specified, and this is not, it will default to
177 the ip given in addr.
231 the ip given in addr.
178 sshkey : str; path to ssh private key file
232 sshkey : str; path to ssh private key file
179 This specifies a key to be used in ssh login, default None.
233 This specifies a key to be used in ssh login, default None.
180 Regular default ssh keys will be used without specifying this argument.
234 Regular default ssh keys will be used without specifying this argument.
181 password : str
235 password : str
182 Your ssh password to sshserver. Note that if this is left None,
236 Your ssh password to sshserver. Note that if this is left None,
183 you will be prompted for it if passwordless key based login is unavailable.
237 you will be prompted for it if passwordless key based login is unavailable.
184 paramiko : bool
238 paramiko : bool
185 flag for whether to use paramiko instead of shell ssh for tunneling.
239 flag for whether to use paramiko instead of shell ssh for tunneling.
186 [default: True on win32, False else]
240 [default: True on win32, False else]
187
241
188 ------- exec authentication args -------
242 ------- exec authentication args -------
189 If even localhost is untrusted, you can have some protection against
243 If even localhost is untrusted, you can have some protection against
190 unauthorized execution by signing messages with HMAC digests.
244 unauthorized execution by signing messages with HMAC digests.
191 Messages are still sent as cleartext, so if someone can snoop your
245 Messages are still sent as cleartext, so if someone can snoop your
192 loopback traffic this will not protect your privacy, but will prevent
246 loopback traffic this will not protect your privacy, but will prevent
193 unauthorized execution.
247 unauthorized execution.
194
248
195 exec_key : str
249 exec_key : str
196 an authentication key or file containing a key
250 an authentication key or file containing a key
197 default: None
251 default: None
198
252
199
253
200 Attributes
254 Attributes
201 ----------
255 ----------
202
256
203 ids : list of int engine IDs
257 ids : list of int engine IDs
204 requesting the ids attribute always synchronizes
258 requesting the ids attribute always synchronizes
205 the registration state. To request ids without synchronization,
259 the registration state. To request ids without synchronization,
206 use semi-private _ids attributes.
260 use semi-private _ids attributes.
207
261
208 history : list of msg_ids
262 history : list of msg_ids
209 a list of msg_ids, keeping track of all the execution
263 a list of msg_ids, keeping track of all the execution
210 messages you have submitted in order.
264 messages you have submitted in order.
211
265
212 outstanding : set of msg_ids
266 outstanding : set of msg_ids
213 a set of msg_ids that have been submitted, but whose
267 a set of msg_ids that have been submitted, but whose
214 results have not yet been received.
268 results have not yet been received.
215
269
216 results : dict
270 results : dict
217 a dict of all our results, keyed by msg_id
271 a dict of all our results, keyed by msg_id
218
272
219 block : bool
273 block : bool
220 determines default behavior when block not specified
274 determines default behavior when block not specified
221 in execution methods
275 in execution methods
222
276
223 Methods
277 Methods
224 -------
278 -------
225
279
226 spin
280 spin
227 flushes incoming results and registration state changes
281 flushes incoming results and registration state changes
228 control methods spin, and requesting `ids` also ensures up to date
282 control methods spin, and requesting `ids` also ensures up to date
229
283
230 wait
284 wait
231 wait on one or more msg_ids
285 wait on one or more msg_ids
232
286
233 execution methods
287 execution methods
234 apply
288 apply
235 legacy: execute, run
289 legacy: execute, run
236
290
237 data movement
291 data movement
238 push, pull, scatter, gather
292 push, pull, scatter, gather
239
293
240 query methods
294 query methods
241 queue_status, get_result, purge, result_status
295 queue_status, get_result, purge, result_status
242
296
243 control methods
297 control methods
244 abort, shutdown
298 abort, shutdown
245
299
246 """
300 """
247
301
248
302
249 block = Bool(False)
303 block = Bool(False)
250 outstanding = Set()
304 outstanding = Set()
251 results = Instance('collections.defaultdict', (dict,))
305 results = Instance('collections.defaultdict', (dict,))
252 metadata = Instance('collections.defaultdict', (Metadata,))
306 metadata = Instance('collections.defaultdict', (Metadata,))
253 history = List()
307 history = List()
254 debug = Bool(False)
308 debug = Bool(False)
255 _spin_thread = Any()
309 _spin_thread = Any()
256 _stop_spinning = Any()
310 _stop_spinning = Any()
257
311
258 profile=Unicode()
312 profile=Unicode()
259 def _profile_default(self):
313 def _profile_default(self):
260 if BaseIPythonApplication.initialized():
314 if BaseIPythonApplication.initialized():
261 # an IPython app *might* be running, try to get its profile
315 # an IPython app *might* be running, try to get its profile
262 try:
316 try:
263 return BaseIPythonApplication.instance().profile
317 return BaseIPythonApplication.instance().profile
264 except (AttributeError, MultipleInstanceError):
318 except (AttributeError, MultipleInstanceError):
265 # could be a *different* subclass of config.Application,
319 # could be a *different* subclass of config.Application,
266 # which would raise one of these two errors.
320 # which would raise one of these two errors.
267 return u'default'
321 return u'default'
268 else:
322 else:
269 return u'default'
323 return u'default'
270
324
271
325
272 _outstanding_dict = Instance('collections.defaultdict', (set,))
326 _outstanding_dict = Instance('collections.defaultdict', (set,))
273 _ids = List()
327 _ids = List()
274 _connected=Bool(False)
328 _connected=Bool(False)
275 _ssh=Bool(False)
329 _ssh=Bool(False)
276 _context = Instance('zmq.Context')
330 _context = Instance('zmq.Context')
277 _config = Dict()
331 _config = Dict()
278 _engines=Instance(util.ReverseDict, (), {})
332 _engines=Instance(util.ReverseDict, (), {})
279 # _hub_socket=Instance('zmq.Socket')
333 # _hub_socket=Instance('zmq.Socket')
280 _query_socket=Instance('zmq.Socket')
334 _query_socket=Instance('zmq.Socket')
281 _control_socket=Instance('zmq.Socket')
335 _control_socket=Instance('zmq.Socket')
282 _iopub_socket=Instance('zmq.Socket')
336 _iopub_socket=Instance('zmq.Socket')
283 _notification_socket=Instance('zmq.Socket')
337 _notification_socket=Instance('zmq.Socket')
284 _mux_socket=Instance('zmq.Socket')
338 _mux_socket=Instance('zmq.Socket')
285 _task_socket=Instance('zmq.Socket')
339 _task_socket=Instance('zmq.Socket')
286 _task_scheme=Unicode()
340 _task_scheme=Unicode()
287 _closed = False
341 _closed = False
288 _ignored_control_replies=Integer(0)
342 _ignored_control_replies=Integer(0)
289 _ignored_hub_replies=Integer(0)
343 _ignored_hub_replies=Integer(0)
290
344
291 def __new__(self, *args, **kw):
345 def __new__(self, *args, **kw):
292 # don't raise on positional args
346 # don't raise on positional args
293 return HasTraits.__new__(self, **kw)
347 return HasTraits.__new__(self, **kw)
294
348
295 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
349 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
296 context=None, debug=False, exec_key=None,
350 context=None, debug=False, exec_key=None,
297 sshserver=None, sshkey=None, password=None, paramiko=None,
351 sshserver=None, sshkey=None, password=None, paramiko=None,
298 timeout=10, **extra_args
352 timeout=10, **extra_args
299 ):
353 ):
300 if profile:
354 if profile:
301 super(Client, self).__init__(debug=debug, profile=profile)
355 super(Client, self).__init__(debug=debug, profile=profile)
302 else:
356 else:
303 super(Client, self).__init__(debug=debug)
357 super(Client, self).__init__(debug=debug)
304 if context is None:
358 if context is None:
305 context = zmq.Context.instance()
359 context = zmq.Context.instance()
306 self._context = context
360 self._context = context
307 self._stop_spinning = Event()
361 self._stop_spinning = Event()
308
362
309 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
363 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
310 if self._cd is not None:
364 if self._cd is not None:
311 if url_or_file is None:
365 if url_or_file is None:
312 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
366 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
313 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
367 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
314 " Please specify at least one of url_or_file or profile."
368 " Please specify at least one of url_or_file or profile."
315
369
316 if not util.is_url(url_or_file):
370 if not util.is_url(url_or_file):
317 # it's not a url, try for a file
371 # it's not a url, try for a file
318 if not os.path.exists(url_or_file):
372 if not os.path.exists(url_or_file):
319 if self._cd:
373 if self._cd:
320 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
374 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
321 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
375 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
322 with open(url_or_file) as f:
376 with open(url_or_file) as f:
323 cfg = json.loads(f.read())
377 cfg = json.loads(f.read())
324 else:
378 else:
325 cfg = {'url':url_or_file}
379 cfg = {'url':url_or_file}
326
380
327 # sync defaults from args, json:
381 # sync defaults from args, json:
328 if sshserver:
382 if sshserver:
329 cfg['ssh'] = sshserver
383 cfg['ssh'] = sshserver
330 if exec_key:
384 if exec_key:
331 cfg['exec_key'] = exec_key
385 cfg['exec_key'] = exec_key
332 exec_key = cfg['exec_key']
386 exec_key = cfg['exec_key']
333 location = cfg.setdefault('location', None)
387 location = cfg.setdefault('location', None)
334 cfg['url'] = util.disambiguate_url(cfg['url'], location)
388 cfg['url'] = util.disambiguate_url(cfg['url'], location)
335 url = cfg['url']
389 url = cfg['url']
336 proto,addr,port = util.split_url(url)
390 proto,addr,port = util.split_url(url)
337 if location is not None and addr == '127.0.0.1':
391 if location is not None and addr == '127.0.0.1':
338 # location specified, and connection is expected to be local
392 # location specified, and connection is expected to be local
339 if location not in LOCAL_IPS and not sshserver:
393 if location not in LOCAL_IPS and not sshserver:
340 # load ssh from JSON *only* if the controller is not on
394 # load ssh from JSON *only* if the controller is not on
341 # this machine
395 # this machine
342 sshserver=cfg['ssh']
396 sshserver=cfg['ssh']
343 if location not in LOCAL_IPS and not sshserver:
397 if location not in LOCAL_IPS and not sshserver:
344 # warn if no ssh specified, but SSH is probably needed
398 # warn if no ssh specified, but SSH is probably needed
345 # This is only a warning, because the most likely cause
399 # This is only a warning, because the most likely cause
346 # is a local Controller on a laptop whose IP is dynamic
400 # is a local Controller on a laptop whose IP is dynamic
347 warnings.warn("""
401 warnings.warn("""
348 Controller appears to be listening on localhost, but not on this machine.
402 Controller appears to be listening on localhost, but not on this machine.
349 If this is true, you should specify Client(...,sshserver='you@%s')
403 If this is true, you should specify Client(...,sshserver='you@%s')
350 or instruct your controller to listen on an external IP."""%location,
404 or instruct your controller to listen on an external IP."""%location,
351 RuntimeWarning)
405 RuntimeWarning)
352 elif not sshserver:
406 elif not sshserver:
353 # otherwise sync with cfg
407 # otherwise sync with cfg
354 sshserver = cfg['ssh']
408 sshserver = cfg['ssh']
355
409
356 self._config = cfg
410 self._config = cfg
357
411
358 self._ssh = bool(sshserver or sshkey or password)
412 self._ssh = bool(sshserver or sshkey or password)
359 if self._ssh and sshserver is None:
413 if self._ssh and sshserver is None:
360 # default to ssh via localhost
414 # default to ssh via localhost
361 sshserver = url.split('://')[1].split(':')[0]
415 sshserver = url.split('://')[1].split(':')[0]
362 if self._ssh and password is None:
416 if self._ssh and password is None:
363 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
417 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
364 password=False
418 password=False
365 else:
419 else:
366 password = getpass("SSH Password for %s: "%sshserver)
420 password = getpass("SSH Password for %s: "%sshserver)
367 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
421 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
368
422
369 # configure and construct the session
423 # configure and construct the session
370 if exec_key is not None:
424 if exec_key is not None:
371 if os.path.isfile(exec_key):
425 if os.path.isfile(exec_key):
372 extra_args['keyfile'] = exec_key
426 extra_args['keyfile'] = exec_key
373 else:
427 else:
374 exec_key = cast_bytes(exec_key)
428 exec_key = cast_bytes(exec_key)
375 extra_args['key'] = exec_key
429 extra_args['key'] = exec_key
376 self.session = Session(**extra_args)
430 self.session = Session(**extra_args)
377
431
378 self._query_socket = self._context.socket(zmq.DEALER)
432 self._query_socket = self._context.socket(zmq.DEALER)
379 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
433 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
380 if self._ssh:
434 if self._ssh:
381 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
435 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
382 else:
436 else:
383 self._query_socket.connect(url)
437 self._query_socket.connect(url)
384
438
385 self.session.debug = self.debug
439 self.session.debug = self.debug
386
440
387 self._notification_handlers = {'registration_notification' : self._register_engine,
441 self._notification_handlers = {'registration_notification' : self._register_engine,
388 'unregistration_notification' : self._unregister_engine,
442 'unregistration_notification' : self._unregister_engine,
389 'shutdown_notification' : lambda msg: self.close(),
443 'shutdown_notification' : lambda msg: self.close(),
390 }
444 }
391 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
445 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
392 'apply_reply' : self._handle_apply_reply}
446 'apply_reply' : self._handle_apply_reply}
393 self._connect(sshserver, ssh_kwargs, timeout)
447 self._connect(sshserver, ssh_kwargs, timeout)
394
448
395 def __del__(self):
449 def __del__(self):
396 """cleanup sockets, but _not_ context."""
450 """cleanup sockets, but _not_ context."""
397 self.close()
451 self.close()
398
452
399 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
453 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
400 if ipython_dir is None:
454 if ipython_dir is None:
401 ipython_dir = get_ipython_dir()
455 ipython_dir = get_ipython_dir()
402 if profile_dir is not None:
456 if profile_dir is not None:
403 try:
457 try:
404 self._cd = ProfileDir.find_profile_dir(profile_dir)
458 self._cd = ProfileDir.find_profile_dir(profile_dir)
405 return
459 return
406 except ProfileDirError:
460 except ProfileDirError:
407 pass
461 pass
408 elif profile is not None:
462 elif profile is not None:
409 try:
463 try:
410 self._cd = ProfileDir.find_profile_dir_by_name(
464 self._cd = ProfileDir.find_profile_dir_by_name(
411 ipython_dir, profile)
465 ipython_dir, profile)
412 return
466 return
413 except ProfileDirError:
467 except ProfileDirError:
414 pass
468 pass
415 self._cd = None
469 self._cd = None
416
470
417 def _update_engines(self, engines):
471 def _update_engines(self, engines):
418 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
472 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
419 for k,v in engines.iteritems():
473 for k,v in engines.iteritems():
420 eid = int(k)
474 eid = int(k)
421 self._engines[eid] = v
475 self._engines[eid] = v
422 self._ids.append(eid)
476 self._ids.append(eid)
423 self._ids = sorted(self._ids)
477 self._ids = sorted(self._ids)
424 if sorted(self._engines.keys()) != range(len(self._engines)) and \
478 if sorted(self._engines.keys()) != range(len(self._engines)) and \
425 self._task_scheme == 'pure' and self._task_socket:
479 self._task_scheme == 'pure' and self._task_socket:
426 self._stop_scheduling_tasks()
480 self._stop_scheduling_tasks()
427
481
428 def _stop_scheduling_tasks(self):
482 def _stop_scheduling_tasks(self):
429 """Stop scheduling tasks because an engine has been unregistered
483 """Stop scheduling tasks because an engine has been unregistered
430 from a pure ZMQ scheduler.
484 from a pure ZMQ scheduler.
431 """
485 """
432 self._task_socket.close()
486 self._task_socket.close()
433 self._task_socket = None
487 self._task_socket = None
434 msg = "An engine has been unregistered, and we are using pure " +\
488 msg = "An engine has been unregistered, and we are using pure " +\
435 "ZMQ task scheduling. Task farming will be disabled."
489 "ZMQ task scheduling. Task farming will be disabled."
436 if self.outstanding:
490 if self.outstanding:
437 msg += " If you were running tasks when this happened, " +\
491 msg += " If you were running tasks when this happened, " +\
438 "some `outstanding` msg_ids may never resolve."
492 "some `outstanding` msg_ids may never resolve."
439 warnings.warn(msg, RuntimeWarning)
493 warnings.warn(msg, RuntimeWarning)
440
494
441 def _build_targets(self, targets):
495 def _build_targets(self, targets):
442 """Turn valid target IDs or 'all' into two lists:
496 """Turn valid target IDs or 'all' into two lists:
443 (int_ids, uuids).
497 (int_ids, uuids).
444 """
498 """
445 if not self._ids:
499 if not self._ids:
446 # flush notification socket if no engines yet, just in case
500 # flush notification socket if no engines yet, just in case
447 if not self.ids:
501 if not self.ids:
448 raise error.NoEnginesRegistered("Can't build targets without any engines")
502 raise error.NoEnginesRegistered("Can't build targets without any engines")
449
503
450 if targets is None:
504 if targets is None:
451 targets = self._ids
505 targets = self._ids
452 elif isinstance(targets, basestring):
506 elif isinstance(targets, basestring):
453 if targets.lower() == 'all':
507 if targets.lower() == 'all':
454 targets = self._ids
508 targets = self._ids
455 else:
509 else:
456 raise TypeError("%r not valid str target, must be 'all'"%(targets))
510 raise TypeError("%r not valid str target, must be 'all'"%(targets))
457 elif isinstance(targets, int):
511 elif isinstance(targets, int):
458 if targets < 0:
512 if targets < 0:
459 targets = self.ids[targets]
513 targets = self.ids[targets]
460 if targets not in self._ids:
514 if targets not in self._ids:
461 raise IndexError("No such engine: %i"%targets)
515 raise IndexError("No such engine: %i"%targets)
462 targets = [targets]
516 targets = [targets]
463
517
464 if isinstance(targets, slice):
518 if isinstance(targets, slice):
465 indices = range(len(self._ids))[targets]
519 indices = range(len(self._ids))[targets]
466 ids = self.ids
520 ids = self.ids
467 targets = [ ids[i] for i in indices ]
521 targets = [ ids[i] for i in indices ]
468
522
469 if not isinstance(targets, (tuple, list, xrange)):
523 if not isinstance(targets, (tuple, list, xrange)):
470 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
524 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
471
525
472 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
526 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
473
527
474 def _connect(self, sshserver, ssh_kwargs, timeout):
528 def _connect(self, sshserver, ssh_kwargs, timeout):
475 """setup all our socket connections to the cluster. This is called from
529 """setup all our socket connections to the cluster. This is called from
476 __init__."""
530 __init__."""
477
531
478 # Maybe allow reconnecting?
532 # Maybe allow reconnecting?
479 if self._connected:
533 if self._connected:
480 return
534 return
481 self._connected=True
535 self._connected=True
482
536
483 def connect_socket(s, url):
537 def connect_socket(s, url):
484 url = util.disambiguate_url(url, self._config['location'])
538 url = util.disambiguate_url(url, self._config['location'])
485 if self._ssh:
539 if self._ssh:
486 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
540 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
487 else:
541 else:
488 return s.connect(url)
542 return s.connect(url)
489
543
490 self.session.send(self._query_socket, 'connection_request')
544 self.session.send(self._query_socket, 'connection_request')
491 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
545 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
492 poller = zmq.Poller()
546 poller = zmq.Poller()
493 poller.register(self._query_socket, zmq.POLLIN)
547 poller.register(self._query_socket, zmq.POLLIN)
494 # poll expects milliseconds, timeout is seconds
548 # poll expects milliseconds, timeout is seconds
495 evts = poller.poll(timeout*1000)
549 evts = poller.poll(timeout*1000)
496 if not evts:
550 if not evts:
497 raise error.TimeoutError("Hub connection request timed out")
551 raise error.TimeoutError("Hub connection request timed out")
498 idents,msg = self.session.recv(self._query_socket,mode=0)
552 idents,msg = self.session.recv(self._query_socket,mode=0)
499 if self.debug:
553 if self.debug:
500 pprint(msg)
554 pprint(msg)
501 msg = Message(msg)
555 msg = Message(msg)
502 content = msg.content
556 content = msg.content
503 self._config['registration'] = dict(content)
557 self._config['registration'] = dict(content)
504 if content.status == 'ok':
558 if content.status == 'ok':
505 ident = self.session.bsession
559 ident = self.session.bsession
506 if content.mux:
560 if content.mux:
507 self._mux_socket = self._context.socket(zmq.DEALER)
561 self._mux_socket = self._context.socket(zmq.DEALER)
508 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
562 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
509 connect_socket(self._mux_socket, content.mux)
563 connect_socket(self._mux_socket, content.mux)
510 if content.task:
564 if content.task:
511 self._task_scheme, task_addr = content.task
565 self._task_scheme, task_addr = content.task
512 self._task_socket = self._context.socket(zmq.DEALER)
566 self._task_socket = self._context.socket(zmq.DEALER)
513 self._task_socket.setsockopt(zmq.IDENTITY, ident)
567 self._task_socket.setsockopt(zmq.IDENTITY, ident)
514 connect_socket(self._task_socket, task_addr)
568 connect_socket(self._task_socket, task_addr)
515 if content.notification:
569 if content.notification:
516 self._notification_socket = self._context.socket(zmq.SUB)
570 self._notification_socket = self._context.socket(zmq.SUB)
517 connect_socket(self._notification_socket, content.notification)
571 connect_socket(self._notification_socket, content.notification)
518 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
572 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
519 # if content.query:
573 # if content.query:
520 # self._query_socket = self._context.socket(zmq.DEALER)
574 # self._query_socket = self._context.socket(zmq.DEALER)
521 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
575 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
522 # connect_socket(self._query_socket, content.query)
576 # connect_socket(self._query_socket, content.query)
523 if content.control:
577 if content.control:
524 self._control_socket = self._context.socket(zmq.DEALER)
578 self._control_socket = self._context.socket(zmq.DEALER)
525 self._control_socket.setsockopt(zmq.IDENTITY, ident)
579 self._control_socket.setsockopt(zmq.IDENTITY, ident)
526 connect_socket(self._control_socket, content.control)
580 connect_socket(self._control_socket, content.control)
527 if content.iopub:
581 if content.iopub:
528 self._iopub_socket = self._context.socket(zmq.SUB)
582 self._iopub_socket = self._context.socket(zmq.SUB)
529 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
583 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
530 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
584 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
531 connect_socket(self._iopub_socket, content.iopub)
585 connect_socket(self._iopub_socket, content.iopub)
532 self._update_engines(dict(content.engines))
586 self._update_engines(dict(content.engines))
533 else:
587 else:
534 self._connected = False
588 self._connected = False
535 raise Exception("Failed to connect!")
589 raise Exception("Failed to connect!")
536
590
537 #--------------------------------------------------------------------------
591 #--------------------------------------------------------------------------
538 # handlers and callbacks for incoming messages
592 # handlers and callbacks for incoming messages
539 #--------------------------------------------------------------------------
593 #--------------------------------------------------------------------------
540
594
541 def _unwrap_exception(self, content):
595 def _unwrap_exception(self, content):
542 """unwrap exception, and remap engine_id to int."""
596 """unwrap exception, and remap engine_id to int."""
543 e = error.unwrap_exception(content)
597 e = error.unwrap_exception(content)
544 # print e.traceback
598 # print e.traceback
545 if e.engine_info:
599 if e.engine_info:
546 e_uuid = e.engine_info['engine_uuid']
600 e_uuid = e.engine_info['engine_uuid']
547 eid = self._engines[e_uuid]
601 eid = self._engines[e_uuid]
548 e.engine_info['engine_id'] = eid
602 e.engine_info['engine_id'] = eid
549 return e
603 return e
550
604
551 def _extract_metadata(self, header, parent, content):
605 def _extract_metadata(self, header, parent, content):
552 md = {'msg_id' : parent['msg_id'],
606 md = {'msg_id' : parent['msg_id'],
553 'received' : datetime.now(),
607 'received' : datetime.now(),
554 'engine_uuid' : header.get('engine', None),
608 'engine_uuid' : header.get('engine', None),
555 'follow' : parent.get('follow', []),
609 'follow' : parent.get('follow', []),
556 'after' : parent.get('after', []),
610 'after' : parent.get('after', []),
557 'status' : content['status'],
611 'status' : content['status'],
558 }
612 }
559
613
560 if md['engine_uuid'] is not None:
614 if md['engine_uuid'] is not None:
561 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
615 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
562
616
563 if 'date' in parent:
617 if 'date' in parent:
564 md['submitted'] = parent['date']
618 md['submitted'] = parent['date']
565 if 'started' in header:
619 if 'started' in header:
566 md['started'] = header['started']
620 md['started'] = header['started']
567 if 'date' in header:
621 if 'date' in header:
568 md['completed'] = header['date']
622 md['completed'] = header['date']
569 return md
623 return md
570
624
571 def _register_engine(self, msg):
625 def _register_engine(self, msg):
572 """Register a new engine, and update our connection info."""
626 """Register a new engine, and update our connection info."""
573 content = msg['content']
627 content = msg['content']
574 eid = content['id']
628 eid = content['id']
575 d = {eid : content['queue']}
629 d = {eid : content['queue']}
576 self._update_engines(d)
630 self._update_engines(d)
577
631
578 def _unregister_engine(self, msg):
632 def _unregister_engine(self, msg):
579 """Unregister an engine that has died."""
633 """Unregister an engine that has died."""
580 content = msg['content']
634 content = msg['content']
581 eid = int(content['id'])
635 eid = int(content['id'])
582 if eid in self._ids:
636 if eid in self._ids:
583 self._ids.remove(eid)
637 self._ids.remove(eid)
584 uuid = self._engines.pop(eid)
638 uuid = self._engines.pop(eid)
585
639
586 self._handle_stranded_msgs(eid, uuid)
640 self._handle_stranded_msgs(eid, uuid)
587
641
588 if self._task_socket and self._task_scheme == 'pure':
642 if self._task_socket and self._task_scheme == 'pure':
589 self._stop_scheduling_tasks()
643 self._stop_scheduling_tasks()
590
644
591 def _handle_stranded_msgs(self, eid, uuid):
645 def _handle_stranded_msgs(self, eid, uuid):
592 """Handle messages known to be on an engine when the engine unregisters.
646 """Handle messages known to be on an engine when the engine unregisters.
593
647
594 It is possible that this will fire prematurely - that is, an engine will
648 It is possible that this will fire prematurely - that is, an engine will
595 go down after completing a result, and the client will be notified
649 go down after completing a result, and the client will be notified
596 of the unregistration and later receive the successful result.
650 of the unregistration and later receive the successful result.
597 """
651 """
598
652
599 outstanding = self._outstanding_dict[uuid]
653 outstanding = self._outstanding_dict[uuid]
600
654
601 for msg_id in list(outstanding):
655 for msg_id in list(outstanding):
602 if msg_id in self.results:
656 if msg_id in self.results:
603 # we already
657 # we already
604 continue
658 continue
605 try:
659 try:
606 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
660 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
607 except:
661 except:
608 content = error.wrap_exception()
662 content = error.wrap_exception()
609 # build a fake message:
663 # build a fake message:
610 parent = {}
664 parent = {}
611 header = {}
665 header = {}
612 parent['msg_id'] = msg_id
666 parent['msg_id'] = msg_id
613 header['engine'] = uuid
667 header['engine'] = uuid
614 header['date'] = datetime.now()
668 header['date'] = datetime.now()
615 msg = dict(parent_header=parent, header=header, content=content)
669 msg = dict(parent_header=parent, header=header, content=content)
616 self._handle_apply_reply(msg)
670 self._handle_apply_reply(msg)
617
671
618 def _handle_execute_reply(self, msg):
672 def _handle_execute_reply(self, msg):
619 """Save the reply to an execute_request into our results.
673 """Save the reply to an execute_request into our results.
620
674
621 execute messages are never actually used. apply is used instead.
675 execute messages are never actually used. apply is used instead.
622 """
676 """
623
677
624 parent = msg['parent_header']
678 parent = msg['parent_header']
625 msg_id = parent['msg_id']
679 msg_id = parent['msg_id']
626 if msg_id not in self.outstanding:
680 if msg_id not in self.outstanding:
627 if msg_id in self.history:
681 if msg_id in self.history:
628 print ("got stale result: %s"%msg_id)
682 print ("got stale result: %s"%msg_id)
629 else:
683 else:
630 print ("got unknown result: %s"%msg_id)
684 print ("got unknown result: %s"%msg_id)
631 else:
685 else:
632 self.outstanding.remove(msg_id)
686 self.outstanding.remove(msg_id)
633
687
634 content = msg['content']
688 content = msg['content']
635 header = msg['header']
689 header = msg['header']
636
690
637 # construct metadata:
691 # construct metadata:
638 md = self.metadata[msg_id]
692 md = self.metadata[msg_id]
639 md.update(self._extract_metadata(header, parent, content))
693 md.update(self._extract_metadata(header, parent, content))
640 # is this redundant?
694 # is this redundant?
641 self.metadata[msg_id] = md
695 self.metadata[msg_id] = md
642
696
643 e_outstanding = self._outstanding_dict[md['engine_uuid']]
697 e_outstanding = self._outstanding_dict[md['engine_uuid']]
644 if msg_id in e_outstanding:
698 if msg_id in e_outstanding:
645 e_outstanding.remove(msg_id)
699 e_outstanding.remove(msg_id)
646
700
647 # construct result:
701 # construct result:
648 if content['status'] == 'ok':
702 if content['status'] == 'ok':
649 self.results[msg_id] = content
703 self.results[msg_id] = ExecuteReply(msg_id, content, md)
650 elif content['status'] == 'aborted':
704 elif content['status'] == 'aborted':
651 self.results[msg_id] = error.TaskAborted(msg_id)
705 self.results[msg_id] = error.TaskAborted(msg_id)
652 elif content['status'] == 'resubmitted':
706 elif content['status'] == 'resubmitted':
653 # TODO: handle resubmission
707 # TODO: handle resubmission
654 pass
708 pass
655 else:
709 else:
656 self.results[msg_id] = self._unwrap_exception(content)
710 self.results[msg_id] = self._unwrap_exception(content)
657
711
658 def _handle_apply_reply(self, msg):
712 def _handle_apply_reply(self, msg):
659 """Save the reply to an apply_request into our results."""
713 """Save the reply to an apply_request into our results."""
660 parent = msg['parent_header']
714 parent = msg['parent_header']
661 msg_id = parent['msg_id']
715 msg_id = parent['msg_id']
662 if msg_id not in self.outstanding:
716 if msg_id not in self.outstanding:
663 if msg_id in self.history:
717 if msg_id in self.history:
664 print ("got stale result: %s"%msg_id)
718 print ("got stale result: %s"%msg_id)
665 print self.results[msg_id]
719 print self.results[msg_id]
666 print msg
720 print msg
667 else:
721 else:
668 print ("got unknown result: %s"%msg_id)
722 print ("got unknown result: %s"%msg_id)
669 else:
723 else:
670 self.outstanding.remove(msg_id)
724 self.outstanding.remove(msg_id)
671 content = msg['content']
725 content = msg['content']
672 header = msg['header']
726 header = msg['header']
673
727
674 # construct metadata:
728 # construct metadata:
675 md = self.metadata[msg_id]
729 md = self.metadata[msg_id]
676 md.update(self._extract_metadata(header, parent, content))
730 md.update(self._extract_metadata(header, parent, content))
677 # is this redundant?
731 # is this redundant?
678 self.metadata[msg_id] = md
732 self.metadata[msg_id] = md
679
733
680 e_outstanding = self._outstanding_dict[md['engine_uuid']]
734 e_outstanding = self._outstanding_dict[md['engine_uuid']]
681 if msg_id in e_outstanding:
735 if msg_id in e_outstanding:
682 e_outstanding.remove(msg_id)
736 e_outstanding.remove(msg_id)
683
737
684 # construct result:
738 # construct result:
685 if content['status'] == 'ok':
739 if content['status'] == 'ok':
686 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
740 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
687 elif content['status'] == 'aborted':
741 elif content['status'] == 'aborted':
688 self.results[msg_id] = error.TaskAborted(msg_id)
742 self.results[msg_id] = error.TaskAborted(msg_id)
689 elif content['status'] == 'resubmitted':
743 elif content['status'] == 'resubmitted':
690 # TODO: handle resubmission
744 # TODO: handle resubmission
691 pass
745 pass
692 else:
746 else:
693 self.results[msg_id] = self._unwrap_exception(content)
747 self.results[msg_id] = self._unwrap_exception(content)
694
748
695 def _flush_notifications(self):
749 def _flush_notifications(self):
696 """Flush notifications of engine registrations waiting
750 """Flush notifications of engine registrations waiting
697 in ZMQ queue."""
751 in ZMQ queue."""
698 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
752 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
699 while msg is not None:
753 while msg is not None:
700 if self.debug:
754 if self.debug:
701 pprint(msg)
755 pprint(msg)
702 msg_type = msg['header']['msg_type']
756 msg_type = msg['header']['msg_type']
703 handler = self._notification_handlers.get(msg_type, None)
757 handler = self._notification_handlers.get(msg_type, None)
704 if handler is None:
758 if handler is None:
705 raise Exception("Unhandled message type: %s"%msg.msg_type)
759 raise Exception("Unhandled message type: %s"%msg.msg_type)
706 else:
760 else:
707 handler(msg)
761 handler(msg)
708 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
762 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
709
763
710 def _flush_results(self, sock):
764 def _flush_results(self, sock):
711 """Flush task or queue results waiting in ZMQ queue."""
765 """Flush task or queue results waiting in ZMQ queue."""
712 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
766 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
713 while msg is not None:
767 while msg is not None:
714 if self.debug:
768 if self.debug:
715 pprint(msg)
769 pprint(msg)
716 msg_type = msg['header']['msg_type']
770 msg_type = msg['header']['msg_type']
717 handler = self._queue_handlers.get(msg_type, None)
771 handler = self._queue_handlers.get(msg_type, None)
718 if handler is None:
772 if handler is None:
719 raise Exception("Unhandled message type: %s"%msg.msg_type)
773 raise Exception("Unhandled message type: %s"%msg.msg_type)
720 else:
774 else:
721 handler(msg)
775 handler(msg)
722 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
776 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
723
777
724 def _flush_control(self, sock):
778 def _flush_control(self, sock):
725 """Flush replies from the control channel waiting
779 """Flush replies from the control channel waiting
726 in the ZMQ queue.
780 in the ZMQ queue.
727
781
728 Currently: ignore them."""
782 Currently: ignore them."""
729 if self._ignored_control_replies <= 0:
783 if self._ignored_control_replies <= 0:
730 return
784 return
731 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
785 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
732 while msg is not None:
786 while msg is not None:
733 self._ignored_control_replies -= 1
787 self._ignored_control_replies -= 1
734 if self.debug:
788 if self.debug:
735 pprint(msg)
789 pprint(msg)
736 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
790 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
737
791
738 def _flush_ignored_control(self):
792 def _flush_ignored_control(self):
739 """flush ignored control replies"""
793 """flush ignored control replies"""
740 while self._ignored_control_replies > 0:
794 while self._ignored_control_replies > 0:
741 self.session.recv(self._control_socket)
795 self.session.recv(self._control_socket)
742 self._ignored_control_replies -= 1
796 self._ignored_control_replies -= 1
743
797
744 def _flush_ignored_hub_replies(self):
798 def _flush_ignored_hub_replies(self):
745 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
799 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
746 while msg is not None:
800 while msg is not None:
747 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
801 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
748
802
749 def _flush_iopub(self, sock):
803 def _flush_iopub(self, sock):
750 """Flush replies from the iopub channel waiting
804 """Flush replies from the iopub channel waiting
751 in the ZMQ queue.
805 in the ZMQ queue.
752 """
806 """
753 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
807 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
754 while msg is not None:
808 while msg is not None:
755 if self.debug:
809 if self.debug:
756 pprint(msg)
810 pprint(msg)
757 parent = msg['parent_header']
811 parent = msg['parent_header']
758 # ignore IOPub messages with no parent.
812 # ignore IOPub messages with no parent.
759 # Caused by print statements or warnings from before the first execution.
813 # Caused by print statements or warnings from before the first execution.
760 if not parent:
814 if not parent:
761 continue
815 continue
762 msg_id = parent['msg_id']
816 msg_id = parent['msg_id']
763 content = msg['content']
817 content = msg['content']
764 header = msg['header']
818 header = msg['header']
765 msg_type = msg['header']['msg_type']
819 msg_type = msg['header']['msg_type']
766
820
767 # init metadata:
821 # init metadata:
768 md = self.metadata[msg_id]
822 md = self.metadata[msg_id]
769
823
770 if msg_type == 'stream':
824 if msg_type == 'stream':
771 name = content['name']
825 name = content['name']
772 s = md[name] or ''
826 s = md[name] or ''
773 md[name] = s + content['data']
827 md[name] = s + content['data']
774 elif msg_type == 'pyerr':
828 elif msg_type == 'pyerr':
775 md.update({'pyerr' : self._unwrap_exception(content)})
829 md.update({'pyerr' : self._unwrap_exception(content)})
776 elif msg_type == 'pyin':
830 elif msg_type == 'pyin':
777 md.update({'pyin' : content['code']})
831 md.update({'pyin' : content['code']})
778 elif msg_type == 'display_data':
832 elif msg_type == 'display_data':
779 md['outputs'].append(content.get('data'))
833 md['outputs'].append(content.get('data'))
780 elif msg_type == 'pyout':
834 elif msg_type == 'pyout':
781 md['pyout'] = content.get('data')
835 md['pyout'] = content.get('data')
782 else:
836 else:
783 # unhandled msg_type (status, etc.)
837 # unhandled msg_type (status, etc.)
784 pass
838 pass
785
839
786 # reduntant?
840 # reduntant?
787 self.metadata[msg_id] = md
841 self.metadata[msg_id] = md
788
842
789 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
843 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
790
844
791 #--------------------------------------------------------------------------
845 #--------------------------------------------------------------------------
792 # len, getitem
846 # len, getitem
793 #--------------------------------------------------------------------------
847 #--------------------------------------------------------------------------
794
848
795 def __len__(self):
849 def __len__(self):
796 """len(client) returns # of engines."""
850 """len(client) returns # of engines."""
797 return len(self.ids)
851 return len(self.ids)
798
852
799 def __getitem__(self, key):
853 def __getitem__(self, key):
800 """index access returns DirectView multiplexer objects
854 """index access returns DirectView multiplexer objects
801
855
802 Must be int, slice, or list/tuple/xrange of ints"""
856 Must be int, slice, or list/tuple/xrange of ints"""
803 if not isinstance(key, (int, slice, tuple, list, xrange)):
857 if not isinstance(key, (int, slice, tuple, list, xrange)):
804 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
858 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
805 else:
859 else:
806 return self.direct_view(key)
860 return self.direct_view(key)
807
861
808 #--------------------------------------------------------------------------
862 #--------------------------------------------------------------------------
809 # Begin public methods
863 # Begin public methods
810 #--------------------------------------------------------------------------
864 #--------------------------------------------------------------------------
811
865
812 @property
866 @property
813 def ids(self):
867 def ids(self):
814 """Always up-to-date ids property."""
868 """Always up-to-date ids property."""
815 self._flush_notifications()
869 self._flush_notifications()
816 # always copy:
870 # always copy:
817 return list(self._ids)
871 return list(self._ids)
818
872
819 def close(self):
873 def close(self):
820 if self._closed:
874 if self._closed:
821 return
875 return
822 self.stop_spin_thread()
876 self.stop_spin_thread()
823 snames = filter(lambda n: n.endswith('socket'), dir(self))
877 snames = filter(lambda n: n.endswith('socket'), dir(self))
824 for socket in map(lambda name: getattr(self, name), snames):
878 for socket in map(lambda name: getattr(self, name), snames):
825 if isinstance(socket, zmq.Socket) and not socket.closed:
879 if isinstance(socket, zmq.Socket) and not socket.closed:
826 socket.close()
880 socket.close()
827 self._closed = True
881 self._closed = True
828
882
829 def _spin_every(self, interval=1):
883 def _spin_every(self, interval=1):
830 """target func for use in spin_thread"""
884 """target func for use in spin_thread"""
831 while True:
885 while True:
832 if self._stop_spinning.is_set():
886 if self._stop_spinning.is_set():
833 return
887 return
834 time.sleep(interval)
888 time.sleep(interval)
835 self.spin()
889 self.spin()
836
890
837 def spin_thread(self, interval=1):
891 def spin_thread(self, interval=1):
838 """call Client.spin() in a background thread on some regular interval
892 """call Client.spin() in a background thread on some regular interval
839
893
840 This helps ensure that messages don't pile up too much in the zmq queue
894 This helps ensure that messages don't pile up too much in the zmq queue
841 while you are working on other things, or just leaving an idle terminal.
895 while you are working on other things, or just leaving an idle terminal.
842
896
843 It also helps limit potential padding of the `received` timestamp
897 It also helps limit potential padding of the `received` timestamp
844 on AsyncResult objects, used for timings.
898 on AsyncResult objects, used for timings.
845
899
846 Parameters
900 Parameters
847 ----------
901 ----------
848
902
849 interval : float, optional
903 interval : float, optional
850 The interval on which to spin the client in the background thread
904 The interval on which to spin the client in the background thread
851 (simply passed to time.sleep).
905 (simply passed to time.sleep).
852
906
853 Notes
907 Notes
854 -----
908 -----
855
909
856 For precision timing, you may want to use this method to put a bound
910 For precision timing, you may want to use this method to put a bound
857 on the jitter (in seconds) in `received` timestamps used
911 on the jitter (in seconds) in `received` timestamps used
858 in AsyncResult.wall_time.
912 in AsyncResult.wall_time.
859
913
860 """
914 """
861 if self._spin_thread is not None:
915 if self._spin_thread is not None:
862 self.stop_spin_thread()
916 self.stop_spin_thread()
863 self._stop_spinning.clear()
917 self._stop_spinning.clear()
864 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
918 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
865 self._spin_thread.daemon = True
919 self._spin_thread.daemon = True
866 self._spin_thread.start()
920 self._spin_thread.start()
867
921
868 def stop_spin_thread(self):
922 def stop_spin_thread(self):
869 """stop background spin_thread, if any"""
923 """stop background spin_thread, if any"""
870 if self._spin_thread is not None:
924 if self._spin_thread is not None:
871 self._stop_spinning.set()
925 self._stop_spinning.set()
872 self._spin_thread.join()
926 self._spin_thread.join()
873 self._spin_thread = None
927 self._spin_thread = None
874
928
875 def spin(self):
929 def spin(self):
876 """Flush any registration notifications and execution results
930 """Flush any registration notifications and execution results
877 waiting in the ZMQ queue.
931 waiting in the ZMQ queue.
878 """
932 """
879 if self._notification_socket:
933 if self._notification_socket:
880 self._flush_notifications()
934 self._flush_notifications()
935 if self._iopub_socket:
936 self._flush_iopub(self._iopub_socket)
881 if self._mux_socket:
937 if self._mux_socket:
882 self._flush_results(self._mux_socket)
938 self._flush_results(self._mux_socket)
883 if self._task_socket:
939 if self._task_socket:
884 self._flush_results(self._task_socket)
940 self._flush_results(self._task_socket)
885 if self._control_socket:
941 if self._control_socket:
886 self._flush_control(self._control_socket)
942 self._flush_control(self._control_socket)
887 if self._iopub_socket:
888 self._flush_iopub(self._iopub_socket)
889 if self._query_socket:
943 if self._query_socket:
890 self._flush_ignored_hub_replies()
944 self._flush_ignored_hub_replies()
891
945
892 def wait(self, jobs=None, timeout=-1):
946 def wait(self, jobs=None, timeout=-1):
893 """waits on one or more `jobs`, for up to `timeout` seconds.
947 """waits on one or more `jobs`, for up to `timeout` seconds.
894
948
895 Parameters
949 Parameters
896 ----------
950 ----------
897
951
898 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
952 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
899 ints are indices to self.history
953 ints are indices to self.history
900 strs are msg_ids
954 strs are msg_ids
901 default: wait on all outstanding messages
955 default: wait on all outstanding messages
902 timeout : float
956 timeout : float
903 a time in seconds, after which to give up.
957 a time in seconds, after which to give up.
904 default is -1, which means no timeout
958 default is -1, which means no timeout
905
959
906 Returns
960 Returns
907 -------
961 -------
908
962
909 True : when all msg_ids are done
963 True : when all msg_ids are done
910 False : timeout reached, some msg_ids still outstanding
964 False : timeout reached, some msg_ids still outstanding
911 """
965 """
912 tic = time.time()
966 tic = time.time()
913 if jobs is None:
967 if jobs is None:
914 theids = self.outstanding
968 theids = self.outstanding
915 else:
969 else:
916 if isinstance(jobs, (int, basestring, AsyncResult)):
970 if isinstance(jobs, (int, basestring, AsyncResult)):
917 jobs = [jobs]
971 jobs = [jobs]
918 theids = set()
972 theids = set()
919 for job in jobs:
973 for job in jobs:
920 if isinstance(job, int):
974 if isinstance(job, int):
921 # index access
975 # index access
922 job = self.history[job]
976 job = self.history[job]
923 elif isinstance(job, AsyncResult):
977 elif isinstance(job, AsyncResult):
924 map(theids.add, job.msg_ids)
978 map(theids.add, job.msg_ids)
925 continue
979 continue
926 theids.add(job)
980 theids.add(job)
927 if not theids.intersection(self.outstanding):
981 if not theids.intersection(self.outstanding):
928 return True
982 return True
929 self.spin()
983 self.spin()
930 while theids.intersection(self.outstanding):
984 while theids.intersection(self.outstanding):
931 if timeout >= 0 and ( time.time()-tic ) > timeout:
985 if timeout >= 0 and ( time.time()-tic ) > timeout:
932 break
986 break
933 time.sleep(1e-3)
987 time.sleep(1e-3)
934 self.spin()
988 self.spin()
935 return len(theids.intersection(self.outstanding)) == 0
989 return len(theids.intersection(self.outstanding)) == 0
936
990
937 #--------------------------------------------------------------------------
991 #--------------------------------------------------------------------------
938 # Control methods
992 # Control methods
939 #--------------------------------------------------------------------------
993 #--------------------------------------------------------------------------
940
994
941 @spin_first
995 @spin_first
942 def clear(self, targets=None, block=None):
996 def clear(self, targets=None, block=None):
943 """Clear the namespace in target(s)."""
997 """Clear the namespace in target(s)."""
944 block = self.block if block is None else block
998 block = self.block if block is None else block
945 targets = self._build_targets(targets)[0]
999 targets = self._build_targets(targets)[0]
946 for t in targets:
1000 for t in targets:
947 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1001 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
948 error = False
1002 error = False
949 if block:
1003 if block:
950 self._flush_ignored_control()
1004 self._flush_ignored_control()
951 for i in range(len(targets)):
1005 for i in range(len(targets)):
952 idents,msg = self.session.recv(self._control_socket,0)
1006 idents,msg = self.session.recv(self._control_socket,0)
953 if self.debug:
1007 if self.debug:
954 pprint(msg)
1008 pprint(msg)
955 if msg['content']['status'] != 'ok':
1009 if msg['content']['status'] != 'ok':
956 error = self._unwrap_exception(msg['content'])
1010 error = self._unwrap_exception(msg['content'])
957 else:
1011 else:
958 self._ignored_control_replies += len(targets)
1012 self._ignored_control_replies += len(targets)
959 if error:
1013 if error:
960 raise error
1014 raise error
961
1015
962
1016
963 @spin_first
1017 @spin_first
964 def abort(self, jobs=None, targets=None, block=None):
1018 def abort(self, jobs=None, targets=None, block=None):
965 """Abort specific jobs from the execution queues of target(s).
1019 """Abort specific jobs from the execution queues of target(s).
966
1020
967 This is a mechanism to prevent jobs that have already been submitted
1021 This is a mechanism to prevent jobs that have already been submitted
968 from executing.
1022 from executing.
969
1023
970 Parameters
1024 Parameters
971 ----------
1025 ----------
972
1026
973 jobs : msg_id, list of msg_ids, or AsyncResult
1027 jobs : msg_id, list of msg_ids, or AsyncResult
974 The jobs to be aborted
1028 The jobs to be aborted
975
1029
976 If unspecified/None: abort all outstanding jobs.
1030 If unspecified/None: abort all outstanding jobs.
977
1031
978 """
1032 """
979 block = self.block if block is None else block
1033 block = self.block if block is None else block
980 jobs = jobs if jobs is not None else list(self.outstanding)
1034 jobs = jobs if jobs is not None else list(self.outstanding)
981 targets = self._build_targets(targets)[0]
1035 targets = self._build_targets(targets)[0]
982
1036
983 msg_ids = []
1037 msg_ids = []
984 if isinstance(jobs, (basestring,AsyncResult)):
1038 if isinstance(jobs, (basestring,AsyncResult)):
985 jobs = [jobs]
1039 jobs = [jobs]
986 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1040 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
987 if bad_ids:
1041 if bad_ids:
988 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1042 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
989 for j in jobs:
1043 for j in jobs:
990 if isinstance(j, AsyncResult):
1044 if isinstance(j, AsyncResult):
991 msg_ids.extend(j.msg_ids)
1045 msg_ids.extend(j.msg_ids)
992 else:
1046 else:
993 msg_ids.append(j)
1047 msg_ids.append(j)
994 content = dict(msg_ids=msg_ids)
1048 content = dict(msg_ids=msg_ids)
995 for t in targets:
1049 for t in targets:
996 self.session.send(self._control_socket, 'abort_request',
1050 self.session.send(self._control_socket, 'abort_request',
997 content=content, ident=t)
1051 content=content, ident=t)
998 error = False
1052 error = False
999 if block:
1053 if block:
1000 self._flush_ignored_control()
1054 self._flush_ignored_control()
1001 for i in range(len(targets)):
1055 for i in range(len(targets)):
1002 idents,msg = self.session.recv(self._control_socket,0)
1056 idents,msg = self.session.recv(self._control_socket,0)
1003 if self.debug:
1057 if self.debug:
1004 pprint(msg)
1058 pprint(msg)
1005 if msg['content']['status'] != 'ok':
1059 if msg['content']['status'] != 'ok':
1006 error = self._unwrap_exception(msg['content'])
1060 error = self._unwrap_exception(msg['content'])
1007 else:
1061 else:
1008 self._ignored_control_replies += len(targets)
1062 self._ignored_control_replies += len(targets)
1009 if error:
1063 if error:
1010 raise error
1064 raise error
1011
1065
1012 @spin_first
1066 @spin_first
1013 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1067 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1014 """Terminates one or more engine processes, optionally including the hub."""
1068 """Terminates one or more engine processes, optionally including the hub."""
1015 block = self.block if block is None else block
1069 block = self.block if block is None else block
1016 if hub:
1070 if hub:
1017 targets = 'all'
1071 targets = 'all'
1018 targets = self._build_targets(targets)[0]
1072 targets = self._build_targets(targets)[0]
1019 for t in targets:
1073 for t in targets:
1020 self.session.send(self._control_socket, 'shutdown_request',
1074 self.session.send(self._control_socket, 'shutdown_request',
1021 content={'restart':restart},ident=t)
1075 content={'restart':restart},ident=t)
1022 error = False
1076 error = False
1023 if block or hub:
1077 if block or hub:
1024 self._flush_ignored_control()
1078 self._flush_ignored_control()
1025 for i in range(len(targets)):
1079 for i in range(len(targets)):
1026 idents,msg = self.session.recv(self._control_socket, 0)
1080 idents,msg = self.session.recv(self._control_socket, 0)
1027 if self.debug:
1081 if self.debug:
1028 pprint(msg)
1082 pprint(msg)
1029 if msg['content']['status'] != 'ok':
1083 if msg['content']['status'] != 'ok':
1030 error = self._unwrap_exception(msg['content'])
1084 error = self._unwrap_exception(msg['content'])
1031 else:
1085 else:
1032 self._ignored_control_replies += len(targets)
1086 self._ignored_control_replies += len(targets)
1033
1087
1034 if hub:
1088 if hub:
1035 time.sleep(0.25)
1089 time.sleep(0.25)
1036 self.session.send(self._query_socket, 'shutdown_request')
1090 self.session.send(self._query_socket, 'shutdown_request')
1037 idents,msg = self.session.recv(self._query_socket, 0)
1091 idents,msg = self.session.recv(self._query_socket, 0)
1038 if self.debug:
1092 if self.debug:
1039 pprint(msg)
1093 pprint(msg)
1040 if msg['content']['status'] != 'ok':
1094 if msg['content']['status'] != 'ok':
1041 error = self._unwrap_exception(msg['content'])
1095 error = self._unwrap_exception(msg['content'])
1042
1096
1043 if error:
1097 if error:
1044 raise error
1098 raise error
1045
1099
1046 #--------------------------------------------------------------------------
1100 #--------------------------------------------------------------------------
1047 # Execution related methods
1101 # Execution related methods
1048 #--------------------------------------------------------------------------
1102 #--------------------------------------------------------------------------
1049
1103
1050 def _maybe_raise(self, result):
1104 def _maybe_raise(self, result):
1051 """wrapper for maybe raising an exception if apply failed."""
1105 """wrapper for maybe raising an exception if apply failed."""
1052 if isinstance(result, error.RemoteError):
1106 if isinstance(result, error.RemoteError):
1053 raise result
1107 raise result
1054
1108
1055 return result
1109 return result
1056
1110
1057 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1111 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1058 ident=None):
1112 ident=None):
1059 """construct and send an apply message via a socket.
1113 """construct and send an apply message via a socket.
1060
1114
1061 This is the principal method with which all engine execution is performed by views.
1115 This is the principal method with which all engine execution is performed by views.
1062 """
1116 """
1063
1117
1064 assert not self._closed, "cannot use me anymore, I'm closed!"
1118 assert not self._closed, "cannot use me anymore, I'm closed!"
1065 # defaults:
1119 # defaults:
1066 args = args if args is not None else []
1120 args = args if args is not None else []
1067 kwargs = kwargs if kwargs is not None else {}
1121 kwargs = kwargs if kwargs is not None else {}
1068 subheader = subheader if subheader is not None else {}
1122 subheader = subheader if subheader is not None else {}
1069
1123
1070 # validate arguments
1124 # validate arguments
1071 if not callable(f) and not isinstance(f, Reference):
1125 if not callable(f) and not isinstance(f, Reference):
1072 raise TypeError("f must be callable, not %s"%type(f))
1126 raise TypeError("f must be callable, not %s"%type(f))
1073 if not isinstance(args, (tuple, list)):
1127 if not isinstance(args, (tuple, list)):
1074 raise TypeError("args must be tuple or list, not %s"%type(args))
1128 raise TypeError("args must be tuple or list, not %s"%type(args))
1075 if not isinstance(kwargs, dict):
1129 if not isinstance(kwargs, dict):
1076 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1130 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1077 if not isinstance(subheader, dict):
1131 if not isinstance(subheader, dict):
1078 raise TypeError("subheader must be dict, not %s"%type(subheader))
1132 raise TypeError("subheader must be dict, not %s"%type(subheader))
1079
1133
1080 bufs = util.pack_apply_message(f,args,kwargs)
1134 bufs = util.pack_apply_message(f,args,kwargs)
1081
1135
1082 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1136 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1083 subheader=subheader, track=track)
1137 subheader=subheader, track=track)
1084
1138
1085 msg_id = msg['header']['msg_id']
1139 msg_id = msg['header']['msg_id']
1086 self.outstanding.add(msg_id)
1140 self.outstanding.add(msg_id)
1087 if ident:
1141 if ident:
1088 # possibly routed to a specific engine
1142 # possibly routed to a specific engine
1089 if isinstance(ident, list):
1143 if isinstance(ident, list):
1090 ident = ident[-1]
1144 ident = ident[-1]
1091 if ident in self._engines.values():
1145 if ident in self._engines.values():
1092 # save for later, in case of engine death
1146 # save for later, in case of engine death
1093 self._outstanding_dict[ident].add(msg_id)
1147 self._outstanding_dict[ident].add(msg_id)
1094 self.history.append(msg_id)
1148 self.history.append(msg_id)
1095 self.metadata[msg_id]['submitted'] = datetime.now()
1149 self.metadata[msg_id]['submitted'] = datetime.now()
1096
1150
1097 return msg
1151 return msg
1098
1152
1099 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1153 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1100 """construct and send an execute request via a socket.
1154 """construct and send an execute request via a socket.
1101
1155
1102 """
1156 """
1103
1157
1104 assert not self._closed, "cannot use me anymore, I'm closed!"
1158 assert not self._closed, "cannot use me anymore, I'm closed!"
1105 # defaults:
1159 # defaults:
1106 subheader = subheader if subheader is not None else {}
1160 subheader = subheader if subheader is not None else {}
1107
1161
1108 # validate arguments
1162 # validate arguments
1109 if not isinstance(code, basestring):
1163 if not isinstance(code, basestring):
1110 raise TypeError("code must be text, not %s" % type(code))
1164 raise TypeError("code must be text, not %s" % type(code))
1111 if not isinstance(subheader, dict):
1165 if not isinstance(subheader, dict):
1112 raise TypeError("subheader must be dict, not %s" % type(subheader))
1166 raise TypeError("subheader must be dict, not %s" % type(subheader))
1113
1167
1114 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1168 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1115
1169
1116
1170
1117 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1171 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1118 subheader=subheader)
1172 subheader=subheader)
1119
1173
1120 msg_id = msg['header']['msg_id']
1174 msg_id = msg['header']['msg_id']
1121 self.outstanding.add(msg_id)
1175 self.outstanding.add(msg_id)
1122 if ident:
1176 if ident:
1123 # possibly routed to a specific engine
1177 # possibly routed to a specific engine
1124 if isinstance(ident, list):
1178 if isinstance(ident, list):
1125 ident = ident[-1]
1179 ident = ident[-1]
1126 if ident in self._engines.values():
1180 if ident in self._engines.values():
1127 # save for later, in case of engine death
1181 # save for later, in case of engine death
1128 self._outstanding_dict[ident].add(msg_id)
1182 self._outstanding_dict[ident].add(msg_id)
1129 self.history.append(msg_id)
1183 self.history.append(msg_id)
1130 self.metadata[msg_id]['submitted'] = datetime.now()
1184 self.metadata[msg_id]['submitted'] = datetime.now()
1131
1185
1132 return msg
1186 return msg
1133
1187
1134 #--------------------------------------------------------------------------
1188 #--------------------------------------------------------------------------
1135 # construct a View object
1189 # construct a View object
1136 #--------------------------------------------------------------------------
1190 #--------------------------------------------------------------------------
1137
1191
1138 def load_balanced_view(self, targets=None):
1192 def load_balanced_view(self, targets=None):
1139 """construct a DirectView object.
1193 """construct a DirectView object.
1140
1194
1141 If no arguments are specified, create a LoadBalancedView
1195 If no arguments are specified, create a LoadBalancedView
1142 using all engines.
1196 using all engines.
1143
1197
1144 Parameters
1198 Parameters
1145 ----------
1199 ----------
1146
1200
1147 targets: list,slice,int,etc. [default: use all engines]
1201 targets: list,slice,int,etc. [default: use all engines]
1148 The subset of engines across which to load-balance
1202 The subset of engines across which to load-balance
1149 """
1203 """
1150 if targets == 'all':
1204 if targets == 'all':
1151 targets = None
1205 targets = None
1152 if targets is not None:
1206 if targets is not None:
1153 targets = self._build_targets(targets)[1]
1207 targets = self._build_targets(targets)[1]
1154 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1208 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1155
1209
1156 def direct_view(self, targets='all'):
1210 def direct_view(self, targets='all'):
1157 """construct a DirectView object.
1211 """construct a DirectView object.
1158
1212
1159 If no targets are specified, create a DirectView using all engines.
1213 If no targets are specified, create a DirectView using all engines.
1160
1214
1161 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1215 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1162 evaluate the target engines at each execution, whereas rc[:] will connect to
1216 evaluate the target engines at each execution, whereas rc[:] will connect to
1163 all *current* engines, and that list will not change.
1217 all *current* engines, and that list will not change.
1164
1218
1165 That is, 'all' will always use all engines, whereas rc[:] will not use
1219 That is, 'all' will always use all engines, whereas rc[:] will not use
1166 engines added after the DirectView is constructed.
1220 engines added after the DirectView is constructed.
1167
1221
1168 Parameters
1222 Parameters
1169 ----------
1223 ----------
1170
1224
1171 targets: list,slice,int,etc. [default: use all engines]
1225 targets: list,slice,int,etc. [default: use all engines]
1172 The engines to use for the View
1226 The engines to use for the View
1173 """
1227 """
1174 single = isinstance(targets, int)
1228 single = isinstance(targets, int)
1175 # allow 'all' to be lazily evaluated at each execution
1229 # allow 'all' to be lazily evaluated at each execution
1176 if targets != 'all':
1230 if targets != 'all':
1177 targets = self._build_targets(targets)[1]
1231 targets = self._build_targets(targets)[1]
1178 if single:
1232 if single:
1179 targets = targets[0]
1233 targets = targets[0]
1180 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1234 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1181
1235
1182 #--------------------------------------------------------------------------
1236 #--------------------------------------------------------------------------
1183 # Query methods
1237 # Query methods
1184 #--------------------------------------------------------------------------
1238 #--------------------------------------------------------------------------
1185
1239
1186 @spin_first
1240 @spin_first
1187 def get_result(self, indices_or_msg_ids=None, block=None):
1241 def get_result(self, indices_or_msg_ids=None, block=None):
1188 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1242 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1189
1243
1190 If the client already has the results, no request to the Hub will be made.
1244 If the client already has the results, no request to the Hub will be made.
1191
1245
1192 This is a convenient way to construct AsyncResult objects, which are wrappers
1246 This is a convenient way to construct AsyncResult objects, which are wrappers
1193 that include metadata about execution, and allow for awaiting results that
1247 that include metadata about execution, and allow for awaiting results that
1194 were not submitted by this Client.
1248 were not submitted by this Client.
1195
1249
1196 It can also be a convenient way to retrieve the metadata associated with
1250 It can also be a convenient way to retrieve the metadata associated with
1197 blocking execution, since it always retrieves
1251 blocking execution, since it always retrieves
1198
1252
1199 Examples
1253 Examples
1200 --------
1254 --------
1201 ::
1255 ::
1202
1256
1203 In [10]: r = client.apply()
1257 In [10]: r = client.apply()
1204
1258
1205 Parameters
1259 Parameters
1206 ----------
1260 ----------
1207
1261
1208 indices_or_msg_ids : integer history index, str msg_id, or list of either
1262 indices_or_msg_ids : integer history index, str msg_id, or list of either
1209 The indices or msg_ids of indices to be retrieved
1263 The indices or msg_ids of indices to be retrieved
1210
1264
1211 block : bool
1265 block : bool
1212 Whether to wait for the result to be done
1266 Whether to wait for the result to be done
1213
1267
1214 Returns
1268 Returns
1215 -------
1269 -------
1216
1270
1217 AsyncResult
1271 AsyncResult
1218 A single AsyncResult object will always be returned.
1272 A single AsyncResult object will always be returned.
1219
1273
1220 AsyncHubResult
1274 AsyncHubResult
1221 A subclass of AsyncResult that retrieves results from the Hub
1275 A subclass of AsyncResult that retrieves results from the Hub
1222
1276
1223 """
1277 """
1224 block = self.block if block is None else block
1278 block = self.block if block is None else block
1225 if indices_or_msg_ids is None:
1279 if indices_or_msg_ids is None:
1226 indices_or_msg_ids = -1
1280 indices_or_msg_ids = -1
1227
1281
1228 if not isinstance(indices_or_msg_ids, (list,tuple)):
1282 if not isinstance(indices_or_msg_ids, (list,tuple)):
1229 indices_or_msg_ids = [indices_or_msg_ids]
1283 indices_or_msg_ids = [indices_or_msg_ids]
1230
1284
1231 theids = []
1285 theids = []
1232 for id in indices_or_msg_ids:
1286 for id in indices_or_msg_ids:
1233 if isinstance(id, int):
1287 if isinstance(id, int):
1234 id = self.history[id]
1288 id = self.history[id]
1235 if not isinstance(id, basestring):
1289 if not isinstance(id, basestring):
1236 raise TypeError("indices must be str or int, not %r"%id)
1290 raise TypeError("indices must be str or int, not %r"%id)
1237 theids.append(id)
1291 theids.append(id)
1238
1292
1239 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1293 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1240 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1294 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1241
1295
1242 if remote_ids:
1296 if remote_ids:
1243 ar = AsyncHubResult(self, msg_ids=theids)
1297 ar = AsyncHubResult(self, msg_ids=theids)
1244 else:
1298 else:
1245 ar = AsyncResult(self, msg_ids=theids)
1299 ar = AsyncResult(self, msg_ids=theids)
1246
1300
1247 if block:
1301 if block:
1248 ar.wait()
1302 ar.wait()
1249
1303
1250 return ar
1304 return ar
1251
1305
1252 @spin_first
1306 @spin_first
1253 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1307 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1254 """Resubmit one or more tasks.
1308 """Resubmit one or more tasks.
1255
1309
1256 in-flight tasks may not be resubmitted.
1310 in-flight tasks may not be resubmitted.
1257
1311
1258 Parameters
1312 Parameters
1259 ----------
1313 ----------
1260
1314
1261 indices_or_msg_ids : integer history index, str msg_id, or list of either
1315 indices_or_msg_ids : integer history index, str msg_id, or list of either
1262 The indices or msg_ids of indices to be retrieved
1316 The indices or msg_ids of indices to be retrieved
1263
1317
1264 block : bool
1318 block : bool
1265 Whether to wait for the result to be done
1319 Whether to wait for the result to be done
1266
1320
1267 Returns
1321 Returns
1268 -------
1322 -------
1269
1323
1270 AsyncHubResult
1324 AsyncHubResult
1271 A subclass of AsyncResult that retrieves results from the Hub
1325 A subclass of AsyncResult that retrieves results from the Hub
1272
1326
1273 """
1327 """
1274 block = self.block if block is None else block
1328 block = self.block if block is None else block
1275 if indices_or_msg_ids is None:
1329 if indices_or_msg_ids is None:
1276 indices_or_msg_ids = -1
1330 indices_or_msg_ids = -1
1277
1331
1278 if not isinstance(indices_or_msg_ids, (list,tuple)):
1332 if not isinstance(indices_or_msg_ids, (list,tuple)):
1279 indices_or_msg_ids = [indices_or_msg_ids]
1333 indices_or_msg_ids = [indices_or_msg_ids]
1280
1334
1281 theids = []
1335 theids = []
1282 for id in indices_or_msg_ids:
1336 for id in indices_or_msg_ids:
1283 if isinstance(id, int):
1337 if isinstance(id, int):
1284 id = self.history[id]
1338 id = self.history[id]
1285 if not isinstance(id, basestring):
1339 if not isinstance(id, basestring):
1286 raise TypeError("indices must be str or int, not %r"%id)
1340 raise TypeError("indices must be str or int, not %r"%id)
1287 theids.append(id)
1341 theids.append(id)
1288
1342
1289 content = dict(msg_ids = theids)
1343 content = dict(msg_ids = theids)
1290
1344
1291 self.session.send(self._query_socket, 'resubmit_request', content)
1345 self.session.send(self._query_socket, 'resubmit_request', content)
1292
1346
1293 zmq.select([self._query_socket], [], [])
1347 zmq.select([self._query_socket], [], [])
1294 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1348 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1295 if self.debug:
1349 if self.debug:
1296 pprint(msg)
1350 pprint(msg)
1297 content = msg['content']
1351 content = msg['content']
1298 if content['status'] != 'ok':
1352 if content['status'] != 'ok':
1299 raise self._unwrap_exception(content)
1353 raise self._unwrap_exception(content)
1300 mapping = content['resubmitted']
1354 mapping = content['resubmitted']
1301 new_ids = [ mapping[msg_id] for msg_id in theids ]
1355 new_ids = [ mapping[msg_id] for msg_id in theids ]
1302
1356
1303 ar = AsyncHubResult(self, msg_ids=new_ids)
1357 ar = AsyncHubResult(self, msg_ids=new_ids)
1304
1358
1305 if block:
1359 if block:
1306 ar.wait()
1360 ar.wait()
1307
1361
1308 return ar
1362 return ar
1309
1363
1310 @spin_first
1364 @spin_first
1311 def result_status(self, msg_ids, status_only=True):
1365 def result_status(self, msg_ids, status_only=True):
1312 """Check on the status of the result(s) of the apply request with `msg_ids`.
1366 """Check on the status of the result(s) of the apply request with `msg_ids`.
1313
1367
1314 If status_only is False, then the actual results will be retrieved, else
1368 If status_only is False, then the actual results will be retrieved, else
1315 only the status of the results will be checked.
1369 only the status of the results will be checked.
1316
1370
1317 Parameters
1371 Parameters
1318 ----------
1372 ----------
1319
1373
1320 msg_ids : list of msg_ids
1374 msg_ids : list of msg_ids
1321 if int:
1375 if int:
1322 Passed as index to self.history for convenience.
1376 Passed as index to self.history for convenience.
1323 status_only : bool (default: True)
1377 status_only : bool (default: True)
1324 if False:
1378 if False:
1325 Retrieve the actual results of completed tasks.
1379 Retrieve the actual results of completed tasks.
1326
1380
1327 Returns
1381 Returns
1328 -------
1382 -------
1329
1383
1330 results : dict
1384 results : dict
1331 There will always be the keys 'pending' and 'completed', which will
1385 There will always be the keys 'pending' and 'completed', which will
1332 be lists of msg_ids that are incomplete or complete. If `status_only`
1386 be lists of msg_ids that are incomplete or complete. If `status_only`
1333 is False, then completed results will be keyed by their `msg_id`.
1387 is False, then completed results will be keyed by their `msg_id`.
1334 """
1388 """
1335 if not isinstance(msg_ids, (list,tuple)):
1389 if not isinstance(msg_ids, (list,tuple)):
1336 msg_ids = [msg_ids]
1390 msg_ids = [msg_ids]
1337
1391
1338 theids = []
1392 theids = []
1339 for msg_id in msg_ids:
1393 for msg_id in msg_ids:
1340 if isinstance(msg_id, int):
1394 if isinstance(msg_id, int):
1341 msg_id = self.history[msg_id]
1395 msg_id = self.history[msg_id]
1342 if not isinstance(msg_id, basestring):
1396 if not isinstance(msg_id, basestring):
1343 raise TypeError("msg_ids must be str, not %r"%msg_id)
1397 raise TypeError("msg_ids must be str, not %r"%msg_id)
1344 theids.append(msg_id)
1398 theids.append(msg_id)
1345
1399
1346 completed = []
1400 completed = []
1347 local_results = {}
1401 local_results = {}
1348
1402
1349 # comment this block out to temporarily disable local shortcut:
1403 # comment this block out to temporarily disable local shortcut:
1350 for msg_id in theids:
1404 for msg_id in theids:
1351 if msg_id in self.results:
1405 if msg_id in self.results:
1352 completed.append(msg_id)
1406 completed.append(msg_id)
1353 local_results[msg_id] = self.results[msg_id]
1407 local_results[msg_id] = self.results[msg_id]
1354 theids.remove(msg_id)
1408 theids.remove(msg_id)
1355
1409
1356 if theids: # some not locally cached
1410 if theids: # some not locally cached
1357 content = dict(msg_ids=theids, status_only=status_only)
1411 content = dict(msg_ids=theids, status_only=status_only)
1358 msg = self.session.send(self._query_socket, "result_request", content=content)
1412 msg = self.session.send(self._query_socket, "result_request", content=content)
1359 zmq.select([self._query_socket], [], [])
1413 zmq.select([self._query_socket], [], [])
1360 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1414 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1361 if self.debug:
1415 if self.debug:
1362 pprint(msg)
1416 pprint(msg)
1363 content = msg['content']
1417 content = msg['content']
1364 if content['status'] != 'ok':
1418 if content['status'] != 'ok':
1365 raise self._unwrap_exception(content)
1419 raise self._unwrap_exception(content)
1366 buffers = msg['buffers']
1420 buffers = msg['buffers']
1367 else:
1421 else:
1368 content = dict(completed=[],pending=[])
1422 content = dict(completed=[],pending=[])
1369
1423
1370 content['completed'].extend(completed)
1424 content['completed'].extend(completed)
1371
1425
1372 if status_only:
1426 if status_only:
1373 return content
1427 return content
1374
1428
1375 failures = []
1429 failures = []
1376 # load cached results into result:
1430 # load cached results into result:
1377 content.update(local_results)
1431 content.update(local_results)
1378
1432
1379 # update cache with results:
1433 # update cache with results:
1380 for msg_id in sorted(theids):
1434 for msg_id in sorted(theids):
1381 if msg_id in content['completed']:
1435 if msg_id in content['completed']:
1382 rec = content[msg_id]
1436 rec = content[msg_id]
1383 parent = rec['header']
1437 parent = rec['header']
1384 header = rec['result_header']
1438 header = rec['result_header']
1385 rcontent = rec['result_content']
1439 rcontent = rec['result_content']
1386 iodict = rec['io']
1440 iodict = rec['io']
1387 if isinstance(rcontent, str):
1441 if isinstance(rcontent, str):
1388 rcontent = self.session.unpack(rcontent)
1442 rcontent = self.session.unpack(rcontent)
1389
1443
1390 md = self.metadata[msg_id]
1444 md = self.metadata[msg_id]
1391 md.update(self._extract_metadata(header, parent, rcontent))
1445 md.update(self._extract_metadata(header, parent, rcontent))
1392 if rec.get('received'):
1446 if rec.get('received'):
1393 md['received'] = rec['received']
1447 md['received'] = rec['received']
1394 md.update(iodict)
1448 md.update(iodict)
1395
1449
1396 if rcontent['status'] == 'ok':
1450 if rcontent['status'] == 'ok':
1397 res,buffers = util.unserialize_object(buffers)
1451 res,buffers = util.unserialize_object(buffers)
1398 else:
1452 else:
1399 print rcontent
1453 print rcontent
1400 res = self._unwrap_exception(rcontent)
1454 res = self._unwrap_exception(rcontent)
1401 failures.append(res)
1455 failures.append(res)
1402
1456
1403 self.results[msg_id] = res
1457 self.results[msg_id] = res
1404 content[msg_id] = res
1458 content[msg_id] = res
1405
1459
1406 if len(theids) == 1 and failures:
1460 if len(theids) == 1 and failures:
1407 raise failures[0]
1461 raise failures[0]
1408
1462
1409 error.collect_exceptions(failures, "result_status")
1463 error.collect_exceptions(failures, "result_status")
1410 return content
1464 return content
1411
1465
1412 @spin_first
1466 @spin_first
1413 def queue_status(self, targets='all', verbose=False):
1467 def queue_status(self, targets='all', verbose=False):
1414 """Fetch the status of engine queues.
1468 """Fetch the status of engine queues.
1415
1469
1416 Parameters
1470 Parameters
1417 ----------
1471 ----------
1418
1472
1419 targets : int/str/list of ints/strs
1473 targets : int/str/list of ints/strs
1420 the engines whose states are to be queried.
1474 the engines whose states are to be queried.
1421 default : all
1475 default : all
1422 verbose : bool
1476 verbose : bool
1423 Whether to return lengths only, or lists of ids for each element
1477 Whether to return lengths only, or lists of ids for each element
1424 """
1478 """
1425 if targets == 'all':
1479 if targets == 'all':
1426 # allow 'all' to be evaluated on the engine
1480 # allow 'all' to be evaluated on the engine
1427 engine_ids = None
1481 engine_ids = None
1428 else:
1482 else:
1429 engine_ids = self._build_targets(targets)[1]
1483 engine_ids = self._build_targets(targets)[1]
1430 content = dict(targets=engine_ids, verbose=verbose)
1484 content = dict(targets=engine_ids, verbose=verbose)
1431 self.session.send(self._query_socket, "queue_request", content=content)
1485 self.session.send(self._query_socket, "queue_request", content=content)
1432 idents,msg = self.session.recv(self._query_socket, 0)
1486 idents,msg = self.session.recv(self._query_socket, 0)
1433 if self.debug:
1487 if self.debug:
1434 pprint(msg)
1488 pprint(msg)
1435 content = msg['content']
1489 content = msg['content']
1436 status = content.pop('status')
1490 status = content.pop('status')
1437 if status != 'ok':
1491 if status != 'ok':
1438 raise self._unwrap_exception(content)
1492 raise self._unwrap_exception(content)
1439 content = rekey(content)
1493 content = rekey(content)
1440 if isinstance(targets, int):
1494 if isinstance(targets, int):
1441 return content[targets]
1495 return content[targets]
1442 else:
1496 else:
1443 return content
1497 return content
1444
1498
1445 @spin_first
1499 @spin_first
1446 def purge_results(self, jobs=[], targets=[]):
1500 def purge_results(self, jobs=[], targets=[]):
1447 """Tell the Hub to forget results.
1501 """Tell the Hub to forget results.
1448
1502
1449 Individual results can be purged by msg_id, or the entire
1503 Individual results can be purged by msg_id, or the entire
1450 history of specific targets can be purged.
1504 history of specific targets can be purged.
1451
1505
1452 Use `purge_results('all')` to scrub everything from the Hub's db.
1506 Use `purge_results('all')` to scrub everything from the Hub's db.
1453
1507
1454 Parameters
1508 Parameters
1455 ----------
1509 ----------
1456
1510
1457 jobs : str or list of str or AsyncResult objects
1511 jobs : str or list of str or AsyncResult objects
1458 the msg_ids whose results should be forgotten.
1512 the msg_ids whose results should be forgotten.
1459 targets : int/str/list of ints/strs
1513 targets : int/str/list of ints/strs
1460 The targets, by int_id, whose entire history is to be purged.
1514 The targets, by int_id, whose entire history is to be purged.
1461
1515
1462 default : None
1516 default : None
1463 """
1517 """
1464 if not targets and not jobs:
1518 if not targets and not jobs:
1465 raise ValueError("Must specify at least one of `targets` and `jobs`")
1519 raise ValueError("Must specify at least one of `targets` and `jobs`")
1466 if targets:
1520 if targets:
1467 targets = self._build_targets(targets)[1]
1521 targets = self._build_targets(targets)[1]
1468
1522
1469 # construct msg_ids from jobs
1523 # construct msg_ids from jobs
1470 if jobs == 'all':
1524 if jobs == 'all':
1471 msg_ids = jobs
1525 msg_ids = jobs
1472 else:
1526 else:
1473 msg_ids = []
1527 msg_ids = []
1474 if isinstance(jobs, (basestring,AsyncResult)):
1528 if isinstance(jobs, (basestring,AsyncResult)):
1475 jobs = [jobs]
1529 jobs = [jobs]
1476 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1530 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1477 if bad_ids:
1531 if bad_ids:
1478 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1532 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1479 for j in jobs:
1533 for j in jobs:
1480 if isinstance(j, AsyncResult):
1534 if isinstance(j, AsyncResult):
1481 msg_ids.extend(j.msg_ids)
1535 msg_ids.extend(j.msg_ids)
1482 else:
1536 else:
1483 msg_ids.append(j)
1537 msg_ids.append(j)
1484
1538
1485 content = dict(engine_ids=targets, msg_ids=msg_ids)
1539 content = dict(engine_ids=targets, msg_ids=msg_ids)
1486 self.session.send(self._query_socket, "purge_request", content=content)
1540 self.session.send(self._query_socket, "purge_request", content=content)
1487 idents, msg = self.session.recv(self._query_socket, 0)
1541 idents, msg = self.session.recv(self._query_socket, 0)
1488 if self.debug:
1542 if self.debug:
1489 pprint(msg)
1543 pprint(msg)
1490 content = msg['content']
1544 content = msg['content']
1491 if content['status'] != 'ok':
1545 if content['status'] != 'ok':
1492 raise self._unwrap_exception(content)
1546 raise self._unwrap_exception(content)
1493
1547
1494 @spin_first
1548 @spin_first
1495 def hub_history(self):
1549 def hub_history(self):
1496 """Get the Hub's history
1550 """Get the Hub's history
1497
1551
1498 Just like the Client, the Hub has a history, which is a list of msg_ids.
1552 Just like the Client, the Hub has a history, which is a list of msg_ids.
1499 This will contain the history of all clients, and, depending on configuration,
1553 This will contain the history of all clients, and, depending on configuration,
1500 may contain history across multiple cluster sessions.
1554 may contain history across multiple cluster sessions.
1501
1555
1502 Any msg_id returned here is a valid argument to `get_result`.
1556 Any msg_id returned here is a valid argument to `get_result`.
1503
1557
1504 Returns
1558 Returns
1505 -------
1559 -------
1506
1560
1507 msg_ids : list of strs
1561 msg_ids : list of strs
1508 list of all msg_ids, ordered by task submission time.
1562 list of all msg_ids, ordered by task submission time.
1509 """
1563 """
1510
1564
1511 self.session.send(self._query_socket, "history_request", content={})
1565 self.session.send(self._query_socket, "history_request", content={})
1512 idents, msg = self.session.recv(self._query_socket, 0)
1566 idents, msg = self.session.recv(self._query_socket, 0)
1513
1567
1514 if self.debug:
1568 if self.debug:
1515 pprint(msg)
1569 pprint(msg)
1516 content = msg['content']
1570 content = msg['content']
1517 if content['status'] != 'ok':
1571 if content['status'] != 'ok':
1518 raise self._unwrap_exception(content)
1572 raise self._unwrap_exception(content)
1519 else:
1573 else:
1520 return content['history']
1574 return content['history']
1521
1575
1522 @spin_first
1576 @spin_first
1523 def db_query(self, query, keys=None):
1577 def db_query(self, query, keys=None):
1524 """Query the Hub's TaskRecord database
1578 """Query the Hub's TaskRecord database
1525
1579
1526 This will return a list of task record dicts that match `query`
1580 This will return a list of task record dicts that match `query`
1527
1581
1528 Parameters
1582 Parameters
1529 ----------
1583 ----------
1530
1584
1531 query : mongodb query dict
1585 query : mongodb query dict
1532 The search dict. See mongodb query docs for details.
1586 The search dict. See mongodb query docs for details.
1533 keys : list of strs [optional]
1587 keys : list of strs [optional]
1534 The subset of keys to be returned. The default is to fetch everything but buffers.
1588 The subset of keys to be returned. The default is to fetch everything but buffers.
1535 'msg_id' will *always* be included.
1589 'msg_id' will *always* be included.
1536 """
1590 """
1537 if isinstance(keys, basestring):
1591 if isinstance(keys, basestring):
1538 keys = [keys]
1592 keys = [keys]
1539 content = dict(query=query, keys=keys)
1593 content = dict(query=query, keys=keys)
1540 self.session.send(self._query_socket, "db_request", content=content)
1594 self.session.send(self._query_socket, "db_request", content=content)
1541 idents, msg = self.session.recv(self._query_socket, 0)
1595 idents, msg = self.session.recv(self._query_socket, 0)
1542 if self.debug:
1596 if self.debug:
1543 pprint(msg)
1597 pprint(msg)
1544 content = msg['content']
1598 content = msg['content']
1545 if content['status'] != 'ok':
1599 if content['status'] != 'ok':
1546 raise self._unwrap_exception(content)
1600 raise self._unwrap_exception(content)
1547
1601
1548 records = content['records']
1602 records = content['records']
1549
1603
1550 buffer_lens = content['buffer_lens']
1604 buffer_lens = content['buffer_lens']
1551 result_buffer_lens = content['result_buffer_lens']
1605 result_buffer_lens = content['result_buffer_lens']
1552 buffers = msg['buffers']
1606 buffers = msg['buffers']
1553 has_bufs = buffer_lens is not None
1607 has_bufs = buffer_lens is not None
1554 has_rbufs = result_buffer_lens is not None
1608 has_rbufs = result_buffer_lens is not None
1555 for i,rec in enumerate(records):
1609 for i,rec in enumerate(records):
1556 # relink buffers
1610 # relink buffers
1557 if has_bufs:
1611 if has_bufs:
1558 blen = buffer_lens[i]
1612 blen = buffer_lens[i]
1559 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1613 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1560 if has_rbufs:
1614 if has_rbufs:
1561 blen = result_buffer_lens[i]
1615 blen = result_buffer_lens[i]
1562 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1616 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1563
1617
1564 return records
1618 return records
1565
1619
1566 __all__ = [ 'Client' ]
1620 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now