##// END OF EJS Templates
track display data in the parallel Client
MinRK -
Show More
@@ -1,1563 +1,1569 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35
35
36 from IPython.utils.jsonutil import rekey
36 from IPython.utils.jsonutil import rekey
37 from IPython.utils.localinterfaces import LOCAL_IPS
37 from IPython.utils.localinterfaces import LOCAL_IPS
38 from IPython.utils.path import get_ipython_dir
38 from IPython.utils.path import get_ipython_dir
39 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
39 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
40 Dict, List, Bool, Set, Any)
40 Dict, List, Bool, Set, Any)
41 from IPython.external.decorator import decorator
41 from IPython.external.decorator import decorator
42 from IPython.external.ssh import tunnel
42 from IPython.external.ssh import tunnel
43
43
44 from IPython.parallel import Reference
44 from IPython.parallel import Reference
45 from IPython.parallel import error
45 from IPython.parallel import error
46 from IPython.parallel import util
46 from IPython.parallel import util
47
47
48 from IPython.zmq.session import Session, Message
48 from IPython.zmq.session import Session, Message
49
49
50 from .asyncresult import AsyncResult, AsyncHubResult
50 from .asyncresult import AsyncResult, AsyncHubResult
51 from IPython.core.profiledir import ProfileDir, ProfileDirError
51 from IPython.core.profiledir import ProfileDir, ProfileDirError
52 from .view import DirectView, LoadBalancedView
52 from .view import DirectView, LoadBalancedView
53
53
54 if sys.version_info[0] >= 3:
54 if sys.version_info[0] >= 3:
55 # xrange is used in a couple 'isinstance' tests in py2
55 # xrange is used in a couple 'isinstance' tests in py2
56 # should be just 'range' in 3k
56 # should be just 'range' in 3k
57 xrange = range
57 xrange = range
58
58
59 #--------------------------------------------------------------------------
59 #--------------------------------------------------------------------------
60 # Decorators for Client methods
60 # Decorators for Client methods
61 #--------------------------------------------------------------------------
61 #--------------------------------------------------------------------------
62
62
63 @decorator
63 @decorator
64 def spin_first(f, self, *args, **kwargs):
64 def spin_first(f, self, *args, **kwargs):
65 """Call spin() to sync state prior to calling the method."""
65 """Call spin() to sync state prior to calling the method."""
66 self.spin()
66 self.spin()
67 return f(self, *args, **kwargs)
67 return f(self, *args, **kwargs)
68
68
69
69
70 #--------------------------------------------------------------------------
70 #--------------------------------------------------------------------------
71 # Classes
71 # Classes
72 #--------------------------------------------------------------------------
72 #--------------------------------------------------------------------------
73
73
74 class Metadata(dict):
74 class Metadata(dict):
75 """Subclass of dict for initializing metadata values.
75 """Subclass of dict for initializing metadata values.
76
76
77 Attribute access works on keys.
77 Attribute access works on keys.
78
78
79 These objects have a strict set of keys - errors will raise if you try
79 These objects have a strict set of keys - errors will raise if you try
80 to add new keys.
80 to add new keys.
81 """
81 """
82 def __init__(self, *args, **kwargs):
82 def __init__(self, *args, **kwargs):
83 dict.__init__(self)
83 dict.__init__(self)
84 md = {'msg_id' : None,
84 md = {'msg_id' : None,
85 'submitted' : None,
85 'submitted' : None,
86 'started' : None,
86 'started' : None,
87 'completed' : None,
87 'completed' : None,
88 'received' : None,
88 'received' : None,
89 'engine_uuid' : None,
89 'engine_uuid' : None,
90 'engine_id' : None,
90 'engine_id' : None,
91 'follow' : None,
91 'follow' : None,
92 'after' : None,
92 'after' : None,
93 'status' : None,
93 'status' : None,
94
94
95 'pyin' : None,
95 'pyin' : None,
96 'pyout' : None,
96 'pyout' : None,
97 'pyerr' : None,
97 'pyerr' : None,
98 'stdout' : '',
98 'stdout' : '',
99 'stderr' : '',
99 'stderr' : '',
100 'outputs' : [],
100 }
101 }
101 self.update(md)
102 self.update(md)
102 self.update(dict(*args, **kwargs))
103 self.update(dict(*args, **kwargs))
103
104
104 def __getattr__(self, key):
105 def __getattr__(self, key):
105 """getattr aliased to getitem"""
106 """getattr aliased to getitem"""
106 if key in self.iterkeys():
107 if key in self.iterkeys():
107 return self[key]
108 return self[key]
108 else:
109 else:
109 raise AttributeError(key)
110 raise AttributeError(key)
110
111
111 def __setattr__(self, key, value):
112 def __setattr__(self, key, value):
112 """setattr aliased to setitem, with strict"""
113 """setattr aliased to setitem, with strict"""
113 if key in self.iterkeys():
114 if key in self.iterkeys():
114 self[key] = value
115 self[key] = value
115 else:
116 else:
116 raise AttributeError(key)
117 raise AttributeError(key)
117
118
118 def __setitem__(self, key, value):
119 def __setitem__(self, key, value):
119 """strict static key enforcement"""
120 """strict static key enforcement"""
120 if key in self.iterkeys():
121 if key in self.iterkeys():
121 dict.__setitem__(self, key, value)
122 dict.__setitem__(self, key, value)
122 else:
123 else:
123 raise KeyError(key)
124 raise KeyError(key)
124
125
125
126
126 class Client(HasTraits):
127 class Client(HasTraits):
127 """A semi-synchronous client to the IPython ZMQ cluster
128 """A semi-synchronous client to the IPython ZMQ cluster
128
129
129 Parameters
130 Parameters
130 ----------
131 ----------
131
132
132 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
133 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
133 Connection information for the Hub's registration. If a json connector
134 Connection information for the Hub's registration. If a json connector
134 file is given, then likely no further configuration is necessary.
135 file is given, then likely no further configuration is necessary.
135 [Default: use profile]
136 [Default: use profile]
136 profile : bytes
137 profile : bytes
137 The name of the Cluster profile to be used to find connector information.
138 The name of the Cluster profile to be used to find connector information.
138 If run from an IPython application, the default profile will be the same
139 If run from an IPython application, the default profile will be the same
139 as the running application, otherwise it will be 'default'.
140 as the running application, otherwise it will be 'default'.
140 context : zmq.Context
141 context : zmq.Context
141 Pass an existing zmq.Context instance, otherwise the client will create its own.
142 Pass an existing zmq.Context instance, otherwise the client will create its own.
142 debug : bool
143 debug : bool
143 flag for lots of message printing for debug purposes
144 flag for lots of message printing for debug purposes
144 timeout : int/float
145 timeout : int/float
145 time (in seconds) to wait for connection replies from the Hub
146 time (in seconds) to wait for connection replies from the Hub
146 [Default: 10]
147 [Default: 10]
147
148
148 #-------------- session related args ----------------
149 #-------------- session related args ----------------
149
150
150 config : Config object
151 config : Config object
151 If specified, this will be relayed to the Session for configuration
152 If specified, this will be relayed to the Session for configuration
152 username : str
153 username : str
153 set username for the session object
154 set username for the session object
154 packer : str (import_string) or callable
155 packer : str (import_string) or callable
155 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
156 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
156 function to serialize messages. Must support same input as
157 function to serialize messages. Must support same input as
157 JSON, and output must be bytes.
158 JSON, and output must be bytes.
158 You can pass a callable directly as `pack`
159 You can pass a callable directly as `pack`
159 unpacker : str (import_string) or callable
160 unpacker : str (import_string) or callable
160 The inverse of packer. Only necessary if packer is specified as *not* one
161 The inverse of packer. Only necessary if packer is specified as *not* one
161 of 'json' or 'pickle'.
162 of 'json' or 'pickle'.
162
163
163 #-------------- ssh related args ----------------
164 #-------------- ssh related args ----------------
164 # These are args for configuring the ssh tunnel to be used
165 # These are args for configuring the ssh tunnel to be used
165 # credentials are used to forward connections over ssh to the Controller
166 # credentials are used to forward connections over ssh to the Controller
166 # Note that the ip given in `addr` needs to be relative to sshserver
167 # Note that the ip given in `addr` needs to be relative to sshserver
167 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
168 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
168 # and set sshserver as the same machine the Controller is on. However,
169 # and set sshserver as the same machine the Controller is on. However,
169 # the only requirement is that sshserver is able to see the Controller
170 # the only requirement is that sshserver is able to see the Controller
170 # (i.e. is within the same trusted network).
171 # (i.e. is within the same trusted network).
171
172
172 sshserver : str
173 sshserver : str
173 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
174 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
174 If keyfile or password is specified, and this is not, it will default to
175 If keyfile or password is specified, and this is not, it will default to
175 the ip given in addr.
176 the ip given in addr.
176 sshkey : str; path to ssh private key file
177 sshkey : str; path to ssh private key file
177 This specifies a key to be used in ssh login, default None.
178 This specifies a key to be used in ssh login, default None.
178 Regular default ssh keys will be used without specifying this argument.
179 Regular default ssh keys will be used without specifying this argument.
179 password : str
180 password : str
180 Your ssh password to sshserver. Note that if this is left None,
181 Your ssh password to sshserver. Note that if this is left None,
181 you will be prompted for it if passwordless key based login is unavailable.
182 you will be prompted for it if passwordless key based login is unavailable.
182 paramiko : bool
183 paramiko : bool
183 flag for whether to use paramiko instead of shell ssh for tunneling.
184 flag for whether to use paramiko instead of shell ssh for tunneling.
184 [default: True on win32, False else]
185 [default: True on win32, False else]
185
186
186 ------- exec authentication args -------
187 ------- exec authentication args -------
187 If even localhost is untrusted, you can have some protection against
188 If even localhost is untrusted, you can have some protection against
188 unauthorized execution by signing messages with HMAC digests.
189 unauthorized execution by signing messages with HMAC digests.
189 Messages are still sent as cleartext, so if someone can snoop your
190 Messages are still sent as cleartext, so if someone can snoop your
190 loopback traffic this will not protect your privacy, but will prevent
191 loopback traffic this will not protect your privacy, but will prevent
191 unauthorized execution.
192 unauthorized execution.
192
193
193 exec_key : str
194 exec_key : str
194 an authentication key or file containing a key
195 an authentication key or file containing a key
195 default: None
196 default: None
196
197
197
198
198 Attributes
199 Attributes
199 ----------
200 ----------
200
201
201 ids : list of int engine IDs
202 ids : list of int engine IDs
202 requesting the ids attribute always synchronizes
203 requesting the ids attribute always synchronizes
203 the registration state. To request ids without synchronization,
204 the registration state. To request ids without synchronization,
204 use semi-private _ids attributes.
205 use semi-private _ids attributes.
205
206
206 history : list of msg_ids
207 history : list of msg_ids
207 a list of msg_ids, keeping track of all the execution
208 a list of msg_ids, keeping track of all the execution
208 messages you have submitted in order.
209 messages you have submitted in order.
209
210
210 outstanding : set of msg_ids
211 outstanding : set of msg_ids
211 a set of msg_ids that have been submitted, but whose
212 a set of msg_ids that have been submitted, but whose
212 results have not yet been received.
213 results have not yet been received.
213
214
214 results : dict
215 results : dict
215 a dict of all our results, keyed by msg_id
216 a dict of all our results, keyed by msg_id
216
217
217 block : bool
218 block : bool
218 determines default behavior when block not specified
219 determines default behavior when block not specified
219 in execution methods
220 in execution methods
220
221
221 Methods
222 Methods
222 -------
223 -------
223
224
224 spin
225 spin
225 flushes incoming results and registration state changes
226 flushes incoming results and registration state changes
226 control methods spin, and requesting `ids` also ensures up to date
227 control methods spin, and requesting `ids` also ensures up to date
227
228
228 wait
229 wait
229 wait on one or more msg_ids
230 wait on one or more msg_ids
230
231
231 execution methods
232 execution methods
232 apply
233 apply
233 legacy: execute, run
234 legacy: execute, run
234
235
235 data movement
236 data movement
236 push, pull, scatter, gather
237 push, pull, scatter, gather
237
238
238 query methods
239 query methods
239 queue_status, get_result, purge, result_status
240 queue_status, get_result, purge, result_status
240
241
241 control methods
242 control methods
242 abort, shutdown
243 abort, shutdown
243
244
244 """
245 """
245
246
246
247
247 block = Bool(False)
248 block = Bool(False)
248 outstanding = Set()
249 outstanding = Set()
249 results = Instance('collections.defaultdict', (dict,))
250 results = Instance('collections.defaultdict', (dict,))
250 metadata = Instance('collections.defaultdict', (Metadata,))
251 metadata = Instance('collections.defaultdict', (Metadata,))
251 history = List()
252 history = List()
252 debug = Bool(False)
253 debug = Bool(False)
253 _spin_thread = Any()
254 _spin_thread = Any()
254 _stop_spinning = Any()
255 _stop_spinning = Any()
255
256
256 profile=Unicode()
257 profile=Unicode()
257 def _profile_default(self):
258 def _profile_default(self):
258 if BaseIPythonApplication.initialized():
259 if BaseIPythonApplication.initialized():
259 # an IPython app *might* be running, try to get its profile
260 # an IPython app *might* be running, try to get its profile
260 try:
261 try:
261 return BaseIPythonApplication.instance().profile
262 return BaseIPythonApplication.instance().profile
262 except (AttributeError, MultipleInstanceError):
263 except (AttributeError, MultipleInstanceError):
263 # could be a *different* subclass of config.Application,
264 # could be a *different* subclass of config.Application,
264 # which would raise one of these two errors.
265 # which would raise one of these two errors.
265 return u'default'
266 return u'default'
266 else:
267 else:
267 return u'default'
268 return u'default'
268
269
269
270
270 _outstanding_dict = Instance('collections.defaultdict', (set,))
271 _outstanding_dict = Instance('collections.defaultdict', (set,))
271 _ids = List()
272 _ids = List()
272 _connected=Bool(False)
273 _connected=Bool(False)
273 _ssh=Bool(False)
274 _ssh=Bool(False)
274 _context = Instance('zmq.Context')
275 _context = Instance('zmq.Context')
275 _config = Dict()
276 _config = Dict()
276 _engines=Instance(util.ReverseDict, (), {})
277 _engines=Instance(util.ReverseDict, (), {})
277 # _hub_socket=Instance('zmq.Socket')
278 # _hub_socket=Instance('zmq.Socket')
278 _query_socket=Instance('zmq.Socket')
279 _query_socket=Instance('zmq.Socket')
279 _control_socket=Instance('zmq.Socket')
280 _control_socket=Instance('zmq.Socket')
280 _iopub_socket=Instance('zmq.Socket')
281 _iopub_socket=Instance('zmq.Socket')
281 _notification_socket=Instance('zmq.Socket')
282 _notification_socket=Instance('zmq.Socket')
282 _mux_socket=Instance('zmq.Socket')
283 _mux_socket=Instance('zmq.Socket')
283 _task_socket=Instance('zmq.Socket')
284 _task_socket=Instance('zmq.Socket')
284 _task_scheme=Unicode()
285 _task_scheme=Unicode()
285 _closed = False
286 _closed = False
286 _ignored_control_replies=Integer(0)
287 _ignored_control_replies=Integer(0)
287 _ignored_hub_replies=Integer(0)
288 _ignored_hub_replies=Integer(0)
288
289
289 def __new__(self, *args, **kw):
290 def __new__(self, *args, **kw):
290 # don't raise on positional args
291 # don't raise on positional args
291 return HasTraits.__new__(self, **kw)
292 return HasTraits.__new__(self, **kw)
292
293
293 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
294 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
294 context=None, debug=False, exec_key=None,
295 context=None, debug=False, exec_key=None,
295 sshserver=None, sshkey=None, password=None, paramiko=None,
296 sshserver=None, sshkey=None, password=None, paramiko=None,
296 timeout=10, **extra_args
297 timeout=10, **extra_args
297 ):
298 ):
298 if profile:
299 if profile:
299 super(Client, self).__init__(debug=debug, profile=profile)
300 super(Client, self).__init__(debug=debug, profile=profile)
300 else:
301 else:
301 super(Client, self).__init__(debug=debug)
302 super(Client, self).__init__(debug=debug)
302 if context is None:
303 if context is None:
303 context = zmq.Context.instance()
304 context = zmq.Context.instance()
304 self._context = context
305 self._context = context
305 self._stop_spinning = Event()
306 self._stop_spinning = Event()
306
307
307 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
308 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
308 if self._cd is not None:
309 if self._cd is not None:
309 if url_or_file is None:
310 if url_or_file is None:
310 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
311 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
311 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
312 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
312 " Please specify at least one of url_or_file or profile."
313 " Please specify at least one of url_or_file or profile."
313
314
314 if not util.is_url(url_or_file):
315 if not util.is_url(url_or_file):
315 # it's not a url, try for a file
316 # it's not a url, try for a file
316 if not os.path.exists(url_or_file):
317 if not os.path.exists(url_or_file):
317 if self._cd:
318 if self._cd:
318 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
319 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
319 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
320 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
320 with open(url_or_file) as f:
321 with open(url_or_file) as f:
321 cfg = json.loads(f.read())
322 cfg = json.loads(f.read())
322 else:
323 else:
323 cfg = {'url':url_or_file}
324 cfg = {'url':url_or_file}
324
325
325 # sync defaults from args, json:
326 # sync defaults from args, json:
326 if sshserver:
327 if sshserver:
327 cfg['ssh'] = sshserver
328 cfg['ssh'] = sshserver
328 if exec_key:
329 if exec_key:
329 cfg['exec_key'] = exec_key
330 cfg['exec_key'] = exec_key
330 exec_key = cfg['exec_key']
331 exec_key = cfg['exec_key']
331 location = cfg.setdefault('location', None)
332 location = cfg.setdefault('location', None)
332 cfg['url'] = util.disambiguate_url(cfg['url'], location)
333 cfg['url'] = util.disambiguate_url(cfg['url'], location)
333 url = cfg['url']
334 url = cfg['url']
334 proto,addr,port = util.split_url(url)
335 proto,addr,port = util.split_url(url)
335 if location is not None and addr == '127.0.0.1':
336 if location is not None and addr == '127.0.0.1':
336 # location specified, and connection is expected to be local
337 # location specified, and connection is expected to be local
337 if location not in LOCAL_IPS and not sshserver:
338 if location not in LOCAL_IPS and not sshserver:
338 # load ssh from JSON *only* if the controller is not on
339 # load ssh from JSON *only* if the controller is not on
339 # this machine
340 # this machine
340 sshserver=cfg['ssh']
341 sshserver=cfg['ssh']
341 if location not in LOCAL_IPS and not sshserver:
342 if location not in LOCAL_IPS and not sshserver:
342 # warn if no ssh specified, but SSH is probably needed
343 # warn if no ssh specified, but SSH is probably needed
343 # This is only a warning, because the most likely cause
344 # This is only a warning, because the most likely cause
344 # is a local Controller on a laptop whose IP is dynamic
345 # is a local Controller on a laptop whose IP is dynamic
345 warnings.warn("""
346 warnings.warn("""
346 Controller appears to be listening on localhost, but not on this machine.
347 Controller appears to be listening on localhost, but not on this machine.
347 If this is true, you should specify Client(...,sshserver='you@%s')
348 If this is true, you should specify Client(...,sshserver='you@%s')
348 or instruct your controller to listen on an external IP."""%location,
349 or instruct your controller to listen on an external IP."""%location,
349 RuntimeWarning)
350 RuntimeWarning)
350 elif not sshserver:
351 elif not sshserver:
351 # otherwise sync with cfg
352 # otherwise sync with cfg
352 sshserver = cfg['ssh']
353 sshserver = cfg['ssh']
353
354
354 self._config = cfg
355 self._config = cfg
355
356
356 self._ssh = bool(sshserver or sshkey or password)
357 self._ssh = bool(sshserver or sshkey or password)
357 if self._ssh and sshserver is None:
358 if self._ssh and sshserver is None:
358 # default to ssh via localhost
359 # default to ssh via localhost
359 sshserver = url.split('://')[1].split(':')[0]
360 sshserver = url.split('://')[1].split(':')[0]
360 if self._ssh and password is None:
361 if self._ssh and password is None:
361 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
362 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
362 password=False
363 password=False
363 else:
364 else:
364 password = getpass("SSH Password for %s: "%sshserver)
365 password = getpass("SSH Password for %s: "%sshserver)
365 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
366 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
366
367
367 # configure and construct the session
368 # configure and construct the session
368 if exec_key is not None:
369 if exec_key is not None:
369 if os.path.isfile(exec_key):
370 if os.path.isfile(exec_key):
370 extra_args['keyfile'] = exec_key
371 extra_args['keyfile'] = exec_key
371 else:
372 else:
372 exec_key = util.asbytes(exec_key)
373 exec_key = util.asbytes(exec_key)
373 extra_args['key'] = exec_key
374 extra_args['key'] = exec_key
374 self.session = Session(**extra_args)
375 self.session = Session(**extra_args)
375
376
376 self._query_socket = self._context.socket(zmq.DEALER)
377 self._query_socket = self._context.socket(zmq.DEALER)
377 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
378 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
378 if self._ssh:
379 if self._ssh:
379 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
380 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
380 else:
381 else:
381 self._query_socket.connect(url)
382 self._query_socket.connect(url)
382
383
383 self.session.debug = self.debug
384 self.session.debug = self.debug
384
385
385 self._notification_handlers = {'registration_notification' : self._register_engine,
386 self._notification_handlers = {'registration_notification' : self._register_engine,
386 'unregistration_notification' : self._unregister_engine,
387 'unregistration_notification' : self._unregister_engine,
387 'shutdown_notification' : lambda msg: self.close(),
388 'shutdown_notification' : lambda msg: self.close(),
388 }
389 }
389 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
390 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
390 'apply_reply' : self._handle_apply_reply}
391 'apply_reply' : self._handle_apply_reply}
391 self._connect(sshserver, ssh_kwargs, timeout)
392 self._connect(sshserver, ssh_kwargs, timeout)
392
393
393 def __del__(self):
394 def __del__(self):
394 """cleanup sockets, but _not_ context."""
395 """cleanup sockets, but _not_ context."""
395 self.close()
396 self.close()
396
397
397 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
398 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
398 if ipython_dir is None:
399 if ipython_dir is None:
399 ipython_dir = get_ipython_dir()
400 ipython_dir = get_ipython_dir()
400 if profile_dir is not None:
401 if profile_dir is not None:
401 try:
402 try:
402 self._cd = ProfileDir.find_profile_dir(profile_dir)
403 self._cd = ProfileDir.find_profile_dir(profile_dir)
403 return
404 return
404 except ProfileDirError:
405 except ProfileDirError:
405 pass
406 pass
406 elif profile is not None:
407 elif profile is not None:
407 try:
408 try:
408 self._cd = ProfileDir.find_profile_dir_by_name(
409 self._cd = ProfileDir.find_profile_dir_by_name(
409 ipython_dir, profile)
410 ipython_dir, profile)
410 return
411 return
411 except ProfileDirError:
412 except ProfileDirError:
412 pass
413 pass
413 self._cd = None
414 self._cd = None
414
415
415 def _update_engines(self, engines):
416 def _update_engines(self, engines):
416 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
417 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
417 for k,v in engines.iteritems():
418 for k,v in engines.iteritems():
418 eid = int(k)
419 eid = int(k)
419 self._engines[eid] = v
420 self._engines[eid] = v
420 self._ids.append(eid)
421 self._ids.append(eid)
421 self._ids = sorted(self._ids)
422 self._ids = sorted(self._ids)
422 if sorted(self._engines.keys()) != range(len(self._engines)) and \
423 if sorted(self._engines.keys()) != range(len(self._engines)) and \
423 self._task_scheme == 'pure' and self._task_socket:
424 self._task_scheme == 'pure' and self._task_socket:
424 self._stop_scheduling_tasks()
425 self._stop_scheduling_tasks()
425
426
426 def _stop_scheduling_tasks(self):
427 def _stop_scheduling_tasks(self):
427 """Stop scheduling tasks because an engine has been unregistered
428 """Stop scheduling tasks because an engine has been unregistered
428 from a pure ZMQ scheduler.
429 from a pure ZMQ scheduler.
429 """
430 """
430 self._task_socket.close()
431 self._task_socket.close()
431 self._task_socket = None
432 self._task_socket = None
432 msg = "An engine has been unregistered, and we are using pure " +\
433 msg = "An engine has been unregistered, and we are using pure " +\
433 "ZMQ task scheduling. Task farming will be disabled."
434 "ZMQ task scheduling. Task farming will be disabled."
434 if self.outstanding:
435 if self.outstanding:
435 msg += " If you were running tasks when this happened, " +\
436 msg += " If you were running tasks when this happened, " +\
436 "some `outstanding` msg_ids may never resolve."
437 "some `outstanding` msg_ids may never resolve."
437 warnings.warn(msg, RuntimeWarning)
438 warnings.warn(msg, RuntimeWarning)
438
439
439 def _build_targets(self, targets):
440 def _build_targets(self, targets):
440 """Turn valid target IDs or 'all' into two lists:
441 """Turn valid target IDs or 'all' into two lists:
441 (int_ids, uuids).
442 (int_ids, uuids).
442 """
443 """
443 if not self._ids:
444 if not self._ids:
444 # flush notification socket if no engines yet, just in case
445 # flush notification socket if no engines yet, just in case
445 if not self.ids:
446 if not self.ids:
446 raise error.NoEnginesRegistered("Can't build targets without any engines")
447 raise error.NoEnginesRegistered("Can't build targets without any engines")
447
448
448 if targets is None:
449 if targets is None:
449 targets = self._ids
450 targets = self._ids
450 elif isinstance(targets, basestring):
451 elif isinstance(targets, basestring):
451 if targets.lower() == 'all':
452 if targets.lower() == 'all':
452 targets = self._ids
453 targets = self._ids
453 else:
454 else:
454 raise TypeError("%r not valid str target, must be 'all'"%(targets))
455 raise TypeError("%r not valid str target, must be 'all'"%(targets))
455 elif isinstance(targets, int):
456 elif isinstance(targets, int):
456 if targets < 0:
457 if targets < 0:
457 targets = self.ids[targets]
458 targets = self.ids[targets]
458 if targets not in self._ids:
459 if targets not in self._ids:
459 raise IndexError("No such engine: %i"%targets)
460 raise IndexError("No such engine: %i"%targets)
460 targets = [targets]
461 targets = [targets]
461
462
462 if isinstance(targets, slice):
463 if isinstance(targets, slice):
463 indices = range(len(self._ids))[targets]
464 indices = range(len(self._ids))[targets]
464 ids = self.ids
465 ids = self.ids
465 targets = [ ids[i] for i in indices ]
466 targets = [ ids[i] for i in indices ]
466
467
467 if not isinstance(targets, (tuple, list, xrange)):
468 if not isinstance(targets, (tuple, list, xrange)):
468 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
469 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
469
470
470 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
471 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
471
472
472 def _connect(self, sshserver, ssh_kwargs, timeout):
473 def _connect(self, sshserver, ssh_kwargs, timeout):
473 """setup all our socket connections to the cluster. This is called from
474 """setup all our socket connections to the cluster. This is called from
474 __init__."""
475 __init__."""
475
476
476 # Maybe allow reconnecting?
477 # Maybe allow reconnecting?
477 if self._connected:
478 if self._connected:
478 return
479 return
479 self._connected=True
480 self._connected=True
480
481
481 def connect_socket(s, url):
482 def connect_socket(s, url):
482 url = util.disambiguate_url(url, self._config['location'])
483 url = util.disambiguate_url(url, self._config['location'])
483 if self._ssh:
484 if self._ssh:
484 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
485 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
485 else:
486 else:
486 return s.connect(url)
487 return s.connect(url)
487
488
488 self.session.send(self._query_socket, 'connection_request')
489 self.session.send(self._query_socket, 'connection_request')
489 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
490 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
490 poller = zmq.Poller()
491 poller = zmq.Poller()
491 poller.register(self._query_socket, zmq.POLLIN)
492 poller.register(self._query_socket, zmq.POLLIN)
492 # poll expects milliseconds, timeout is seconds
493 # poll expects milliseconds, timeout is seconds
493 evts = poller.poll(timeout*1000)
494 evts = poller.poll(timeout*1000)
494 if not evts:
495 if not evts:
495 raise error.TimeoutError("Hub connection request timed out")
496 raise error.TimeoutError("Hub connection request timed out")
496 idents,msg = self.session.recv(self._query_socket,mode=0)
497 idents,msg = self.session.recv(self._query_socket,mode=0)
497 if self.debug:
498 if self.debug:
498 pprint(msg)
499 pprint(msg)
499 msg = Message(msg)
500 msg = Message(msg)
500 content = msg.content
501 content = msg.content
501 self._config['registration'] = dict(content)
502 self._config['registration'] = dict(content)
502 if content.status == 'ok':
503 if content.status == 'ok':
503 ident = self.session.bsession
504 ident = self.session.bsession
504 if content.mux:
505 if content.mux:
505 self._mux_socket = self._context.socket(zmq.DEALER)
506 self._mux_socket = self._context.socket(zmq.DEALER)
506 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
507 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
507 connect_socket(self._mux_socket, content.mux)
508 connect_socket(self._mux_socket, content.mux)
508 if content.task:
509 if content.task:
509 self._task_scheme, task_addr = content.task
510 self._task_scheme, task_addr = content.task
510 self._task_socket = self._context.socket(zmq.DEALER)
511 self._task_socket = self._context.socket(zmq.DEALER)
511 self._task_socket.setsockopt(zmq.IDENTITY, ident)
512 self._task_socket.setsockopt(zmq.IDENTITY, ident)
512 connect_socket(self._task_socket, task_addr)
513 connect_socket(self._task_socket, task_addr)
513 if content.notification:
514 if content.notification:
514 self._notification_socket = self._context.socket(zmq.SUB)
515 self._notification_socket = self._context.socket(zmq.SUB)
515 connect_socket(self._notification_socket, content.notification)
516 connect_socket(self._notification_socket, content.notification)
516 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
517 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
517 # if content.query:
518 # if content.query:
518 # self._query_socket = self._context.socket(zmq.DEALER)
519 # self._query_socket = self._context.socket(zmq.DEALER)
519 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
520 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
520 # connect_socket(self._query_socket, content.query)
521 # connect_socket(self._query_socket, content.query)
521 if content.control:
522 if content.control:
522 self._control_socket = self._context.socket(zmq.DEALER)
523 self._control_socket = self._context.socket(zmq.DEALER)
523 self._control_socket.setsockopt(zmq.IDENTITY, ident)
524 self._control_socket.setsockopt(zmq.IDENTITY, ident)
524 connect_socket(self._control_socket, content.control)
525 connect_socket(self._control_socket, content.control)
525 if content.iopub:
526 if content.iopub:
526 self._iopub_socket = self._context.socket(zmq.SUB)
527 self._iopub_socket = self._context.socket(zmq.SUB)
527 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
528 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
528 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
529 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
529 connect_socket(self._iopub_socket, content.iopub)
530 connect_socket(self._iopub_socket, content.iopub)
530 self._update_engines(dict(content.engines))
531 self._update_engines(dict(content.engines))
531 else:
532 else:
532 self._connected = False
533 self._connected = False
533 raise Exception("Failed to connect!")
534 raise Exception("Failed to connect!")
534
535
535 #--------------------------------------------------------------------------
536 #--------------------------------------------------------------------------
536 # handlers and callbacks for incoming messages
537 # handlers and callbacks for incoming messages
537 #--------------------------------------------------------------------------
538 #--------------------------------------------------------------------------
538
539
539 def _unwrap_exception(self, content):
540 def _unwrap_exception(self, content):
540 """unwrap exception, and remap engine_id to int."""
541 """unwrap exception, and remap engine_id to int."""
541 e = error.unwrap_exception(content)
542 e = error.unwrap_exception(content)
542 # print e.traceback
543 # print e.traceback
543 if e.engine_info:
544 if e.engine_info:
544 e_uuid = e.engine_info['engine_uuid']
545 e_uuid = e.engine_info['engine_uuid']
545 eid = self._engines[e_uuid]
546 eid = self._engines[e_uuid]
546 e.engine_info['engine_id'] = eid
547 e.engine_info['engine_id'] = eid
547 return e
548 return e
548
549
549 def _extract_metadata(self, header, parent, content):
550 def _extract_metadata(self, header, parent, content):
550 md = {'msg_id' : parent['msg_id'],
551 md = {'msg_id' : parent['msg_id'],
551 'received' : datetime.now(),
552 'received' : datetime.now(),
552 'engine_uuid' : header.get('engine', None),
553 'engine_uuid' : header.get('engine', None),
553 'follow' : parent.get('follow', []),
554 'follow' : parent.get('follow', []),
554 'after' : parent.get('after', []),
555 'after' : parent.get('after', []),
555 'status' : content['status'],
556 'status' : content['status'],
556 }
557 }
557
558
558 if md['engine_uuid'] is not None:
559 if md['engine_uuid'] is not None:
559 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
560 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
560
561
561 if 'date' in parent:
562 if 'date' in parent:
562 md['submitted'] = parent['date']
563 md['submitted'] = parent['date']
563 if 'started' in header:
564 if 'started' in header:
564 md['started'] = header['started']
565 md['started'] = header['started']
565 if 'date' in header:
566 if 'date' in header:
566 md['completed'] = header['date']
567 md['completed'] = header['date']
567 return md
568 return md
568
569
569 def _register_engine(self, msg):
570 def _register_engine(self, msg):
570 """Register a new engine, and update our connection info."""
571 """Register a new engine, and update our connection info."""
571 content = msg['content']
572 content = msg['content']
572 eid = content['id']
573 eid = content['id']
573 d = {eid : content['queue']}
574 d = {eid : content['queue']}
574 self._update_engines(d)
575 self._update_engines(d)
575
576
576 def _unregister_engine(self, msg):
577 def _unregister_engine(self, msg):
577 """Unregister an engine that has died."""
578 """Unregister an engine that has died."""
578 content = msg['content']
579 content = msg['content']
579 eid = int(content['id'])
580 eid = int(content['id'])
580 if eid in self._ids:
581 if eid in self._ids:
581 self._ids.remove(eid)
582 self._ids.remove(eid)
582 uuid = self._engines.pop(eid)
583 uuid = self._engines.pop(eid)
583
584
584 self._handle_stranded_msgs(eid, uuid)
585 self._handle_stranded_msgs(eid, uuid)
585
586
586 if self._task_socket and self._task_scheme == 'pure':
587 if self._task_socket and self._task_scheme == 'pure':
587 self._stop_scheduling_tasks()
588 self._stop_scheduling_tasks()
588
589
589 def _handle_stranded_msgs(self, eid, uuid):
590 def _handle_stranded_msgs(self, eid, uuid):
590 """Handle messages known to be on an engine when the engine unregisters.
591 """Handle messages known to be on an engine when the engine unregisters.
591
592
592 It is possible that this will fire prematurely - that is, an engine will
593 It is possible that this will fire prematurely - that is, an engine will
593 go down after completing a result, and the client will be notified
594 go down after completing a result, and the client will be notified
594 of the unregistration and later receive the successful result.
595 of the unregistration and later receive the successful result.
595 """
596 """
596
597
597 outstanding = self._outstanding_dict[uuid]
598 outstanding = self._outstanding_dict[uuid]
598
599
599 for msg_id in list(outstanding):
600 for msg_id in list(outstanding):
600 if msg_id in self.results:
601 if msg_id in self.results:
601 # we already
602 # we already
602 continue
603 continue
603 try:
604 try:
604 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
605 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
605 except:
606 except:
606 content = error.wrap_exception()
607 content = error.wrap_exception()
607 # build a fake message:
608 # build a fake message:
608 parent = {}
609 parent = {}
609 header = {}
610 header = {}
610 parent['msg_id'] = msg_id
611 parent['msg_id'] = msg_id
611 header['engine'] = uuid
612 header['engine'] = uuid
612 header['date'] = datetime.now()
613 header['date'] = datetime.now()
613 msg = dict(parent_header=parent, header=header, content=content)
614 msg = dict(parent_header=parent, header=header, content=content)
614 self._handle_apply_reply(msg)
615 self._handle_apply_reply(msg)
615
616
616 def _handle_execute_reply(self, msg):
617 def _handle_execute_reply(self, msg):
617 """Save the reply to an execute_request into our results.
618 """Save the reply to an execute_request into our results.
618
619
619 execute messages are never actually used. apply is used instead.
620 execute messages are never actually used. apply is used instead.
620 """
621 """
621
622
622 parent = msg['parent_header']
623 parent = msg['parent_header']
623 msg_id = parent['msg_id']
624 msg_id = parent['msg_id']
624 if msg_id not in self.outstanding:
625 if msg_id not in self.outstanding:
625 if msg_id in self.history:
626 if msg_id in self.history:
626 print ("got stale result: %s"%msg_id)
627 print ("got stale result: %s"%msg_id)
627 else:
628 else:
628 print ("got unknown result: %s"%msg_id)
629 print ("got unknown result: %s"%msg_id)
629 else:
630 else:
630 self.outstanding.remove(msg_id)
631 self.outstanding.remove(msg_id)
631
632
632 content = msg['content']
633 content = msg['content']
633 header = msg['header']
634 header = msg['header']
634
635
635 # construct metadata:
636 # construct metadata:
636 md = self.metadata[msg_id]
637 md = self.metadata[msg_id]
637 md.update(self._extract_metadata(header, parent, content))
638 md.update(self._extract_metadata(header, parent, content))
638 # is this redundant?
639 # is this redundant?
639 self.metadata[msg_id] = md
640 self.metadata[msg_id] = md
640
641
641 e_outstanding = self._outstanding_dict[md['engine_uuid']]
642 e_outstanding = self._outstanding_dict[md['engine_uuid']]
642 if msg_id in e_outstanding:
643 if msg_id in e_outstanding:
643 e_outstanding.remove(msg_id)
644 e_outstanding.remove(msg_id)
644
645
645 # construct result:
646 # construct result:
646 if content['status'] == 'ok':
647 if content['status'] == 'ok':
647 self.results[msg_id] = content
648 self.results[msg_id] = content
648 elif content['status'] == 'aborted':
649 elif content['status'] == 'aborted':
649 self.results[msg_id] = error.TaskAborted(msg_id)
650 self.results[msg_id] = error.TaskAborted(msg_id)
650 elif content['status'] == 'resubmitted':
651 elif content['status'] == 'resubmitted':
651 # TODO: handle resubmission
652 # TODO: handle resubmission
652 pass
653 pass
653 else:
654 else:
654 self.results[msg_id] = self._unwrap_exception(content)
655 self.results[msg_id] = self._unwrap_exception(content)
655
656
656 def _handle_apply_reply(self, msg):
657 def _handle_apply_reply(self, msg):
657 """Save the reply to an apply_request into our results."""
658 """Save the reply to an apply_request into our results."""
658 parent = msg['parent_header']
659 parent = msg['parent_header']
659 msg_id = parent['msg_id']
660 msg_id = parent['msg_id']
660 if msg_id not in self.outstanding:
661 if msg_id not in self.outstanding:
661 if msg_id in self.history:
662 if msg_id in self.history:
662 print ("got stale result: %s"%msg_id)
663 print ("got stale result: %s"%msg_id)
663 print self.results[msg_id]
664 print self.results[msg_id]
664 print msg
665 print msg
665 else:
666 else:
666 print ("got unknown result: %s"%msg_id)
667 print ("got unknown result: %s"%msg_id)
667 else:
668 else:
668 self.outstanding.remove(msg_id)
669 self.outstanding.remove(msg_id)
669 content = msg['content']
670 content = msg['content']
670 header = msg['header']
671 header = msg['header']
671
672
672 # construct metadata:
673 # construct metadata:
673 md = self.metadata[msg_id]
674 md = self.metadata[msg_id]
674 md.update(self._extract_metadata(header, parent, content))
675 md.update(self._extract_metadata(header, parent, content))
675 # is this redundant?
676 # is this redundant?
676 self.metadata[msg_id] = md
677 self.metadata[msg_id] = md
677
678
678 e_outstanding = self._outstanding_dict[md['engine_uuid']]
679 e_outstanding = self._outstanding_dict[md['engine_uuid']]
679 if msg_id in e_outstanding:
680 if msg_id in e_outstanding:
680 e_outstanding.remove(msg_id)
681 e_outstanding.remove(msg_id)
681
682
682 # construct result:
683 # construct result:
683 if content['status'] == 'ok':
684 if content['status'] == 'ok':
684 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
685 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
685 elif content['status'] == 'aborted':
686 elif content['status'] == 'aborted':
686 self.results[msg_id] = error.TaskAborted(msg_id)
687 self.results[msg_id] = error.TaskAborted(msg_id)
687 elif content['status'] == 'resubmitted':
688 elif content['status'] == 'resubmitted':
688 # TODO: handle resubmission
689 # TODO: handle resubmission
689 pass
690 pass
690 else:
691 else:
691 self.results[msg_id] = self._unwrap_exception(content)
692 self.results[msg_id] = self._unwrap_exception(content)
692
693
693 def _flush_notifications(self):
694 def _flush_notifications(self):
694 """Flush notifications of engine registrations waiting
695 """Flush notifications of engine registrations waiting
695 in ZMQ queue."""
696 in ZMQ queue."""
696 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
697 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
697 while msg is not None:
698 while msg is not None:
698 if self.debug:
699 if self.debug:
699 pprint(msg)
700 pprint(msg)
700 msg_type = msg['header']['msg_type']
701 msg_type = msg['header']['msg_type']
701 handler = self._notification_handlers.get(msg_type, None)
702 handler = self._notification_handlers.get(msg_type, None)
702 if handler is None:
703 if handler is None:
703 raise Exception("Unhandled message type: %s"%msg.msg_type)
704 raise Exception("Unhandled message type: %s"%msg.msg_type)
704 else:
705 else:
705 handler(msg)
706 handler(msg)
706 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
707 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
707
708
708 def _flush_results(self, sock):
709 def _flush_results(self, sock):
709 """Flush task or queue results waiting in ZMQ queue."""
710 """Flush task or queue results waiting in ZMQ queue."""
710 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
711 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
711 while msg is not None:
712 while msg is not None:
712 if self.debug:
713 if self.debug:
713 pprint(msg)
714 pprint(msg)
714 msg_type = msg['header']['msg_type']
715 msg_type = msg['header']['msg_type']
715 handler = self._queue_handlers.get(msg_type, None)
716 handler = self._queue_handlers.get(msg_type, None)
716 if handler is None:
717 if handler is None:
717 raise Exception("Unhandled message type: %s"%msg.msg_type)
718 raise Exception("Unhandled message type: %s"%msg.msg_type)
718 else:
719 else:
719 handler(msg)
720 handler(msg)
720 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
721 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
721
722
722 def _flush_control(self, sock):
723 def _flush_control(self, sock):
723 """Flush replies from the control channel waiting
724 """Flush replies from the control channel waiting
724 in the ZMQ queue.
725 in the ZMQ queue.
725
726
726 Currently: ignore them."""
727 Currently: ignore them."""
727 if self._ignored_control_replies <= 0:
728 if self._ignored_control_replies <= 0:
728 return
729 return
729 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
730 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
730 while msg is not None:
731 while msg is not None:
731 self._ignored_control_replies -= 1
732 self._ignored_control_replies -= 1
732 if self.debug:
733 if self.debug:
733 pprint(msg)
734 pprint(msg)
734 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
735 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
735
736
736 def _flush_ignored_control(self):
737 def _flush_ignored_control(self):
737 """flush ignored control replies"""
738 """flush ignored control replies"""
738 while self._ignored_control_replies > 0:
739 while self._ignored_control_replies > 0:
739 self.session.recv(self._control_socket)
740 self.session.recv(self._control_socket)
740 self._ignored_control_replies -= 1
741 self._ignored_control_replies -= 1
741
742
742 def _flush_ignored_hub_replies(self):
743 def _flush_ignored_hub_replies(self):
743 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
744 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
744 while msg is not None:
745 while msg is not None:
745 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
746 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
746
747
747 def _flush_iopub(self, sock):
748 def _flush_iopub(self, sock):
748 """Flush replies from the iopub channel waiting
749 """Flush replies from the iopub channel waiting
749 in the ZMQ queue.
750 in the ZMQ queue.
750 """
751 """
751 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
752 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
752 while msg is not None:
753 while msg is not None:
753 if self.debug:
754 if self.debug:
754 pprint(msg)
755 pprint(msg)
755 parent = msg['parent_header']
756 parent = msg['parent_header']
756 # ignore IOPub messages with no parent.
757 # ignore IOPub messages with no parent.
757 # Caused by print statements or warnings from before the first execution.
758 # Caused by print statements or warnings from before the first execution.
758 if not parent:
759 if not parent:
759 continue
760 continue
760 msg_id = parent['msg_id']
761 msg_id = parent['msg_id']
761 content = msg['content']
762 content = msg['content']
762 header = msg['header']
763 header = msg['header']
763 msg_type = msg['header']['msg_type']
764 msg_type = msg['header']['msg_type']
764
765
765 # init metadata:
766 # init metadata:
766 md = self.metadata[msg_id]
767 md = self.metadata[msg_id]
767
768
768 if msg_type == 'stream':
769 if msg_type == 'stream':
769 name = content['name']
770 name = content['name']
770 s = md[name] or ''
771 s = md[name] or ''
771 md[name] = s + content['data']
772 md[name] = s + content['data']
772 elif msg_type == 'pyerr':
773 elif msg_type == 'pyerr':
773 md.update({'pyerr' : self._unwrap_exception(content)})
774 md.update({'pyerr' : self._unwrap_exception(content)})
774 elif msg_type == 'pyin':
775 elif msg_type == 'pyin':
775 md.update({'pyin' : content['code']})
776 md.update({'pyin' : content['code']})
777 elif msg_type == 'display_data':
778 md['outputs'].append(content.get('data'))
779 elif msg_type == 'pyout':
780 md['pyout'] = content.get('data')
776 else:
781 else:
777 md.update({msg_type : content.get('data', '')})
782 # unhandled msg_type (status, etc.)
783 pass
778
784
779 # reduntant?
785 # reduntant?
780 self.metadata[msg_id] = md
786 self.metadata[msg_id] = md
781
787
782 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
788 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
783
789
784 #--------------------------------------------------------------------------
790 #--------------------------------------------------------------------------
785 # len, getitem
791 # len, getitem
786 #--------------------------------------------------------------------------
792 #--------------------------------------------------------------------------
787
793
788 def __len__(self):
794 def __len__(self):
789 """len(client) returns # of engines."""
795 """len(client) returns # of engines."""
790 return len(self.ids)
796 return len(self.ids)
791
797
792 def __getitem__(self, key):
798 def __getitem__(self, key):
793 """index access returns DirectView multiplexer objects
799 """index access returns DirectView multiplexer objects
794
800
795 Must be int, slice, or list/tuple/xrange of ints"""
801 Must be int, slice, or list/tuple/xrange of ints"""
796 if not isinstance(key, (int, slice, tuple, list, xrange)):
802 if not isinstance(key, (int, slice, tuple, list, xrange)):
797 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
803 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
798 else:
804 else:
799 return self.direct_view(key)
805 return self.direct_view(key)
800
806
801 #--------------------------------------------------------------------------
807 #--------------------------------------------------------------------------
802 # Begin public methods
808 # Begin public methods
803 #--------------------------------------------------------------------------
809 #--------------------------------------------------------------------------
804
810
805 @property
811 @property
806 def ids(self):
812 def ids(self):
807 """Always up-to-date ids property."""
813 """Always up-to-date ids property."""
808 self._flush_notifications()
814 self._flush_notifications()
809 # always copy:
815 # always copy:
810 return list(self._ids)
816 return list(self._ids)
811
817
812 def close(self):
818 def close(self):
813 if self._closed:
819 if self._closed:
814 return
820 return
815 self.stop_spin_thread()
821 self.stop_spin_thread()
816 snames = filter(lambda n: n.endswith('socket'), dir(self))
822 snames = filter(lambda n: n.endswith('socket'), dir(self))
817 for socket in map(lambda name: getattr(self, name), snames):
823 for socket in map(lambda name: getattr(self, name), snames):
818 if isinstance(socket, zmq.Socket) and not socket.closed:
824 if isinstance(socket, zmq.Socket) and not socket.closed:
819 socket.close()
825 socket.close()
820 self._closed = True
826 self._closed = True
821
827
822 def _spin_every(self, interval=1):
828 def _spin_every(self, interval=1):
823 """target func for use in spin_thread"""
829 """target func for use in spin_thread"""
824 while True:
830 while True:
825 if self._stop_spinning.is_set():
831 if self._stop_spinning.is_set():
826 return
832 return
827 time.sleep(interval)
833 time.sleep(interval)
828 self.spin()
834 self.spin()
829
835
830 def spin_thread(self, interval=1):
836 def spin_thread(self, interval=1):
831 """call Client.spin() in a background thread on some regular interval
837 """call Client.spin() in a background thread on some regular interval
832
838
833 This helps ensure that messages don't pile up too much in the zmq queue
839 This helps ensure that messages don't pile up too much in the zmq queue
834 while you are working on other things, or just leaving an idle terminal.
840 while you are working on other things, or just leaving an idle terminal.
835
841
836 It also helps limit potential padding of the `received` timestamp
842 It also helps limit potential padding of the `received` timestamp
837 on AsyncResult objects, used for timings.
843 on AsyncResult objects, used for timings.
838
844
839 Parameters
845 Parameters
840 ----------
846 ----------
841
847
842 interval : float, optional
848 interval : float, optional
843 The interval on which to spin the client in the background thread
849 The interval on which to spin the client in the background thread
844 (simply passed to time.sleep).
850 (simply passed to time.sleep).
845
851
846 Notes
852 Notes
847 -----
853 -----
848
854
849 For precision timing, you may want to use this method to put a bound
855 For precision timing, you may want to use this method to put a bound
850 on the jitter (in seconds) in `received` timestamps used
856 on the jitter (in seconds) in `received` timestamps used
851 in AsyncResult.wall_time.
857 in AsyncResult.wall_time.
852
858
853 """
859 """
854 if self._spin_thread is not None:
860 if self._spin_thread is not None:
855 self.stop_spin_thread()
861 self.stop_spin_thread()
856 self._stop_spinning.clear()
862 self._stop_spinning.clear()
857 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
863 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
858 self._spin_thread.daemon = True
864 self._spin_thread.daemon = True
859 self._spin_thread.start()
865 self._spin_thread.start()
860
866
861 def stop_spin_thread(self):
867 def stop_spin_thread(self):
862 """stop background spin_thread, if any"""
868 """stop background spin_thread, if any"""
863 if self._spin_thread is not None:
869 if self._spin_thread is not None:
864 self._stop_spinning.set()
870 self._stop_spinning.set()
865 self._spin_thread.join()
871 self._spin_thread.join()
866 self._spin_thread = None
872 self._spin_thread = None
867
873
868 def spin(self):
874 def spin(self):
869 """Flush any registration notifications and execution results
875 """Flush any registration notifications and execution results
870 waiting in the ZMQ queue.
876 waiting in the ZMQ queue.
871 """
877 """
872 if self._notification_socket:
878 if self._notification_socket:
873 self._flush_notifications()
879 self._flush_notifications()
874 if self._mux_socket:
880 if self._mux_socket:
875 self._flush_results(self._mux_socket)
881 self._flush_results(self._mux_socket)
876 if self._task_socket:
882 if self._task_socket:
877 self._flush_results(self._task_socket)
883 self._flush_results(self._task_socket)
878 if self._control_socket:
884 if self._control_socket:
879 self._flush_control(self._control_socket)
885 self._flush_control(self._control_socket)
880 if self._iopub_socket:
886 if self._iopub_socket:
881 self._flush_iopub(self._iopub_socket)
887 self._flush_iopub(self._iopub_socket)
882 if self._query_socket:
888 if self._query_socket:
883 self._flush_ignored_hub_replies()
889 self._flush_ignored_hub_replies()
884
890
885 def wait(self, jobs=None, timeout=-1):
891 def wait(self, jobs=None, timeout=-1):
886 """waits on one or more `jobs`, for up to `timeout` seconds.
892 """waits on one or more `jobs`, for up to `timeout` seconds.
887
893
888 Parameters
894 Parameters
889 ----------
895 ----------
890
896
891 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
897 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
892 ints are indices to self.history
898 ints are indices to self.history
893 strs are msg_ids
899 strs are msg_ids
894 default: wait on all outstanding messages
900 default: wait on all outstanding messages
895 timeout : float
901 timeout : float
896 a time in seconds, after which to give up.
902 a time in seconds, after which to give up.
897 default is -1, which means no timeout
903 default is -1, which means no timeout
898
904
899 Returns
905 Returns
900 -------
906 -------
901
907
902 True : when all msg_ids are done
908 True : when all msg_ids are done
903 False : timeout reached, some msg_ids still outstanding
909 False : timeout reached, some msg_ids still outstanding
904 """
910 """
905 tic = time.time()
911 tic = time.time()
906 if jobs is None:
912 if jobs is None:
907 theids = self.outstanding
913 theids = self.outstanding
908 else:
914 else:
909 if isinstance(jobs, (int, basestring, AsyncResult)):
915 if isinstance(jobs, (int, basestring, AsyncResult)):
910 jobs = [jobs]
916 jobs = [jobs]
911 theids = set()
917 theids = set()
912 for job in jobs:
918 for job in jobs:
913 if isinstance(job, int):
919 if isinstance(job, int):
914 # index access
920 # index access
915 job = self.history[job]
921 job = self.history[job]
916 elif isinstance(job, AsyncResult):
922 elif isinstance(job, AsyncResult):
917 map(theids.add, job.msg_ids)
923 map(theids.add, job.msg_ids)
918 continue
924 continue
919 theids.add(job)
925 theids.add(job)
920 if not theids.intersection(self.outstanding):
926 if not theids.intersection(self.outstanding):
921 return True
927 return True
922 self.spin()
928 self.spin()
923 while theids.intersection(self.outstanding):
929 while theids.intersection(self.outstanding):
924 if timeout >= 0 and ( time.time()-tic ) > timeout:
930 if timeout >= 0 and ( time.time()-tic ) > timeout:
925 break
931 break
926 time.sleep(1e-3)
932 time.sleep(1e-3)
927 self.spin()
933 self.spin()
928 return len(theids.intersection(self.outstanding)) == 0
934 return len(theids.intersection(self.outstanding)) == 0
929
935
930 #--------------------------------------------------------------------------
936 #--------------------------------------------------------------------------
931 # Control methods
937 # Control methods
932 #--------------------------------------------------------------------------
938 #--------------------------------------------------------------------------
933
939
934 @spin_first
940 @spin_first
935 def clear(self, targets=None, block=None):
941 def clear(self, targets=None, block=None):
936 """Clear the namespace in target(s)."""
942 """Clear the namespace in target(s)."""
937 block = self.block if block is None else block
943 block = self.block if block is None else block
938 targets = self._build_targets(targets)[0]
944 targets = self._build_targets(targets)[0]
939 for t in targets:
945 for t in targets:
940 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
946 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
941 error = False
947 error = False
942 if block:
948 if block:
943 self._flush_ignored_control()
949 self._flush_ignored_control()
944 for i in range(len(targets)):
950 for i in range(len(targets)):
945 idents,msg = self.session.recv(self._control_socket,0)
951 idents,msg = self.session.recv(self._control_socket,0)
946 if self.debug:
952 if self.debug:
947 pprint(msg)
953 pprint(msg)
948 if msg['content']['status'] != 'ok':
954 if msg['content']['status'] != 'ok':
949 error = self._unwrap_exception(msg['content'])
955 error = self._unwrap_exception(msg['content'])
950 else:
956 else:
951 self._ignored_control_replies += len(targets)
957 self._ignored_control_replies += len(targets)
952 if error:
958 if error:
953 raise error
959 raise error
954
960
955
961
956 @spin_first
962 @spin_first
957 def abort(self, jobs=None, targets=None, block=None):
963 def abort(self, jobs=None, targets=None, block=None):
958 """Abort specific jobs from the execution queues of target(s).
964 """Abort specific jobs from the execution queues of target(s).
959
965
960 This is a mechanism to prevent jobs that have already been submitted
966 This is a mechanism to prevent jobs that have already been submitted
961 from executing.
967 from executing.
962
968
963 Parameters
969 Parameters
964 ----------
970 ----------
965
971
966 jobs : msg_id, list of msg_ids, or AsyncResult
972 jobs : msg_id, list of msg_ids, or AsyncResult
967 The jobs to be aborted
973 The jobs to be aborted
968
974
969 If unspecified/None: abort all outstanding jobs.
975 If unspecified/None: abort all outstanding jobs.
970
976
971 """
977 """
972 block = self.block if block is None else block
978 block = self.block if block is None else block
973 jobs = jobs if jobs is not None else list(self.outstanding)
979 jobs = jobs if jobs is not None else list(self.outstanding)
974 targets = self._build_targets(targets)[0]
980 targets = self._build_targets(targets)[0]
975
981
976 msg_ids = []
982 msg_ids = []
977 if isinstance(jobs, (basestring,AsyncResult)):
983 if isinstance(jobs, (basestring,AsyncResult)):
978 jobs = [jobs]
984 jobs = [jobs]
979 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
985 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
980 if bad_ids:
986 if bad_ids:
981 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
987 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
982 for j in jobs:
988 for j in jobs:
983 if isinstance(j, AsyncResult):
989 if isinstance(j, AsyncResult):
984 msg_ids.extend(j.msg_ids)
990 msg_ids.extend(j.msg_ids)
985 else:
991 else:
986 msg_ids.append(j)
992 msg_ids.append(j)
987 content = dict(msg_ids=msg_ids)
993 content = dict(msg_ids=msg_ids)
988 for t in targets:
994 for t in targets:
989 self.session.send(self._control_socket, 'abort_request',
995 self.session.send(self._control_socket, 'abort_request',
990 content=content, ident=t)
996 content=content, ident=t)
991 error = False
997 error = False
992 if block:
998 if block:
993 self._flush_ignored_control()
999 self._flush_ignored_control()
994 for i in range(len(targets)):
1000 for i in range(len(targets)):
995 idents,msg = self.session.recv(self._control_socket,0)
1001 idents,msg = self.session.recv(self._control_socket,0)
996 if self.debug:
1002 if self.debug:
997 pprint(msg)
1003 pprint(msg)
998 if msg['content']['status'] != 'ok':
1004 if msg['content']['status'] != 'ok':
999 error = self._unwrap_exception(msg['content'])
1005 error = self._unwrap_exception(msg['content'])
1000 else:
1006 else:
1001 self._ignored_control_replies += len(targets)
1007 self._ignored_control_replies += len(targets)
1002 if error:
1008 if error:
1003 raise error
1009 raise error
1004
1010
1005 @spin_first
1011 @spin_first
1006 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1012 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1007 """Terminates one or more engine processes, optionally including the hub."""
1013 """Terminates one or more engine processes, optionally including the hub."""
1008 block = self.block if block is None else block
1014 block = self.block if block is None else block
1009 if hub:
1015 if hub:
1010 targets = 'all'
1016 targets = 'all'
1011 targets = self._build_targets(targets)[0]
1017 targets = self._build_targets(targets)[0]
1012 for t in targets:
1018 for t in targets:
1013 self.session.send(self._control_socket, 'shutdown_request',
1019 self.session.send(self._control_socket, 'shutdown_request',
1014 content={'restart':restart},ident=t)
1020 content={'restart':restart},ident=t)
1015 error = False
1021 error = False
1016 if block or hub:
1022 if block or hub:
1017 self._flush_ignored_control()
1023 self._flush_ignored_control()
1018 for i in range(len(targets)):
1024 for i in range(len(targets)):
1019 idents,msg = self.session.recv(self._control_socket, 0)
1025 idents,msg = self.session.recv(self._control_socket, 0)
1020 if self.debug:
1026 if self.debug:
1021 pprint(msg)
1027 pprint(msg)
1022 if msg['content']['status'] != 'ok':
1028 if msg['content']['status'] != 'ok':
1023 error = self._unwrap_exception(msg['content'])
1029 error = self._unwrap_exception(msg['content'])
1024 else:
1030 else:
1025 self._ignored_control_replies += len(targets)
1031 self._ignored_control_replies += len(targets)
1026
1032
1027 if hub:
1033 if hub:
1028 time.sleep(0.25)
1034 time.sleep(0.25)
1029 self.session.send(self._query_socket, 'shutdown_request')
1035 self.session.send(self._query_socket, 'shutdown_request')
1030 idents,msg = self.session.recv(self._query_socket, 0)
1036 idents,msg = self.session.recv(self._query_socket, 0)
1031 if self.debug:
1037 if self.debug:
1032 pprint(msg)
1038 pprint(msg)
1033 if msg['content']['status'] != 'ok':
1039 if msg['content']['status'] != 'ok':
1034 error = self._unwrap_exception(msg['content'])
1040 error = self._unwrap_exception(msg['content'])
1035
1041
1036 if error:
1042 if error:
1037 raise error
1043 raise error
1038
1044
1039 #--------------------------------------------------------------------------
1045 #--------------------------------------------------------------------------
1040 # Execution related methods
1046 # Execution related methods
1041 #--------------------------------------------------------------------------
1047 #--------------------------------------------------------------------------
1042
1048
1043 def _maybe_raise(self, result):
1049 def _maybe_raise(self, result):
1044 """wrapper for maybe raising an exception if apply failed."""
1050 """wrapper for maybe raising an exception if apply failed."""
1045 if isinstance(result, error.RemoteError):
1051 if isinstance(result, error.RemoteError):
1046 raise result
1052 raise result
1047
1053
1048 return result
1054 return result
1049
1055
1050 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1056 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1051 ident=None):
1057 ident=None):
1052 """construct and send an apply message via a socket.
1058 """construct and send an apply message via a socket.
1053
1059
1054 This is the principal method with which all engine execution is performed by views.
1060 This is the principal method with which all engine execution is performed by views.
1055 """
1061 """
1056
1062
1057 assert not self._closed, "cannot use me anymore, I'm closed!"
1063 assert not self._closed, "cannot use me anymore, I'm closed!"
1058 # defaults:
1064 # defaults:
1059 args = args if args is not None else []
1065 args = args if args is not None else []
1060 kwargs = kwargs if kwargs is not None else {}
1066 kwargs = kwargs if kwargs is not None else {}
1061 subheader = subheader if subheader is not None else {}
1067 subheader = subheader if subheader is not None else {}
1062
1068
1063 # validate arguments
1069 # validate arguments
1064 if not callable(f) and not isinstance(f, Reference):
1070 if not callable(f) and not isinstance(f, Reference):
1065 raise TypeError("f must be callable, not %s"%type(f))
1071 raise TypeError("f must be callable, not %s"%type(f))
1066 if not isinstance(args, (tuple, list)):
1072 if not isinstance(args, (tuple, list)):
1067 raise TypeError("args must be tuple or list, not %s"%type(args))
1073 raise TypeError("args must be tuple or list, not %s"%type(args))
1068 if not isinstance(kwargs, dict):
1074 if not isinstance(kwargs, dict):
1069 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1075 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1070 if not isinstance(subheader, dict):
1076 if not isinstance(subheader, dict):
1071 raise TypeError("subheader must be dict, not %s"%type(subheader))
1077 raise TypeError("subheader must be dict, not %s"%type(subheader))
1072
1078
1073 bufs = util.pack_apply_message(f,args,kwargs)
1079 bufs = util.pack_apply_message(f,args,kwargs)
1074
1080
1075 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1081 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1076 subheader=subheader, track=track)
1082 subheader=subheader, track=track)
1077
1083
1078 msg_id = msg['header']['msg_id']
1084 msg_id = msg['header']['msg_id']
1079 self.outstanding.add(msg_id)
1085 self.outstanding.add(msg_id)
1080 if ident:
1086 if ident:
1081 # possibly routed to a specific engine
1087 # possibly routed to a specific engine
1082 if isinstance(ident, list):
1088 if isinstance(ident, list):
1083 ident = ident[-1]
1089 ident = ident[-1]
1084 if ident in self._engines.values():
1090 if ident in self._engines.values():
1085 # save for later, in case of engine death
1091 # save for later, in case of engine death
1086 self._outstanding_dict[ident].add(msg_id)
1092 self._outstanding_dict[ident].add(msg_id)
1087 self.history.append(msg_id)
1093 self.history.append(msg_id)
1088 self.metadata[msg_id]['submitted'] = datetime.now()
1094 self.metadata[msg_id]['submitted'] = datetime.now()
1089
1095
1090 return msg
1096 return msg
1091
1097
1092 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1098 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1093 """construct and send an execute request via a socket.
1099 """construct and send an execute request via a socket.
1094
1100
1095 """
1101 """
1096
1102
1097 assert not self._closed, "cannot use me anymore, I'm closed!"
1103 assert not self._closed, "cannot use me anymore, I'm closed!"
1098 # defaults:
1104 # defaults:
1099 subheader = subheader if subheader is not None else {}
1105 subheader = subheader if subheader is not None else {}
1100
1106
1101 # validate arguments
1107 # validate arguments
1102 if not isinstance(code, basestring):
1108 if not isinstance(code, basestring):
1103 raise TypeError("code must be text, not %s" % type(code))
1109 raise TypeError("code must be text, not %s" % type(code))
1104 if not isinstance(subheader, dict):
1110 if not isinstance(subheader, dict):
1105 raise TypeError("subheader must be dict, not %s" % type(subheader))
1111 raise TypeError("subheader must be dict, not %s" % type(subheader))
1106
1112
1107 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1113 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1108
1114
1109
1115
1110 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1116 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1111 subheader=subheader)
1117 subheader=subheader)
1112
1118
1113 msg_id = msg['header']['msg_id']
1119 msg_id = msg['header']['msg_id']
1114 self.outstanding.add(msg_id)
1120 self.outstanding.add(msg_id)
1115 if ident:
1121 if ident:
1116 # possibly routed to a specific engine
1122 # possibly routed to a specific engine
1117 if isinstance(ident, list):
1123 if isinstance(ident, list):
1118 ident = ident[-1]
1124 ident = ident[-1]
1119 if ident in self._engines.values():
1125 if ident in self._engines.values():
1120 # save for later, in case of engine death
1126 # save for later, in case of engine death
1121 self._outstanding_dict[ident].add(msg_id)
1127 self._outstanding_dict[ident].add(msg_id)
1122 self.history.append(msg_id)
1128 self.history.append(msg_id)
1123 self.metadata[msg_id]['submitted'] = datetime.now()
1129 self.metadata[msg_id]['submitted'] = datetime.now()
1124
1130
1125 return msg
1131 return msg
1126
1132
1127 #--------------------------------------------------------------------------
1133 #--------------------------------------------------------------------------
1128 # construct a View object
1134 # construct a View object
1129 #--------------------------------------------------------------------------
1135 #--------------------------------------------------------------------------
1130
1136
1131 def load_balanced_view(self, targets=None):
1137 def load_balanced_view(self, targets=None):
1132 """construct a DirectView object.
1138 """construct a DirectView object.
1133
1139
1134 If no arguments are specified, create a LoadBalancedView
1140 If no arguments are specified, create a LoadBalancedView
1135 using all engines.
1141 using all engines.
1136
1142
1137 Parameters
1143 Parameters
1138 ----------
1144 ----------
1139
1145
1140 targets: list,slice,int,etc. [default: use all engines]
1146 targets: list,slice,int,etc. [default: use all engines]
1141 The subset of engines across which to load-balance
1147 The subset of engines across which to load-balance
1142 """
1148 """
1143 if targets == 'all':
1149 if targets == 'all':
1144 targets = None
1150 targets = None
1145 if targets is not None:
1151 if targets is not None:
1146 targets = self._build_targets(targets)[1]
1152 targets = self._build_targets(targets)[1]
1147 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1153 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1148
1154
1149 def direct_view(self, targets='all'):
1155 def direct_view(self, targets='all'):
1150 """construct a DirectView object.
1156 """construct a DirectView object.
1151
1157
1152 If no targets are specified, create a DirectView using all engines.
1158 If no targets are specified, create a DirectView using all engines.
1153
1159
1154 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1160 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1155 evaluate the target engines at each execution, whereas rc[:] will connect to
1161 evaluate the target engines at each execution, whereas rc[:] will connect to
1156 all *current* engines, and that list will not change.
1162 all *current* engines, and that list will not change.
1157
1163
1158 That is, 'all' will always use all engines, whereas rc[:] will not use
1164 That is, 'all' will always use all engines, whereas rc[:] will not use
1159 engines added after the DirectView is constructed.
1165 engines added after the DirectView is constructed.
1160
1166
1161 Parameters
1167 Parameters
1162 ----------
1168 ----------
1163
1169
1164 targets: list,slice,int,etc. [default: use all engines]
1170 targets: list,slice,int,etc. [default: use all engines]
1165 The engines to use for the View
1171 The engines to use for the View
1166 """
1172 """
1167 single = isinstance(targets, int)
1173 single = isinstance(targets, int)
1168 # allow 'all' to be lazily evaluated at each execution
1174 # allow 'all' to be lazily evaluated at each execution
1169 if targets != 'all':
1175 if targets != 'all':
1170 targets = self._build_targets(targets)[1]
1176 targets = self._build_targets(targets)[1]
1171 if single:
1177 if single:
1172 targets = targets[0]
1178 targets = targets[0]
1173 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1179 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1174
1180
1175 #--------------------------------------------------------------------------
1181 #--------------------------------------------------------------------------
1176 # Query methods
1182 # Query methods
1177 #--------------------------------------------------------------------------
1183 #--------------------------------------------------------------------------
1178
1184
1179 @spin_first
1185 @spin_first
1180 def get_result(self, indices_or_msg_ids=None, block=None):
1186 def get_result(self, indices_or_msg_ids=None, block=None):
1181 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1187 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1182
1188
1183 If the client already has the results, no request to the Hub will be made.
1189 If the client already has the results, no request to the Hub will be made.
1184
1190
1185 This is a convenient way to construct AsyncResult objects, which are wrappers
1191 This is a convenient way to construct AsyncResult objects, which are wrappers
1186 that include metadata about execution, and allow for awaiting results that
1192 that include metadata about execution, and allow for awaiting results that
1187 were not submitted by this Client.
1193 were not submitted by this Client.
1188
1194
1189 It can also be a convenient way to retrieve the metadata associated with
1195 It can also be a convenient way to retrieve the metadata associated with
1190 blocking execution, since it always retrieves
1196 blocking execution, since it always retrieves
1191
1197
1192 Examples
1198 Examples
1193 --------
1199 --------
1194 ::
1200 ::
1195
1201
1196 In [10]: r = client.apply()
1202 In [10]: r = client.apply()
1197
1203
1198 Parameters
1204 Parameters
1199 ----------
1205 ----------
1200
1206
1201 indices_or_msg_ids : integer history index, str msg_id, or list of either
1207 indices_or_msg_ids : integer history index, str msg_id, or list of either
1202 The indices or msg_ids of indices to be retrieved
1208 The indices or msg_ids of indices to be retrieved
1203
1209
1204 block : bool
1210 block : bool
1205 Whether to wait for the result to be done
1211 Whether to wait for the result to be done
1206
1212
1207 Returns
1213 Returns
1208 -------
1214 -------
1209
1215
1210 AsyncResult
1216 AsyncResult
1211 A single AsyncResult object will always be returned.
1217 A single AsyncResult object will always be returned.
1212
1218
1213 AsyncHubResult
1219 AsyncHubResult
1214 A subclass of AsyncResult that retrieves results from the Hub
1220 A subclass of AsyncResult that retrieves results from the Hub
1215
1221
1216 """
1222 """
1217 block = self.block if block is None else block
1223 block = self.block if block is None else block
1218 if indices_or_msg_ids is None:
1224 if indices_or_msg_ids is None:
1219 indices_or_msg_ids = -1
1225 indices_or_msg_ids = -1
1220
1226
1221 if not isinstance(indices_or_msg_ids, (list,tuple)):
1227 if not isinstance(indices_or_msg_ids, (list,tuple)):
1222 indices_or_msg_ids = [indices_or_msg_ids]
1228 indices_or_msg_ids = [indices_or_msg_ids]
1223
1229
1224 theids = []
1230 theids = []
1225 for id in indices_or_msg_ids:
1231 for id in indices_or_msg_ids:
1226 if isinstance(id, int):
1232 if isinstance(id, int):
1227 id = self.history[id]
1233 id = self.history[id]
1228 if not isinstance(id, basestring):
1234 if not isinstance(id, basestring):
1229 raise TypeError("indices must be str or int, not %r"%id)
1235 raise TypeError("indices must be str or int, not %r"%id)
1230 theids.append(id)
1236 theids.append(id)
1231
1237
1232 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1238 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1233 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1239 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1234
1240
1235 if remote_ids:
1241 if remote_ids:
1236 ar = AsyncHubResult(self, msg_ids=theids)
1242 ar = AsyncHubResult(self, msg_ids=theids)
1237 else:
1243 else:
1238 ar = AsyncResult(self, msg_ids=theids)
1244 ar = AsyncResult(self, msg_ids=theids)
1239
1245
1240 if block:
1246 if block:
1241 ar.wait()
1247 ar.wait()
1242
1248
1243 return ar
1249 return ar
1244
1250
1245 @spin_first
1251 @spin_first
1246 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1252 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1247 """Resubmit one or more tasks.
1253 """Resubmit one or more tasks.
1248
1254
1249 in-flight tasks may not be resubmitted.
1255 in-flight tasks may not be resubmitted.
1250
1256
1251 Parameters
1257 Parameters
1252 ----------
1258 ----------
1253
1259
1254 indices_or_msg_ids : integer history index, str msg_id, or list of either
1260 indices_or_msg_ids : integer history index, str msg_id, or list of either
1255 The indices or msg_ids of indices to be retrieved
1261 The indices or msg_ids of indices to be retrieved
1256
1262
1257 block : bool
1263 block : bool
1258 Whether to wait for the result to be done
1264 Whether to wait for the result to be done
1259
1265
1260 Returns
1266 Returns
1261 -------
1267 -------
1262
1268
1263 AsyncHubResult
1269 AsyncHubResult
1264 A subclass of AsyncResult that retrieves results from the Hub
1270 A subclass of AsyncResult that retrieves results from the Hub
1265
1271
1266 """
1272 """
1267 block = self.block if block is None else block
1273 block = self.block if block is None else block
1268 if indices_or_msg_ids is None:
1274 if indices_or_msg_ids is None:
1269 indices_or_msg_ids = -1
1275 indices_or_msg_ids = -1
1270
1276
1271 if not isinstance(indices_or_msg_ids, (list,tuple)):
1277 if not isinstance(indices_or_msg_ids, (list,tuple)):
1272 indices_or_msg_ids = [indices_or_msg_ids]
1278 indices_or_msg_ids = [indices_or_msg_ids]
1273
1279
1274 theids = []
1280 theids = []
1275 for id in indices_or_msg_ids:
1281 for id in indices_or_msg_ids:
1276 if isinstance(id, int):
1282 if isinstance(id, int):
1277 id = self.history[id]
1283 id = self.history[id]
1278 if not isinstance(id, basestring):
1284 if not isinstance(id, basestring):
1279 raise TypeError("indices must be str or int, not %r"%id)
1285 raise TypeError("indices must be str or int, not %r"%id)
1280 theids.append(id)
1286 theids.append(id)
1281
1287
1282 for msg_id in theids:
1288 for msg_id in theids:
1283 self.outstanding.discard(msg_id)
1289 self.outstanding.discard(msg_id)
1284 if msg_id in self.history:
1290 if msg_id in self.history:
1285 self.history.remove(msg_id)
1291 self.history.remove(msg_id)
1286 self.results.pop(msg_id, None)
1292 self.results.pop(msg_id, None)
1287 self.metadata.pop(msg_id, None)
1293 self.metadata.pop(msg_id, None)
1288 content = dict(msg_ids = theids)
1294 content = dict(msg_ids = theids)
1289
1295
1290 self.session.send(self._query_socket, 'resubmit_request', content)
1296 self.session.send(self._query_socket, 'resubmit_request', content)
1291
1297
1292 zmq.select([self._query_socket], [], [])
1298 zmq.select([self._query_socket], [], [])
1293 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1299 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1294 if self.debug:
1300 if self.debug:
1295 pprint(msg)
1301 pprint(msg)
1296 content = msg['content']
1302 content = msg['content']
1297 if content['status'] != 'ok':
1303 if content['status'] != 'ok':
1298 raise self._unwrap_exception(content)
1304 raise self._unwrap_exception(content)
1299
1305
1300 ar = AsyncHubResult(self, msg_ids=theids)
1306 ar = AsyncHubResult(self, msg_ids=theids)
1301
1307
1302 if block:
1308 if block:
1303 ar.wait()
1309 ar.wait()
1304
1310
1305 return ar
1311 return ar
1306
1312
1307 @spin_first
1313 @spin_first
1308 def result_status(self, msg_ids, status_only=True):
1314 def result_status(self, msg_ids, status_only=True):
1309 """Check on the status of the result(s) of the apply request with `msg_ids`.
1315 """Check on the status of the result(s) of the apply request with `msg_ids`.
1310
1316
1311 If status_only is False, then the actual results will be retrieved, else
1317 If status_only is False, then the actual results will be retrieved, else
1312 only the status of the results will be checked.
1318 only the status of the results will be checked.
1313
1319
1314 Parameters
1320 Parameters
1315 ----------
1321 ----------
1316
1322
1317 msg_ids : list of msg_ids
1323 msg_ids : list of msg_ids
1318 if int:
1324 if int:
1319 Passed as index to self.history for convenience.
1325 Passed as index to self.history for convenience.
1320 status_only : bool (default: True)
1326 status_only : bool (default: True)
1321 if False:
1327 if False:
1322 Retrieve the actual results of completed tasks.
1328 Retrieve the actual results of completed tasks.
1323
1329
1324 Returns
1330 Returns
1325 -------
1331 -------
1326
1332
1327 results : dict
1333 results : dict
1328 There will always be the keys 'pending' and 'completed', which will
1334 There will always be the keys 'pending' and 'completed', which will
1329 be lists of msg_ids that are incomplete or complete. If `status_only`
1335 be lists of msg_ids that are incomplete or complete. If `status_only`
1330 is False, then completed results will be keyed by their `msg_id`.
1336 is False, then completed results will be keyed by their `msg_id`.
1331 """
1337 """
1332 if not isinstance(msg_ids, (list,tuple)):
1338 if not isinstance(msg_ids, (list,tuple)):
1333 msg_ids = [msg_ids]
1339 msg_ids = [msg_ids]
1334
1340
1335 theids = []
1341 theids = []
1336 for msg_id in msg_ids:
1342 for msg_id in msg_ids:
1337 if isinstance(msg_id, int):
1343 if isinstance(msg_id, int):
1338 msg_id = self.history[msg_id]
1344 msg_id = self.history[msg_id]
1339 if not isinstance(msg_id, basestring):
1345 if not isinstance(msg_id, basestring):
1340 raise TypeError("msg_ids must be str, not %r"%msg_id)
1346 raise TypeError("msg_ids must be str, not %r"%msg_id)
1341 theids.append(msg_id)
1347 theids.append(msg_id)
1342
1348
1343 completed = []
1349 completed = []
1344 local_results = {}
1350 local_results = {}
1345
1351
1346 # comment this block out to temporarily disable local shortcut:
1352 # comment this block out to temporarily disable local shortcut:
1347 for msg_id in theids:
1353 for msg_id in theids:
1348 if msg_id in self.results:
1354 if msg_id in self.results:
1349 completed.append(msg_id)
1355 completed.append(msg_id)
1350 local_results[msg_id] = self.results[msg_id]
1356 local_results[msg_id] = self.results[msg_id]
1351 theids.remove(msg_id)
1357 theids.remove(msg_id)
1352
1358
1353 if theids: # some not locally cached
1359 if theids: # some not locally cached
1354 content = dict(msg_ids=theids, status_only=status_only)
1360 content = dict(msg_ids=theids, status_only=status_only)
1355 msg = self.session.send(self._query_socket, "result_request", content=content)
1361 msg = self.session.send(self._query_socket, "result_request", content=content)
1356 zmq.select([self._query_socket], [], [])
1362 zmq.select([self._query_socket], [], [])
1357 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1363 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1358 if self.debug:
1364 if self.debug:
1359 pprint(msg)
1365 pprint(msg)
1360 content = msg['content']
1366 content = msg['content']
1361 if content['status'] != 'ok':
1367 if content['status'] != 'ok':
1362 raise self._unwrap_exception(content)
1368 raise self._unwrap_exception(content)
1363 buffers = msg['buffers']
1369 buffers = msg['buffers']
1364 else:
1370 else:
1365 content = dict(completed=[],pending=[])
1371 content = dict(completed=[],pending=[])
1366
1372
1367 content['completed'].extend(completed)
1373 content['completed'].extend(completed)
1368
1374
1369 if status_only:
1375 if status_only:
1370 return content
1376 return content
1371
1377
1372 failures = []
1378 failures = []
1373 # load cached results into result:
1379 # load cached results into result:
1374 content.update(local_results)
1380 content.update(local_results)
1375
1381
1376 # update cache with results:
1382 # update cache with results:
1377 for msg_id in sorted(theids):
1383 for msg_id in sorted(theids):
1378 if msg_id in content['completed']:
1384 if msg_id in content['completed']:
1379 rec = content[msg_id]
1385 rec = content[msg_id]
1380 parent = rec['header']
1386 parent = rec['header']
1381 header = rec['result_header']
1387 header = rec['result_header']
1382 rcontent = rec['result_content']
1388 rcontent = rec['result_content']
1383 iodict = rec['io']
1389 iodict = rec['io']
1384 if isinstance(rcontent, str):
1390 if isinstance(rcontent, str):
1385 rcontent = self.session.unpack(rcontent)
1391 rcontent = self.session.unpack(rcontent)
1386
1392
1387 md = self.metadata[msg_id]
1393 md = self.metadata[msg_id]
1388 md.update(self._extract_metadata(header, parent, rcontent))
1394 md.update(self._extract_metadata(header, parent, rcontent))
1389 if rec.get('received'):
1395 if rec.get('received'):
1390 md['received'] = rec['received']
1396 md['received'] = rec['received']
1391 md.update(iodict)
1397 md.update(iodict)
1392
1398
1393 if rcontent['status'] == 'ok':
1399 if rcontent['status'] == 'ok':
1394 res,buffers = util.unserialize_object(buffers)
1400 res,buffers = util.unserialize_object(buffers)
1395 else:
1401 else:
1396 print rcontent
1402 print rcontent
1397 res = self._unwrap_exception(rcontent)
1403 res = self._unwrap_exception(rcontent)
1398 failures.append(res)
1404 failures.append(res)
1399
1405
1400 self.results[msg_id] = res
1406 self.results[msg_id] = res
1401 content[msg_id] = res
1407 content[msg_id] = res
1402
1408
1403 if len(theids) == 1 and failures:
1409 if len(theids) == 1 and failures:
1404 raise failures[0]
1410 raise failures[0]
1405
1411
1406 error.collect_exceptions(failures, "result_status")
1412 error.collect_exceptions(failures, "result_status")
1407 return content
1413 return content
1408
1414
1409 @spin_first
1415 @spin_first
1410 def queue_status(self, targets='all', verbose=False):
1416 def queue_status(self, targets='all', verbose=False):
1411 """Fetch the status of engine queues.
1417 """Fetch the status of engine queues.
1412
1418
1413 Parameters
1419 Parameters
1414 ----------
1420 ----------
1415
1421
1416 targets : int/str/list of ints/strs
1422 targets : int/str/list of ints/strs
1417 the engines whose states are to be queried.
1423 the engines whose states are to be queried.
1418 default : all
1424 default : all
1419 verbose : bool
1425 verbose : bool
1420 Whether to return lengths only, or lists of ids for each element
1426 Whether to return lengths only, or lists of ids for each element
1421 """
1427 """
1422 if targets == 'all':
1428 if targets == 'all':
1423 # allow 'all' to be evaluated on the engine
1429 # allow 'all' to be evaluated on the engine
1424 engine_ids = None
1430 engine_ids = None
1425 else:
1431 else:
1426 engine_ids = self._build_targets(targets)[1]
1432 engine_ids = self._build_targets(targets)[1]
1427 content = dict(targets=engine_ids, verbose=verbose)
1433 content = dict(targets=engine_ids, verbose=verbose)
1428 self.session.send(self._query_socket, "queue_request", content=content)
1434 self.session.send(self._query_socket, "queue_request", content=content)
1429 idents,msg = self.session.recv(self._query_socket, 0)
1435 idents,msg = self.session.recv(self._query_socket, 0)
1430 if self.debug:
1436 if self.debug:
1431 pprint(msg)
1437 pprint(msg)
1432 content = msg['content']
1438 content = msg['content']
1433 status = content.pop('status')
1439 status = content.pop('status')
1434 if status != 'ok':
1440 if status != 'ok':
1435 raise self._unwrap_exception(content)
1441 raise self._unwrap_exception(content)
1436 content = rekey(content)
1442 content = rekey(content)
1437 if isinstance(targets, int):
1443 if isinstance(targets, int):
1438 return content[targets]
1444 return content[targets]
1439 else:
1445 else:
1440 return content
1446 return content
1441
1447
1442 @spin_first
1448 @spin_first
1443 def purge_results(self, jobs=[], targets=[]):
1449 def purge_results(self, jobs=[], targets=[]):
1444 """Tell the Hub to forget results.
1450 """Tell the Hub to forget results.
1445
1451
1446 Individual results can be purged by msg_id, or the entire
1452 Individual results can be purged by msg_id, or the entire
1447 history of specific targets can be purged.
1453 history of specific targets can be purged.
1448
1454
1449 Use `purge_results('all')` to scrub everything from the Hub's db.
1455 Use `purge_results('all')` to scrub everything from the Hub's db.
1450
1456
1451 Parameters
1457 Parameters
1452 ----------
1458 ----------
1453
1459
1454 jobs : str or list of str or AsyncResult objects
1460 jobs : str or list of str or AsyncResult objects
1455 the msg_ids whose results should be forgotten.
1461 the msg_ids whose results should be forgotten.
1456 targets : int/str/list of ints/strs
1462 targets : int/str/list of ints/strs
1457 The targets, by int_id, whose entire history is to be purged.
1463 The targets, by int_id, whose entire history is to be purged.
1458
1464
1459 default : None
1465 default : None
1460 """
1466 """
1461 if not targets and not jobs:
1467 if not targets and not jobs:
1462 raise ValueError("Must specify at least one of `targets` and `jobs`")
1468 raise ValueError("Must specify at least one of `targets` and `jobs`")
1463 if targets:
1469 if targets:
1464 targets = self._build_targets(targets)[1]
1470 targets = self._build_targets(targets)[1]
1465
1471
1466 # construct msg_ids from jobs
1472 # construct msg_ids from jobs
1467 if jobs == 'all':
1473 if jobs == 'all':
1468 msg_ids = jobs
1474 msg_ids = jobs
1469 else:
1475 else:
1470 msg_ids = []
1476 msg_ids = []
1471 if isinstance(jobs, (basestring,AsyncResult)):
1477 if isinstance(jobs, (basestring,AsyncResult)):
1472 jobs = [jobs]
1478 jobs = [jobs]
1473 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1479 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1474 if bad_ids:
1480 if bad_ids:
1475 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1481 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1476 for j in jobs:
1482 for j in jobs:
1477 if isinstance(j, AsyncResult):
1483 if isinstance(j, AsyncResult):
1478 msg_ids.extend(j.msg_ids)
1484 msg_ids.extend(j.msg_ids)
1479 else:
1485 else:
1480 msg_ids.append(j)
1486 msg_ids.append(j)
1481
1487
1482 content = dict(engine_ids=targets, msg_ids=msg_ids)
1488 content = dict(engine_ids=targets, msg_ids=msg_ids)
1483 self.session.send(self._query_socket, "purge_request", content=content)
1489 self.session.send(self._query_socket, "purge_request", content=content)
1484 idents, msg = self.session.recv(self._query_socket, 0)
1490 idents, msg = self.session.recv(self._query_socket, 0)
1485 if self.debug:
1491 if self.debug:
1486 pprint(msg)
1492 pprint(msg)
1487 content = msg['content']
1493 content = msg['content']
1488 if content['status'] != 'ok':
1494 if content['status'] != 'ok':
1489 raise self._unwrap_exception(content)
1495 raise self._unwrap_exception(content)
1490
1496
1491 @spin_first
1497 @spin_first
1492 def hub_history(self):
1498 def hub_history(self):
1493 """Get the Hub's history
1499 """Get the Hub's history
1494
1500
1495 Just like the Client, the Hub has a history, which is a list of msg_ids.
1501 Just like the Client, the Hub has a history, which is a list of msg_ids.
1496 This will contain the history of all clients, and, depending on configuration,
1502 This will contain the history of all clients, and, depending on configuration,
1497 may contain history across multiple cluster sessions.
1503 may contain history across multiple cluster sessions.
1498
1504
1499 Any msg_id returned here is a valid argument to `get_result`.
1505 Any msg_id returned here is a valid argument to `get_result`.
1500
1506
1501 Returns
1507 Returns
1502 -------
1508 -------
1503
1509
1504 msg_ids : list of strs
1510 msg_ids : list of strs
1505 list of all msg_ids, ordered by task submission time.
1511 list of all msg_ids, ordered by task submission time.
1506 """
1512 """
1507
1513
1508 self.session.send(self._query_socket, "history_request", content={})
1514 self.session.send(self._query_socket, "history_request", content={})
1509 idents, msg = self.session.recv(self._query_socket, 0)
1515 idents, msg = self.session.recv(self._query_socket, 0)
1510
1516
1511 if self.debug:
1517 if self.debug:
1512 pprint(msg)
1518 pprint(msg)
1513 content = msg['content']
1519 content = msg['content']
1514 if content['status'] != 'ok':
1520 if content['status'] != 'ok':
1515 raise self._unwrap_exception(content)
1521 raise self._unwrap_exception(content)
1516 else:
1522 else:
1517 return content['history']
1523 return content['history']
1518
1524
1519 @spin_first
1525 @spin_first
1520 def db_query(self, query, keys=None):
1526 def db_query(self, query, keys=None):
1521 """Query the Hub's TaskRecord database
1527 """Query the Hub's TaskRecord database
1522
1528
1523 This will return a list of task record dicts that match `query`
1529 This will return a list of task record dicts that match `query`
1524
1530
1525 Parameters
1531 Parameters
1526 ----------
1532 ----------
1527
1533
1528 query : mongodb query dict
1534 query : mongodb query dict
1529 The search dict. See mongodb query docs for details.
1535 The search dict. See mongodb query docs for details.
1530 keys : list of strs [optional]
1536 keys : list of strs [optional]
1531 The subset of keys to be returned. The default is to fetch everything but buffers.
1537 The subset of keys to be returned. The default is to fetch everything but buffers.
1532 'msg_id' will *always* be included.
1538 'msg_id' will *always* be included.
1533 """
1539 """
1534 if isinstance(keys, basestring):
1540 if isinstance(keys, basestring):
1535 keys = [keys]
1541 keys = [keys]
1536 content = dict(query=query, keys=keys)
1542 content = dict(query=query, keys=keys)
1537 self.session.send(self._query_socket, "db_request", content=content)
1543 self.session.send(self._query_socket, "db_request", content=content)
1538 idents, msg = self.session.recv(self._query_socket, 0)
1544 idents, msg = self.session.recv(self._query_socket, 0)
1539 if self.debug:
1545 if self.debug:
1540 pprint(msg)
1546 pprint(msg)
1541 content = msg['content']
1547 content = msg['content']
1542 if content['status'] != 'ok':
1548 if content['status'] != 'ok':
1543 raise self._unwrap_exception(content)
1549 raise self._unwrap_exception(content)
1544
1550
1545 records = content['records']
1551 records = content['records']
1546
1552
1547 buffer_lens = content['buffer_lens']
1553 buffer_lens = content['buffer_lens']
1548 result_buffer_lens = content['result_buffer_lens']
1554 result_buffer_lens = content['result_buffer_lens']
1549 buffers = msg['buffers']
1555 buffers = msg['buffers']
1550 has_bufs = buffer_lens is not None
1556 has_bufs = buffer_lens is not None
1551 has_rbufs = result_buffer_lens is not None
1557 has_rbufs = result_buffer_lens is not None
1552 for i,rec in enumerate(records):
1558 for i,rec in enumerate(records):
1553 # relink buffers
1559 # relink buffers
1554 if has_bufs:
1560 if has_bufs:
1555 blen = buffer_lens[i]
1561 blen = buffer_lens[i]
1556 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1562 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1557 if has_rbufs:
1563 if has_rbufs:
1558 blen = result_buffer_lens[i]
1564 blen = result_buffer_lens[i]
1559 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1565 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1560
1566
1561 return records
1567 return records
1562
1568
1563 __all__ = [ 'Client' ]
1569 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now