##// END OF EJS Templates
Don't use asserts when checking for connection files
MinRK -
Show More
@@ -1,1624 +1,1628 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35
35
36 from IPython.utils.jsonutil import rekey
36 from IPython.utils.jsonutil import rekey
37 from IPython.utils.localinterfaces import LOCAL_IPS
37 from IPython.utils.localinterfaces import LOCAL_IPS
38 from IPython.utils.path import get_ipython_dir
38 from IPython.utils.path import get_ipython_dir
39 from IPython.utils.py3compat import cast_bytes
39 from IPython.utils.py3compat import cast_bytes
40 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
40 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
41 Dict, List, Bool, Set, Any)
41 Dict, List, Bool, Set, Any)
42 from IPython.external.decorator import decorator
42 from IPython.external.decorator import decorator
43 from IPython.external.ssh import tunnel
43 from IPython.external.ssh import tunnel
44
44
45 from IPython.parallel import Reference
45 from IPython.parallel import Reference
46 from IPython.parallel import error
46 from IPython.parallel import error
47 from IPython.parallel import util
47 from IPython.parallel import util
48
48
49 from IPython.zmq.session import Session, Message
49 from IPython.zmq.session import Session, Message
50
50
51 from .asyncresult import AsyncResult, AsyncHubResult
51 from .asyncresult import AsyncResult, AsyncHubResult
52 from IPython.core.profiledir import ProfileDir, ProfileDirError
52 from IPython.core.profiledir import ProfileDir, ProfileDirError
53 from .view import DirectView, LoadBalancedView
53 from .view import DirectView, LoadBalancedView
54
54
55 if sys.version_info[0] >= 3:
55 if sys.version_info[0] >= 3:
56 # xrange is used in a couple 'isinstance' tests in py2
56 # xrange is used in a couple 'isinstance' tests in py2
57 # should be just 'range' in 3k
57 # should be just 'range' in 3k
58 xrange = range
58 xrange = range
59
59
60 #--------------------------------------------------------------------------
60 #--------------------------------------------------------------------------
61 # Decorators for Client methods
61 # Decorators for Client methods
62 #--------------------------------------------------------------------------
62 #--------------------------------------------------------------------------
63
63
64 @decorator
64 @decorator
65 def spin_first(f, self, *args, **kwargs):
65 def spin_first(f, self, *args, **kwargs):
66 """Call spin() to sync state prior to calling the method."""
66 """Call spin() to sync state prior to calling the method."""
67 self.spin()
67 self.spin()
68 return f(self, *args, **kwargs)
68 return f(self, *args, **kwargs)
69
69
70
70
71 #--------------------------------------------------------------------------
71 #--------------------------------------------------------------------------
72 # Classes
72 # Classes
73 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
74
74
75
75
76 class ExecuteReply(object):
76 class ExecuteReply(object):
77 """wrapper for finished Execute results"""
77 """wrapper for finished Execute results"""
78 def __init__(self, msg_id, content, metadata):
78 def __init__(self, msg_id, content, metadata):
79 self.msg_id = msg_id
79 self.msg_id = msg_id
80 self._content = content
80 self._content = content
81 self.execution_count = content['execution_count']
81 self.execution_count = content['execution_count']
82 self.metadata = metadata
82 self.metadata = metadata
83
83
84 def __getitem__(self, key):
84 def __getitem__(self, key):
85 return self.metadata[key]
85 return self.metadata[key]
86
86
87 def __getattr__(self, key):
87 def __getattr__(self, key):
88 if key not in self.metadata:
88 if key not in self.metadata:
89 raise AttributeError(key)
89 raise AttributeError(key)
90 return self.metadata[key]
90 return self.metadata[key]
91
91
92 def __repr__(self):
92 def __repr__(self):
93 pyout = self.metadata['pyout'] or {}
93 pyout = self.metadata['pyout'] or {}
94 text_out = pyout.get('text/plain', '')
94 text_out = pyout.get('text/plain', '')
95 if len(text_out) > 32:
95 if len(text_out) > 32:
96 text_out = text_out[:29] + '...'
96 text_out = text_out[:29] + '...'
97
97
98 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
98 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
99
99
100 def _repr_html_(self):
100 def _repr_html_(self):
101 pyout = self.metadata['pyout'] or {}
101 pyout = self.metadata['pyout'] or {}
102 return pyout.get("text/html")
102 return pyout.get("text/html")
103
103
104 def _repr_latex_(self):
104 def _repr_latex_(self):
105 pyout = self.metadata['pyout'] or {}
105 pyout = self.metadata['pyout'] or {}
106 return pyout.get("text/latex")
106 return pyout.get("text/latex")
107
107
108 def _repr_json_(self):
108 def _repr_json_(self):
109 pyout = self.metadata['pyout'] or {}
109 pyout = self.metadata['pyout'] or {}
110 return pyout.get("application/json")
110 return pyout.get("application/json")
111
111
112 def _repr_javascript_(self):
112 def _repr_javascript_(self):
113 pyout = self.metadata['pyout'] or {}
113 pyout = self.metadata['pyout'] or {}
114 return pyout.get("application/javascript")
114 return pyout.get("application/javascript")
115
115
116 def _repr_png_(self):
116 def _repr_png_(self):
117 pyout = self.metadata['pyout'] or {}
117 pyout = self.metadata['pyout'] or {}
118 return pyout.get("image/png")
118 return pyout.get("image/png")
119
119
120 def _repr_jpeg_(self):
120 def _repr_jpeg_(self):
121 pyout = self.metadata['pyout'] or {}
121 pyout = self.metadata['pyout'] or {}
122 return pyout.get("image/jpeg")
122 return pyout.get("image/jpeg")
123
123
124 def _repr_svg_(self):
124 def _repr_svg_(self):
125 pyout = self.metadata['pyout'] or {}
125 pyout = self.metadata['pyout'] or {}
126 return pyout.get("image/svg+xml")
126 return pyout.get("image/svg+xml")
127
127
128
128
129 class Metadata(dict):
129 class Metadata(dict):
130 """Subclass of dict for initializing metadata values.
130 """Subclass of dict for initializing metadata values.
131
131
132 Attribute access works on keys.
132 Attribute access works on keys.
133
133
134 These objects have a strict set of keys - errors will raise if you try
134 These objects have a strict set of keys - errors will raise if you try
135 to add new keys.
135 to add new keys.
136 """
136 """
137 def __init__(self, *args, **kwargs):
137 def __init__(self, *args, **kwargs):
138 dict.__init__(self)
138 dict.__init__(self)
139 md = {'msg_id' : None,
139 md = {'msg_id' : None,
140 'submitted' : None,
140 'submitted' : None,
141 'started' : None,
141 'started' : None,
142 'completed' : None,
142 'completed' : None,
143 'received' : None,
143 'received' : None,
144 'engine_uuid' : None,
144 'engine_uuid' : None,
145 'engine_id' : None,
145 'engine_id' : None,
146 'follow' : None,
146 'follow' : None,
147 'after' : None,
147 'after' : None,
148 'status' : None,
148 'status' : None,
149
149
150 'pyin' : None,
150 'pyin' : None,
151 'pyout' : None,
151 'pyout' : None,
152 'pyerr' : None,
152 'pyerr' : None,
153 'stdout' : '',
153 'stdout' : '',
154 'stderr' : '',
154 'stderr' : '',
155 'outputs' : [],
155 'outputs' : [],
156 }
156 }
157 self.update(md)
157 self.update(md)
158 self.update(dict(*args, **kwargs))
158 self.update(dict(*args, **kwargs))
159
159
160 def __getattr__(self, key):
160 def __getattr__(self, key):
161 """getattr aliased to getitem"""
161 """getattr aliased to getitem"""
162 if key in self.iterkeys():
162 if key in self.iterkeys():
163 return self[key]
163 return self[key]
164 else:
164 else:
165 raise AttributeError(key)
165 raise AttributeError(key)
166
166
167 def __setattr__(self, key, value):
167 def __setattr__(self, key, value):
168 """setattr aliased to setitem, with strict"""
168 """setattr aliased to setitem, with strict"""
169 if key in self.iterkeys():
169 if key in self.iterkeys():
170 self[key] = value
170 self[key] = value
171 else:
171 else:
172 raise AttributeError(key)
172 raise AttributeError(key)
173
173
174 def __setitem__(self, key, value):
174 def __setitem__(self, key, value):
175 """strict static key enforcement"""
175 """strict static key enforcement"""
176 if key in self.iterkeys():
176 if key in self.iterkeys():
177 dict.__setitem__(self, key, value)
177 dict.__setitem__(self, key, value)
178 else:
178 else:
179 raise KeyError(key)
179 raise KeyError(key)
180
180
181
181
182 class Client(HasTraits):
182 class Client(HasTraits):
183 """A semi-synchronous client to the IPython ZMQ cluster
183 """A semi-synchronous client to the IPython ZMQ cluster
184
184
185 Parameters
185 Parameters
186 ----------
186 ----------
187
187
188 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
188 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
189 Connection information for the Hub's registration. If a json connector
189 Connection information for the Hub's registration. If a json connector
190 file is given, then likely no further configuration is necessary.
190 file is given, then likely no further configuration is necessary.
191 [Default: use profile]
191 [Default: use profile]
192 profile : bytes
192 profile : bytes
193 The name of the Cluster profile to be used to find connector information.
193 The name of the Cluster profile to be used to find connector information.
194 If run from an IPython application, the default profile will be the same
194 If run from an IPython application, the default profile will be the same
195 as the running application, otherwise it will be 'default'.
195 as the running application, otherwise it will be 'default'.
196 context : zmq.Context
196 context : zmq.Context
197 Pass an existing zmq.Context instance, otherwise the client will create its own.
197 Pass an existing zmq.Context instance, otherwise the client will create its own.
198 debug : bool
198 debug : bool
199 flag for lots of message printing for debug purposes
199 flag for lots of message printing for debug purposes
200 timeout : int/float
200 timeout : int/float
201 time (in seconds) to wait for connection replies from the Hub
201 time (in seconds) to wait for connection replies from the Hub
202 [Default: 10]
202 [Default: 10]
203
203
204 #-------------- session related args ----------------
204 #-------------- session related args ----------------
205
205
206 config : Config object
206 config : Config object
207 If specified, this will be relayed to the Session for configuration
207 If specified, this will be relayed to the Session for configuration
208 username : str
208 username : str
209 set username for the session object
209 set username for the session object
210 packer : str (import_string) or callable
210 packer : str (import_string) or callable
211 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
211 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
212 function to serialize messages. Must support same input as
212 function to serialize messages. Must support same input as
213 JSON, and output must be bytes.
213 JSON, and output must be bytes.
214 You can pass a callable directly as `pack`
214 You can pass a callable directly as `pack`
215 unpacker : str (import_string) or callable
215 unpacker : str (import_string) or callable
216 The inverse of packer. Only necessary if packer is specified as *not* one
216 The inverse of packer. Only necessary if packer is specified as *not* one
217 of 'json' or 'pickle'.
217 of 'json' or 'pickle'.
218
218
219 #-------------- ssh related args ----------------
219 #-------------- ssh related args ----------------
220 # These are args for configuring the ssh tunnel to be used
220 # These are args for configuring the ssh tunnel to be used
221 # credentials are used to forward connections over ssh to the Controller
221 # credentials are used to forward connections over ssh to the Controller
222 # Note that the ip given in `addr` needs to be relative to sshserver
222 # Note that the ip given in `addr` needs to be relative to sshserver
223 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
223 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
224 # and set sshserver as the same machine the Controller is on. However,
224 # and set sshserver as the same machine the Controller is on. However,
225 # the only requirement is that sshserver is able to see the Controller
225 # the only requirement is that sshserver is able to see the Controller
226 # (i.e. is within the same trusted network).
226 # (i.e. is within the same trusted network).
227
227
228 sshserver : str
228 sshserver : str
229 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
229 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
230 If keyfile or password is specified, and this is not, it will default to
230 If keyfile or password is specified, and this is not, it will default to
231 the ip given in addr.
231 the ip given in addr.
232 sshkey : str; path to ssh private key file
232 sshkey : str; path to ssh private key file
233 This specifies a key to be used in ssh login, default None.
233 This specifies a key to be used in ssh login, default None.
234 Regular default ssh keys will be used without specifying this argument.
234 Regular default ssh keys will be used without specifying this argument.
235 password : str
235 password : str
236 Your ssh password to sshserver. Note that if this is left None,
236 Your ssh password to sshserver. Note that if this is left None,
237 you will be prompted for it if passwordless key based login is unavailable.
237 you will be prompted for it if passwordless key based login is unavailable.
238 paramiko : bool
238 paramiko : bool
239 flag for whether to use paramiko instead of shell ssh for tunneling.
239 flag for whether to use paramiko instead of shell ssh for tunneling.
240 [default: True on win32, False else]
240 [default: True on win32, False else]
241
241
242 ------- exec authentication args -------
242 ------- exec authentication args -------
243 If even localhost is untrusted, you can have some protection against
243 If even localhost is untrusted, you can have some protection against
244 unauthorized execution by signing messages with HMAC digests.
244 unauthorized execution by signing messages with HMAC digests.
245 Messages are still sent as cleartext, so if someone can snoop your
245 Messages are still sent as cleartext, so if someone can snoop your
246 loopback traffic this will not protect your privacy, but will prevent
246 loopback traffic this will not protect your privacy, but will prevent
247 unauthorized execution.
247 unauthorized execution.
248
248
249 exec_key : str
249 exec_key : str
250 an authentication key or file containing a key
250 an authentication key or file containing a key
251 default: None
251 default: None
252
252
253
253
254 Attributes
254 Attributes
255 ----------
255 ----------
256
256
257 ids : list of int engine IDs
257 ids : list of int engine IDs
258 requesting the ids attribute always synchronizes
258 requesting the ids attribute always synchronizes
259 the registration state. To request ids without synchronization,
259 the registration state. To request ids without synchronization,
260 use semi-private _ids attributes.
260 use semi-private _ids attributes.
261
261
262 history : list of msg_ids
262 history : list of msg_ids
263 a list of msg_ids, keeping track of all the execution
263 a list of msg_ids, keeping track of all the execution
264 messages you have submitted in order.
264 messages you have submitted in order.
265
265
266 outstanding : set of msg_ids
266 outstanding : set of msg_ids
267 a set of msg_ids that have been submitted, but whose
267 a set of msg_ids that have been submitted, but whose
268 results have not yet been received.
268 results have not yet been received.
269
269
270 results : dict
270 results : dict
271 a dict of all our results, keyed by msg_id
271 a dict of all our results, keyed by msg_id
272
272
273 block : bool
273 block : bool
274 determines default behavior when block not specified
274 determines default behavior when block not specified
275 in execution methods
275 in execution methods
276
276
277 Methods
277 Methods
278 -------
278 -------
279
279
280 spin
280 spin
281 flushes incoming results and registration state changes
281 flushes incoming results and registration state changes
282 control methods spin, and requesting `ids` also ensures up to date
282 control methods spin, and requesting `ids` also ensures up to date
283
283
284 wait
284 wait
285 wait on one or more msg_ids
285 wait on one or more msg_ids
286
286
287 execution methods
287 execution methods
288 apply
288 apply
289 legacy: execute, run
289 legacy: execute, run
290
290
291 data movement
291 data movement
292 push, pull, scatter, gather
292 push, pull, scatter, gather
293
293
294 query methods
294 query methods
295 queue_status, get_result, purge, result_status
295 queue_status, get_result, purge, result_status
296
296
297 control methods
297 control methods
298 abort, shutdown
298 abort, shutdown
299
299
300 """
300 """
301
301
302
302
303 block = Bool(False)
303 block = Bool(False)
304 outstanding = Set()
304 outstanding = Set()
305 results = Instance('collections.defaultdict', (dict,))
305 results = Instance('collections.defaultdict', (dict,))
306 metadata = Instance('collections.defaultdict', (Metadata,))
306 metadata = Instance('collections.defaultdict', (Metadata,))
307 history = List()
307 history = List()
308 debug = Bool(False)
308 debug = Bool(False)
309 _spin_thread = Any()
309 _spin_thread = Any()
310 _stop_spinning = Any()
310 _stop_spinning = Any()
311
311
312 profile=Unicode()
312 profile=Unicode()
313 def _profile_default(self):
313 def _profile_default(self):
314 if BaseIPythonApplication.initialized():
314 if BaseIPythonApplication.initialized():
315 # an IPython app *might* be running, try to get its profile
315 # an IPython app *might* be running, try to get its profile
316 try:
316 try:
317 return BaseIPythonApplication.instance().profile
317 return BaseIPythonApplication.instance().profile
318 except (AttributeError, MultipleInstanceError):
318 except (AttributeError, MultipleInstanceError):
319 # could be a *different* subclass of config.Application,
319 # could be a *different* subclass of config.Application,
320 # which would raise one of these two errors.
320 # which would raise one of these two errors.
321 return u'default'
321 return u'default'
322 else:
322 else:
323 return u'default'
323 return u'default'
324
324
325
325
326 _outstanding_dict = Instance('collections.defaultdict', (set,))
326 _outstanding_dict = Instance('collections.defaultdict', (set,))
327 _ids = List()
327 _ids = List()
328 _connected=Bool(False)
328 _connected=Bool(False)
329 _ssh=Bool(False)
329 _ssh=Bool(False)
330 _context = Instance('zmq.Context')
330 _context = Instance('zmq.Context')
331 _config = Dict()
331 _config = Dict()
332 _engines=Instance(util.ReverseDict, (), {})
332 _engines=Instance(util.ReverseDict, (), {})
333 # _hub_socket=Instance('zmq.Socket')
333 # _hub_socket=Instance('zmq.Socket')
334 _query_socket=Instance('zmq.Socket')
334 _query_socket=Instance('zmq.Socket')
335 _control_socket=Instance('zmq.Socket')
335 _control_socket=Instance('zmq.Socket')
336 _iopub_socket=Instance('zmq.Socket')
336 _iopub_socket=Instance('zmq.Socket')
337 _notification_socket=Instance('zmq.Socket')
337 _notification_socket=Instance('zmq.Socket')
338 _mux_socket=Instance('zmq.Socket')
338 _mux_socket=Instance('zmq.Socket')
339 _task_socket=Instance('zmq.Socket')
339 _task_socket=Instance('zmq.Socket')
340 _task_scheme=Unicode()
340 _task_scheme=Unicode()
341 _closed = False
341 _closed = False
342 _ignored_control_replies=Integer(0)
342 _ignored_control_replies=Integer(0)
343 _ignored_hub_replies=Integer(0)
343 _ignored_hub_replies=Integer(0)
344
344
345 def __new__(self, *args, **kw):
345 def __new__(self, *args, **kw):
346 # don't raise on positional args
346 # don't raise on positional args
347 return HasTraits.__new__(self, **kw)
347 return HasTraits.__new__(self, **kw)
348
348
349 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
349 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
350 context=None, debug=False, exec_key=None,
350 context=None, debug=False, exec_key=None,
351 sshserver=None, sshkey=None, password=None, paramiko=None,
351 sshserver=None, sshkey=None, password=None, paramiko=None,
352 timeout=10, **extra_args
352 timeout=10, **extra_args
353 ):
353 ):
354 if profile:
354 if profile:
355 super(Client, self).__init__(debug=debug, profile=profile)
355 super(Client, self).__init__(debug=debug, profile=profile)
356 else:
356 else:
357 super(Client, self).__init__(debug=debug)
357 super(Client, self).__init__(debug=debug)
358 if context is None:
358 if context is None:
359 context = zmq.Context.instance()
359 context = zmq.Context.instance()
360 self._context = context
360 self._context = context
361 self._stop_spinning = Event()
361 self._stop_spinning = Event()
362
362
363 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
363 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
364 if self._cd is not None:
364 if self._cd is not None:
365 if url_or_file is None:
365 if url_or_file is None:
366 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
366 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
367 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
367 if url_or_file is None:
368 " Please specify at least one of url_or_file or profile."
368 raise ValueError(
369 "I can't find enough information to connect to a hub!"
370 " Please specify at least one of url_or_file or profile."
371 )
369
372
370 if not util.is_url(url_or_file):
373 if not util.is_url(url_or_file):
371 # it's not a url, try for a file
374 # it's not a url, try for a file
372 if not os.path.exists(url_or_file):
375 if not os.path.exists(url_or_file):
373 if self._cd:
376 if self._cd:
374 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
377 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
375 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
378 if not os.path.exists(url_or_file):
379 raise IOError("Connection file not found: %r" % url_or_file)
376 with open(url_or_file) as f:
380 with open(url_or_file) as f:
377 cfg = json.loads(f.read())
381 cfg = json.loads(f.read())
378 else:
382 else:
379 cfg = {'url':url_or_file}
383 cfg = {'url':url_or_file}
380
384
381 # sync defaults from args, json:
385 # sync defaults from args, json:
382 if sshserver:
386 if sshserver:
383 cfg['ssh'] = sshserver
387 cfg['ssh'] = sshserver
384 if exec_key:
388 if exec_key:
385 cfg['exec_key'] = exec_key
389 cfg['exec_key'] = exec_key
386 exec_key = cfg['exec_key']
390 exec_key = cfg['exec_key']
387 location = cfg.setdefault('location', None)
391 location = cfg.setdefault('location', None)
388 cfg['url'] = util.disambiguate_url(cfg['url'], location)
392 cfg['url'] = util.disambiguate_url(cfg['url'], location)
389 url = cfg['url']
393 url = cfg['url']
390 proto,addr,port = util.split_url(url)
394 proto,addr,port = util.split_url(url)
391 if location is not None and addr == '127.0.0.1':
395 if location is not None and addr == '127.0.0.1':
392 # location specified, and connection is expected to be local
396 # location specified, and connection is expected to be local
393 if location not in LOCAL_IPS and not sshserver:
397 if location not in LOCAL_IPS and not sshserver:
394 # load ssh from JSON *only* if the controller is not on
398 # load ssh from JSON *only* if the controller is not on
395 # this machine
399 # this machine
396 sshserver=cfg['ssh']
400 sshserver=cfg['ssh']
397 if location not in LOCAL_IPS and not sshserver:
401 if location not in LOCAL_IPS and not sshserver:
398 # warn if no ssh specified, but SSH is probably needed
402 # warn if no ssh specified, but SSH is probably needed
399 # This is only a warning, because the most likely cause
403 # This is only a warning, because the most likely cause
400 # is a local Controller on a laptop whose IP is dynamic
404 # is a local Controller on a laptop whose IP is dynamic
401 warnings.warn("""
405 warnings.warn("""
402 Controller appears to be listening on localhost, but not on this machine.
406 Controller appears to be listening on localhost, but not on this machine.
403 If this is true, you should specify Client(...,sshserver='you@%s')
407 If this is true, you should specify Client(...,sshserver='you@%s')
404 or instruct your controller to listen on an external IP."""%location,
408 or instruct your controller to listen on an external IP."""%location,
405 RuntimeWarning)
409 RuntimeWarning)
406 elif not sshserver:
410 elif not sshserver:
407 # otherwise sync with cfg
411 # otherwise sync with cfg
408 sshserver = cfg['ssh']
412 sshserver = cfg['ssh']
409
413
410 self._config = cfg
414 self._config = cfg
411
415
412 self._ssh = bool(sshserver or sshkey or password)
416 self._ssh = bool(sshserver or sshkey or password)
413 if self._ssh and sshserver is None:
417 if self._ssh and sshserver is None:
414 # default to ssh via localhost
418 # default to ssh via localhost
415 sshserver = url.split('://')[1].split(':')[0]
419 sshserver = url.split('://')[1].split(':')[0]
416 if self._ssh and password is None:
420 if self._ssh and password is None:
417 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
421 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
418 password=False
422 password=False
419 else:
423 else:
420 password = getpass("SSH Password for %s: "%sshserver)
424 password = getpass("SSH Password for %s: "%sshserver)
421 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
425 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
422
426
423 # configure and construct the session
427 # configure and construct the session
424 if exec_key is not None:
428 if exec_key is not None:
425 if os.path.isfile(exec_key):
429 if os.path.isfile(exec_key):
426 extra_args['keyfile'] = exec_key
430 extra_args['keyfile'] = exec_key
427 else:
431 else:
428 exec_key = cast_bytes(exec_key)
432 exec_key = cast_bytes(exec_key)
429 extra_args['key'] = exec_key
433 extra_args['key'] = exec_key
430 self.session = Session(**extra_args)
434 self.session = Session(**extra_args)
431
435
432 self._query_socket = self._context.socket(zmq.DEALER)
436 self._query_socket = self._context.socket(zmq.DEALER)
433 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
437 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
434 if self._ssh:
438 if self._ssh:
435 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
439 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
436 else:
440 else:
437 self._query_socket.connect(url)
441 self._query_socket.connect(url)
438
442
439 self.session.debug = self.debug
443 self.session.debug = self.debug
440
444
441 self._notification_handlers = {'registration_notification' : self._register_engine,
445 self._notification_handlers = {'registration_notification' : self._register_engine,
442 'unregistration_notification' : self._unregister_engine,
446 'unregistration_notification' : self._unregister_engine,
443 'shutdown_notification' : lambda msg: self.close(),
447 'shutdown_notification' : lambda msg: self.close(),
444 }
448 }
445 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
449 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
446 'apply_reply' : self._handle_apply_reply}
450 'apply_reply' : self._handle_apply_reply}
447 self._connect(sshserver, ssh_kwargs, timeout)
451 self._connect(sshserver, ssh_kwargs, timeout)
448
452
449 def __del__(self):
453 def __del__(self):
450 """cleanup sockets, but _not_ context."""
454 """cleanup sockets, but _not_ context."""
451 self.close()
455 self.close()
452
456
453 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
457 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
454 if ipython_dir is None:
458 if ipython_dir is None:
455 ipython_dir = get_ipython_dir()
459 ipython_dir = get_ipython_dir()
456 if profile_dir is not None:
460 if profile_dir is not None:
457 try:
461 try:
458 self._cd = ProfileDir.find_profile_dir(profile_dir)
462 self._cd = ProfileDir.find_profile_dir(profile_dir)
459 return
463 return
460 except ProfileDirError:
464 except ProfileDirError:
461 pass
465 pass
462 elif profile is not None:
466 elif profile is not None:
463 try:
467 try:
464 self._cd = ProfileDir.find_profile_dir_by_name(
468 self._cd = ProfileDir.find_profile_dir_by_name(
465 ipython_dir, profile)
469 ipython_dir, profile)
466 return
470 return
467 except ProfileDirError:
471 except ProfileDirError:
468 pass
472 pass
469 self._cd = None
473 self._cd = None
470
474
471 def _update_engines(self, engines):
475 def _update_engines(self, engines):
472 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
476 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
473 for k,v in engines.iteritems():
477 for k,v in engines.iteritems():
474 eid = int(k)
478 eid = int(k)
475 self._engines[eid] = v
479 self._engines[eid] = v
476 self._ids.append(eid)
480 self._ids.append(eid)
477 self._ids = sorted(self._ids)
481 self._ids = sorted(self._ids)
478 if sorted(self._engines.keys()) != range(len(self._engines)) and \
482 if sorted(self._engines.keys()) != range(len(self._engines)) and \
479 self._task_scheme == 'pure' and self._task_socket:
483 self._task_scheme == 'pure' and self._task_socket:
480 self._stop_scheduling_tasks()
484 self._stop_scheduling_tasks()
481
485
482 def _stop_scheduling_tasks(self):
486 def _stop_scheduling_tasks(self):
483 """Stop scheduling tasks because an engine has been unregistered
487 """Stop scheduling tasks because an engine has been unregistered
484 from a pure ZMQ scheduler.
488 from a pure ZMQ scheduler.
485 """
489 """
486 self._task_socket.close()
490 self._task_socket.close()
487 self._task_socket = None
491 self._task_socket = None
488 msg = "An engine has been unregistered, and we are using pure " +\
492 msg = "An engine has been unregistered, and we are using pure " +\
489 "ZMQ task scheduling. Task farming will be disabled."
493 "ZMQ task scheduling. Task farming will be disabled."
490 if self.outstanding:
494 if self.outstanding:
491 msg += " If you were running tasks when this happened, " +\
495 msg += " If you were running tasks when this happened, " +\
492 "some `outstanding` msg_ids may never resolve."
496 "some `outstanding` msg_ids may never resolve."
493 warnings.warn(msg, RuntimeWarning)
497 warnings.warn(msg, RuntimeWarning)
494
498
495 def _build_targets(self, targets):
499 def _build_targets(self, targets):
496 """Turn valid target IDs or 'all' into two lists:
500 """Turn valid target IDs or 'all' into two lists:
497 (int_ids, uuids).
501 (int_ids, uuids).
498 """
502 """
499 if not self._ids:
503 if not self._ids:
500 # flush notification socket if no engines yet, just in case
504 # flush notification socket if no engines yet, just in case
501 if not self.ids:
505 if not self.ids:
502 raise error.NoEnginesRegistered("Can't build targets without any engines")
506 raise error.NoEnginesRegistered("Can't build targets without any engines")
503
507
504 if targets is None:
508 if targets is None:
505 targets = self._ids
509 targets = self._ids
506 elif isinstance(targets, basestring):
510 elif isinstance(targets, basestring):
507 if targets.lower() == 'all':
511 if targets.lower() == 'all':
508 targets = self._ids
512 targets = self._ids
509 else:
513 else:
510 raise TypeError("%r not valid str target, must be 'all'"%(targets))
514 raise TypeError("%r not valid str target, must be 'all'"%(targets))
511 elif isinstance(targets, int):
515 elif isinstance(targets, int):
512 if targets < 0:
516 if targets < 0:
513 targets = self.ids[targets]
517 targets = self.ids[targets]
514 if targets not in self._ids:
518 if targets not in self._ids:
515 raise IndexError("No such engine: %i"%targets)
519 raise IndexError("No such engine: %i"%targets)
516 targets = [targets]
520 targets = [targets]
517
521
518 if isinstance(targets, slice):
522 if isinstance(targets, slice):
519 indices = range(len(self._ids))[targets]
523 indices = range(len(self._ids))[targets]
520 ids = self.ids
524 ids = self.ids
521 targets = [ ids[i] for i in indices ]
525 targets = [ ids[i] for i in indices ]
522
526
523 if not isinstance(targets, (tuple, list, xrange)):
527 if not isinstance(targets, (tuple, list, xrange)):
524 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
528 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
525
529
526 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
530 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
527
531
528 def _connect(self, sshserver, ssh_kwargs, timeout):
532 def _connect(self, sshserver, ssh_kwargs, timeout):
529 """setup all our socket connections to the cluster. This is called from
533 """setup all our socket connections to the cluster. This is called from
530 __init__."""
534 __init__."""
531
535
532 # Maybe allow reconnecting?
536 # Maybe allow reconnecting?
533 if self._connected:
537 if self._connected:
534 return
538 return
535 self._connected=True
539 self._connected=True
536
540
537 def connect_socket(s, url):
541 def connect_socket(s, url):
538 url = util.disambiguate_url(url, self._config['location'])
542 url = util.disambiguate_url(url, self._config['location'])
539 if self._ssh:
543 if self._ssh:
540 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
544 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
541 else:
545 else:
542 return s.connect(url)
546 return s.connect(url)
543
547
544 self.session.send(self._query_socket, 'connection_request')
548 self.session.send(self._query_socket, 'connection_request')
545 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
549 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
546 poller = zmq.Poller()
550 poller = zmq.Poller()
547 poller.register(self._query_socket, zmq.POLLIN)
551 poller.register(self._query_socket, zmq.POLLIN)
548 # poll expects milliseconds, timeout is seconds
552 # poll expects milliseconds, timeout is seconds
549 evts = poller.poll(timeout*1000)
553 evts = poller.poll(timeout*1000)
550 if not evts:
554 if not evts:
551 raise error.TimeoutError("Hub connection request timed out")
555 raise error.TimeoutError("Hub connection request timed out")
552 idents,msg = self.session.recv(self._query_socket,mode=0)
556 idents,msg = self.session.recv(self._query_socket,mode=0)
553 if self.debug:
557 if self.debug:
554 pprint(msg)
558 pprint(msg)
555 msg = Message(msg)
559 msg = Message(msg)
556 content = msg.content
560 content = msg.content
557 self._config['registration'] = dict(content)
561 self._config['registration'] = dict(content)
558 if content.status == 'ok':
562 if content.status == 'ok':
559 ident = self.session.bsession
563 ident = self.session.bsession
560 if content.mux:
564 if content.mux:
561 self._mux_socket = self._context.socket(zmq.DEALER)
565 self._mux_socket = self._context.socket(zmq.DEALER)
562 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
566 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
563 connect_socket(self._mux_socket, content.mux)
567 connect_socket(self._mux_socket, content.mux)
564 if content.task:
568 if content.task:
565 self._task_scheme, task_addr = content.task
569 self._task_scheme, task_addr = content.task
566 self._task_socket = self._context.socket(zmq.DEALER)
570 self._task_socket = self._context.socket(zmq.DEALER)
567 self._task_socket.setsockopt(zmq.IDENTITY, ident)
571 self._task_socket.setsockopt(zmq.IDENTITY, ident)
568 connect_socket(self._task_socket, task_addr)
572 connect_socket(self._task_socket, task_addr)
569 if content.notification:
573 if content.notification:
570 self._notification_socket = self._context.socket(zmq.SUB)
574 self._notification_socket = self._context.socket(zmq.SUB)
571 connect_socket(self._notification_socket, content.notification)
575 connect_socket(self._notification_socket, content.notification)
572 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
576 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
573 # if content.query:
577 # if content.query:
574 # self._query_socket = self._context.socket(zmq.DEALER)
578 # self._query_socket = self._context.socket(zmq.DEALER)
575 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
579 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
576 # connect_socket(self._query_socket, content.query)
580 # connect_socket(self._query_socket, content.query)
577 if content.control:
581 if content.control:
578 self._control_socket = self._context.socket(zmq.DEALER)
582 self._control_socket = self._context.socket(zmq.DEALER)
579 self._control_socket.setsockopt(zmq.IDENTITY, ident)
583 self._control_socket.setsockopt(zmq.IDENTITY, ident)
580 connect_socket(self._control_socket, content.control)
584 connect_socket(self._control_socket, content.control)
581 if content.iopub:
585 if content.iopub:
582 self._iopub_socket = self._context.socket(zmq.SUB)
586 self._iopub_socket = self._context.socket(zmq.SUB)
583 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
587 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
584 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
588 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
585 connect_socket(self._iopub_socket, content.iopub)
589 connect_socket(self._iopub_socket, content.iopub)
586 self._update_engines(dict(content.engines))
590 self._update_engines(dict(content.engines))
587 else:
591 else:
588 self._connected = False
592 self._connected = False
589 raise Exception("Failed to connect!")
593 raise Exception("Failed to connect!")
590
594
591 #--------------------------------------------------------------------------
595 #--------------------------------------------------------------------------
592 # handlers and callbacks for incoming messages
596 # handlers and callbacks for incoming messages
593 #--------------------------------------------------------------------------
597 #--------------------------------------------------------------------------
594
598
595 def _unwrap_exception(self, content):
599 def _unwrap_exception(self, content):
596 """unwrap exception, and remap engine_id to int."""
600 """unwrap exception, and remap engine_id to int."""
597 e = error.unwrap_exception(content)
601 e = error.unwrap_exception(content)
598 # print e.traceback
602 # print e.traceback
599 if e.engine_info:
603 if e.engine_info:
600 e_uuid = e.engine_info['engine_uuid']
604 e_uuid = e.engine_info['engine_uuid']
601 eid = self._engines[e_uuid]
605 eid = self._engines[e_uuid]
602 e.engine_info['engine_id'] = eid
606 e.engine_info['engine_id'] = eid
603 return e
607 return e
604
608
605 def _extract_metadata(self, header, parent, content):
609 def _extract_metadata(self, header, parent, content):
606 md = {'msg_id' : parent['msg_id'],
610 md = {'msg_id' : parent['msg_id'],
607 'received' : datetime.now(),
611 'received' : datetime.now(),
608 'engine_uuid' : header.get('engine', None),
612 'engine_uuid' : header.get('engine', None),
609 'follow' : parent.get('follow', []),
613 'follow' : parent.get('follow', []),
610 'after' : parent.get('after', []),
614 'after' : parent.get('after', []),
611 'status' : content['status'],
615 'status' : content['status'],
612 }
616 }
613
617
614 if md['engine_uuid'] is not None:
618 if md['engine_uuid'] is not None:
615 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
619 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
616
620
617 if 'date' in parent:
621 if 'date' in parent:
618 md['submitted'] = parent['date']
622 md['submitted'] = parent['date']
619 if 'started' in header:
623 if 'started' in header:
620 md['started'] = header['started']
624 md['started'] = header['started']
621 if 'date' in header:
625 if 'date' in header:
622 md['completed'] = header['date']
626 md['completed'] = header['date']
623 return md
627 return md
624
628
625 def _register_engine(self, msg):
629 def _register_engine(self, msg):
626 """Register a new engine, and update our connection info."""
630 """Register a new engine, and update our connection info."""
627 content = msg['content']
631 content = msg['content']
628 eid = content['id']
632 eid = content['id']
629 d = {eid : content['queue']}
633 d = {eid : content['queue']}
630 self._update_engines(d)
634 self._update_engines(d)
631
635
632 def _unregister_engine(self, msg):
636 def _unregister_engine(self, msg):
633 """Unregister an engine that has died."""
637 """Unregister an engine that has died."""
634 content = msg['content']
638 content = msg['content']
635 eid = int(content['id'])
639 eid = int(content['id'])
636 if eid in self._ids:
640 if eid in self._ids:
637 self._ids.remove(eid)
641 self._ids.remove(eid)
638 uuid = self._engines.pop(eid)
642 uuid = self._engines.pop(eid)
639
643
640 self._handle_stranded_msgs(eid, uuid)
644 self._handle_stranded_msgs(eid, uuid)
641
645
642 if self._task_socket and self._task_scheme == 'pure':
646 if self._task_socket and self._task_scheme == 'pure':
643 self._stop_scheduling_tasks()
647 self._stop_scheduling_tasks()
644
648
645 def _handle_stranded_msgs(self, eid, uuid):
649 def _handle_stranded_msgs(self, eid, uuid):
646 """Handle messages known to be on an engine when the engine unregisters.
650 """Handle messages known to be on an engine when the engine unregisters.
647
651
648 It is possible that this will fire prematurely - that is, an engine will
652 It is possible that this will fire prematurely - that is, an engine will
649 go down after completing a result, and the client will be notified
653 go down after completing a result, and the client will be notified
650 of the unregistration and later receive the successful result.
654 of the unregistration and later receive the successful result.
651 """
655 """
652
656
653 outstanding = self._outstanding_dict[uuid]
657 outstanding = self._outstanding_dict[uuid]
654
658
655 for msg_id in list(outstanding):
659 for msg_id in list(outstanding):
656 if msg_id in self.results:
660 if msg_id in self.results:
657 # we already
661 # we already
658 continue
662 continue
659 try:
663 try:
660 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
664 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
661 except:
665 except:
662 content = error.wrap_exception()
666 content = error.wrap_exception()
663 # build a fake message:
667 # build a fake message:
664 parent = {}
668 parent = {}
665 header = {}
669 header = {}
666 parent['msg_id'] = msg_id
670 parent['msg_id'] = msg_id
667 header['engine'] = uuid
671 header['engine'] = uuid
668 header['date'] = datetime.now()
672 header['date'] = datetime.now()
669 msg = dict(parent_header=parent, header=header, content=content)
673 msg = dict(parent_header=parent, header=header, content=content)
670 self._handle_apply_reply(msg)
674 self._handle_apply_reply(msg)
671
675
672 def _handle_execute_reply(self, msg):
676 def _handle_execute_reply(self, msg):
673 """Save the reply to an execute_request into our results.
677 """Save the reply to an execute_request into our results.
674
678
675 execute messages are never actually used. apply is used instead.
679 execute messages are never actually used. apply is used instead.
676 """
680 """
677
681
678 parent = msg['parent_header']
682 parent = msg['parent_header']
679 msg_id = parent['msg_id']
683 msg_id = parent['msg_id']
680 if msg_id not in self.outstanding:
684 if msg_id not in self.outstanding:
681 if msg_id in self.history:
685 if msg_id in self.history:
682 print ("got stale result: %s"%msg_id)
686 print ("got stale result: %s"%msg_id)
683 else:
687 else:
684 print ("got unknown result: %s"%msg_id)
688 print ("got unknown result: %s"%msg_id)
685 else:
689 else:
686 self.outstanding.remove(msg_id)
690 self.outstanding.remove(msg_id)
687
691
688 content = msg['content']
692 content = msg['content']
689 header = msg['header']
693 header = msg['header']
690
694
691 # construct metadata:
695 # construct metadata:
692 md = self.metadata[msg_id]
696 md = self.metadata[msg_id]
693 md.update(self._extract_metadata(header, parent, content))
697 md.update(self._extract_metadata(header, parent, content))
694 # is this redundant?
698 # is this redundant?
695 self.metadata[msg_id] = md
699 self.metadata[msg_id] = md
696
700
697 e_outstanding = self._outstanding_dict[md['engine_uuid']]
701 e_outstanding = self._outstanding_dict[md['engine_uuid']]
698 if msg_id in e_outstanding:
702 if msg_id in e_outstanding:
699 e_outstanding.remove(msg_id)
703 e_outstanding.remove(msg_id)
700
704
701 # construct result:
705 # construct result:
702 if content['status'] == 'ok':
706 if content['status'] == 'ok':
703 self.results[msg_id] = ExecuteReply(msg_id, content, md)
707 self.results[msg_id] = ExecuteReply(msg_id, content, md)
704 elif content['status'] == 'aborted':
708 elif content['status'] == 'aborted':
705 self.results[msg_id] = error.TaskAborted(msg_id)
709 self.results[msg_id] = error.TaskAborted(msg_id)
706 elif content['status'] == 'resubmitted':
710 elif content['status'] == 'resubmitted':
707 # TODO: handle resubmission
711 # TODO: handle resubmission
708 pass
712 pass
709 else:
713 else:
710 self.results[msg_id] = self._unwrap_exception(content)
714 self.results[msg_id] = self._unwrap_exception(content)
711
715
712 def _handle_apply_reply(self, msg):
716 def _handle_apply_reply(self, msg):
713 """Save the reply to an apply_request into our results."""
717 """Save the reply to an apply_request into our results."""
714 parent = msg['parent_header']
718 parent = msg['parent_header']
715 msg_id = parent['msg_id']
719 msg_id = parent['msg_id']
716 if msg_id not in self.outstanding:
720 if msg_id not in self.outstanding:
717 if msg_id in self.history:
721 if msg_id in self.history:
718 print ("got stale result: %s"%msg_id)
722 print ("got stale result: %s"%msg_id)
719 print self.results[msg_id]
723 print self.results[msg_id]
720 print msg
724 print msg
721 else:
725 else:
722 print ("got unknown result: %s"%msg_id)
726 print ("got unknown result: %s"%msg_id)
723 else:
727 else:
724 self.outstanding.remove(msg_id)
728 self.outstanding.remove(msg_id)
725 content = msg['content']
729 content = msg['content']
726 header = msg['header']
730 header = msg['header']
727
731
728 # construct metadata:
732 # construct metadata:
729 md = self.metadata[msg_id]
733 md = self.metadata[msg_id]
730 md.update(self._extract_metadata(header, parent, content))
734 md.update(self._extract_metadata(header, parent, content))
731 # is this redundant?
735 # is this redundant?
732 self.metadata[msg_id] = md
736 self.metadata[msg_id] = md
733
737
734 e_outstanding = self._outstanding_dict[md['engine_uuid']]
738 e_outstanding = self._outstanding_dict[md['engine_uuid']]
735 if msg_id in e_outstanding:
739 if msg_id in e_outstanding:
736 e_outstanding.remove(msg_id)
740 e_outstanding.remove(msg_id)
737
741
738 # construct result:
742 # construct result:
739 if content['status'] == 'ok':
743 if content['status'] == 'ok':
740 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
744 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
741 elif content['status'] == 'aborted':
745 elif content['status'] == 'aborted':
742 self.results[msg_id] = error.TaskAborted(msg_id)
746 self.results[msg_id] = error.TaskAborted(msg_id)
743 elif content['status'] == 'resubmitted':
747 elif content['status'] == 'resubmitted':
744 # TODO: handle resubmission
748 # TODO: handle resubmission
745 pass
749 pass
746 else:
750 else:
747 self.results[msg_id] = self._unwrap_exception(content)
751 self.results[msg_id] = self._unwrap_exception(content)
748
752
749 def _flush_notifications(self):
753 def _flush_notifications(self):
750 """Flush notifications of engine registrations waiting
754 """Flush notifications of engine registrations waiting
751 in ZMQ queue."""
755 in ZMQ queue."""
752 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
756 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
753 while msg is not None:
757 while msg is not None:
754 if self.debug:
758 if self.debug:
755 pprint(msg)
759 pprint(msg)
756 msg_type = msg['header']['msg_type']
760 msg_type = msg['header']['msg_type']
757 handler = self._notification_handlers.get(msg_type, None)
761 handler = self._notification_handlers.get(msg_type, None)
758 if handler is None:
762 if handler is None:
759 raise Exception("Unhandled message type: %s"%msg.msg_type)
763 raise Exception("Unhandled message type: %s"%msg.msg_type)
760 else:
764 else:
761 handler(msg)
765 handler(msg)
762 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
766 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
763
767
764 def _flush_results(self, sock):
768 def _flush_results(self, sock):
765 """Flush task or queue results waiting in ZMQ queue."""
769 """Flush task or queue results waiting in ZMQ queue."""
766 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
770 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
767 while msg is not None:
771 while msg is not None:
768 if self.debug:
772 if self.debug:
769 pprint(msg)
773 pprint(msg)
770 msg_type = msg['header']['msg_type']
774 msg_type = msg['header']['msg_type']
771 handler = self._queue_handlers.get(msg_type, None)
775 handler = self._queue_handlers.get(msg_type, None)
772 if handler is None:
776 if handler is None:
773 raise Exception("Unhandled message type: %s"%msg.msg_type)
777 raise Exception("Unhandled message type: %s"%msg.msg_type)
774 else:
778 else:
775 handler(msg)
779 handler(msg)
776 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
780 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
777
781
778 def _flush_control(self, sock):
782 def _flush_control(self, sock):
779 """Flush replies from the control channel waiting
783 """Flush replies from the control channel waiting
780 in the ZMQ queue.
784 in the ZMQ queue.
781
785
782 Currently: ignore them."""
786 Currently: ignore them."""
783 if self._ignored_control_replies <= 0:
787 if self._ignored_control_replies <= 0:
784 return
788 return
785 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
789 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
786 while msg is not None:
790 while msg is not None:
787 self._ignored_control_replies -= 1
791 self._ignored_control_replies -= 1
788 if self.debug:
792 if self.debug:
789 pprint(msg)
793 pprint(msg)
790 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
794 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
791
795
792 def _flush_ignored_control(self):
796 def _flush_ignored_control(self):
793 """flush ignored control replies"""
797 """flush ignored control replies"""
794 while self._ignored_control_replies > 0:
798 while self._ignored_control_replies > 0:
795 self.session.recv(self._control_socket)
799 self.session.recv(self._control_socket)
796 self._ignored_control_replies -= 1
800 self._ignored_control_replies -= 1
797
801
798 def _flush_ignored_hub_replies(self):
802 def _flush_ignored_hub_replies(self):
799 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
803 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
800 while msg is not None:
804 while msg is not None:
801 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
805 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
802
806
803 def _flush_iopub(self, sock):
807 def _flush_iopub(self, sock):
804 """Flush replies from the iopub channel waiting
808 """Flush replies from the iopub channel waiting
805 in the ZMQ queue.
809 in the ZMQ queue.
806 """
810 """
807 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
811 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
808 while msg is not None:
812 while msg is not None:
809 if self.debug:
813 if self.debug:
810 pprint(msg)
814 pprint(msg)
811 parent = msg['parent_header']
815 parent = msg['parent_header']
812 # ignore IOPub messages with no parent.
816 # ignore IOPub messages with no parent.
813 # Caused by print statements or warnings from before the first execution.
817 # Caused by print statements or warnings from before the first execution.
814 if not parent:
818 if not parent:
815 continue
819 continue
816 msg_id = parent['msg_id']
820 msg_id = parent['msg_id']
817 content = msg['content']
821 content = msg['content']
818 header = msg['header']
822 header = msg['header']
819 msg_type = msg['header']['msg_type']
823 msg_type = msg['header']['msg_type']
820
824
821 # init metadata:
825 # init metadata:
822 md = self.metadata[msg_id]
826 md = self.metadata[msg_id]
823
827
824 if msg_type == 'stream':
828 if msg_type == 'stream':
825 name = content['name']
829 name = content['name']
826 s = md[name] or ''
830 s = md[name] or ''
827 md[name] = s + content['data']
831 md[name] = s + content['data']
828 elif msg_type == 'pyerr':
832 elif msg_type == 'pyerr':
829 md.update({'pyerr' : self._unwrap_exception(content)})
833 md.update({'pyerr' : self._unwrap_exception(content)})
830 elif msg_type == 'pyin':
834 elif msg_type == 'pyin':
831 md.update({'pyin' : content['code']})
835 md.update({'pyin' : content['code']})
832 elif msg_type == 'display_data':
836 elif msg_type == 'display_data':
833 md['outputs'].append(content.get('data'))
837 md['outputs'].append(content.get('data'))
834 elif msg_type == 'pyout':
838 elif msg_type == 'pyout':
835 md['pyout'] = content.get('data')
839 md['pyout'] = content.get('data')
836 else:
840 else:
837 # unhandled msg_type (status, etc.)
841 # unhandled msg_type (status, etc.)
838 pass
842 pass
839
843
840 # reduntant?
844 # reduntant?
841 self.metadata[msg_id] = md
845 self.metadata[msg_id] = md
842
846
843 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
847 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
844
848
845 #--------------------------------------------------------------------------
849 #--------------------------------------------------------------------------
846 # len, getitem
850 # len, getitem
847 #--------------------------------------------------------------------------
851 #--------------------------------------------------------------------------
848
852
849 def __len__(self):
853 def __len__(self):
850 """len(client) returns # of engines."""
854 """len(client) returns # of engines."""
851 return len(self.ids)
855 return len(self.ids)
852
856
853 def __getitem__(self, key):
857 def __getitem__(self, key):
854 """index access returns DirectView multiplexer objects
858 """index access returns DirectView multiplexer objects
855
859
856 Must be int, slice, or list/tuple/xrange of ints"""
860 Must be int, slice, or list/tuple/xrange of ints"""
857 if not isinstance(key, (int, slice, tuple, list, xrange)):
861 if not isinstance(key, (int, slice, tuple, list, xrange)):
858 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
862 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
859 else:
863 else:
860 return self.direct_view(key)
864 return self.direct_view(key)
861
865
862 #--------------------------------------------------------------------------
866 #--------------------------------------------------------------------------
863 # Begin public methods
867 # Begin public methods
864 #--------------------------------------------------------------------------
868 #--------------------------------------------------------------------------
865
869
866 @property
870 @property
867 def ids(self):
871 def ids(self):
868 """Always up-to-date ids property."""
872 """Always up-to-date ids property."""
869 self._flush_notifications()
873 self._flush_notifications()
870 # always copy:
874 # always copy:
871 return list(self._ids)
875 return list(self._ids)
872
876
873 def close(self):
877 def close(self):
874 if self._closed:
878 if self._closed:
875 return
879 return
876 self.stop_spin_thread()
880 self.stop_spin_thread()
877 snames = filter(lambda n: n.endswith('socket'), dir(self))
881 snames = filter(lambda n: n.endswith('socket'), dir(self))
878 for socket in map(lambda name: getattr(self, name), snames):
882 for socket in map(lambda name: getattr(self, name), snames):
879 if isinstance(socket, zmq.Socket) and not socket.closed:
883 if isinstance(socket, zmq.Socket) and not socket.closed:
880 socket.close()
884 socket.close()
881 self._closed = True
885 self._closed = True
882
886
883 def _spin_every(self, interval=1):
887 def _spin_every(self, interval=1):
884 """target func for use in spin_thread"""
888 """target func for use in spin_thread"""
885 while True:
889 while True:
886 if self._stop_spinning.is_set():
890 if self._stop_spinning.is_set():
887 return
891 return
888 time.sleep(interval)
892 time.sleep(interval)
889 self.spin()
893 self.spin()
890
894
891 def spin_thread(self, interval=1):
895 def spin_thread(self, interval=1):
892 """call Client.spin() in a background thread on some regular interval
896 """call Client.spin() in a background thread on some regular interval
893
897
894 This helps ensure that messages don't pile up too much in the zmq queue
898 This helps ensure that messages don't pile up too much in the zmq queue
895 while you are working on other things, or just leaving an idle terminal.
899 while you are working on other things, or just leaving an idle terminal.
896
900
897 It also helps limit potential padding of the `received` timestamp
901 It also helps limit potential padding of the `received` timestamp
898 on AsyncResult objects, used for timings.
902 on AsyncResult objects, used for timings.
899
903
900 Parameters
904 Parameters
901 ----------
905 ----------
902
906
903 interval : float, optional
907 interval : float, optional
904 The interval on which to spin the client in the background thread
908 The interval on which to spin the client in the background thread
905 (simply passed to time.sleep).
909 (simply passed to time.sleep).
906
910
907 Notes
911 Notes
908 -----
912 -----
909
913
910 For precision timing, you may want to use this method to put a bound
914 For precision timing, you may want to use this method to put a bound
911 on the jitter (in seconds) in `received` timestamps used
915 on the jitter (in seconds) in `received` timestamps used
912 in AsyncResult.wall_time.
916 in AsyncResult.wall_time.
913
917
914 """
918 """
915 if self._spin_thread is not None:
919 if self._spin_thread is not None:
916 self.stop_spin_thread()
920 self.stop_spin_thread()
917 self._stop_spinning.clear()
921 self._stop_spinning.clear()
918 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
922 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
919 self._spin_thread.daemon = True
923 self._spin_thread.daemon = True
920 self._spin_thread.start()
924 self._spin_thread.start()
921
925
922 def stop_spin_thread(self):
926 def stop_spin_thread(self):
923 """stop background spin_thread, if any"""
927 """stop background spin_thread, if any"""
924 if self._spin_thread is not None:
928 if self._spin_thread is not None:
925 self._stop_spinning.set()
929 self._stop_spinning.set()
926 self._spin_thread.join()
930 self._spin_thread.join()
927 self._spin_thread = None
931 self._spin_thread = None
928
932
929 def spin(self):
933 def spin(self):
930 """Flush any registration notifications and execution results
934 """Flush any registration notifications and execution results
931 waiting in the ZMQ queue.
935 waiting in the ZMQ queue.
932 """
936 """
933 if self._notification_socket:
937 if self._notification_socket:
934 self._flush_notifications()
938 self._flush_notifications()
935 if self._iopub_socket:
939 if self._iopub_socket:
936 self._flush_iopub(self._iopub_socket)
940 self._flush_iopub(self._iopub_socket)
937 if self._mux_socket:
941 if self._mux_socket:
938 self._flush_results(self._mux_socket)
942 self._flush_results(self._mux_socket)
939 if self._task_socket:
943 if self._task_socket:
940 self._flush_results(self._task_socket)
944 self._flush_results(self._task_socket)
941 if self._control_socket:
945 if self._control_socket:
942 self._flush_control(self._control_socket)
946 self._flush_control(self._control_socket)
943 if self._query_socket:
947 if self._query_socket:
944 self._flush_ignored_hub_replies()
948 self._flush_ignored_hub_replies()
945
949
946 def wait(self, jobs=None, timeout=-1):
950 def wait(self, jobs=None, timeout=-1):
947 """waits on one or more `jobs`, for up to `timeout` seconds.
951 """waits on one or more `jobs`, for up to `timeout` seconds.
948
952
949 Parameters
953 Parameters
950 ----------
954 ----------
951
955
952 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
956 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
953 ints are indices to self.history
957 ints are indices to self.history
954 strs are msg_ids
958 strs are msg_ids
955 default: wait on all outstanding messages
959 default: wait on all outstanding messages
956 timeout : float
960 timeout : float
957 a time in seconds, after which to give up.
961 a time in seconds, after which to give up.
958 default is -1, which means no timeout
962 default is -1, which means no timeout
959
963
960 Returns
964 Returns
961 -------
965 -------
962
966
963 True : when all msg_ids are done
967 True : when all msg_ids are done
964 False : timeout reached, some msg_ids still outstanding
968 False : timeout reached, some msg_ids still outstanding
965 """
969 """
966 tic = time.time()
970 tic = time.time()
967 if jobs is None:
971 if jobs is None:
968 theids = self.outstanding
972 theids = self.outstanding
969 else:
973 else:
970 if isinstance(jobs, (int, basestring, AsyncResult)):
974 if isinstance(jobs, (int, basestring, AsyncResult)):
971 jobs = [jobs]
975 jobs = [jobs]
972 theids = set()
976 theids = set()
973 for job in jobs:
977 for job in jobs:
974 if isinstance(job, int):
978 if isinstance(job, int):
975 # index access
979 # index access
976 job = self.history[job]
980 job = self.history[job]
977 elif isinstance(job, AsyncResult):
981 elif isinstance(job, AsyncResult):
978 map(theids.add, job.msg_ids)
982 map(theids.add, job.msg_ids)
979 continue
983 continue
980 theids.add(job)
984 theids.add(job)
981 if not theids.intersection(self.outstanding):
985 if not theids.intersection(self.outstanding):
982 return True
986 return True
983 self.spin()
987 self.spin()
984 while theids.intersection(self.outstanding):
988 while theids.intersection(self.outstanding):
985 if timeout >= 0 and ( time.time()-tic ) > timeout:
989 if timeout >= 0 and ( time.time()-tic ) > timeout:
986 break
990 break
987 time.sleep(1e-3)
991 time.sleep(1e-3)
988 self.spin()
992 self.spin()
989 return len(theids.intersection(self.outstanding)) == 0
993 return len(theids.intersection(self.outstanding)) == 0
990
994
991 #--------------------------------------------------------------------------
995 #--------------------------------------------------------------------------
992 # Control methods
996 # Control methods
993 #--------------------------------------------------------------------------
997 #--------------------------------------------------------------------------
994
998
995 @spin_first
999 @spin_first
996 def clear(self, targets=None, block=None):
1000 def clear(self, targets=None, block=None):
997 """Clear the namespace in target(s)."""
1001 """Clear the namespace in target(s)."""
998 block = self.block if block is None else block
1002 block = self.block if block is None else block
999 targets = self._build_targets(targets)[0]
1003 targets = self._build_targets(targets)[0]
1000 for t in targets:
1004 for t in targets:
1001 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1005 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1002 error = False
1006 error = False
1003 if block:
1007 if block:
1004 self._flush_ignored_control()
1008 self._flush_ignored_control()
1005 for i in range(len(targets)):
1009 for i in range(len(targets)):
1006 idents,msg = self.session.recv(self._control_socket,0)
1010 idents,msg = self.session.recv(self._control_socket,0)
1007 if self.debug:
1011 if self.debug:
1008 pprint(msg)
1012 pprint(msg)
1009 if msg['content']['status'] != 'ok':
1013 if msg['content']['status'] != 'ok':
1010 error = self._unwrap_exception(msg['content'])
1014 error = self._unwrap_exception(msg['content'])
1011 else:
1015 else:
1012 self._ignored_control_replies += len(targets)
1016 self._ignored_control_replies += len(targets)
1013 if error:
1017 if error:
1014 raise error
1018 raise error
1015
1019
1016
1020
1017 @spin_first
1021 @spin_first
1018 def abort(self, jobs=None, targets=None, block=None):
1022 def abort(self, jobs=None, targets=None, block=None):
1019 """Abort specific jobs from the execution queues of target(s).
1023 """Abort specific jobs from the execution queues of target(s).
1020
1024
1021 This is a mechanism to prevent jobs that have already been submitted
1025 This is a mechanism to prevent jobs that have already been submitted
1022 from executing.
1026 from executing.
1023
1027
1024 Parameters
1028 Parameters
1025 ----------
1029 ----------
1026
1030
1027 jobs : msg_id, list of msg_ids, or AsyncResult
1031 jobs : msg_id, list of msg_ids, or AsyncResult
1028 The jobs to be aborted
1032 The jobs to be aborted
1029
1033
1030 If unspecified/None: abort all outstanding jobs.
1034 If unspecified/None: abort all outstanding jobs.
1031
1035
1032 """
1036 """
1033 block = self.block if block is None else block
1037 block = self.block if block is None else block
1034 jobs = jobs if jobs is not None else list(self.outstanding)
1038 jobs = jobs if jobs is not None else list(self.outstanding)
1035 targets = self._build_targets(targets)[0]
1039 targets = self._build_targets(targets)[0]
1036
1040
1037 msg_ids = []
1041 msg_ids = []
1038 if isinstance(jobs, (basestring,AsyncResult)):
1042 if isinstance(jobs, (basestring,AsyncResult)):
1039 jobs = [jobs]
1043 jobs = [jobs]
1040 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1044 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1041 if bad_ids:
1045 if bad_ids:
1042 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1046 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1043 for j in jobs:
1047 for j in jobs:
1044 if isinstance(j, AsyncResult):
1048 if isinstance(j, AsyncResult):
1045 msg_ids.extend(j.msg_ids)
1049 msg_ids.extend(j.msg_ids)
1046 else:
1050 else:
1047 msg_ids.append(j)
1051 msg_ids.append(j)
1048 content = dict(msg_ids=msg_ids)
1052 content = dict(msg_ids=msg_ids)
1049 for t in targets:
1053 for t in targets:
1050 self.session.send(self._control_socket, 'abort_request',
1054 self.session.send(self._control_socket, 'abort_request',
1051 content=content, ident=t)
1055 content=content, ident=t)
1052 error = False
1056 error = False
1053 if block:
1057 if block:
1054 self._flush_ignored_control()
1058 self._flush_ignored_control()
1055 for i in range(len(targets)):
1059 for i in range(len(targets)):
1056 idents,msg = self.session.recv(self._control_socket,0)
1060 idents,msg = self.session.recv(self._control_socket,0)
1057 if self.debug:
1061 if self.debug:
1058 pprint(msg)
1062 pprint(msg)
1059 if msg['content']['status'] != 'ok':
1063 if msg['content']['status'] != 'ok':
1060 error = self._unwrap_exception(msg['content'])
1064 error = self._unwrap_exception(msg['content'])
1061 else:
1065 else:
1062 self._ignored_control_replies += len(targets)
1066 self._ignored_control_replies += len(targets)
1063 if error:
1067 if error:
1064 raise error
1068 raise error
1065
1069
1066 @spin_first
1070 @spin_first
1067 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1071 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1068 """Terminates one or more engine processes, optionally including the hub."""
1072 """Terminates one or more engine processes, optionally including the hub."""
1069 block = self.block if block is None else block
1073 block = self.block if block is None else block
1070 if hub:
1074 if hub:
1071 targets = 'all'
1075 targets = 'all'
1072 targets = self._build_targets(targets)[0]
1076 targets = self._build_targets(targets)[0]
1073 for t in targets:
1077 for t in targets:
1074 self.session.send(self._control_socket, 'shutdown_request',
1078 self.session.send(self._control_socket, 'shutdown_request',
1075 content={'restart':restart},ident=t)
1079 content={'restart':restart},ident=t)
1076 error = False
1080 error = False
1077 if block or hub:
1081 if block or hub:
1078 self._flush_ignored_control()
1082 self._flush_ignored_control()
1079 for i in range(len(targets)):
1083 for i in range(len(targets)):
1080 idents,msg = self.session.recv(self._control_socket, 0)
1084 idents,msg = self.session.recv(self._control_socket, 0)
1081 if self.debug:
1085 if self.debug:
1082 pprint(msg)
1086 pprint(msg)
1083 if msg['content']['status'] != 'ok':
1087 if msg['content']['status'] != 'ok':
1084 error = self._unwrap_exception(msg['content'])
1088 error = self._unwrap_exception(msg['content'])
1085 else:
1089 else:
1086 self._ignored_control_replies += len(targets)
1090 self._ignored_control_replies += len(targets)
1087
1091
1088 if hub:
1092 if hub:
1089 time.sleep(0.25)
1093 time.sleep(0.25)
1090 self.session.send(self._query_socket, 'shutdown_request')
1094 self.session.send(self._query_socket, 'shutdown_request')
1091 idents,msg = self.session.recv(self._query_socket, 0)
1095 idents,msg = self.session.recv(self._query_socket, 0)
1092 if self.debug:
1096 if self.debug:
1093 pprint(msg)
1097 pprint(msg)
1094 if msg['content']['status'] != 'ok':
1098 if msg['content']['status'] != 'ok':
1095 error = self._unwrap_exception(msg['content'])
1099 error = self._unwrap_exception(msg['content'])
1096
1100
1097 if error:
1101 if error:
1098 raise error
1102 raise error
1099
1103
1100 #--------------------------------------------------------------------------
1104 #--------------------------------------------------------------------------
1101 # Execution related methods
1105 # Execution related methods
1102 #--------------------------------------------------------------------------
1106 #--------------------------------------------------------------------------
1103
1107
1104 def _maybe_raise(self, result):
1108 def _maybe_raise(self, result):
1105 """wrapper for maybe raising an exception if apply failed."""
1109 """wrapper for maybe raising an exception if apply failed."""
1106 if isinstance(result, error.RemoteError):
1110 if isinstance(result, error.RemoteError):
1107 raise result
1111 raise result
1108
1112
1109 return result
1113 return result
1110
1114
1111 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1115 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1112 ident=None):
1116 ident=None):
1113 """construct and send an apply message via a socket.
1117 """construct and send an apply message via a socket.
1114
1118
1115 This is the principal method with which all engine execution is performed by views.
1119 This is the principal method with which all engine execution is performed by views.
1116 """
1120 """
1117
1121
1118 if self._closed:
1122 if self._closed:
1119 raise RuntimeError("Client cannot be used after its sockets have been closed")
1123 raise RuntimeError("Client cannot be used after its sockets have been closed")
1120
1124
1121 # defaults:
1125 # defaults:
1122 args = args if args is not None else []
1126 args = args if args is not None else []
1123 kwargs = kwargs if kwargs is not None else {}
1127 kwargs = kwargs if kwargs is not None else {}
1124 subheader = subheader if subheader is not None else {}
1128 subheader = subheader if subheader is not None else {}
1125
1129
1126 # validate arguments
1130 # validate arguments
1127 if not callable(f) and not isinstance(f, Reference):
1131 if not callable(f) and not isinstance(f, Reference):
1128 raise TypeError("f must be callable, not %s"%type(f))
1132 raise TypeError("f must be callable, not %s"%type(f))
1129 if not isinstance(args, (tuple, list)):
1133 if not isinstance(args, (tuple, list)):
1130 raise TypeError("args must be tuple or list, not %s"%type(args))
1134 raise TypeError("args must be tuple or list, not %s"%type(args))
1131 if not isinstance(kwargs, dict):
1135 if not isinstance(kwargs, dict):
1132 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1136 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1133 if not isinstance(subheader, dict):
1137 if not isinstance(subheader, dict):
1134 raise TypeError("subheader must be dict, not %s"%type(subheader))
1138 raise TypeError("subheader must be dict, not %s"%type(subheader))
1135
1139
1136 bufs = util.pack_apply_message(f,args,kwargs)
1140 bufs = util.pack_apply_message(f,args,kwargs)
1137
1141
1138 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1142 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1139 subheader=subheader, track=track)
1143 subheader=subheader, track=track)
1140
1144
1141 msg_id = msg['header']['msg_id']
1145 msg_id = msg['header']['msg_id']
1142 self.outstanding.add(msg_id)
1146 self.outstanding.add(msg_id)
1143 if ident:
1147 if ident:
1144 # possibly routed to a specific engine
1148 # possibly routed to a specific engine
1145 if isinstance(ident, list):
1149 if isinstance(ident, list):
1146 ident = ident[-1]
1150 ident = ident[-1]
1147 if ident in self._engines.values():
1151 if ident in self._engines.values():
1148 # save for later, in case of engine death
1152 # save for later, in case of engine death
1149 self._outstanding_dict[ident].add(msg_id)
1153 self._outstanding_dict[ident].add(msg_id)
1150 self.history.append(msg_id)
1154 self.history.append(msg_id)
1151 self.metadata[msg_id]['submitted'] = datetime.now()
1155 self.metadata[msg_id]['submitted'] = datetime.now()
1152
1156
1153 return msg
1157 return msg
1154
1158
1155 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1159 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1156 """construct and send an execute request via a socket.
1160 """construct and send an execute request via a socket.
1157
1161
1158 """
1162 """
1159
1163
1160 if self._closed:
1164 if self._closed:
1161 raise RuntimeError("Client cannot be used after its sockets have been closed")
1165 raise RuntimeError("Client cannot be used after its sockets have been closed")
1162
1166
1163 # defaults:
1167 # defaults:
1164 subheader = subheader if subheader is not None else {}
1168 subheader = subheader if subheader is not None else {}
1165
1169
1166 # validate arguments
1170 # validate arguments
1167 if not isinstance(code, basestring):
1171 if not isinstance(code, basestring):
1168 raise TypeError("code must be text, not %s" % type(code))
1172 raise TypeError("code must be text, not %s" % type(code))
1169 if not isinstance(subheader, dict):
1173 if not isinstance(subheader, dict):
1170 raise TypeError("subheader must be dict, not %s" % type(subheader))
1174 raise TypeError("subheader must be dict, not %s" % type(subheader))
1171
1175
1172 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1176 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1173
1177
1174
1178
1175 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1179 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1176 subheader=subheader)
1180 subheader=subheader)
1177
1181
1178 msg_id = msg['header']['msg_id']
1182 msg_id = msg['header']['msg_id']
1179 self.outstanding.add(msg_id)
1183 self.outstanding.add(msg_id)
1180 if ident:
1184 if ident:
1181 # possibly routed to a specific engine
1185 # possibly routed to a specific engine
1182 if isinstance(ident, list):
1186 if isinstance(ident, list):
1183 ident = ident[-1]
1187 ident = ident[-1]
1184 if ident in self._engines.values():
1188 if ident in self._engines.values():
1185 # save for later, in case of engine death
1189 # save for later, in case of engine death
1186 self._outstanding_dict[ident].add(msg_id)
1190 self._outstanding_dict[ident].add(msg_id)
1187 self.history.append(msg_id)
1191 self.history.append(msg_id)
1188 self.metadata[msg_id]['submitted'] = datetime.now()
1192 self.metadata[msg_id]['submitted'] = datetime.now()
1189
1193
1190 return msg
1194 return msg
1191
1195
1192 #--------------------------------------------------------------------------
1196 #--------------------------------------------------------------------------
1193 # construct a View object
1197 # construct a View object
1194 #--------------------------------------------------------------------------
1198 #--------------------------------------------------------------------------
1195
1199
1196 def load_balanced_view(self, targets=None):
1200 def load_balanced_view(self, targets=None):
1197 """construct a DirectView object.
1201 """construct a DirectView object.
1198
1202
1199 If no arguments are specified, create a LoadBalancedView
1203 If no arguments are specified, create a LoadBalancedView
1200 using all engines.
1204 using all engines.
1201
1205
1202 Parameters
1206 Parameters
1203 ----------
1207 ----------
1204
1208
1205 targets: list,slice,int,etc. [default: use all engines]
1209 targets: list,slice,int,etc. [default: use all engines]
1206 The subset of engines across which to load-balance
1210 The subset of engines across which to load-balance
1207 """
1211 """
1208 if targets == 'all':
1212 if targets == 'all':
1209 targets = None
1213 targets = None
1210 if targets is not None:
1214 if targets is not None:
1211 targets = self._build_targets(targets)[1]
1215 targets = self._build_targets(targets)[1]
1212 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1216 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1213
1217
1214 def direct_view(self, targets='all'):
1218 def direct_view(self, targets='all'):
1215 """construct a DirectView object.
1219 """construct a DirectView object.
1216
1220
1217 If no targets are specified, create a DirectView using all engines.
1221 If no targets are specified, create a DirectView using all engines.
1218
1222
1219 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1223 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1220 evaluate the target engines at each execution, whereas rc[:] will connect to
1224 evaluate the target engines at each execution, whereas rc[:] will connect to
1221 all *current* engines, and that list will not change.
1225 all *current* engines, and that list will not change.
1222
1226
1223 That is, 'all' will always use all engines, whereas rc[:] will not use
1227 That is, 'all' will always use all engines, whereas rc[:] will not use
1224 engines added after the DirectView is constructed.
1228 engines added after the DirectView is constructed.
1225
1229
1226 Parameters
1230 Parameters
1227 ----------
1231 ----------
1228
1232
1229 targets: list,slice,int,etc. [default: use all engines]
1233 targets: list,slice,int,etc. [default: use all engines]
1230 The engines to use for the View
1234 The engines to use for the View
1231 """
1235 """
1232 single = isinstance(targets, int)
1236 single = isinstance(targets, int)
1233 # allow 'all' to be lazily evaluated at each execution
1237 # allow 'all' to be lazily evaluated at each execution
1234 if targets != 'all':
1238 if targets != 'all':
1235 targets = self._build_targets(targets)[1]
1239 targets = self._build_targets(targets)[1]
1236 if single:
1240 if single:
1237 targets = targets[0]
1241 targets = targets[0]
1238 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1242 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1239
1243
1240 #--------------------------------------------------------------------------
1244 #--------------------------------------------------------------------------
1241 # Query methods
1245 # Query methods
1242 #--------------------------------------------------------------------------
1246 #--------------------------------------------------------------------------
1243
1247
1244 @spin_first
1248 @spin_first
1245 def get_result(self, indices_or_msg_ids=None, block=None):
1249 def get_result(self, indices_or_msg_ids=None, block=None):
1246 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1250 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1247
1251
1248 If the client already has the results, no request to the Hub will be made.
1252 If the client already has the results, no request to the Hub will be made.
1249
1253
1250 This is a convenient way to construct AsyncResult objects, which are wrappers
1254 This is a convenient way to construct AsyncResult objects, which are wrappers
1251 that include metadata about execution, and allow for awaiting results that
1255 that include metadata about execution, and allow for awaiting results that
1252 were not submitted by this Client.
1256 were not submitted by this Client.
1253
1257
1254 It can also be a convenient way to retrieve the metadata associated with
1258 It can also be a convenient way to retrieve the metadata associated with
1255 blocking execution, since it always retrieves
1259 blocking execution, since it always retrieves
1256
1260
1257 Examples
1261 Examples
1258 --------
1262 --------
1259 ::
1263 ::
1260
1264
1261 In [10]: r = client.apply()
1265 In [10]: r = client.apply()
1262
1266
1263 Parameters
1267 Parameters
1264 ----------
1268 ----------
1265
1269
1266 indices_or_msg_ids : integer history index, str msg_id, or list of either
1270 indices_or_msg_ids : integer history index, str msg_id, or list of either
1267 The indices or msg_ids of indices to be retrieved
1271 The indices or msg_ids of indices to be retrieved
1268
1272
1269 block : bool
1273 block : bool
1270 Whether to wait for the result to be done
1274 Whether to wait for the result to be done
1271
1275
1272 Returns
1276 Returns
1273 -------
1277 -------
1274
1278
1275 AsyncResult
1279 AsyncResult
1276 A single AsyncResult object will always be returned.
1280 A single AsyncResult object will always be returned.
1277
1281
1278 AsyncHubResult
1282 AsyncHubResult
1279 A subclass of AsyncResult that retrieves results from the Hub
1283 A subclass of AsyncResult that retrieves results from the Hub
1280
1284
1281 """
1285 """
1282 block = self.block if block is None else block
1286 block = self.block if block is None else block
1283 if indices_or_msg_ids is None:
1287 if indices_or_msg_ids is None:
1284 indices_or_msg_ids = -1
1288 indices_or_msg_ids = -1
1285
1289
1286 if not isinstance(indices_or_msg_ids, (list,tuple)):
1290 if not isinstance(indices_or_msg_ids, (list,tuple)):
1287 indices_or_msg_ids = [indices_or_msg_ids]
1291 indices_or_msg_ids = [indices_or_msg_ids]
1288
1292
1289 theids = []
1293 theids = []
1290 for id in indices_or_msg_ids:
1294 for id in indices_or_msg_ids:
1291 if isinstance(id, int):
1295 if isinstance(id, int):
1292 id = self.history[id]
1296 id = self.history[id]
1293 if not isinstance(id, basestring):
1297 if not isinstance(id, basestring):
1294 raise TypeError("indices must be str or int, not %r"%id)
1298 raise TypeError("indices must be str or int, not %r"%id)
1295 theids.append(id)
1299 theids.append(id)
1296
1300
1297 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1301 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1298 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1302 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1299
1303
1300 if remote_ids:
1304 if remote_ids:
1301 ar = AsyncHubResult(self, msg_ids=theids)
1305 ar = AsyncHubResult(self, msg_ids=theids)
1302 else:
1306 else:
1303 ar = AsyncResult(self, msg_ids=theids)
1307 ar = AsyncResult(self, msg_ids=theids)
1304
1308
1305 if block:
1309 if block:
1306 ar.wait()
1310 ar.wait()
1307
1311
1308 return ar
1312 return ar
1309
1313
1310 @spin_first
1314 @spin_first
1311 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1315 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1312 """Resubmit one or more tasks.
1316 """Resubmit one or more tasks.
1313
1317
1314 in-flight tasks may not be resubmitted.
1318 in-flight tasks may not be resubmitted.
1315
1319
1316 Parameters
1320 Parameters
1317 ----------
1321 ----------
1318
1322
1319 indices_or_msg_ids : integer history index, str msg_id, or list of either
1323 indices_or_msg_ids : integer history index, str msg_id, or list of either
1320 The indices or msg_ids of indices to be retrieved
1324 The indices or msg_ids of indices to be retrieved
1321
1325
1322 block : bool
1326 block : bool
1323 Whether to wait for the result to be done
1327 Whether to wait for the result to be done
1324
1328
1325 Returns
1329 Returns
1326 -------
1330 -------
1327
1331
1328 AsyncHubResult
1332 AsyncHubResult
1329 A subclass of AsyncResult that retrieves results from the Hub
1333 A subclass of AsyncResult that retrieves results from the Hub
1330
1334
1331 """
1335 """
1332 block = self.block if block is None else block
1336 block = self.block if block is None else block
1333 if indices_or_msg_ids is None:
1337 if indices_or_msg_ids is None:
1334 indices_or_msg_ids = -1
1338 indices_or_msg_ids = -1
1335
1339
1336 if not isinstance(indices_or_msg_ids, (list,tuple)):
1340 if not isinstance(indices_or_msg_ids, (list,tuple)):
1337 indices_or_msg_ids = [indices_or_msg_ids]
1341 indices_or_msg_ids = [indices_or_msg_ids]
1338
1342
1339 theids = []
1343 theids = []
1340 for id in indices_or_msg_ids:
1344 for id in indices_or_msg_ids:
1341 if isinstance(id, int):
1345 if isinstance(id, int):
1342 id = self.history[id]
1346 id = self.history[id]
1343 if not isinstance(id, basestring):
1347 if not isinstance(id, basestring):
1344 raise TypeError("indices must be str or int, not %r"%id)
1348 raise TypeError("indices must be str or int, not %r"%id)
1345 theids.append(id)
1349 theids.append(id)
1346
1350
1347 content = dict(msg_ids = theids)
1351 content = dict(msg_ids = theids)
1348
1352
1349 self.session.send(self._query_socket, 'resubmit_request', content)
1353 self.session.send(self._query_socket, 'resubmit_request', content)
1350
1354
1351 zmq.select([self._query_socket], [], [])
1355 zmq.select([self._query_socket], [], [])
1352 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1356 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1353 if self.debug:
1357 if self.debug:
1354 pprint(msg)
1358 pprint(msg)
1355 content = msg['content']
1359 content = msg['content']
1356 if content['status'] != 'ok':
1360 if content['status'] != 'ok':
1357 raise self._unwrap_exception(content)
1361 raise self._unwrap_exception(content)
1358 mapping = content['resubmitted']
1362 mapping = content['resubmitted']
1359 new_ids = [ mapping[msg_id] for msg_id in theids ]
1363 new_ids = [ mapping[msg_id] for msg_id in theids ]
1360
1364
1361 ar = AsyncHubResult(self, msg_ids=new_ids)
1365 ar = AsyncHubResult(self, msg_ids=new_ids)
1362
1366
1363 if block:
1367 if block:
1364 ar.wait()
1368 ar.wait()
1365
1369
1366 return ar
1370 return ar
1367
1371
1368 @spin_first
1372 @spin_first
1369 def result_status(self, msg_ids, status_only=True):
1373 def result_status(self, msg_ids, status_only=True):
1370 """Check on the status of the result(s) of the apply request with `msg_ids`.
1374 """Check on the status of the result(s) of the apply request with `msg_ids`.
1371
1375
1372 If status_only is False, then the actual results will be retrieved, else
1376 If status_only is False, then the actual results will be retrieved, else
1373 only the status of the results will be checked.
1377 only the status of the results will be checked.
1374
1378
1375 Parameters
1379 Parameters
1376 ----------
1380 ----------
1377
1381
1378 msg_ids : list of msg_ids
1382 msg_ids : list of msg_ids
1379 if int:
1383 if int:
1380 Passed as index to self.history for convenience.
1384 Passed as index to self.history for convenience.
1381 status_only : bool (default: True)
1385 status_only : bool (default: True)
1382 if False:
1386 if False:
1383 Retrieve the actual results of completed tasks.
1387 Retrieve the actual results of completed tasks.
1384
1388
1385 Returns
1389 Returns
1386 -------
1390 -------
1387
1391
1388 results : dict
1392 results : dict
1389 There will always be the keys 'pending' and 'completed', which will
1393 There will always be the keys 'pending' and 'completed', which will
1390 be lists of msg_ids that are incomplete or complete. If `status_only`
1394 be lists of msg_ids that are incomplete or complete. If `status_only`
1391 is False, then completed results will be keyed by their `msg_id`.
1395 is False, then completed results will be keyed by their `msg_id`.
1392 """
1396 """
1393 if not isinstance(msg_ids, (list,tuple)):
1397 if not isinstance(msg_ids, (list,tuple)):
1394 msg_ids = [msg_ids]
1398 msg_ids = [msg_ids]
1395
1399
1396 theids = []
1400 theids = []
1397 for msg_id in msg_ids:
1401 for msg_id in msg_ids:
1398 if isinstance(msg_id, int):
1402 if isinstance(msg_id, int):
1399 msg_id = self.history[msg_id]
1403 msg_id = self.history[msg_id]
1400 if not isinstance(msg_id, basestring):
1404 if not isinstance(msg_id, basestring):
1401 raise TypeError("msg_ids must be str, not %r"%msg_id)
1405 raise TypeError("msg_ids must be str, not %r"%msg_id)
1402 theids.append(msg_id)
1406 theids.append(msg_id)
1403
1407
1404 completed = []
1408 completed = []
1405 local_results = {}
1409 local_results = {}
1406
1410
1407 # comment this block out to temporarily disable local shortcut:
1411 # comment this block out to temporarily disable local shortcut:
1408 for msg_id in theids:
1412 for msg_id in theids:
1409 if msg_id in self.results:
1413 if msg_id in self.results:
1410 completed.append(msg_id)
1414 completed.append(msg_id)
1411 local_results[msg_id] = self.results[msg_id]
1415 local_results[msg_id] = self.results[msg_id]
1412 theids.remove(msg_id)
1416 theids.remove(msg_id)
1413
1417
1414 if theids: # some not locally cached
1418 if theids: # some not locally cached
1415 content = dict(msg_ids=theids, status_only=status_only)
1419 content = dict(msg_ids=theids, status_only=status_only)
1416 msg = self.session.send(self._query_socket, "result_request", content=content)
1420 msg = self.session.send(self._query_socket, "result_request", content=content)
1417 zmq.select([self._query_socket], [], [])
1421 zmq.select([self._query_socket], [], [])
1418 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1422 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1419 if self.debug:
1423 if self.debug:
1420 pprint(msg)
1424 pprint(msg)
1421 content = msg['content']
1425 content = msg['content']
1422 if content['status'] != 'ok':
1426 if content['status'] != 'ok':
1423 raise self._unwrap_exception(content)
1427 raise self._unwrap_exception(content)
1424 buffers = msg['buffers']
1428 buffers = msg['buffers']
1425 else:
1429 else:
1426 content = dict(completed=[],pending=[])
1430 content = dict(completed=[],pending=[])
1427
1431
1428 content['completed'].extend(completed)
1432 content['completed'].extend(completed)
1429
1433
1430 if status_only:
1434 if status_only:
1431 return content
1435 return content
1432
1436
1433 failures = []
1437 failures = []
1434 # load cached results into result:
1438 # load cached results into result:
1435 content.update(local_results)
1439 content.update(local_results)
1436
1440
1437 # update cache with results:
1441 # update cache with results:
1438 for msg_id in sorted(theids):
1442 for msg_id in sorted(theids):
1439 if msg_id in content['completed']:
1443 if msg_id in content['completed']:
1440 rec = content[msg_id]
1444 rec = content[msg_id]
1441 parent = rec['header']
1445 parent = rec['header']
1442 header = rec['result_header']
1446 header = rec['result_header']
1443 rcontent = rec['result_content']
1447 rcontent = rec['result_content']
1444 iodict = rec['io']
1448 iodict = rec['io']
1445 if isinstance(rcontent, str):
1449 if isinstance(rcontent, str):
1446 rcontent = self.session.unpack(rcontent)
1450 rcontent = self.session.unpack(rcontent)
1447
1451
1448 md = self.metadata[msg_id]
1452 md = self.metadata[msg_id]
1449 md.update(self._extract_metadata(header, parent, rcontent))
1453 md.update(self._extract_metadata(header, parent, rcontent))
1450 if rec.get('received'):
1454 if rec.get('received'):
1451 md['received'] = rec['received']
1455 md['received'] = rec['received']
1452 md.update(iodict)
1456 md.update(iodict)
1453
1457
1454 if rcontent['status'] == 'ok':
1458 if rcontent['status'] == 'ok':
1455 res,buffers = util.unserialize_object(buffers)
1459 res,buffers = util.unserialize_object(buffers)
1456 else:
1460 else:
1457 print rcontent
1461 print rcontent
1458 res = self._unwrap_exception(rcontent)
1462 res = self._unwrap_exception(rcontent)
1459 failures.append(res)
1463 failures.append(res)
1460
1464
1461 self.results[msg_id] = res
1465 self.results[msg_id] = res
1462 content[msg_id] = res
1466 content[msg_id] = res
1463
1467
1464 if len(theids) == 1 and failures:
1468 if len(theids) == 1 and failures:
1465 raise failures[0]
1469 raise failures[0]
1466
1470
1467 error.collect_exceptions(failures, "result_status")
1471 error.collect_exceptions(failures, "result_status")
1468 return content
1472 return content
1469
1473
1470 @spin_first
1474 @spin_first
1471 def queue_status(self, targets='all', verbose=False):
1475 def queue_status(self, targets='all', verbose=False):
1472 """Fetch the status of engine queues.
1476 """Fetch the status of engine queues.
1473
1477
1474 Parameters
1478 Parameters
1475 ----------
1479 ----------
1476
1480
1477 targets : int/str/list of ints/strs
1481 targets : int/str/list of ints/strs
1478 the engines whose states are to be queried.
1482 the engines whose states are to be queried.
1479 default : all
1483 default : all
1480 verbose : bool
1484 verbose : bool
1481 Whether to return lengths only, or lists of ids for each element
1485 Whether to return lengths only, or lists of ids for each element
1482 """
1486 """
1483 if targets == 'all':
1487 if targets == 'all':
1484 # allow 'all' to be evaluated on the engine
1488 # allow 'all' to be evaluated on the engine
1485 engine_ids = None
1489 engine_ids = None
1486 else:
1490 else:
1487 engine_ids = self._build_targets(targets)[1]
1491 engine_ids = self._build_targets(targets)[1]
1488 content = dict(targets=engine_ids, verbose=verbose)
1492 content = dict(targets=engine_ids, verbose=verbose)
1489 self.session.send(self._query_socket, "queue_request", content=content)
1493 self.session.send(self._query_socket, "queue_request", content=content)
1490 idents,msg = self.session.recv(self._query_socket, 0)
1494 idents,msg = self.session.recv(self._query_socket, 0)
1491 if self.debug:
1495 if self.debug:
1492 pprint(msg)
1496 pprint(msg)
1493 content = msg['content']
1497 content = msg['content']
1494 status = content.pop('status')
1498 status = content.pop('status')
1495 if status != 'ok':
1499 if status != 'ok':
1496 raise self._unwrap_exception(content)
1500 raise self._unwrap_exception(content)
1497 content = rekey(content)
1501 content = rekey(content)
1498 if isinstance(targets, int):
1502 if isinstance(targets, int):
1499 return content[targets]
1503 return content[targets]
1500 else:
1504 else:
1501 return content
1505 return content
1502
1506
1503 @spin_first
1507 @spin_first
1504 def purge_results(self, jobs=[], targets=[]):
1508 def purge_results(self, jobs=[], targets=[]):
1505 """Tell the Hub to forget results.
1509 """Tell the Hub to forget results.
1506
1510
1507 Individual results can be purged by msg_id, or the entire
1511 Individual results can be purged by msg_id, or the entire
1508 history of specific targets can be purged.
1512 history of specific targets can be purged.
1509
1513
1510 Use `purge_results('all')` to scrub everything from the Hub's db.
1514 Use `purge_results('all')` to scrub everything from the Hub's db.
1511
1515
1512 Parameters
1516 Parameters
1513 ----------
1517 ----------
1514
1518
1515 jobs : str or list of str or AsyncResult objects
1519 jobs : str or list of str or AsyncResult objects
1516 the msg_ids whose results should be forgotten.
1520 the msg_ids whose results should be forgotten.
1517 targets : int/str/list of ints/strs
1521 targets : int/str/list of ints/strs
1518 The targets, by int_id, whose entire history is to be purged.
1522 The targets, by int_id, whose entire history is to be purged.
1519
1523
1520 default : None
1524 default : None
1521 """
1525 """
1522 if not targets and not jobs:
1526 if not targets and not jobs:
1523 raise ValueError("Must specify at least one of `targets` and `jobs`")
1527 raise ValueError("Must specify at least one of `targets` and `jobs`")
1524 if targets:
1528 if targets:
1525 targets = self._build_targets(targets)[1]
1529 targets = self._build_targets(targets)[1]
1526
1530
1527 # construct msg_ids from jobs
1531 # construct msg_ids from jobs
1528 if jobs == 'all':
1532 if jobs == 'all':
1529 msg_ids = jobs
1533 msg_ids = jobs
1530 else:
1534 else:
1531 msg_ids = []
1535 msg_ids = []
1532 if isinstance(jobs, (basestring,AsyncResult)):
1536 if isinstance(jobs, (basestring,AsyncResult)):
1533 jobs = [jobs]
1537 jobs = [jobs]
1534 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1538 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1535 if bad_ids:
1539 if bad_ids:
1536 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1540 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1537 for j in jobs:
1541 for j in jobs:
1538 if isinstance(j, AsyncResult):
1542 if isinstance(j, AsyncResult):
1539 msg_ids.extend(j.msg_ids)
1543 msg_ids.extend(j.msg_ids)
1540 else:
1544 else:
1541 msg_ids.append(j)
1545 msg_ids.append(j)
1542
1546
1543 content = dict(engine_ids=targets, msg_ids=msg_ids)
1547 content = dict(engine_ids=targets, msg_ids=msg_ids)
1544 self.session.send(self._query_socket, "purge_request", content=content)
1548 self.session.send(self._query_socket, "purge_request", content=content)
1545 idents, msg = self.session.recv(self._query_socket, 0)
1549 idents, msg = self.session.recv(self._query_socket, 0)
1546 if self.debug:
1550 if self.debug:
1547 pprint(msg)
1551 pprint(msg)
1548 content = msg['content']
1552 content = msg['content']
1549 if content['status'] != 'ok':
1553 if content['status'] != 'ok':
1550 raise self._unwrap_exception(content)
1554 raise self._unwrap_exception(content)
1551
1555
1552 @spin_first
1556 @spin_first
1553 def hub_history(self):
1557 def hub_history(self):
1554 """Get the Hub's history
1558 """Get the Hub's history
1555
1559
1556 Just like the Client, the Hub has a history, which is a list of msg_ids.
1560 Just like the Client, the Hub has a history, which is a list of msg_ids.
1557 This will contain the history of all clients, and, depending on configuration,
1561 This will contain the history of all clients, and, depending on configuration,
1558 may contain history across multiple cluster sessions.
1562 may contain history across multiple cluster sessions.
1559
1563
1560 Any msg_id returned here is a valid argument to `get_result`.
1564 Any msg_id returned here is a valid argument to `get_result`.
1561
1565
1562 Returns
1566 Returns
1563 -------
1567 -------
1564
1568
1565 msg_ids : list of strs
1569 msg_ids : list of strs
1566 list of all msg_ids, ordered by task submission time.
1570 list of all msg_ids, ordered by task submission time.
1567 """
1571 """
1568
1572
1569 self.session.send(self._query_socket, "history_request", content={})
1573 self.session.send(self._query_socket, "history_request", content={})
1570 idents, msg = self.session.recv(self._query_socket, 0)
1574 idents, msg = self.session.recv(self._query_socket, 0)
1571
1575
1572 if self.debug:
1576 if self.debug:
1573 pprint(msg)
1577 pprint(msg)
1574 content = msg['content']
1578 content = msg['content']
1575 if content['status'] != 'ok':
1579 if content['status'] != 'ok':
1576 raise self._unwrap_exception(content)
1580 raise self._unwrap_exception(content)
1577 else:
1581 else:
1578 return content['history']
1582 return content['history']
1579
1583
1580 @spin_first
1584 @spin_first
1581 def db_query(self, query, keys=None):
1585 def db_query(self, query, keys=None):
1582 """Query the Hub's TaskRecord database
1586 """Query the Hub's TaskRecord database
1583
1587
1584 This will return a list of task record dicts that match `query`
1588 This will return a list of task record dicts that match `query`
1585
1589
1586 Parameters
1590 Parameters
1587 ----------
1591 ----------
1588
1592
1589 query : mongodb query dict
1593 query : mongodb query dict
1590 The search dict. See mongodb query docs for details.
1594 The search dict. See mongodb query docs for details.
1591 keys : list of strs [optional]
1595 keys : list of strs [optional]
1592 The subset of keys to be returned. The default is to fetch everything but buffers.
1596 The subset of keys to be returned. The default is to fetch everything but buffers.
1593 'msg_id' will *always* be included.
1597 'msg_id' will *always* be included.
1594 """
1598 """
1595 if isinstance(keys, basestring):
1599 if isinstance(keys, basestring):
1596 keys = [keys]
1600 keys = [keys]
1597 content = dict(query=query, keys=keys)
1601 content = dict(query=query, keys=keys)
1598 self.session.send(self._query_socket, "db_request", content=content)
1602 self.session.send(self._query_socket, "db_request", content=content)
1599 idents, msg = self.session.recv(self._query_socket, 0)
1603 idents, msg = self.session.recv(self._query_socket, 0)
1600 if self.debug:
1604 if self.debug:
1601 pprint(msg)
1605 pprint(msg)
1602 content = msg['content']
1606 content = msg['content']
1603 if content['status'] != 'ok':
1607 if content['status'] != 'ok':
1604 raise self._unwrap_exception(content)
1608 raise self._unwrap_exception(content)
1605
1609
1606 records = content['records']
1610 records = content['records']
1607
1611
1608 buffer_lens = content['buffer_lens']
1612 buffer_lens = content['buffer_lens']
1609 result_buffer_lens = content['result_buffer_lens']
1613 result_buffer_lens = content['result_buffer_lens']
1610 buffers = msg['buffers']
1614 buffers = msg['buffers']
1611 has_bufs = buffer_lens is not None
1615 has_bufs = buffer_lens is not None
1612 has_rbufs = result_buffer_lens is not None
1616 has_rbufs = result_buffer_lens is not None
1613 for i,rec in enumerate(records):
1617 for i,rec in enumerate(records):
1614 # relink buffers
1618 # relink buffers
1615 if has_bufs:
1619 if has_bufs:
1616 blen = buffer_lens[i]
1620 blen = buffer_lens[i]
1617 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1621 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1618 if has_rbufs:
1622 if has_rbufs:
1619 blen = result_buffer_lens[i]
1623 blen = result_buffer_lens[i]
1620 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1624 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1621
1625
1622 return records
1626 return records
1623
1627
1624 __all__ = [ 'Client' ]
1628 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now