##// END OF EJS Templates
allow configuration of packer/unpacker in client
MinRK -
Show More
@@ -1,1351 +1,1368 b''
1 """A semi-synchronous Client for the ZMQ cluster"""
1 """A semi-synchronous Client for the ZMQ cluster"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import json
14 import json
15 import time
15 import time
16 import warnings
16 import warnings
17 from datetime import datetime
17 from datetime import datetime
18 from getpass import getpass
18 from getpass import getpass
19 from pprint import pprint
19 from pprint import pprint
20
20
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 import zmq
23 import zmq
24 # from zmq.eventloop import ioloop, zmqstream
24 # from zmq.eventloop import ioloop, zmqstream
25
25
26 from IPython.utils.path import get_ipython_dir
26 from IPython.utils.path import get_ipython_dir
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
28 Dict, List, Bool, Set)
28 Dict, List, Bool, Set)
29 from IPython.external.decorator import decorator
29 from IPython.external.decorator import decorator
30 from IPython.external.ssh import tunnel
30 from IPython.external.ssh import tunnel
31
31
32 from IPython.parallel import error
32 from IPython.parallel import error
33 from IPython.parallel import util
33 from IPython.parallel import util
34
34
35 from IPython.zmq.session import Session, Message
35 from IPython.zmq.session import Session, Message
36
36
37 from .asyncresult import AsyncResult, AsyncHubResult
37 from .asyncresult import AsyncResult, AsyncHubResult
38 from IPython.core.newapplication import ProfileDir, ProfileDirError
38 from IPython.core.newapplication import ProfileDir, ProfileDirError
39 from .view import DirectView, LoadBalancedView
39 from .view import DirectView, LoadBalancedView
40
40
41 #--------------------------------------------------------------------------
41 #--------------------------------------------------------------------------
42 # Decorators for Client methods
42 # Decorators for Client methods
43 #--------------------------------------------------------------------------
43 #--------------------------------------------------------------------------
44
44
45 @decorator
45 @decorator
46 def spin_first(f, self, *args, **kwargs):
46 def spin_first(f, self, *args, **kwargs):
47 """Call spin() to sync state prior to calling the method."""
47 """Call spin() to sync state prior to calling the method."""
48 self.spin()
48 self.spin()
49 return f(self, *args, **kwargs)
49 return f(self, *args, **kwargs)
50
50
51
51
52 #--------------------------------------------------------------------------
52 #--------------------------------------------------------------------------
53 # Classes
53 # Classes
54 #--------------------------------------------------------------------------
54 #--------------------------------------------------------------------------
55
55
56 class Metadata(dict):
56 class Metadata(dict):
57 """Subclass of dict for initializing metadata values.
57 """Subclass of dict for initializing metadata values.
58
58
59 Attribute access works on keys.
59 Attribute access works on keys.
60
60
61 These objects have a strict set of keys - errors will raise if you try
61 These objects have a strict set of keys - errors will raise if you try
62 to add new keys.
62 to add new keys.
63 """
63 """
64 def __init__(self, *args, **kwargs):
64 def __init__(self, *args, **kwargs):
65 dict.__init__(self)
65 dict.__init__(self)
66 md = {'msg_id' : None,
66 md = {'msg_id' : None,
67 'submitted' : None,
67 'submitted' : None,
68 'started' : None,
68 'started' : None,
69 'completed' : None,
69 'completed' : None,
70 'received' : None,
70 'received' : None,
71 'engine_uuid' : None,
71 'engine_uuid' : None,
72 'engine_id' : None,
72 'engine_id' : None,
73 'follow' : None,
73 'follow' : None,
74 'after' : None,
74 'after' : None,
75 'status' : None,
75 'status' : None,
76
76
77 'pyin' : None,
77 'pyin' : None,
78 'pyout' : None,
78 'pyout' : None,
79 'pyerr' : None,
79 'pyerr' : None,
80 'stdout' : '',
80 'stdout' : '',
81 'stderr' : '',
81 'stderr' : '',
82 }
82 }
83 self.update(md)
83 self.update(md)
84 self.update(dict(*args, **kwargs))
84 self.update(dict(*args, **kwargs))
85
85
86 def __getattr__(self, key):
86 def __getattr__(self, key):
87 """getattr aliased to getitem"""
87 """getattr aliased to getitem"""
88 if key in self.iterkeys():
88 if key in self.iterkeys():
89 return self[key]
89 return self[key]
90 else:
90 else:
91 raise AttributeError(key)
91 raise AttributeError(key)
92
92
93 def __setattr__(self, key, value):
93 def __setattr__(self, key, value):
94 """setattr aliased to setitem, with strict"""
94 """setattr aliased to setitem, with strict"""
95 if key in self.iterkeys():
95 if key in self.iterkeys():
96 self[key] = value
96 self[key] = value
97 else:
97 else:
98 raise AttributeError(key)
98 raise AttributeError(key)
99
99
100 def __setitem__(self, key, value):
100 def __setitem__(self, key, value):
101 """strict static key enforcement"""
101 """strict static key enforcement"""
102 if key in self.iterkeys():
102 if key in self.iterkeys():
103 dict.__setitem__(self, key, value)
103 dict.__setitem__(self, key, value)
104 else:
104 else:
105 raise KeyError(key)
105 raise KeyError(key)
106
106
107
107
108 class Client(HasTraits):
108 class Client(HasTraits):
109 """A semi-synchronous client to the IPython ZMQ cluster
109 """A semi-synchronous client to the IPython ZMQ cluster
110
110
111 Parameters
111 Parameters
112 ----------
112 ----------
113
113
114 url_or_file : bytes; zmq url or path to ipcontroller-client.json
114 url_or_file : bytes; zmq url or path to ipcontroller-client.json
115 Connection information for the Hub's registration. If a json connector
115 Connection information for the Hub's registration. If a json connector
116 file is given, then likely no further configuration is necessary.
116 file is given, then likely no further configuration is necessary.
117 [Default: use profile]
117 [Default: use profile]
118 profile : bytes
118 profile : bytes
119 The name of the Cluster profile to be used to find connector information.
119 The name of the Cluster profile to be used to find connector information.
120 [Default: 'default']
120 [Default: 'default']
121 context : zmq.Context
121 context : zmq.Context
122 Pass an existing zmq.Context instance, otherwise the client will create its own.
122 Pass an existing zmq.Context instance, otherwise the client will create its own.
123 username : bytes
124 set username to be passed to the Session object
125 debug : bool
123 debug : bool
126 flag for lots of message printing for debug purposes
124 flag for lots of message printing for debug purposes
125 timeout : int/float
126 time (in seconds) to wait for connection replies from the Hub
127 [Default: 10]
127
128
129 #-------------- session related args ----------------
130
131 config : Config object
132 If specified, this will be relayed to the Session for configuration
133 username : str
134 set username for the session object
135 packer : str (import_string) or callable
136 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
137 function to serialize messages. Must support same input as
138 JSON, and output must be bytes.
139 You can pass a callable directly as `pack`
140 unpacker : str (import_string) or callable
141 The inverse of packer. Only necessary if packer is specified as *not* one
142 of 'json' or 'pickle'.
143
128 #-------------- ssh related args ----------------
144 #-------------- ssh related args ----------------
129 # These are args for configuring the ssh tunnel to be used
145 # These are args for configuring the ssh tunnel to be used
130 # credentials are used to forward connections over ssh to the Controller
146 # credentials are used to forward connections over ssh to the Controller
131 # Note that the ip given in `addr` needs to be relative to sshserver
147 # Note that the ip given in `addr` needs to be relative to sshserver
132 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
148 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
133 # and set sshserver as the same machine the Controller is on. However,
149 # and set sshserver as the same machine the Controller is on. However,
134 # the only requirement is that sshserver is able to see the Controller
150 # the only requirement is that sshserver is able to see the Controller
135 # (i.e. is within the same trusted network).
151 # (i.e. is within the same trusted network).
136
152
137 sshserver : str
153 sshserver : str
138 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
154 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
139 If keyfile or password is specified, and this is not, it will default to
155 If keyfile or password is specified, and this is not, it will default to
140 the ip given in addr.
156 the ip given in addr.
141 sshkey : str; path to public ssh key file
157 sshkey : str; path to public ssh key file
142 This specifies a key to be used in ssh login, default None.
158 This specifies a key to be used in ssh login, default None.
143 Regular default ssh keys will be used without specifying this argument.
159 Regular default ssh keys will be used without specifying this argument.
144 password : str
160 password : str
145 Your ssh password to sshserver. Note that if this is left None,
161 Your ssh password to sshserver. Note that if this is left None,
146 you will be prompted for it if passwordless key based login is unavailable.
162 you will be prompted for it if passwordless key based login is unavailable.
147 paramiko : bool
163 paramiko : bool
148 flag for whether to use paramiko instead of shell ssh for tunneling.
164 flag for whether to use paramiko instead of shell ssh for tunneling.
149 [default: True on win32, False else]
165 [default: True on win32, False else]
150
166
151 ------- exec authentication args -------
167 ------- exec authentication args -------
152 If even localhost is untrusted, you can have some protection against
168 If even localhost is untrusted, you can have some protection against
153 unauthorized execution by using a key. Messages are still sent
169 unauthorized execution by signing messages with HMAC digests.
154 as cleartext, so if someone can snoop your loopback traffic this will
170 Messages are still sent as cleartext, so if someone can snoop your
155 not help against malicious attacks.
171 loopback traffic this will not protect your privacy, but will prevent
172 unauthorized execution.
156
173
157 exec_key : str
174 exec_key : str
158 an authentication key or file containing a key
175 an authentication key or file containing a key
159 default: None
176 default: None
160
177
161
178
162 Attributes
179 Attributes
163 ----------
180 ----------
164
181
165 ids : list of int engine IDs
182 ids : list of int engine IDs
166 requesting the ids attribute always synchronizes
183 requesting the ids attribute always synchronizes
167 the registration state. To request ids without synchronization,
184 the registration state. To request ids without synchronization,
168 use semi-private _ids attributes.
185 use semi-private _ids attributes.
169
186
170 history : list of msg_ids
187 history : list of msg_ids
171 a list of msg_ids, keeping track of all the execution
188 a list of msg_ids, keeping track of all the execution
172 messages you have submitted in order.
189 messages you have submitted in order.
173
190
174 outstanding : set of msg_ids
191 outstanding : set of msg_ids
175 a set of msg_ids that have been submitted, but whose
192 a set of msg_ids that have been submitted, but whose
176 results have not yet been received.
193 results have not yet been received.
177
194
178 results : dict
195 results : dict
179 a dict of all our results, keyed by msg_id
196 a dict of all our results, keyed by msg_id
180
197
181 block : bool
198 block : bool
182 determines default behavior when block not specified
199 determines default behavior when block not specified
183 in execution methods
200 in execution methods
184
201
185 Methods
202 Methods
186 -------
203 -------
187
204
188 spin
205 spin
189 flushes incoming results and registration state changes
206 flushes incoming results and registration state changes
190 control methods spin, and requesting `ids` also ensures up to date
207 control methods spin, and requesting `ids` also ensures up to date
191
208
192 wait
209 wait
193 wait on one or more msg_ids
210 wait on one or more msg_ids
194
211
195 execution methods
212 execution methods
196 apply
213 apply
197 legacy: execute, run
214 legacy: execute, run
198
215
199 data movement
216 data movement
200 push, pull, scatter, gather
217 push, pull, scatter, gather
201
218
202 query methods
219 query methods
203 queue_status, get_result, purge, result_status
220 queue_status, get_result, purge, result_status
204
221
205 control methods
222 control methods
206 abort, shutdown
223 abort, shutdown
207
224
208 """
225 """
209
226
210
227
211 block = Bool(False)
228 block = Bool(False)
212 outstanding = Set()
229 outstanding = Set()
213 results = Instance('collections.defaultdict', (dict,))
230 results = Instance('collections.defaultdict', (dict,))
214 metadata = Instance('collections.defaultdict', (Metadata,))
231 metadata = Instance('collections.defaultdict', (Metadata,))
215 history = List()
232 history = List()
216 debug = Bool(False)
233 debug = Bool(False)
217 profile=Unicode('default')
234 profile=Unicode('default')
218
235
219 _outstanding_dict = Instance('collections.defaultdict', (set,))
236 _outstanding_dict = Instance('collections.defaultdict', (set,))
220 _ids = List()
237 _ids = List()
221 _connected=Bool(False)
238 _connected=Bool(False)
222 _ssh=Bool(False)
239 _ssh=Bool(False)
223 _context = Instance('zmq.Context')
240 _context = Instance('zmq.Context')
224 _config = Dict()
241 _config = Dict()
225 _engines=Instance(util.ReverseDict, (), {})
242 _engines=Instance(util.ReverseDict, (), {})
226 # _hub_socket=Instance('zmq.Socket')
243 # _hub_socket=Instance('zmq.Socket')
227 _query_socket=Instance('zmq.Socket')
244 _query_socket=Instance('zmq.Socket')
228 _control_socket=Instance('zmq.Socket')
245 _control_socket=Instance('zmq.Socket')
229 _iopub_socket=Instance('zmq.Socket')
246 _iopub_socket=Instance('zmq.Socket')
230 _notification_socket=Instance('zmq.Socket')
247 _notification_socket=Instance('zmq.Socket')
231 _mux_socket=Instance('zmq.Socket')
248 _mux_socket=Instance('zmq.Socket')
232 _task_socket=Instance('zmq.Socket')
249 _task_socket=Instance('zmq.Socket')
233 _task_scheme=Unicode()
250 _task_scheme=Unicode()
234 _closed = False
251 _closed = False
235 _ignored_control_replies=Int(0)
252 _ignored_control_replies=Int(0)
236 _ignored_hub_replies=Int(0)
253 _ignored_hub_replies=Int(0)
237
254
238 def __init__(self, url_or_file=None, profile='default', profile_dir=None, ipython_dir=None,
255 def __init__(self, url_or_file=None, profile='default', profile_dir=None, ipython_dir=None,
239 context=None, username=None, debug=False, exec_key=None,
256 context=None, debug=False, exec_key=None,
240 sshserver=None, sshkey=None, password=None, paramiko=None,
257 sshserver=None, sshkey=None, password=None, paramiko=None,
241 timeout=10
258 timeout=10, **extra_args
242 ):
259 ):
243 super(Client, self).__init__(debug=debug, profile=profile)
260 super(Client, self).__init__(debug=debug, profile=profile)
244 if context is None:
261 if context is None:
245 context = zmq.Context.instance()
262 context = zmq.Context.instance()
246 self._context = context
263 self._context = context
247
264
248
265
249 self._setup_profile_dir(profile, profile_dir, ipython_dir)
266 self._setup_profile_dir(profile, profile_dir, ipython_dir)
250 if self._cd is not None:
267 if self._cd is not None:
251 if url_or_file is None:
268 if url_or_file is None:
252 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
269 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
253 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
270 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
254 " Please specify at least one of url_or_file or profile."
271 " Please specify at least one of url_or_file or profile."
255
272
256 try:
273 try:
257 util.validate_url(url_or_file)
274 util.validate_url(url_or_file)
258 except AssertionError:
275 except AssertionError:
259 if not os.path.exists(url_or_file):
276 if not os.path.exists(url_or_file):
260 if self._cd:
277 if self._cd:
261 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
278 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
262 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
279 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
263 with open(url_or_file) as f:
280 with open(url_or_file) as f:
264 cfg = json.loads(f.read())
281 cfg = json.loads(f.read())
265 else:
282 else:
266 cfg = {'url':url_or_file}
283 cfg = {'url':url_or_file}
267
284
268 # sync defaults from args, json:
285 # sync defaults from args, json:
269 if sshserver:
286 if sshserver:
270 cfg['ssh'] = sshserver
287 cfg['ssh'] = sshserver
271 if exec_key:
288 if exec_key:
272 cfg['exec_key'] = exec_key
289 cfg['exec_key'] = exec_key
273 exec_key = cfg['exec_key']
290 exec_key = cfg['exec_key']
274 sshserver=cfg['ssh']
291 sshserver=cfg['ssh']
275 url = cfg['url']
292 url = cfg['url']
276 location = cfg.setdefault('location', None)
293 location = cfg.setdefault('location', None)
277 cfg['url'] = util.disambiguate_url(cfg['url'], location)
294 cfg['url'] = util.disambiguate_url(cfg['url'], location)
278 url = cfg['url']
295 url = cfg['url']
279
296
280 self._config = cfg
297 self._config = cfg
281
298
282 self._ssh = bool(sshserver or sshkey or password)
299 self._ssh = bool(sshserver or sshkey or password)
283 if self._ssh and sshserver is None:
300 if self._ssh and sshserver is None:
284 # default to ssh via localhost
301 # default to ssh via localhost
285 sshserver = url.split('://')[1].split(':')[0]
302 sshserver = url.split('://')[1].split(':')[0]
286 if self._ssh and password is None:
303 if self._ssh and password is None:
287 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
304 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
288 password=False
305 password=False
289 else:
306 else:
290 password = getpass("SSH Password for %s: "%sshserver)
307 password = getpass("SSH Password for %s: "%sshserver)
291 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
308 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
292 if exec_key is not None and os.path.isfile(exec_key):
309
293 arg = 'keyfile'
310 # configure and construct the session
294 else:
311 if exec_key is not None:
295 arg = 'key'
312 if os.path.isfile(exec_key):
296 key_arg = {arg:exec_key}
313 extra_args['keyfile'] = exec_key
297 if username is None:
314 else:
298 self.session = Session(**key_arg)
315 extra_args['key'] = exec_key
299 else:
316 self.session = Session(**extra_args)
300 self.session = Session(username=username, **key_arg)
317
301 self._query_socket = self._context.socket(zmq.XREQ)
318 self._query_socket = self._context.socket(zmq.XREQ)
302 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
319 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
303 if self._ssh:
320 if self._ssh:
304 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
321 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
305 else:
322 else:
306 self._query_socket.connect(url)
323 self._query_socket.connect(url)
307
324
308 self.session.debug = self.debug
325 self.session.debug = self.debug
309
326
310 self._notification_handlers = {'registration_notification' : self._register_engine,
327 self._notification_handlers = {'registration_notification' : self._register_engine,
311 'unregistration_notification' : self._unregister_engine,
328 'unregistration_notification' : self._unregister_engine,
312 'shutdown_notification' : lambda msg: self.close(),
329 'shutdown_notification' : lambda msg: self.close(),
313 }
330 }
314 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
331 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
315 'apply_reply' : self._handle_apply_reply}
332 'apply_reply' : self._handle_apply_reply}
316 self._connect(sshserver, ssh_kwargs, timeout)
333 self._connect(sshserver, ssh_kwargs, timeout)
317
334
318 def __del__(self):
335 def __del__(self):
319 """cleanup sockets, but _not_ context."""
336 """cleanup sockets, but _not_ context."""
320 self.close()
337 self.close()
321
338
322 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
339 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
323 if ipython_dir is None:
340 if ipython_dir is None:
324 ipython_dir = get_ipython_dir()
341 ipython_dir = get_ipython_dir()
325 if profile_dir is not None:
342 if profile_dir is not None:
326 try:
343 try:
327 self._cd = ProfileDir.find_profile_dir(profile_dir)
344 self._cd = ProfileDir.find_profile_dir(profile_dir)
328 return
345 return
329 except ProfileDirError:
346 except ProfileDirError:
330 pass
347 pass
331 elif profile is not None:
348 elif profile is not None:
332 try:
349 try:
333 self._cd = ProfileDir.find_profile_dir_by_name(
350 self._cd = ProfileDir.find_profile_dir_by_name(
334 ipython_dir, profile)
351 ipython_dir, profile)
335 return
352 return
336 except ProfileDirError:
353 except ProfileDirError:
337 pass
354 pass
338 self._cd = None
355 self._cd = None
339
356
340 def _update_engines(self, engines):
357 def _update_engines(self, engines):
341 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
358 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
342 for k,v in engines.iteritems():
359 for k,v in engines.iteritems():
343 eid = int(k)
360 eid = int(k)
344 self._engines[eid] = bytes(v) # force not unicode
361 self._engines[eid] = bytes(v) # force not unicode
345 self._ids.append(eid)
362 self._ids.append(eid)
346 self._ids = sorted(self._ids)
363 self._ids = sorted(self._ids)
347 if sorted(self._engines.keys()) != range(len(self._engines)) and \
364 if sorted(self._engines.keys()) != range(len(self._engines)) and \
348 self._task_scheme == 'pure' and self._task_socket:
365 self._task_scheme == 'pure' and self._task_socket:
349 self._stop_scheduling_tasks()
366 self._stop_scheduling_tasks()
350
367
351 def _stop_scheduling_tasks(self):
368 def _stop_scheduling_tasks(self):
352 """Stop scheduling tasks because an engine has been unregistered
369 """Stop scheduling tasks because an engine has been unregistered
353 from a pure ZMQ scheduler.
370 from a pure ZMQ scheduler.
354 """
371 """
355 self._task_socket.close()
372 self._task_socket.close()
356 self._task_socket = None
373 self._task_socket = None
357 msg = "An engine has been unregistered, and we are using pure " +\
374 msg = "An engine has been unregistered, and we are using pure " +\
358 "ZMQ task scheduling. Task farming will be disabled."
375 "ZMQ task scheduling. Task farming will be disabled."
359 if self.outstanding:
376 if self.outstanding:
360 msg += " If you were running tasks when this happened, " +\
377 msg += " If you were running tasks when this happened, " +\
361 "some `outstanding` msg_ids may never resolve."
378 "some `outstanding` msg_ids may never resolve."
362 warnings.warn(msg, RuntimeWarning)
379 warnings.warn(msg, RuntimeWarning)
363
380
364 def _build_targets(self, targets):
381 def _build_targets(self, targets):
365 """Turn valid target IDs or 'all' into two lists:
382 """Turn valid target IDs or 'all' into two lists:
366 (int_ids, uuids).
383 (int_ids, uuids).
367 """
384 """
368 if not self._ids:
385 if not self._ids:
369 # flush notification socket if no engines yet, just in case
386 # flush notification socket if no engines yet, just in case
370 if not self.ids:
387 if not self.ids:
371 raise error.NoEnginesRegistered("Can't build targets without any engines")
388 raise error.NoEnginesRegistered("Can't build targets without any engines")
372
389
373 if targets is None:
390 if targets is None:
374 targets = self._ids
391 targets = self._ids
375 elif isinstance(targets, str):
392 elif isinstance(targets, str):
376 if targets.lower() == 'all':
393 if targets.lower() == 'all':
377 targets = self._ids
394 targets = self._ids
378 else:
395 else:
379 raise TypeError("%r not valid str target, must be 'all'"%(targets))
396 raise TypeError("%r not valid str target, must be 'all'"%(targets))
380 elif isinstance(targets, int):
397 elif isinstance(targets, int):
381 if targets < 0:
398 if targets < 0:
382 targets = self.ids[targets]
399 targets = self.ids[targets]
383 if targets not in self._ids:
400 if targets not in self._ids:
384 raise IndexError("No such engine: %i"%targets)
401 raise IndexError("No such engine: %i"%targets)
385 targets = [targets]
402 targets = [targets]
386
403
387 if isinstance(targets, slice):
404 if isinstance(targets, slice):
388 indices = range(len(self._ids))[targets]
405 indices = range(len(self._ids))[targets]
389 ids = self.ids
406 ids = self.ids
390 targets = [ ids[i] for i in indices ]
407 targets = [ ids[i] for i in indices ]
391
408
392 if not isinstance(targets, (tuple, list, xrange)):
409 if not isinstance(targets, (tuple, list, xrange)):
393 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
410 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
394
411
395 return [self._engines[t] for t in targets], list(targets)
412 return [self._engines[t] for t in targets], list(targets)
396
413
397 def _connect(self, sshserver, ssh_kwargs, timeout):
414 def _connect(self, sshserver, ssh_kwargs, timeout):
398 """setup all our socket connections to the cluster. This is called from
415 """setup all our socket connections to the cluster. This is called from
399 __init__."""
416 __init__."""
400
417
401 # Maybe allow reconnecting?
418 # Maybe allow reconnecting?
402 if self._connected:
419 if self._connected:
403 return
420 return
404 self._connected=True
421 self._connected=True
405
422
406 def connect_socket(s, url):
423 def connect_socket(s, url):
407 url = util.disambiguate_url(url, self._config['location'])
424 url = util.disambiguate_url(url, self._config['location'])
408 if self._ssh:
425 if self._ssh:
409 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
426 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
410 else:
427 else:
411 return s.connect(url)
428 return s.connect(url)
412
429
413 self.session.send(self._query_socket, 'connection_request')
430 self.session.send(self._query_socket, 'connection_request')
414 r,w,x = zmq.select([self._query_socket],[],[], timeout)
431 r,w,x = zmq.select([self._query_socket],[],[], timeout)
415 if not r:
432 if not r:
416 raise error.TimeoutError("Hub connection request timed out")
433 raise error.TimeoutError("Hub connection request timed out")
417 idents,msg = self.session.recv(self._query_socket,mode=0)
434 idents,msg = self.session.recv(self._query_socket,mode=0)
418 if self.debug:
435 if self.debug:
419 pprint(msg)
436 pprint(msg)
420 msg = Message(msg)
437 msg = Message(msg)
421 content = msg.content
438 content = msg.content
422 self._config['registration'] = dict(content)
439 self._config['registration'] = dict(content)
423 if content.status == 'ok':
440 if content.status == 'ok':
424 if content.mux:
441 if content.mux:
425 self._mux_socket = self._context.socket(zmq.XREQ)
442 self._mux_socket = self._context.socket(zmq.XREQ)
426 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
443 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
427 connect_socket(self._mux_socket, content.mux)
444 connect_socket(self._mux_socket, content.mux)
428 if content.task:
445 if content.task:
429 self._task_scheme, task_addr = content.task
446 self._task_scheme, task_addr = content.task
430 self._task_socket = self._context.socket(zmq.XREQ)
447 self._task_socket = self._context.socket(zmq.XREQ)
431 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
448 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
432 connect_socket(self._task_socket, task_addr)
449 connect_socket(self._task_socket, task_addr)
433 if content.notification:
450 if content.notification:
434 self._notification_socket = self._context.socket(zmq.SUB)
451 self._notification_socket = self._context.socket(zmq.SUB)
435 connect_socket(self._notification_socket, content.notification)
452 connect_socket(self._notification_socket, content.notification)
436 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
453 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
437 # if content.query:
454 # if content.query:
438 # self._query_socket = self._context.socket(zmq.XREQ)
455 # self._query_socket = self._context.socket(zmq.XREQ)
439 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
456 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
440 # connect_socket(self._query_socket, content.query)
457 # connect_socket(self._query_socket, content.query)
441 if content.control:
458 if content.control:
442 self._control_socket = self._context.socket(zmq.XREQ)
459 self._control_socket = self._context.socket(zmq.XREQ)
443 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
460 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
444 connect_socket(self._control_socket, content.control)
461 connect_socket(self._control_socket, content.control)
445 if content.iopub:
462 if content.iopub:
446 self._iopub_socket = self._context.socket(zmq.SUB)
463 self._iopub_socket = self._context.socket(zmq.SUB)
447 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
464 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
448 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
465 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
449 connect_socket(self._iopub_socket, content.iopub)
466 connect_socket(self._iopub_socket, content.iopub)
450 self._update_engines(dict(content.engines))
467 self._update_engines(dict(content.engines))
451 else:
468 else:
452 self._connected = False
469 self._connected = False
453 raise Exception("Failed to connect!")
470 raise Exception("Failed to connect!")
454
471
455 #--------------------------------------------------------------------------
472 #--------------------------------------------------------------------------
456 # handlers and callbacks for incoming messages
473 # handlers and callbacks for incoming messages
457 #--------------------------------------------------------------------------
474 #--------------------------------------------------------------------------
458
475
459 def _unwrap_exception(self, content):
476 def _unwrap_exception(self, content):
460 """unwrap exception, and remap engine_id to int."""
477 """unwrap exception, and remap engine_id to int."""
461 e = error.unwrap_exception(content)
478 e = error.unwrap_exception(content)
462 # print e.traceback
479 # print e.traceback
463 if e.engine_info:
480 if e.engine_info:
464 e_uuid = e.engine_info['engine_uuid']
481 e_uuid = e.engine_info['engine_uuid']
465 eid = self._engines[e_uuid]
482 eid = self._engines[e_uuid]
466 e.engine_info['engine_id'] = eid
483 e.engine_info['engine_id'] = eid
467 return e
484 return e
468
485
469 def _extract_metadata(self, header, parent, content):
486 def _extract_metadata(self, header, parent, content):
470 md = {'msg_id' : parent['msg_id'],
487 md = {'msg_id' : parent['msg_id'],
471 'received' : datetime.now(),
488 'received' : datetime.now(),
472 'engine_uuid' : header.get('engine', None),
489 'engine_uuid' : header.get('engine', None),
473 'follow' : parent.get('follow', []),
490 'follow' : parent.get('follow', []),
474 'after' : parent.get('after', []),
491 'after' : parent.get('after', []),
475 'status' : content['status'],
492 'status' : content['status'],
476 }
493 }
477
494
478 if md['engine_uuid'] is not None:
495 if md['engine_uuid'] is not None:
479 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
496 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
480
497
481 if 'date' in parent:
498 if 'date' in parent:
482 md['submitted'] = parent['date']
499 md['submitted'] = parent['date']
483 if 'started' in header:
500 if 'started' in header:
484 md['started'] = header['started']
501 md['started'] = header['started']
485 if 'date' in header:
502 if 'date' in header:
486 md['completed'] = header['date']
503 md['completed'] = header['date']
487 return md
504 return md
488
505
489 def _register_engine(self, msg):
506 def _register_engine(self, msg):
490 """Register a new engine, and update our connection info."""
507 """Register a new engine, and update our connection info."""
491 content = msg['content']
508 content = msg['content']
492 eid = content['id']
509 eid = content['id']
493 d = {eid : content['queue']}
510 d = {eid : content['queue']}
494 self._update_engines(d)
511 self._update_engines(d)
495
512
496 def _unregister_engine(self, msg):
513 def _unregister_engine(self, msg):
497 """Unregister an engine that has died."""
514 """Unregister an engine that has died."""
498 content = msg['content']
515 content = msg['content']
499 eid = int(content['id'])
516 eid = int(content['id'])
500 if eid in self._ids:
517 if eid in self._ids:
501 self._ids.remove(eid)
518 self._ids.remove(eid)
502 uuid = self._engines.pop(eid)
519 uuid = self._engines.pop(eid)
503
520
504 self._handle_stranded_msgs(eid, uuid)
521 self._handle_stranded_msgs(eid, uuid)
505
522
506 if self._task_socket and self._task_scheme == 'pure':
523 if self._task_socket and self._task_scheme == 'pure':
507 self._stop_scheduling_tasks()
524 self._stop_scheduling_tasks()
508
525
509 def _handle_stranded_msgs(self, eid, uuid):
526 def _handle_stranded_msgs(self, eid, uuid):
510 """Handle messages known to be on an engine when the engine unregisters.
527 """Handle messages known to be on an engine when the engine unregisters.
511
528
512 It is possible that this will fire prematurely - that is, an engine will
529 It is possible that this will fire prematurely - that is, an engine will
513 go down after completing a result, and the client will be notified
530 go down after completing a result, and the client will be notified
514 of the unregistration and later receive the successful result.
531 of the unregistration and later receive the successful result.
515 """
532 """
516
533
517 outstanding = self._outstanding_dict[uuid]
534 outstanding = self._outstanding_dict[uuid]
518
535
519 for msg_id in list(outstanding):
536 for msg_id in list(outstanding):
520 if msg_id in self.results:
537 if msg_id in self.results:
521 # we already
538 # we already
522 continue
539 continue
523 try:
540 try:
524 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
541 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
525 except:
542 except:
526 content = error.wrap_exception()
543 content = error.wrap_exception()
527 # build a fake message:
544 # build a fake message:
528 parent = {}
545 parent = {}
529 header = {}
546 header = {}
530 parent['msg_id'] = msg_id
547 parent['msg_id'] = msg_id
531 header['engine'] = uuid
548 header['engine'] = uuid
532 header['date'] = datetime.now()
549 header['date'] = datetime.now()
533 msg = dict(parent_header=parent, header=header, content=content)
550 msg = dict(parent_header=parent, header=header, content=content)
534 self._handle_apply_reply(msg)
551 self._handle_apply_reply(msg)
535
552
536 def _handle_execute_reply(self, msg):
553 def _handle_execute_reply(self, msg):
537 """Save the reply to an execute_request into our results.
554 """Save the reply to an execute_request into our results.
538
555
539 execute messages are never actually used. apply is used instead.
556 execute messages are never actually used. apply is used instead.
540 """
557 """
541
558
542 parent = msg['parent_header']
559 parent = msg['parent_header']
543 msg_id = parent['msg_id']
560 msg_id = parent['msg_id']
544 if msg_id not in self.outstanding:
561 if msg_id not in self.outstanding:
545 if msg_id in self.history:
562 if msg_id in self.history:
546 print ("got stale result: %s"%msg_id)
563 print ("got stale result: %s"%msg_id)
547 else:
564 else:
548 print ("got unknown result: %s"%msg_id)
565 print ("got unknown result: %s"%msg_id)
549 else:
566 else:
550 self.outstanding.remove(msg_id)
567 self.outstanding.remove(msg_id)
551 self.results[msg_id] = self._unwrap_exception(msg['content'])
568 self.results[msg_id] = self._unwrap_exception(msg['content'])
552
569
553 def _handle_apply_reply(self, msg):
570 def _handle_apply_reply(self, msg):
554 """Save the reply to an apply_request into our results."""
571 """Save the reply to an apply_request into our results."""
555 parent = msg['parent_header']
572 parent = msg['parent_header']
556 msg_id = parent['msg_id']
573 msg_id = parent['msg_id']
557 if msg_id not in self.outstanding:
574 if msg_id not in self.outstanding:
558 if msg_id in self.history:
575 if msg_id in self.history:
559 print ("got stale result: %s"%msg_id)
576 print ("got stale result: %s"%msg_id)
560 print self.results[msg_id]
577 print self.results[msg_id]
561 print msg
578 print msg
562 else:
579 else:
563 print ("got unknown result: %s"%msg_id)
580 print ("got unknown result: %s"%msg_id)
564 else:
581 else:
565 self.outstanding.remove(msg_id)
582 self.outstanding.remove(msg_id)
566 content = msg['content']
583 content = msg['content']
567 header = msg['header']
584 header = msg['header']
568
585
569 # construct metadata:
586 # construct metadata:
570 md = self.metadata[msg_id]
587 md = self.metadata[msg_id]
571 md.update(self._extract_metadata(header, parent, content))
588 md.update(self._extract_metadata(header, parent, content))
572 # is this redundant?
589 # is this redundant?
573 self.metadata[msg_id] = md
590 self.metadata[msg_id] = md
574
591
575 e_outstanding = self._outstanding_dict[md['engine_uuid']]
592 e_outstanding = self._outstanding_dict[md['engine_uuid']]
576 if msg_id in e_outstanding:
593 if msg_id in e_outstanding:
577 e_outstanding.remove(msg_id)
594 e_outstanding.remove(msg_id)
578
595
579 # construct result:
596 # construct result:
580 if content['status'] == 'ok':
597 if content['status'] == 'ok':
581 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
598 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
582 elif content['status'] == 'aborted':
599 elif content['status'] == 'aborted':
583 self.results[msg_id] = error.TaskAborted(msg_id)
600 self.results[msg_id] = error.TaskAborted(msg_id)
584 elif content['status'] == 'resubmitted':
601 elif content['status'] == 'resubmitted':
585 # TODO: handle resubmission
602 # TODO: handle resubmission
586 pass
603 pass
587 else:
604 else:
588 self.results[msg_id] = self._unwrap_exception(content)
605 self.results[msg_id] = self._unwrap_exception(content)
589
606
590 def _flush_notifications(self):
607 def _flush_notifications(self):
591 """Flush notifications of engine registrations waiting
608 """Flush notifications of engine registrations waiting
592 in ZMQ queue."""
609 in ZMQ queue."""
593 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
610 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
594 while msg is not None:
611 while msg is not None:
595 if self.debug:
612 if self.debug:
596 pprint(msg)
613 pprint(msg)
597 msg_type = msg['msg_type']
614 msg_type = msg['msg_type']
598 handler = self._notification_handlers.get(msg_type, None)
615 handler = self._notification_handlers.get(msg_type, None)
599 if handler is None:
616 if handler is None:
600 raise Exception("Unhandled message type: %s"%msg.msg_type)
617 raise Exception("Unhandled message type: %s"%msg.msg_type)
601 else:
618 else:
602 handler(msg)
619 handler(msg)
603 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
620 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
604
621
605 def _flush_results(self, sock):
622 def _flush_results(self, sock):
606 """Flush task or queue results waiting in ZMQ queue."""
623 """Flush task or queue results waiting in ZMQ queue."""
607 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
624 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
608 while msg is not None:
625 while msg is not None:
609 if self.debug:
626 if self.debug:
610 pprint(msg)
627 pprint(msg)
611 msg_type = msg['msg_type']
628 msg_type = msg['msg_type']
612 handler = self._queue_handlers.get(msg_type, None)
629 handler = self._queue_handlers.get(msg_type, None)
613 if handler is None:
630 if handler is None:
614 raise Exception("Unhandled message type: %s"%msg.msg_type)
631 raise Exception("Unhandled message type: %s"%msg.msg_type)
615 else:
632 else:
616 handler(msg)
633 handler(msg)
617 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
634 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
618
635
619 def _flush_control(self, sock):
636 def _flush_control(self, sock):
620 """Flush replies from the control channel waiting
637 """Flush replies from the control channel waiting
621 in the ZMQ queue.
638 in the ZMQ queue.
622
639
623 Currently: ignore them."""
640 Currently: ignore them."""
624 if self._ignored_control_replies <= 0:
641 if self._ignored_control_replies <= 0:
625 return
642 return
626 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
643 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
627 while msg is not None:
644 while msg is not None:
628 self._ignored_control_replies -= 1
645 self._ignored_control_replies -= 1
629 if self.debug:
646 if self.debug:
630 pprint(msg)
647 pprint(msg)
631 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
648 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
632
649
633 def _flush_ignored_control(self):
650 def _flush_ignored_control(self):
634 """flush ignored control replies"""
651 """flush ignored control replies"""
635 while self._ignored_control_replies > 0:
652 while self._ignored_control_replies > 0:
636 self.session.recv(self._control_socket)
653 self.session.recv(self._control_socket)
637 self._ignored_control_replies -= 1
654 self._ignored_control_replies -= 1
638
655
639 def _flush_ignored_hub_replies(self):
656 def _flush_ignored_hub_replies(self):
640 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
657 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
641 while msg is not None:
658 while msg is not None:
642 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
659 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
643
660
644 def _flush_iopub(self, sock):
661 def _flush_iopub(self, sock):
645 """Flush replies from the iopub channel waiting
662 """Flush replies from the iopub channel waiting
646 in the ZMQ queue.
663 in the ZMQ queue.
647 """
664 """
648 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
665 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
649 while msg is not None:
666 while msg is not None:
650 if self.debug:
667 if self.debug:
651 pprint(msg)
668 pprint(msg)
652 parent = msg['parent_header']
669 parent = msg['parent_header']
653 msg_id = parent['msg_id']
670 msg_id = parent['msg_id']
654 content = msg['content']
671 content = msg['content']
655 header = msg['header']
672 header = msg['header']
656 msg_type = msg['msg_type']
673 msg_type = msg['msg_type']
657
674
658 # init metadata:
675 # init metadata:
659 md = self.metadata[msg_id]
676 md = self.metadata[msg_id]
660
677
661 if msg_type == 'stream':
678 if msg_type == 'stream':
662 name = content['name']
679 name = content['name']
663 s = md[name] or ''
680 s = md[name] or ''
664 md[name] = s + content['data']
681 md[name] = s + content['data']
665 elif msg_type == 'pyerr':
682 elif msg_type == 'pyerr':
666 md.update({'pyerr' : self._unwrap_exception(content)})
683 md.update({'pyerr' : self._unwrap_exception(content)})
667 elif msg_type == 'pyin':
684 elif msg_type == 'pyin':
668 md.update({'pyin' : content['code']})
685 md.update({'pyin' : content['code']})
669 else:
686 else:
670 md.update({msg_type : content.get('data', '')})
687 md.update({msg_type : content.get('data', '')})
671
688
672 # reduntant?
689 # reduntant?
673 self.metadata[msg_id] = md
690 self.metadata[msg_id] = md
674
691
675 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
692 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
676
693
677 #--------------------------------------------------------------------------
694 #--------------------------------------------------------------------------
678 # len, getitem
695 # len, getitem
679 #--------------------------------------------------------------------------
696 #--------------------------------------------------------------------------
680
697
681 def __len__(self):
698 def __len__(self):
682 """len(client) returns # of engines."""
699 """len(client) returns # of engines."""
683 return len(self.ids)
700 return len(self.ids)
684
701
685 def __getitem__(self, key):
702 def __getitem__(self, key):
686 """index access returns DirectView multiplexer objects
703 """index access returns DirectView multiplexer objects
687
704
688 Must be int, slice, or list/tuple/xrange of ints"""
705 Must be int, slice, or list/tuple/xrange of ints"""
689 if not isinstance(key, (int, slice, tuple, list, xrange)):
706 if not isinstance(key, (int, slice, tuple, list, xrange)):
690 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
707 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
691 else:
708 else:
692 return self.direct_view(key)
709 return self.direct_view(key)
693
710
694 #--------------------------------------------------------------------------
711 #--------------------------------------------------------------------------
695 # Begin public methods
712 # Begin public methods
696 #--------------------------------------------------------------------------
713 #--------------------------------------------------------------------------
697
714
698 @property
715 @property
699 def ids(self):
716 def ids(self):
700 """Always up-to-date ids property."""
717 """Always up-to-date ids property."""
701 self._flush_notifications()
718 self._flush_notifications()
702 # always copy:
719 # always copy:
703 return list(self._ids)
720 return list(self._ids)
704
721
705 def close(self):
722 def close(self):
706 if self._closed:
723 if self._closed:
707 return
724 return
708 snames = filter(lambda n: n.endswith('socket'), dir(self))
725 snames = filter(lambda n: n.endswith('socket'), dir(self))
709 for socket in map(lambda name: getattr(self, name), snames):
726 for socket in map(lambda name: getattr(self, name), snames):
710 if isinstance(socket, zmq.Socket) and not socket.closed:
727 if isinstance(socket, zmq.Socket) and not socket.closed:
711 socket.close()
728 socket.close()
712 self._closed = True
729 self._closed = True
713
730
714 def spin(self):
731 def spin(self):
715 """Flush any registration notifications and execution results
732 """Flush any registration notifications and execution results
716 waiting in the ZMQ queue.
733 waiting in the ZMQ queue.
717 """
734 """
718 if self._notification_socket:
735 if self._notification_socket:
719 self._flush_notifications()
736 self._flush_notifications()
720 if self._mux_socket:
737 if self._mux_socket:
721 self._flush_results(self._mux_socket)
738 self._flush_results(self._mux_socket)
722 if self._task_socket:
739 if self._task_socket:
723 self._flush_results(self._task_socket)
740 self._flush_results(self._task_socket)
724 if self._control_socket:
741 if self._control_socket:
725 self._flush_control(self._control_socket)
742 self._flush_control(self._control_socket)
726 if self._iopub_socket:
743 if self._iopub_socket:
727 self._flush_iopub(self._iopub_socket)
744 self._flush_iopub(self._iopub_socket)
728 if self._query_socket:
745 if self._query_socket:
729 self._flush_ignored_hub_replies()
746 self._flush_ignored_hub_replies()
730
747
731 def wait(self, jobs=None, timeout=-1):
748 def wait(self, jobs=None, timeout=-1):
732 """waits on one or more `jobs`, for up to `timeout` seconds.
749 """waits on one or more `jobs`, for up to `timeout` seconds.
733
750
734 Parameters
751 Parameters
735 ----------
752 ----------
736
753
737 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
754 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
738 ints are indices to self.history
755 ints are indices to self.history
739 strs are msg_ids
756 strs are msg_ids
740 default: wait on all outstanding messages
757 default: wait on all outstanding messages
741 timeout : float
758 timeout : float
742 a time in seconds, after which to give up.
759 a time in seconds, after which to give up.
743 default is -1, which means no timeout
760 default is -1, which means no timeout
744
761
745 Returns
762 Returns
746 -------
763 -------
747
764
748 True : when all msg_ids are done
765 True : when all msg_ids are done
749 False : timeout reached, some msg_ids still outstanding
766 False : timeout reached, some msg_ids still outstanding
750 """
767 """
751 tic = time.time()
768 tic = time.time()
752 if jobs is None:
769 if jobs is None:
753 theids = self.outstanding
770 theids = self.outstanding
754 else:
771 else:
755 if isinstance(jobs, (int, str, AsyncResult)):
772 if isinstance(jobs, (int, str, AsyncResult)):
756 jobs = [jobs]
773 jobs = [jobs]
757 theids = set()
774 theids = set()
758 for job in jobs:
775 for job in jobs:
759 if isinstance(job, int):
776 if isinstance(job, int):
760 # index access
777 # index access
761 job = self.history[job]
778 job = self.history[job]
762 elif isinstance(job, AsyncResult):
779 elif isinstance(job, AsyncResult):
763 map(theids.add, job.msg_ids)
780 map(theids.add, job.msg_ids)
764 continue
781 continue
765 theids.add(job)
782 theids.add(job)
766 if not theids.intersection(self.outstanding):
783 if not theids.intersection(self.outstanding):
767 return True
784 return True
768 self.spin()
785 self.spin()
769 while theids.intersection(self.outstanding):
786 while theids.intersection(self.outstanding):
770 if timeout >= 0 and ( time.time()-tic ) > timeout:
787 if timeout >= 0 and ( time.time()-tic ) > timeout:
771 break
788 break
772 time.sleep(1e-3)
789 time.sleep(1e-3)
773 self.spin()
790 self.spin()
774 return len(theids.intersection(self.outstanding)) == 0
791 return len(theids.intersection(self.outstanding)) == 0
775
792
776 #--------------------------------------------------------------------------
793 #--------------------------------------------------------------------------
777 # Control methods
794 # Control methods
778 #--------------------------------------------------------------------------
795 #--------------------------------------------------------------------------
779
796
780 @spin_first
797 @spin_first
781 def clear(self, targets=None, block=None):
798 def clear(self, targets=None, block=None):
782 """Clear the namespace in target(s)."""
799 """Clear the namespace in target(s)."""
783 block = self.block if block is None else block
800 block = self.block if block is None else block
784 targets = self._build_targets(targets)[0]
801 targets = self._build_targets(targets)[0]
785 for t in targets:
802 for t in targets:
786 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
803 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
787 error = False
804 error = False
788 if block:
805 if block:
789 self._flush_ignored_control()
806 self._flush_ignored_control()
790 for i in range(len(targets)):
807 for i in range(len(targets)):
791 idents,msg = self.session.recv(self._control_socket,0)
808 idents,msg = self.session.recv(self._control_socket,0)
792 if self.debug:
809 if self.debug:
793 pprint(msg)
810 pprint(msg)
794 if msg['content']['status'] != 'ok':
811 if msg['content']['status'] != 'ok':
795 error = self._unwrap_exception(msg['content'])
812 error = self._unwrap_exception(msg['content'])
796 else:
813 else:
797 self._ignored_control_replies += len(targets)
814 self._ignored_control_replies += len(targets)
798 if error:
815 if error:
799 raise error
816 raise error
800
817
801
818
802 @spin_first
819 @spin_first
803 def abort(self, jobs=None, targets=None, block=None):
820 def abort(self, jobs=None, targets=None, block=None):
804 """Abort specific jobs from the execution queues of target(s).
821 """Abort specific jobs from the execution queues of target(s).
805
822
806 This is a mechanism to prevent jobs that have already been submitted
823 This is a mechanism to prevent jobs that have already been submitted
807 from executing.
824 from executing.
808
825
809 Parameters
826 Parameters
810 ----------
827 ----------
811
828
812 jobs : msg_id, list of msg_ids, or AsyncResult
829 jobs : msg_id, list of msg_ids, or AsyncResult
813 The jobs to be aborted
830 The jobs to be aborted
814
831
815
832
816 """
833 """
817 block = self.block if block is None else block
834 block = self.block if block is None else block
818 targets = self._build_targets(targets)[0]
835 targets = self._build_targets(targets)[0]
819 msg_ids = []
836 msg_ids = []
820 if isinstance(jobs, (basestring,AsyncResult)):
837 if isinstance(jobs, (basestring,AsyncResult)):
821 jobs = [jobs]
838 jobs = [jobs]
822 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
839 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
823 if bad_ids:
840 if bad_ids:
824 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
841 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
825 for j in jobs:
842 for j in jobs:
826 if isinstance(j, AsyncResult):
843 if isinstance(j, AsyncResult):
827 msg_ids.extend(j.msg_ids)
844 msg_ids.extend(j.msg_ids)
828 else:
845 else:
829 msg_ids.append(j)
846 msg_ids.append(j)
830 content = dict(msg_ids=msg_ids)
847 content = dict(msg_ids=msg_ids)
831 for t in targets:
848 for t in targets:
832 self.session.send(self._control_socket, 'abort_request',
849 self.session.send(self._control_socket, 'abort_request',
833 content=content, ident=t)
850 content=content, ident=t)
834 error = False
851 error = False
835 if block:
852 if block:
836 self._flush_ignored_control()
853 self._flush_ignored_control()
837 for i in range(len(targets)):
854 for i in range(len(targets)):
838 idents,msg = self.session.recv(self._control_socket,0)
855 idents,msg = self.session.recv(self._control_socket,0)
839 if self.debug:
856 if self.debug:
840 pprint(msg)
857 pprint(msg)
841 if msg['content']['status'] != 'ok':
858 if msg['content']['status'] != 'ok':
842 error = self._unwrap_exception(msg['content'])
859 error = self._unwrap_exception(msg['content'])
843 else:
860 else:
844 self._ignored_control_replies += len(targets)
861 self._ignored_control_replies += len(targets)
845 if error:
862 if error:
846 raise error
863 raise error
847
864
848 @spin_first
865 @spin_first
849 def shutdown(self, targets=None, restart=False, hub=False, block=None):
866 def shutdown(self, targets=None, restart=False, hub=False, block=None):
850 """Terminates one or more engine processes, optionally including the hub."""
867 """Terminates one or more engine processes, optionally including the hub."""
851 block = self.block if block is None else block
868 block = self.block if block is None else block
852 if hub:
869 if hub:
853 targets = 'all'
870 targets = 'all'
854 targets = self._build_targets(targets)[0]
871 targets = self._build_targets(targets)[0]
855 for t in targets:
872 for t in targets:
856 self.session.send(self._control_socket, 'shutdown_request',
873 self.session.send(self._control_socket, 'shutdown_request',
857 content={'restart':restart},ident=t)
874 content={'restart':restart},ident=t)
858 error = False
875 error = False
859 if block or hub:
876 if block or hub:
860 self._flush_ignored_control()
877 self._flush_ignored_control()
861 for i in range(len(targets)):
878 for i in range(len(targets)):
862 idents,msg = self.session.recv(self._control_socket, 0)
879 idents,msg = self.session.recv(self._control_socket, 0)
863 if self.debug:
880 if self.debug:
864 pprint(msg)
881 pprint(msg)
865 if msg['content']['status'] != 'ok':
882 if msg['content']['status'] != 'ok':
866 error = self._unwrap_exception(msg['content'])
883 error = self._unwrap_exception(msg['content'])
867 else:
884 else:
868 self._ignored_control_replies += len(targets)
885 self._ignored_control_replies += len(targets)
869
886
870 if hub:
887 if hub:
871 time.sleep(0.25)
888 time.sleep(0.25)
872 self.session.send(self._query_socket, 'shutdown_request')
889 self.session.send(self._query_socket, 'shutdown_request')
873 idents,msg = self.session.recv(self._query_socket, 0)
890 idents,msg = self.session.recv(self._query_socket, 0)
874 if self.debug:
891 if self.debug:
875 pprint(msg)
892 pprint(msg)
876 if msg['content']['status'] != 'ok':
893 if msg['content']['status'] != 'ok':
877 error = self._unwrap_exception(msg['content'])
894 error = self._unwrap_exception(msg['content'])
878
895
879 if error:
896 if error:
880 raise error
897 raise error
881
898
882 #--------------------------------------------------------------------------
899 #--------------------------------------------------------------------------
883 # Execution related methods
900 # Execution related methods
884 #--------------------------------------------------------------------------
901 #--------------------------------------------------------------------------
885
902
886 def _maybe_raise(self, result):
903 def _maybe_raise(self, result):
887 """wrapper for maybe raising an exception if apply failed."""
904 """wrapper for maybe raising an exception if apply failed."""
888 if isinstance(result, error.RemoteError):
905 if isinstance(result, error.RemoteError):
889 raise result
906 raise result
890
907
891 return result
908 return result
892
909
893 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
910 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
894 ident=None):
911 ident=None):
895 """construct and send an apply message via a socket.
912 """construct and send an apply message via a socket.
896
913
897 This is the principal method with which all engine execution is performed by views.
914 This is the principal method with which all engine execution is performed by views.
898 """
915 """
899
916
900 assert not self._closed, "cannot use me anymore, I'm closed!"
917 assert not self._closed, "cannot use me anymore, I'm closed!"
901 # defaults:
918 # defaults:
902 args = args if args is not None else []
919 args = args if args is not None else []
903 kwargs = kwargs if kwargs is not None else {}
920 kwargs = kwargs if kwargs is not None else {}
904 subheader = subheader if subheader is not None else {}
921 subheader = subheader if subheader is not None else {}
905
922
906 # validate arguments
923 # validate arguments
907 if not callable(f):
924 if not callable(f):
908 raise TypeError("f must be callable, not %s"%type(f))
925 raise TypeError("f must be callable, not %s"%type(f))
909 if not isinstance(args, (tuple, list)):
926 if not isinstance(args, (tuple, list)):
910 raise TypeError("args must be tuple or list, not %s"%type(args))
927 raise TypeError("args must be tuple or list, not %s"%type(args))
911 if not isinstance(kwargs, dict):
928 if not isinstance(kwargs, dict):
912 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
929 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
913 if not isinstance(subheader, dict):
930 if not isinstance(subheader, dict):
914 raise TypeError("subheader must be dict, not %s"%type(subheader))
931 raise TypeError("subheader must be dict, not %s"%type(subheader))
915
932
916 bufs = util.pack_apply_message(f,args,kwargs)
933 bufs = util.pack_apply_message(f,args,kwargs)
917
934
918 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
935 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
919 subheader=subheader, track=track)
936 subheader=subheader, track=track)
920
937
921 msg_id = msg['msg_id']
938 msg_id = msg['msg_id']
922 self.outstanding.add(msg_id)
939 self.outstanding.add(msg_id)
923 if ident:
940 if ident:
924 # possibly routed to a specific engine
941 # possibly routed to a specific engine
925 if isinstance(ident, list):
942 if isinstance(ident, list):
926 ident = ident[-1]
943 ident = ident[-1]
927 if ident in self._engines.values():
944 if ident in self._engines.values():
928 # save for later, in case of engine death
945 # save for later, in case of engine death
929 self._outstanding_dict[ident].add(msg_id)
946 self._outstanding_dict[ident].add(msg_id)
930 self.history.append(msg_id)
947 self.history.append(msg_id)
931 self.metadata[msg_id]['submitted'] = datetime.now()
948 self.metadata[msg_id]['submitted'] = datetime.now()
932
949
933 return msg
950 return msg
934
951
935 #--------------------------------------------------------------------------
952 #--------------------------------------------------------------------------
936 # construct a View object
953 # construct a View object
937 #--------------------------------------------------------------------------
954 #--------------------------------------------------------------------------
938
955
939 def load_balanced_view(self, targets=None):
956 def load_balanced_view(self, targets=None):
940 """construct a DirectView object.
957 """construct a DirectView object.
941
958
942 If no arguments are specified, create a LoadBalancedView
959 If no arguments are specified, create a LoadBalancedView
943 using all engines.
960 using all engines.
944
961
945 Parameters
962 Parameters
946 ----------
963 ----------
947
964
948 targets: list,slice,int,etc. [default: use all engines]
965 targets: list,slice,int,etc. [default: use all engines]
949 The subset of engines across which to load-balance
966 The subset of engines across which to load-balance
950 """
967 """
951 if targets is not None:
968 if targets is not None:
952 targets = self._build_targets(targets)[1]
969 targets = self._build_targets(targets)[1]
953 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
970 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
954
971
955 def direct_view(self, targets='all'):
972 def direct_view(self, targets='all'):
956 """construct a DirectView object.
973 """construct a DirectView object.
957
974
958 If no targets are specified, create a DirectView
975 If no targets are specified, create a DirectView
959 using all engines.
976 using all engines.
960
977
961 Parameters
978 Parameters
962 ----------
979 ----------
963
980
964 targets: list,slice,int,etc. [default: use all engines]
981 targets: list,slice,int,etc. [default: use all engines]
965 The engines to use for the View
982 The engines to use for the View
966 """
983 """
967 single = isinstance(targets, int)
984 single = isinstance(targets, int)
968 targets = self._build_targets(targets)[1]
985 targets = self._build_targets(targets)[1]
969 if single:
986 if single:
970 targets = targets[0]
987 targets = targets[0]
971 return DirectView(client=self, socket=self._mux_socket, targets=targets)
988 return DirectView(client=self, socket=self._mux_socket, targets=targets)
972
989
973 #--------------------------------------------------------------------------
990 #--------------------------------------------------------------------------
974 # Query methods
991 # Query methods
975 #--------------------------------------------------------------------------
992 #--------------------------------------------------------------------------
976
993
977 @spin_first
994 @spin_first
978 def get_result(self, indices_or_msg_ids=None, block=None):
995 def get_result(self, indices_or_msg_ids=None, block=None):
979 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
996 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
980
997
981 If the client already has the results, no request to the Hub will be made.
998 If the client already has the results, no request to the Hub will be made.
982
999
983 This is a convenient way to construct AsyncResult objects, which are wrappers
1000 This is a convenient way to construct AsyncResult objects, which are wrappers
984 that include metadata about execution, and allow for awaiting results that
1001 that include metadata about execution, and allow for awaiting results that
985 were not submitted by this Client.
1002 were not submitted by this Client.
986
1003
987 It can also be a convenient way to retrieve the metadata associated with
1004 It can also be a convenient way to retrieve the metadata associated with
988 blocking execution, since it always retrieves
1005 blocking execution, since it always retrieves
989
1006
990 Examples
1007 Examples
991 --------
1008 --------
992 ::
1009 ::
993
1010
994 In [10]: r = client.apply()
1011 In [10]: r = client.apply()
995
1012
996 Parameters
1013 Parameters
997 ----------
1014 ----------
998
1015
999 indices_or_msg_ids : integer history index, str msg_id, or list of either
1016 indices_or_msg_ids : integer history index, str msg_id, or list of either
1000 The indices or msg_ids of indices to be retrieved
1017 The indices or msg_ids of indices to be retrieved
1001
1018
1002 block : bool
1019 block : bool
1003 Whether to wait for the result to be done
1020 Whether to wait for the result to be done
1004
1021
1005 Returns
1022 Returns
1006 -------
1023 -------
1007
1024
1008 AsyncResult
1025 AsyncResult
1009 A single AsyncResult object will always be returned.
1026 A single AsyncResult object will always be returned.
1010
1027
1011 AsyncHubResult
1028 AsyncHubResult
1012 A subclass of AsyncResult that retrieves results from the Hub
1029 A subclass of AsyncResult that retrieves results from the Hub
1013
1030
1014 """
1031 """
1015 block = self.block if block is None else block
1032 block = self.block if block is None else block
1016 if indices_or_msg_ids is None:
1033 if indices_or_msg_ids is None:
1017 indices_or_msg_ids = -1
1034 indices_or_msg_ids = -1
1018
1035
1019 if not isinstance(indices_or_msg_ids, (list,tuple)):
1036 if not isinstance(indices_or_msg_ids, (list,tuple)):
1020 indices_or_msg_ids = [indices_or_msg_ids]
1037 indices_or_msg_ids = [indices_or_msg_ids]
1021
1038
1022 theids = []
1039 theids = []
1023 for id in indices_or_msg_ids:
1040 for id in indices_or_msg_ids:
1024 if isinstance(id, int):
1041 if isinstance(id, int):
1025 id = self.history[id]
1042 id = self.history[id]
1026 if not isinstance(id, str):
1043 if not isinstance(id, str):
1027 raise TypeError("indices must be str or int, not %r"%id)
1044 raise TypeError("indices must be str or int, not %r"%id)
1028 theids.append(id)
1045 theids.append(id)
1029
1046
1030 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1047 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1031 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1048 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1032
1049
1033 if remote_ids:
1050 if remote_ids:
1034 ar = AsyncHubResult(self, msg_ids=theids)
1051 ar = AsyncHubResult(self, msg_ids=theids)
1035 else:
1052 else:
1036 ar = AsyncResult(self, msg_ids=theids)
1053 ar = AsyncResult(self, msg_ids=theids)
1037
1054
1038 if block:
1055 if block:
1039 ar.wait()
1056 ar.wait()
1040
1057
1041 return ar
1058 return ar
1042
1059
1043 @spin_first
1060 @spin_first
1044 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1061 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1045 """Resubmit one or more tasks.
1062 """Resubmit one or more tasks.
1046
1063
1047 in-flight tasks may not be resubmitted.
1064 in-flight tasks may not be resubmitted.
1048
1065
1049 Parameters
1066 Parameters
1050 ----------
1067 ----------
1051
1068
1052 indices_or_msg_ids : integer history index, str msg_id, or list of either
1069 indices_or_msg_ids : integer history index, str msg_id, or list of either
1053 The indices or msg_ids of indices to be retrieved
1070 The indices or msg_ids of indices to be retrieved
1054
1071
1055 block : bool
1072 block : bool
1056 Whether to wait for the result to be done
1073 Whether to wait for the result to be done
1057
1074
1058 Returns
1075 Returns
1059 -------
1076 -------
1060
1077
1061 AsyncHubResult
1078 AsyncHubResult
1062 A subclass of AsyncResult that retrieves results from the Hub
1079 A subclass of AsyncResult that retrieves results from the Hub
1063
1080
1064 """
1081 """
1065 block = self.block if block is None else block
1082 block = self.block if block is None else block
1066 if indices_or_msg_ids is None:
1083 if indices_or_msg_ids is None:
1067 indices_or_msg_ids = -1
1084 indices_or_msg_ids = -1
1068
1085
1069 if not isinstance(indices_or_msg_ids, (list,tuple)):
1086 if not isinstance(indices_or_msg_ids, (list,tuple)):
1070 indices_or_msg_ids = [indices_or_msg_ids]
1087 indices_or_msg_ids = [indices_or_msg_ids]
1071
1088
1072 theids = []
1089 theids = []
1073 for id in indices_or_msg_ids:
1090 for id in indices_or_msg_ids:
1074 if isinstance(id, int):
1091 if isinstance(id, int):
1075 id = self.history[id]
1092 id = self.history[id]
1076 if not isinstance(id, str):
1093 if not isinstance(id, str):
1077 raise TypeError("indices must be str or int, not %r"%id)
1094 raise TypeError("indices must be str or int, not %r"%id)
1078 theids.append(id)
1095 theids.append(id)
1079
1096
1080 for msg_id in theids:
1097 for msg_id in theids:
1081 self.outstanding.discard(msg_id)
1098 self.outstanding.discard(msg_id)
1082 if msg_id in self.history:
1099 if msg_id in self.history:
1083 self.history.remove(msg_id)
1100 self.history.remove(msg_id)
1084 self.results.pop(msg_id, None)
1101 self.results.pop(msg_id, None)
1085 self.metadata.pop(msg_id, None)
1102 self.metadata.pop(msg_id, None)
1086 content = dict(msg_ids = theids)
1103 content = dict(msg_ids = theids)
1087
1104
1088 self.session.send(self._query_socket, 'resubmit_request', content)
1105 self.session.send(self._query_socket, 'resubmit_request', content)
1089
1106
1090 zmq.select([self._query_socket], [], [])
1107 zmq.select([self._query_socket], [], [])
1091 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1108 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1092 if self.debug:
1109 if self.debug:
1093 pprint(msg)
1110 pprint(msg)
1094 content = msg['content']
1111 content = msg['content']
1095 if content['status'] != 'ok':
1112 if content['status'] != 'ok':
1096 raise self._unwrap_exception(content)
1113 raise self._unwrap_exception(content)
1097
1114
1098 ar = AsyncHubResult(self, msg_ids=theids)
1115 ar = AsyncHubResult(self, msg_ids=theids)
1099
1116
1100 if block:
1117 if block:
1101 ar.wait()
1118 ar.wait()
1102
1119
1103 return ar
1120 return ar
1104
1121
1105 @spin_first
1122 @spin_first
1106 def result_status(self, msg_ids, status_only=True):
1123 def result_status(self, msg_ids, status_only=True):
1107 """Check on the status of the result(s) of the apply request with `msg_ids`.
1124 """Check on the status of the result(s) of the apply request with `msg_ids`.
1108
1125
1109 If status_only is False, then the actual results will be retrieved, else
1126 If status_only is False, then the actual results will be retrieved, else
1110 only the status of the results will be checked.
1127 only the status of the results will be checked.
1111
1128
1112 Parameters
1129 Parameters
1113 ----------
1130 ----------
1114
1131
1115 msg_ids : list of msg_ids
1132 msg_ids : list of msg_ids
1116 if int:
1133 if int:
1117 Passed as index to self.history for convenience.
1134 Passed as index to self.history for convenience.
1118 status_only : bool (default: True)
1135 status_only : bool (default: True)
1119 if False:
1136 if False:
1120 Retrieve the actual results of completed tasks.
1137 Retrieve the actual results of completed tasks.
1121
1138
1122 Returns
1139 Returns
1123 -------
1140 -------
1124
1141
1125 results : dict
1142 results : dict
1126 There will always be the keys 'pending' and 'completed', which will
1143 There will always be the keys 'pending' and 'completed', which will
1127 be lists of msg_ids that are incomplete or complete. If `status_only`
1144 be lists of msg_ids that are incomplete or complete. If `status_only`
1128 is False, then completed results will be keyed by their `msg_id`.
1145 is False, then completed results will be keyed by their `msg_id`.
1129 """
1146 """
1130 if not isinstance(msg_ids, (list,tuple)):
1147 if not isinstance(msg_ids, (list,tuple)):
1131 msg_ids = [msg_ids]
1148 msg_ids = [msg_ids]
1132
1149
1133 theids = []
1150 theids = []
1134 for msg_id in msg_ids:
1151 for msg_id in msg_ids:
1135 if isinstance(msg_id, int):
1152 if isinstance(msg_id, int):
1136 msg_id = self.history[msg_id]
1153 msg_id = self.history[msg_id]
1137 if not isinstance(msg_id, basestring):
1154 if not isinstance(msg_id, basestring):
1138 raise TypeError("msg_ids must be str, not %r"%msg_id)
1155 raise TypeError("msg_ids must be str, not %r"%msg_id)
1139 theids.append(msg_id)
1156 theids.append(msg_id)
1140
1157
1141 completed = []
1158 completed = []
1142 local_results = {}
1159 local_results = {}
1143
1160
1144 # comment this block out to temporarily disable local shortcut:
1161 # comment this block out to temporarily disable local shortcut:
1145 for msg_id in theids:
1162 for msg_id in theids:
1146 if msg_id in self.results:
1163 if msg_id in self.results:
1147 completed.append(msg_id)
1164 completed.append(msg_id)
1148 local_results[msg_id] = self.results[msg_id]
1165 local_results[msg_id] = self.results[msg_id]
1149 theids.remove(msg_id)
1166 theids.remove(msg_id)
1150
1167
1151 if theids: # some not locally cached
1168 if theids: # some not locally cached
1152 content = dict(msg_ids=theids, status_only=status_only)
1169 content = dict(msg_ids=theids, status_only=status_only)
1153 msg = self.session.send(self._query_socket, "result_request", content=content)
1170 msg = self.session.send(self._query_socket, "result_request", content=content)
1154 zmq.select([self._query_socket], [], [])
1171 zmq.select([self._query_socket], [], [])
1155 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1172 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1156 if self.debug:
1173 if self.debug:
1157 pprint(msg)
1174 pprint(msg)
1158 content = msg['content']
1175 content = msg['content']
1159 if content['status'] != 'ok':
1176 if content['status'] != 'ok':
1160 raise self._unwrap_exception(content)
1177 raise self._unwrap_exception(content)
1161 buffers = msg['buffers']
1178 buffers = msg['buffers']
1162 else:
1179 else:
1163 content = dict(completed=[],pending=[])
1180 content = dict(completed=[],pending=[])
1164
1181
1165 content['completed'].extend(completed)
1182 content['completed'].extend(completed)
1166
1183
1167 if status_only:
1184 if status_only:
1168 return content
1185 return content
1169
1186
1170 failures = []
1187 failures = []
1171 # load cached results into result:
1188 # load cached results into result:
1172 content.update(local_results)
1189 content.update(local_results)
1173
1190
1174 # update cache with results:
1191 # update cache with results:
1175 for msg_id in sorted(theids):
1192 for msg_id in sorted(theids):
1176 if msg_id in content['completed']:
1193 if msg_id in content['completed']:
1177 rec = content[msg_id]
1194 rec = content[msg_id]
1178 parent = rec['header']
1195 parent = rec['header']
1179 header = rec['result_header']
1196 header = rec['result_header']
1180 rcontent = rec['result_content']
1197 rcontent = rec['result_content']
1181 iodict = rec['io']
1198 iodict = rec['io']
1182 if isinstance(rcontent, str):
1199 if isinstance(rcontent, str):
1183 rcontent = self.session.unpack(rcontent)
1200 rcontent = self.session.unpack(rcontent)
1184
1201
1185 md = self.metadata[msg_id]
1202 md = self.metadata[msg_id]
1186 md.update(self._extract_metadata(header, parent, rcontent))
1203 md.update(self._extract_metadata(header, parent, rcontent))
1187 md.update(iodict)
1204 md.update(iodict)
1188
1205
1189 if rcontent['status'] == 'ok':
1206 if rcontent['status'] == 'ok':
1190 res,buffers = util.unserialize_object(buffers)
1207 res,buffers = util.unserialize_object(buffers)
1191 else:
1208 else:
1192 print rcontent
1209 print rcontent
1193 res = self._unwrap_exception(rcontent)
1210 res = self._unwrap_exception(rcontent)
1194 failures.append(res)
1211 failures.append(res)
1195
1212
1196 self.results[msg_id] = res
1213 self.results[msg_id] = res
1197 content[msg_id] = res
1214 content[msg_id] = res
1198
1215
1199 if len(theids) == 1 and failures:
1216 if len(theids) == 1 and failures:
1200 raise failures[0]
1217 raise failures[0]
1201
1218
1202 error.collect_exceptions(failures, "result_status")
1219 error.collect_exceptions(failures, "result_status")
1203 return content
1220 return content
1204
1221
1205 @spin_first
1222 @spin_first
1206 def queue_status(self, targets='all', verbose=False):
1223 def queue_status(self, targets='all', verbose=False):
1207 """Fetch the status of engine queues.
1224 """Fetch the status of engine queues.
1208
1225
1209 Parameters
1226 Parameters
1210 ----------
1227 ----------
1211
1228
1212 targets : int/str/list of ints/strs
1229 targets : int/str/list of ints/strs
1213 the engines whose states are to be queried.
1230 the engines whose states are to be queried.
1214 default : all
1231 default : all
1215 verbose : bool
1232 verbose : bool
1216 Whether to return lengths only, or lists of ids for each element
1233 Whether to return lengths only, or lists of ids for each element
1217 """
1234 """
1218 engine_ids = self._build_targets(targets)[1]
1235 engine_ids = self._build_targets(targets)[1]
1219 content = dict(targets=engine_ids, verbose=verbose)
1236 content = dict(targets=engine_ids, verbose=verbose)
1220 self.session.send(self._query_socket, "queue_request", content=content)
1237 self.session.send(self._query_socket, "queue_request", content=content)
1221 idents,msg = self.session.recv(self._query_socket, 0)
1238 idents,msg = self.session.recv(self._query_socket, 0)
1222 if self.debug:
1239 if self.debug:
1223 pprint(msg)
1240 pprint(msg)
1224 content = msg['content']
1241 content = msg['content']
1225 status = content.pop('status')
1242 status = content.pop('status')
1226 if status != 'ok':
1243 if status != 'ok':
1227 raise self._unwrap_exception(content)
1244 raise self._unwrap_exception(content)
1228 content = util.rekey(content)
1245 content = util.rekey(content)
1229 if isinstance(targets, int):
1246 if isinstance(targets, int):
1230 return content[targets]
1247 return content[targets]
1231 else:
1248 else:
1232 return content
1249 return content
1233
1250
1234 @spin_first
1251 @spin_first
1235 def purge_results(self, jobs=[], targets=[]):
1252 def purge_results(self, jobs=[], targets=[]):
1236 """Tell the Hub to forget results.
1253 """Tell the Hub to forget results.
1237
1254
1238 Individual results can be purged by msg_id, or the entire
1255 Individual results can be purged by msg_id, or the entire
1239 history of specific targets can be purged.
1256 history of specific targets can be purged.
1240
1257
1241 Parameters
1258 Parameters
1242 ----------
1259 ----------
1243
1260
1244 jobs : str or list of str or AsyncResult objects
1261 jobs : str or list of str or AsyncResult objects
1245 the msg_ids whose results should be forgotten.
1262 the msg_ids whose results should be forgotten.
1246 targets : int/str/list of ints/strs
1263 targets : int/str/list of ints/strs
1247 The targets, by uuid or int_id, whose entire history is to be purged.
1264 The targets, by uuid or int_id, whose entire history is to be purged.
1248 Use `targets='all'` to scrub everything from the Hub's memory.
1265 Use `targets='all'` to scrub everything from the Hub's memory.
1249
1266
1250 default : None
1267 default : None
1251 """
1268 """
1252 if not targets and not jobs:
1269 if not targets and not jobs:
1253 raise ValueError("Must specify at least one of `targets` and `jobs`")
1270 raise ValueError("Must specify at least one of `targets` and `jobs`")
1254 if targets:
1271 if targets:
1255 targets = self._build_targets(targets)[1]
1272 targets = self._build_targets(targets)[1]
1256
1273
1257 # construct msg_ids from jobs
1274 # construct msg_ids from jobs
1258 msg_ids = []
1275 msg_ids = []
1259 if isinstance(jobs, (basestring,AsyncResult)):
1276 if isinstance(jobs, (basestring,AsyncResult)):
1260 jobs = [jobs]
1277 jobs = [jobs]
1261 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1278 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1262 if bad_ids:
1279 if bad_ids:
1263 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1280 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1264 for j in jobs:
1281 for j in jobs:
1265 if isinstance(j, AsyncResult):
1282 if isinstance(j, AsyncResult):
1266 msg_ids.extend(j.msg_ids)
1283 msg_ids.extend(j.msg_ids)
1267 else:
1284 else:
1268 msg_ids.append(j)
1285 msg_ids.append(j)
1269
1286
1270 content = dict(targets=targets, msg_ids=msg_ids)
1287 content = dict(targets=targets, msg_ids=msg_ids)
1271 self.session.send(self._query_socket, "purge_request", content=content)
1288 self.session.send(self._query_socket, "purge_request", content=content)
1272 idents, msg = self.session.recv(self._query_socket, 0)
1289 idents, msg = self.session.recv(self._query_socket, 0)
1273 if self.debug:
1290 if self.debug:
1274 pprint(msg)
1291 pprint(msg)
1275 content = msg['content']
1292 content = msg['content']
1276 if content['status'] != 'ok':
1293 if content['status'] != 'ok':
1277 raise self._unwrap_exception(content)
1294 raise self._unwrap_exception(content)
1278
1295
1279 @spin_first
1296 @spin_first
1280 def hub_history(self):
1297 def hub_history(self):
1281 """Get the Hub's history
1298 """Get the Hub's history
1282
1299
1283 Just like the Client, the Hub has a history, which is a list of msg_ids.
1300 Just like the Client, the Hub has a history, which is a list of msg_ids.
1284 This will contain the history of all clients, and, depending on configuration,
1301 This will contain the history of all clients, and, depending on configuration,
1285 may contain history across multiple cluster sessions.
1302 may contain history across multiple cluster sessions.
1286
1303
1287 Any msg_id returned here is a valid argument to `get_result`.
1304 Any msg_id returned here is a valid argument to `get_result`.
1288
1305
1289 Returns
1306 Returns
1290 -------
1307 -------
1291
1308
1292 msg_ids : list of strs
1309 msg_ids : list of strs
1293 list of all msg_ids, ordered by task submission time.
1310 list of all msg_ids, ordered by task submission time.
1294 """
1311 """
1295
1312
1296 self.session.send(self._query_socket, "history_request", content={})
1313 self.session.send(self._query_socket, "history_request", content={})
1297 idents, msg = self.session.recv(self._query_socket, 0)
1314 idents, msg = self.session.recv(self._query_socket, 0)
1298
1315
1299 if self.debug:
1316 if self.debug:
1300 pprint(msg)
1317 pprint(msg)
1301 content = msg['content']
1318 content = msg['content']
1302 if content['status'] != 'ok':
1319 if content['status'] != 'ok':
1303 raise self._unwrap_exception(content)
1320 raise self._unwrap_exception(content)
1304 else:
1321 else:
1305 return content['history']
1322 return content['history']
1306
1323
1307 @spin_first
1324 @spin_first
1308 def db_query(self, query, keys=None):
1325 def db_query(self, query, keys=None):
1309 """Query the Hub's TaskRecord database
1326 """Query the Hub's TaskRecord database
1310
1327
1311 This will return a list of task record dicts that match `query`
1328 This will return a list of task record dicts that match `query`
1312
1329
1313 Parameters
1330 Parameters
1314 ----------
1331 ----------
1315
1332
1316 query : mongodb query dict
1333 query : mongodb query dict
1317 The search dict. See mongodb query docs for details.
1334 The search dict. See mongodb query docs for details.
1318 keys : list of strs [optional]
1335 keys : list of strs [optional]
1319 The subset of keys to be returned. The default is to fetch everything but buffers.
1336 The subset of keys to be returned. The default is to fetch everything but buffers.
1320 'msg_id' will *always* be included.
1337 'msg_id' will *always* be included.
1321 """
1338 """
1322 if isinstance(keys, basestring):
1339 if isinstance(keys, basestring):
1323 keys = [keys]
1340 keys = [keys]
1324 content = dict(query=query, keys=keys)
1341 content = dict(query=query, keys=keys)
1325 self.session.send(self._query_socket, "db_request", content=content)
1342 self.session.send(self._query_socket, "db_request", content=content)
1326 idents, msg = self.session.recv(self._query_socket, 0)
1343 idents, msg = self.session.recv(self._query_socket, 0)
1327 if self.debug:
1344 if self.debug:
1328 pprint(msg)
1345 pprint(msg)
1329 content = msg['content']
1346 content = msg['content']
1330 if content['status'] != 'ok':
1347 if content['status'] != 'ok':
1331 raise self._unwrap_exception(content)
1348 raise self._unwrap_exception(content)
1332
1349
1333 records = content['records']
1350 records = content['records']
1334
1351
1335 buffer_lens = content['buffer_lens']
1352 buffer_lens = content['buffer_lens']
1336 result_buffer_lens = content['result_buffer_lens']
1353 result_buffer_lens = content['result_buffer_lens']
1337 buffers = msg['buffers']
1354 buffers = msg['buffers']
1338 has_bufs = buffer_lens is not None
1355 has_bufs = buffer_lens is not None
1339 has_rbufs = result_buffer_lens is not None
1356 has_rbufs = result_buffer_lens is not None
1340 for i,rec in enumerate(records):
1357 for i,rec in enumerate(records):
1341 # relink buffers
1358 # relink buffers
1342 if has_bufs:
1359 if has_bufs:
1343 blen = buffer_lens[i]
1360 blen = buffer_lens[i]
1344 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1361 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1345 if has_rbufs:
1362 if has_rbufs:
1346 blen = result_buffer_lens[i]
1363 blen = result_buffer_lens[i]
1347 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1364 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1348
1365
1349 return records
1366 return records
1350
1367
1351 __all__ = [ 'Client' ]
1368 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now