##// END OF EJS Templates
Backport PR #4074: close Client sockets if connection fails...
MinRK -
Show More
@@ -1,1839 +1,1852 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
36
36
37 from IPython.utils.coloransi import TermColors
37 from IPython.utils.coloransi import TermColors
38 from IPython.utils.jsonutil import rekey
38 from IPython.utils.jsonutil import rekey
39 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
40 from IPython.utils.path import get_ipython_dir
40 from IPython.utils.path import get_ipython_dir
41 from IPython.utils.py3compat import cast_bytes
41 from IPython.utils.py3compat import cast_bytes
42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
43 Dict, List, Bool, Set, Any)
43 Dict, List, Bool, Set, Any)
44 from IPython.external.decorator import decorator
44 from IPython.external.decorator import decorator
45 from IPython.external.ssh import tunnel
45 from IPython.external.ssh import tunnel
46
46
47 from IPython.parallel import Reference
47 from IPython.parallel import Reference
48 from IPython.parallel import error
48 from IPython.parallel import error
49 from IPython.parallel import util
49 from IPython.parallel import util
50
50
51 from IPython.kernel.zmq.session import Session, Message
51 from IPython.kernel.zmq.session import Session, Message
52 from IPython.kernel.zmq import serialize
52 from IPython.kernel.zmq import serialize
53
53
54 from .asyncresult import AsyncResult, AsyncHubResult
54 from .asyncresult import AsyncResult, AsyncHubResult
55 from .view import DirectView, LoadBalancedView
55 from .view import DirectView, LoadBalancedView
56
56
57 if sys.version_info[0] >= 3:
57 if sys.version_info[0] >= 3:
58 # xrange is used in a couple 'isinstance' tests in py2
58 # xrange is used in a couple 'isinstance' tests in py2
59 # should be just 'range' in 3k
59 # should be just 'range' in 3k
60 xrange = range
60 xrange = range
61
61
62 #--------------------------------------------------------------------------
62 #--------------------------------------------------------------------------
63 # Decorators for Client methods
63 # Decorators for Client methods
64 #--------------------------------------------------------------------------
64 #--------------------------------------------------------------------------
65
65
66 @decorator
66 @decorator
67 def spin_first(f, self, *args, **kwargs):
67 def spin_first(f, self, *args, **kwargs):
68 """Call spin() to sync state prior to calling the method."""
68 """Call spin() to sync state prior to calling the method."""
69 self.spin()
69 self.spin()
70 return f(self, *args, **kwargs)
70 return f(self, *args, **kwargs)
71
71
72
72
73 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
74 # Classes
74 # Classes
75 #--------------------------------------------------------------------------
75 #--------------------------------------------------------------------------
76
76
77
77
78 class ExecuteReply(object):
78 class ExecuteReply(object):
79 """wrapper for finished Execute results"""
79 """wrapper for finished Execute results"""
80 def __init__(self, msg_id, content, metadata):
80 def __init__(self, msg_id, content, metadata):
81 self.msg_id = msg_id
81 self.msg_id = msg_id
82 self._content = content
82 self._content = content
83 self.execution_count = content['execution_count']
83 self.execution_count = content['execution_count']
84 self.metadata = metadata
84 self.metadata = metadata
85
85
86 def __getitem__(self, key):
86 def __getitem__(self, key):
87 return self.metadata[key]
87 return self.metadata[key]
88
88
89 def __getattr__(self, key):
89 def __getattr__(self, key):
90 if key not in self.metadata:
90 if key not in self.metadata:
91 raise AttributeError(key)
91 raise AttributeError(key)
92 return self.metadata[key]
92 return self.metadata[key]
93
93
94 def __repr__(self):
94 def __repr__(self):
95 pyout = self.metadata['pyout'] or {'data':{}}
95 pyout = self.metadata['pyout'] or {'data':{}}
96 text_out = pyout['data'].get('text/plain', '')
96 text_out = pyout['data'].get('text/plain', '')
97 if len(text_out) > 32:
97 if len(text_out) > 32:
98 text_out = text_out[:29] + '...'
98 text_out = text_out[:29] + '...'
99
99
100 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
101
101
102 def _repr_pretty_(self, p, cycle):
102 def _repr_pretty_(self, p, cycle):
103 pyout = self.metadata['pyout'] or {'data':{}}
103 pyout = self.metadata['pyout'] or {'data':{}}
104 text_out = pyout['data'].get('text/plain', '')
104 text_out = pyout['data'].get('text/plain', '')
105
105
106 if not text_out:
106 if not text_out:
107 return
107 return
108
108
109 try:
109 try:
110 ip = get_ipython()
110 ip = get_ipython()
111 except NameError:
111 except NameError:
112 colors = "NoColor"
112 colors = "NoColor"
113 else:
113 else:
114 colors = ip.colors
114 colors = ip.colors
115
115
116 if colors == "NoColor":
116 if colors == "NoColor":
117 out = normal = ""
117 out = normal = ""
118 else:
118 else:
119 out = TermColors.Red
119 out = TermColors.Red
120 normal = TermColors.Normal
120 normal = TermColors.Normal
121
121
122 if '\n' in text_out and not text_out.startswith('\n'):
122 if '\n' in text_out and not text_out.startswith('\n'):
123 # add newline for multiline reprs
123 # add newline for multiline reprs
124 text_out = '\n' + text_out
124 text_out = '\n' + text_out
125
125
126 p.text(
126 p.text(
127 out + u'Out[%i:%i]: ' % (
127 out + u'Out[%i:%i]: ' % (
128 self.metadata['engine_id'], self.execution_count
128 self.metadata['engine_id'], self.execution_count
129 ) + normal + text_out
129 ) + normal + text_out
130 )
130 )
131
131
132 def _repr_html_(self):
132 def _repr_html_(self):
133 pyout = self.metadata['pyout'] or {'data':{}}
133 pyout = self.metadata['pyout'] or {'data':{}}
134 return pyout['data'].get("text/html")
134 return pyout['data'].get("text/html")
135
135
136 def _repr_latex_(self):
136 def _repr_latex_(self):
137 pyout = self.metadata['pyout'] or {'data':{}}
137 pyout = self.metadata['pyout'] or {'data':{}}
138 return pyout['data'].get("text/latex")
138 return pyout['data'].get("text/latex")
139
139
140 def _repr_json_(self):
140 def _repr_json_(self):
141 pyout = self.metadata['pyout'] or {'data':{}}
141 pyout = self.metadata['pyout'] or {'data':{}}
142 return pyout['data'].get("application/json")
142 return pyout['data'].get("application/json")
143
143
144 def _repr_javascript_(self):
144 def _repr_javascript_(self):
145 pyout = self.metadata['pyout'] or {'data':{}}
145 pyout = self.metadata['pyout'] or {'data':{}}
146 return pyout['data'].get("application/javascript")
146 return pyout['data'].get("application/javascript")
147
147
148 def _repr_png_(self):
148 def _repr_png_(self):
149 pyout = self.metadata['pyout'] or {'data':{}}
149 pyout = self.metadata['pyout'] or {'data':{}}
150 return pyout['data'].get("image/png")
150 return pyout['data'].get("image/png")
151
151
152 def _repr_jpeg_(self):
152 def _repr_jpeg_(self):
153 pyout = self.metadata['pyout'] or {'data':{}}
153 pyout = self.metadata['pyout'] or {'data':{}}
154 return pyout['data'].get("image/jpeg")
154 return pyout['data'].get("image/jpeg")
155
155
156 def _repr_svg_(self):
156 def _repr_svg_(self):
157 pyout = self.metadata['pyout'] or {'data':{}}
157 pyout = self.metadata['pyout'] or {'data':{}}
158 return pyout['data'].get("image/svg+xml")
158 return pyout['data'].get("image/svg+xml")
159
159
160
160
161 class Metadata(dict):
161 class Metadata(dict):
162 """Subclass of dict for initializing metadata values.
162 """Subclass of dict for initializing metadata values.
163
163
164 Attribute access works on keys.
164 Attribute access works on keys.
165
165
166 These objects have a strict set of keys - errors will raise if you try
166 These objects have a strict set of keys - errors will raise if you try
167 to add new keys.
167 to add new keys.
168 """
168 """
169 def __init__(self, *args, **kwargs):
169 def __init__(self, *args, **kwargs):
170 dict.__init__(self)
170 dict.__init__(self)
171 md = {'msg_id' : None,
171 md = {'msg_id' : None,
172 'submitted' : None,
172 'submitted' : None,
173 'started' : None,
173 'started' : None,
174 'completed' : None,
174 'completed' : None,
175 'received' : None,
175 'received' : None,
176 'engine_uuid' : None,
176 'engine_uuid' : None,
177 'engine_id' : None,
177 'engine_id' : None,
178 'follow' : None,
178 'follow' : None,
179 'after' : None,
179 'after' : None,
180 'status' : None,
180 'status' : None,
181
181
182 'pyin' : None,
182 'pyin' : None,
183 'pyout' : None,
183 'pyout' : None,
184 'pyerr' : None,
184 'pyerr' : None,
185 'stdout' : '',
185 'stdout' : '',
186 'stderr' : '',
186 'stderr' : '',
187 'outputs' : [],
187 'outputs' : [],
188 'data': {},
188 'data': {},
189 'outputs_ready' : False,
189 'outputs_ready' : False,
190 }
190 }
191 self.update(md)
191 self.update(md)
192 self.update(dict(*args, **kwargs))
192 self.update(dict(*args, **kwargs))
193
193
194 def __getattr__(self, key):
194 def __getattr__(self, key):
195 """getattr aliased to getitem"""
195 """getattr aliased to getitem"""
196 if key in self.iterkeys():
196 if key in self.iterkeys():
197 return self[key]
197 return self[key]
198 else:
198 else:
199 raise AttributeError(key)
199 raise AttributeError(key)
200
200
201 def __setattr__(self, key, value):
201 def __setattr__(self, key, value):
202 """setattr aliased to setitem, with strict"""
202 """setattr aliased to setitem, with strict"""
203 if key in self.iterkeys():
203 if key in self.iterkeys():
204 self[key] = value
204 self[key] = value
205 else:
205 else:
206 raise AttributeError(key)
206 raise AttributeError(key)
207
207
208 def __setitem__(self, key, value):
208 def __setitem__(self, key, value):
209 """strict static key enforcement"""
209 """strict static key enforcement"""
210 if key in self.iterkeys():
210 if key in self.iterkeys():
211 dict.__setitem__(self, key, value)
211 dict.__setitem__(self, key, value)
212 else:
212 else:
213 raise KeyError(key)
213 raise KeyError(key)
214
214
215
215
216 class Client(HasTraits):
216 class Client(HasTraits):
217 """A semi-synchronous client to the IPython ZMQ cluster
217 """A semi-synchronous client to the IPython ZMQ cluster
218
218
219 Parameters
219 Parameters
220 ----------
220 ----------
221
221
222 url_file : str/unicode; path to ipcontroller-client.json
222 url_file : str/unicode; path to ipcontroller-client.json
223 This JSON file should contain all the information needed to connect to a cluster,
223 This JSON file should contain all the information needed to connect to a cluster,
224 and is likely the only argument needed.
224 and is likely the only argument needed.
225 Connection information for the Hub's registration. If a json connector
225 Connection information for the Hub's registration. If a json connector
226 file is given, then likely no further configuration is necessary.
226 file is given, then likely no further configuration is necessary.
227 [Default: use profile]
227 [Default: use profile]
228 profile : bytes
228 profile : bytes
229 The name of the Cluster profile to be used to find connector information.
229 The name of the Cluster profile to be used to find connector information.
230 If run from an IPython application, the default profile will be the same
230 If run from an IPython application, the default profile will be the same
231 as the running application, otherwise it will be 'default'.
231 as the running application, otherwise it will be 'default'.
232 cluster_id : str
232 cluster_id : str
233 String id to added to runtime files, to prevent name collisions when using
233 String id to added to runtime files, to prevent name collisions when using
234 multiple clusters with a single profile simultaneously.
234 multiple clusters with a single profile simultaneously.
235 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
235 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
236 Since this is text inserted into filenames, typical recommendations apply:
236 Since this is text inserted into filenames, typical recommendations apply:
237 Simple character strings are ideal, and spaces are not recommended (but
237 Simple character strings are ideal, and spaces are not recommended (but
238 should generally work)
238 should generally work)
239 context : zmq.Context
239 context : zmq.Context
240 Pass an existing zmq.Context instance, otherwise the client will create its own.
240 Pass an existing zmq.Context instance, otherwise the client will create its own.
241 debug : bool
241 debug : bool
242 flag for lots of message printing for debug purposes
242 flag for lots of message printing for debug purposes
243 timeout : int/float
243 timeout : int/float
244 time (in seconds) to wait for connection replies from the Hub
244 time (in seconds) to wait for connection replies from the Hub
245 [Default: 10]
245 [Default: 10]
246
246
247 #-------------- session related args ----------------
247 #-------------- session related args ----------------
248
248
249 config : Config object
249 config : Config object
250 If specified, this will be relayed to the Session for configuration
250 If specified, this will be relayed to the Session for configuration
251 username : str
251 username : str
252 set username for the session object
252 set username for the session object
253
253
254 #-------------- ssh related args ----------------
254 #-------------- ssh related args ----------------
255 # These are args for configuring the ssh tunnel to be used
255 # These are args for configuring the ssh tunnel to be used
256 # credentials are used to forward connections over ssh to the Controller
256 # credentials are used to forward connections over ssh to the Controller
257 # Note that the ip given in `addr` needs to be relative to sshserver
257 # Note that the ip given in `addr` needs to be relative to sshserver
258 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
258 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
259 # and set sshserver as the same machine the Controller is on. However,
259 # and set sshserver as the same machine the Controller is on. However,
260 # the only requirement is that sshserver is able to see the Controller
260 # the only requirement is that sshserver is able to see the Controller
261 # (i.e. is within the same trusted network).
261 # (i.e. is within the same trusted network).
262
262
263 sshserver : str
263 sshserver : str
264 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
264 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
265 If keyfile or password is specified, and this is not, it will default to
265 If keyfile or password is specified, and this is not, it will default to
266 the ip given in addr.
266 the ip given in addr.
267 sshkey : str; path to ssh private key file
267 sshkey : str; path to ssh private key file
268 This specifies a key to be used in ssh login, default None.
268 This specifies a key to be used in ssh login, default None.
269 Regular default ssh keys will be used without specifying this argument.
269 Regular default ssh keys will be used without specifying this argument.
270 password : str
270 password : str
271 Your ssh password to sshserver. Note that if this is left None,
271 Your ssh password to sshserver. Note that if this is left None,
272 you will be prompted for it if passwordless key based login is unavailable.
272 you will be prompted for it if passwordless key based login is unavailable.
273 paramiko : bool
273 paramiko : bool
274 flag for whether to use paramiko instead of shell ssh for tunneling.
274 flag for whether to use paramiko instead of shell ssh for tunneling.
275 [default: True on win32, False else]
275 [default: True on win32, False else]
276
276
277
277
278 Attributes
278 Attributes
279 ----------
279 ----------
280
280
281 ids : list of int engine IDs
281 ids : list of int engine IDs
282 requesting the ids attribute always synchronizes
282 requesting the ids attribute always synchronizes
283 the registration state. To request ids without synchronization,
283 the registration state. To request ids without synchronization,
284 use semi-private _ids attributes.
284 use semi-private _ids attributes.
285
285
286 history : list of msg_ids
286 history : list of msg_ids
287 a list of msg_ids, keeping track of all the execution
287 a list of msg_ids, keeping track of all the execution
288 messages you have submitted in order.
288 messages you have submitted in order.
289
289
290 outstanding : set of msg_ids
290 outstanding : set of msg_ids
291 a set of msg_ids that have been submitted, but whose
291 a set of msg_ids that have been submitted, but whose
292 results have not yet been received.
292 results have not yet been received.
293
293
294 results : dict
294 results : dict
295 a dict of all our results, keyed by msg_id
295 a dict of all our results, keyed by msg_id
296
296
297 block : bool
297 block : bool
298 determines default behavior when block not specified
298 determines default behavior when block not specified
299 in execution methods
299 in execution methods
300
300
301 Methods
301 Methods
302 -------
302 -------
303
303
304 spin
304 spin
305 flushes incoming results and registration state changes
305 flushes incoming results and registration state changes
306 control methods spin, and requesting `ids` also ensures up to date
306 control methods spin, and requesting `ids` also ensures up to date
307
307
308 wait
308 wait
309 wait on one or more msg_ids
309 wait on one or more msg_ids
310
310
311 execution methods
311 execution methods
312 apply
312 apply
313 legacy: execute, run
313 legacy: execute, run
314
314
315 data movement
315 data movement
316 push, pull, scatter, gather
316 push, pull, scatter, gather
317
317
318 query methods
318 query methods
319 queue_status, get_result, purge, result_status
319 queue_status, get_result, purge, result_status
320
320
321 control methods
321 control methods
322 abort, shutdown
322 abort, shutdown
323
323
324 """
324 """
325
325
326
326
327 block = Bool(False)
327 block = Bool(False)
328 outstanding = Set()
328 outstanding = Set()
329 results = Instance('collections.defaultdict', (dict,))
329 results = Instance('collections.defaultdict', (dict,))
330 metadata = Instance('collections.defaultdict', (Metadata,))
330 metadata = Instance('collections.defaultdict', (Metadata,))
331 history = List()
331 history = List()
332 debug = Bool(False)
332 debug = Bool(False)
333 _spin_thread = Any()
333 _spin_thread = Any()
334 _stop_spinning = Any()
334 _stop_spinning = Any()
335
335
336 profile=Unicode()
336 profile=Unicode()
337 def _profile_default(self):
337 def _profile_default(self):
338 if BaseIPythonApplication.initialized():
338 if BaseIPythonApplication.initialized():
339 # an IPython app *might* be running, try to get its profile
339 # an IPython app *might* be running, try to get its profile
340 try:
340 try:
341 return BaseIPythonApplication.instance().profile
341 return BaseIPythonApplication.instance().profile
342 except (AttributeError, MultipleInstanceError):
342 except (AttributeError, MultipleInstanceError):
343 # could be a *different* subclass of config.Application,
343 # could be a *different* subclass of config.Application,
344 # which would raise one of these two errors.
344 # which would raise one of these two errors.
345 return u'default'
345 return u'default'
346 else:
346 else:
347 return u'default'
347 return u'default'
348
348
349
349
350 _outstanding_dict = Instance('collections.defaultdict', (set,))
350 _outstanding_dict = Instance('collections.defaultdict', (set,))
351 _ids = List()
351 _ids = List()
352 _connected=Bool(False)
352 _connected=Bool(False)
353 _ssh=Bool(False)
353 _ssh=Bool(False)
354 _context = Instance('zmq.Context')
354 _context = Instance('zmq.Context')
355 _config = Dict()
355 _config = Dict()
356 _engines=Instance(util.ReverseDict, (), {})
356 _engines=Instance(util.ReverseDict, (), {})
357 # _hub_socket=Instance('zmq.Socket')
357 # _hub_socket=Instance('zmq.Socket')
358 _query_socket=Instance('zmq.Socket')
358 _query_socket=Instance('zmq.Socket')
359 _control_socket=Instance('zmq.Socket')
359 _control_socket=Instance('zmq.Socket')
360 _iopub_socket=Instance('zmq.Socket')
360 _iopub_socket=Instance('zmq.Socket')
361 _notification_socket=Instance('zmq.Socket')
361 _notification_socket=Instance('zmq.Socket')
362 _mux_socket=Instance('zmq.Socket')
362 _mux_socket=Instance('zmq.Socket')
363 _task_socket=Instance('zmq.Socket')
363 _task_socket=Instance('zmq.Socket')
364 _task_scheme=Unicode()
364 _task_scheme=Unicode()
365 _closed = False
365 _closed = False
366 _ignored_control_replies=Integer(0)
366 _ignored_control_replies=Integer(0)
367 _ignored_hub_replies=Integer(0)
367 _ignored_hub_replies=Integer(0)
368
368
369 def __new__(self, *args, **kw):
369 def __new__(self, *args, **kw):
370 # don't raise on positional args
370 # don't raise on positional args
371 return HasTraits.__new__(self, **kw)
371 return HasTraits.__new__(self, **kw)
372
372
373 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
373 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
374 context=None, debug=False,
374 context=None, debug=False,
375 sshserver=None, sshkey=None, password=None, paramiko=None,
375 sshserver=None, sshkey=None, password=None, paramiko=None,
376 timeout=10, cluster_id=None, **extra_args
376 timeout=10, cluster_id=None, **extra_args
377 ):
377 ):
378 if profile:
378 if profile:
379 super(Client, self).__init__(debug=debug, profile=profile)
379 super(Client, self).__init__(debug=debug, profile=profile)
380 else:
380 else:
381 super(Client, self).__init__(debug=debug)
381 super(Client, self).__init__(debug=debug)
382 if context is None:
382 if context is None:
383 context = zmq.Context.instance()
383 context = zmq.Context.instance()
384 self._context = context
384 self._context = context
385 self._stop_spinning = Event()
385 self._stop_spinning = Event()
386
386
387 if 'url_or_file' in extra_args:
387 if 'url_or_file' in extra_args:
388 url_file = extra_args['url_or_file']
388 url_file = extra_args['url_or_file']
389 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
389 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
390
390
391 if url_file and util.is_url(url_file):
391 if url_file and util.is_url(url_file):
392 raise ValueError("single urls cannot be specified, url-files must be used.")
392 raise ValueError("single urls cannot be specified, url-files must be used.")
393
393
394 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
394 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
395
395
396 if self._cd is not None:
396 if self._cd is not None:
397 if url_file is None:
397 if url_file is None:
398 if not cluster_id:
398 if not cluster_id:
399 client_json = 'ipcontroller-client.json'
399 client_json = 'ipcontroller-client.json'
400 else:
400 else:
401 client_json = 'ipcontroller-%s-client.json' % cluster_id
401 client_json = 'ipcontroller-%s-client.json' % cluster_id
402 url_file = pjoin(self._cd.security_dir, client_json)
402 url_file = pjoin(self._cd.security_dir, client_json)
403 if url_file is None:
403 if url_file is None:
404 raise ValueError(
404 raise ValueError(
405 "I can't find enough information to connect to a hub!"
405 "I can't find enough information to connect to a hub!"
406 " Please specify at least one of url_file or profile."
406 " Please specify at least one of url_file or profile."
407 )
407 )
408
408
409 with open(url_file) as f:
409 with open(url_file) as f:
410 cfg = json.load(f)
410 cfg = json.load(f)
411
411
412 self._task_scheme = cfg['task_scheme']
412 self._task_scheme = cfg['task_scheme']
413
413
414 # sync defaults from args, json:
414 # sync defaults from args, json:
415 if sshserver:
415 if sshserver:
416 cfg['ssh'] = sshserver
416 cfg['ssh'] = sshserver
417
417
418 location = cfg.setdefault('location', None)
418 location = cfg.setdefault('location', None)
419
419
420 proto,addr = cfg['interface'].split('://')
420 proto,addr = cfg['interface'].split('://')
421 addr = util.disambiguate_ip_address(addr, location)
421 addr = util.disambiguate_ip_address(addr, location)
422 cfg['interface'] = "%s://%s" % (proto, addr)
422 cfg['interface'] = "%s://%s" % (proto, addr)
423
423
424 # turn interface,port into full urls:
424 # turn interface,port into full urls:
425 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
425 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
426 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
426 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
427
427
428 url = cfg['registration']
428 url = cfg['registration']
429
429
430 if location is not None and addr == LOCALHOST:
430 if location is not None and addr == LOCALHOST:
431 # location specified, and connection is expected to be local
431 # location specified, and connection is expected to be local
432 if location not in LOCAL_IPS and not sshserver:
432 if location not in LOCAL_IPS and not sshserver:
433 # load ssh from JSON *only* if the controller is not on
433 # load ssh from JSON *only* if the controller is not on
434 # this machine
434 # this machine
435 sshserver=cfg['ssh']
435 sshserver=cfg['ssh']
436 if location not in LOCAL_IPS and not sshserver:
436 if location not in LOCAL_IPS and not sshserver:
437 # warn if no ssh specified, but SSH is probably needed
437 # warn if no ssh specified, but SSH is probably needed
438 # This is only a warning, because the most likely cause
438 # This is only a warning, because the most likely cause
439 # is a local Controller on a laptop whose IP is dynamic
439 # is a local Controller on a laptop whose IP is dynamic
440 warnings.warn("""
440 warnings.warn("""
441 Controller appears to be listening on localhost, but not on this machine.
441 Controller appears to be listening on localhost, but not on this machine.
442 If this is true, you should specify Client(...,sshserver='you@%s')
442 If this is true, you should specify Client(...,sshserver='you@%s')
443 or instruct your controller to listen on an external IP."""%location,
443 or instruct your controller to listen on an external IP."""%location,
444 RuntimeWarning)
444 RuntimeWarning)
445 elif not sshserver:
445 elif not sshserver:
446 # otherwise sync with cfg
446 # otherwise sync with cfg
447 sshserver = cfg['ssh']
447 sshserver = cfg['ssh']
448
448
449 self._config = cfg
449 self._config = cfg
450
450
451 self._ssh = bool(sshserver or sshkey or password)
451 self._ssh = bool(sshserver or sshkey or password)
452 if self._ssh and sshserver is None:
452 if self._ssh and sshserver is None:
453 # default to ssh via localhost
453 # default to ssh via localhost
454 sshserver = addr
454 sshserver = addr
455 if self._ssh and password is None:
455 if self._ssh and password is None:
456 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
456 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
457 password=False
457 password=False
458 else:
458 else:
459 password = getpass("SSH Password for %s: "%sshserver)
459 password = getpass("SSH Password for %s: "%sshserver)
460 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
460 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
461
461
462 # configure and construct the session
462 # configure and construct the session
463 try:
463 try:
464 extra_args['packer'] = cfg['pack']
464 extra_args['packer'] = cfg['pack']
465 extra_args['unpacker'] = cfg['unpack']
465 extra_args['unpacker'] = cfg['unpack']
466 extra_args['key'] = cast_bytes(cfg['key'])
466 extra_args['key'] = cast_bytes(cfg['key'])
467 extra_args['signature_scheme'] = cfg['signature_scheme']
467 extra_args['signature_scheme'] = cfg['signature_scheme']
468 except KeyError as exc:
468 except KeyError as exc:
469 msg = '\n'.join([
469 msg = '\n'.join([
470 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
470 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
471 "If you are reusing connection files, remove them and start ipcontroller again."
471 "If you are reusing connection files, remove them and start ipcontroller again."
472 ])
472 ])
473 raise ValueError(msg.format(exc.message))
473 raise ValueError(msg.format(exc.message))
474
474
475 self.session = Session(**extra_args)
475 self.session = Session(**extra_args)
476
476
477 self._query_socket = self._context.socket(zmq.DEALER)
477 self._query_socket = self._context.socket(zmq.DEALER)
478
478
479 if self._ssh:
479 if self._ssh:
480 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
480 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
481 else:
481 else:
482 self._query_socket.connect(cfg['registration'])
482 self._query_socket.connect(cfg['registration'])
483
483
484 self.session.debug = self.debug
484 self.session.debug = self.debug
485
485
486 self._notification_handlers = {'registration_notification' : self._register_engine,
486 self._notification_handlers = {'registration_notification' : self._register_engine,
487 'unregistration_notification' : self._unregister_engine,
487 'unregistration_notification' : self._unregister_engine,
488 'shutdown_notification' : lambda msg: self.close(),
488 'shutdown_notification' : lambda msg: self.close(),
489 }
489 }
490 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
490 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
491 'apply_reply' : self._handle_apply_reply}
491 'apply_reply' : self._handle_apply_reply}
492 self._connect(sshserver, ssh_kwargs, timeout)
492
493 try:
494 self._connect(sshserver, ssh_kwargs, timeout)
495 except:
496 self.close(linger=0)
497 raise
493
498
494 # last step: setup magics, if we are in IPython:
499 # last step: setup magics, if we are in IPython:
495
500
496 try:
501 try:
497 ip = get_ipython()
502 ip = get_ipython()
498 except NameError:
503 except NameError:
499 return
504 return
500 else:
505 else:
501 if 'px' not in ip.magics_manager.magics:
506 if 'px' not in ip.magics_manager.magics:
502 # in IPython but we are the first Client.
507 # in IPython but we are the first Client.
503 # activate a default view for parallel magics.
508 # activate a default view for parallel magics.
504 self.activate()
509 self.activate()
505
510
506 def __del__(self):
511 def __del__(self):
507 """cleanup sockets, but _not_ context."""
512 """cleanup sockets, but _not_ context."""
508 self.close()
513 self.close()
509
514
510 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
515 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
511 if ipython_dir is None:
516 if ipython_dir is None:
512 ipython_dir = get_ipython_dir()
517 ipython_dir = get_ipython_dir()
513 if profile_dir is not None:
518 if profile_dir is not None:
514 try:
519 try:
515 self._cd = ProfileDir.find_profile_dir(profile_dir)
520 self._cd = ProfileDir.find_profile_dir(profile_dir)
516 return
521 return
517 except ProfileDirError:
522 except ProfileDirError:
518 pass
523 pass
519 elif profile is not None:
524 elif profile is not None:
520 try:
525 try:
521 self._cd = ProfileDir.find_profile_dir_by_name(
526 self._cd = ProfileDir.find_profile_dir_by_name(
522 ipython_dir, profile)
527 ipython_dir, profile)
523 return
528 return
524 except ProfileDirError:
529 except ProfileDirError:
525 pass
530 pass
526 self._cd = None
531 self._cd = None
527
532
528 def _update_engines(self, engines):
533 def _update_engines(self, engines):
529 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
534 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
530 for k,v in engines.iteritems():
535 for k,v in engines.iteritems():
531 eid = int(k)
536 eid = int(k)
532 if eid not in self._engines:
537 if eid not in self._engines:
533 self._ids.append(eid)
538 self._ids.append(eid)
534 self._engines[eid] = v
539 self._engines[eid] = v
535 self._ids = sorted(self._ids)
540 self._ids = sorted(self._ids)
536 if sorted(self._engines.keys()) != range(len(self._engines)) and \
541 if sorted(self._engines.keys()) != range(len(self._engines)) and \
537 self._task_scheme == 'pure' and self._task_socket:
542 self._task_scheme == 'pure' and self._task_socket:
538 self._stop_scheduling_tasks()
543 self._stop_scheduling_tasks()
539
544
540 def _stop_scheduling_tasks(self):
545 def _stop_scheduling_tasks(self):
541 """Stop scheduling tasks because an engine has been unregistered
546 """Stop scheduling tasks because an engine has been unregistered
542 from a pure ZMQ scheduler.
547 from a pure ZMQ scheduler.
543 """
548 """
544 self._task_socket.close()
549 self._task_socket.close()
545 self._task_socket = None
550 self._task_socket = None
546 msg = "An engine has been unregistered, and we are using pure " +\
551 msg = "An engine has been unregistered, and we are using pure " +\
547 "ZMQ task scheduling. Task farming will be disabled."
552 "ZMQ task scheduling. Task farming will be disabled."
548 if self.outstanding:
553 if self.outstanding:
549 msg += " If you were running tasks when this happened, " +\
554 msg += " If you were running tasks when this happened, " +\
550 "some `outstanding` msg_ids may never resolve."
555 "some `outstanding` msg_ids may never resolve."
551 warnings.warn(msg, RuntimeWarning)
556 warnings.warn(msg, RuntimeWarning)
552
557
553 def _build_targets(self, targets):
558 def _build_targets(self, targets):
554 """Turn valid target IDs or 'all' into two lists:
559 """Turn valid target IDs or 'all' into two lists:
555 (int_ids, uuids).
560 (int_ids, uuids).
556 """
561 """
557 if not self._ids:
562 if not self._ids:
558 # flush notification socket if no engines yet, just in case
563 # flush notification socket if no engines yet, just in case
559 if not self.ids:
564 if not self.ids:
560 raise error.NoEnginesRegistered("Can't build targets without any engines")
565 raise error.NoEnginesRegistered("Can't build targets without any engines")
561
566
562 if targets is None:
567 if targets is None:
563 targets = self._ids
568 targets = self._ids
564 elif isinstance(targets, basestring):
569 elif isinstance(targets, basestring):
565 if targets.lower() == 'all':
570 if targets.lower() == 'all':
566 targets = self._ids
571 targets = self._ids
567 else:
572 else:
568 raise TypeError("%r not valid str target, must be 'all'"%(targets))
573 raise TypeError("%r not valid str target, must be 'all'"%(targets))
569 elif isinstance(targets, int):
574 elif isinstance(targets, int):
570 if targets < 0:
575 if targets < 0:
571 targets = self.ids[targets]
576 targets = self.ids[targets]
572 if targets not in self._ids:
577 if targets not in self._ids:
573 raise IndexError("No such engine: %i"%targets)
578 raise IndexError("No such engine: %i"%targets)
574 targets = [targets]
579 targets = [targets]
575
580
576 if isinstance(targets, slice):
581 if isinstance(targets, slice):
577 indices = range(len(self._ids))[targets]
582 indices = range(len(self._ids))[targets]
578 ids = self.ids
583 ids = self.ids
579 targets = [ ids[i] for i in indices ]
584 targets = [ ids[i] for i in indices ]
580
585
581 if not isinstance(targets, (tuple, list, xrange)):
586 if not isinstance(targets, (tuple, list, xrange)):
582 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
587 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
583
588
584 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
589 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
585
590
586 def _connect(self, sshserver, ssh_kwargs, timeout):
591 def _connect(self, sshserver, ssh_kwargs, timeout):
587 """setup all our socket connections to the cluster. This is called from
592 """setup all our socket connections to the cluster. This is called from
588 __init__."""
593 __init__."""
589
594
590 # Maybe allow reconnecting?
595 # Maybe allow reconnecting?
591 if self._connected:
596 if self._connected:
592 return
597 return
593 self._connected=True
598 self._connected=True
594
599
595 def connect_socket(s, url):
600 def connect_socket(s, url):
596 # url = util.disambiguate_url(url, self._config['location'])
597 if self._ssh:
601 if self._ssh:
598 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
602 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
599 else:
603 else:
600 return s.connect(url)
604 return s.connect(url)
601
605
602 self.session.send(self._query_socket, 'connection_request')
606 self.session.send(self._query_socket, 'connection_request')
603 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
607 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
604 poller = zmq.Poller()
608 poller = zmq.Poller()
605 poller.register(self._query_socket, zmq.POLLIN)
609 poller.register(self._query_socket, zmq.POLLIN)
606 # poll expects milliseconds, timeout is seconds
610 # poll expects milliseconds, timeout is seconds
607 evts = poller.poll(timeout*1000)
611 evts = poller.poll(timeout*1000)
608 if not evts:
612 if not evts:
609 raise error.TimeoutError("Hub connection request timed out")
613 raise error.TimeoutError("Hub connection request timed out")
610 idents,msg = self.session.recv(self._query_socket,mode=0)
614 idents,msg = self.session.recv(self._query_socket,mode=0)
611 if self.debug:
615 if self.debug:
612 pprint(msg)
616 pprint(msg)
613 content = msg['content']
617 content = msg['content']
614 # self._config['registration'] = dict(content)
618 # self._config['registration'] = dict(content)
615 cfg = self._config
619 cfg = self._config
616 if content['status'] == 'ok':
620 if content['status'] == 'ok':
617 self._mux_socket = self._context.socket(zmq.DEALER)
621 self._mux_socket = self._context.socket(zmq.DEALER)
618 connect_socket(self._mux_socket, cfg['mux'])
622 connect_socket(self._mux_socket, cfg['mux'])
619
623
620 self._task_socket = self._context.socket(zmq.DEALER)
624 self._task_socket = self._context.socket(zmq.DEALER)
621 connect_socket(self._task_socket, cfg['task'])
625 connect_socket(self._task_socket, cfg['task'])
622
626
623 self._notification_socket = self._context.socket(zmq.SUB)
627 self._notification_socket = self._context.socket(zmq.SUB)
624 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
628 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
625 connect_socket(self._notification_socket, cfg['notification'])
629 connect_socket(self._notification_socket, cfg['notification'])
626
630
627 self._control_socket = self._context.socket(zmq.DEALER)
631 self._control_socket = self._context.socket(zmq.DEALER)
628 connect_socket(self._control_socket, cfg['control'])
632 connect_socket(self._control_socket, cfg['control'])
629
633
630 self._iopub_socket = self._context.socket(zmq.SUB)
634 self._iopub_socket = self._context.socket(zmq.SUB)
631 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
635 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
632 connect_socket(self._iopub_socket, cfg['iopub'])
636 connect_socket(self._iopub_socket, cfg['iopub'])
633
637
634 self._update_engines(dict(content['engines']))
638 self._update_engines(dict(content['engines']))
635 else:
639 else:
636 self._connected = False
640 self._connected = False
637 raise Exception("Failed to connect!")
641 raise Exception("Failed to connect!")
638
642
639 #--------------------------------------------------------------------------
643 #--------------------------------------------------------------------------
640 # handlers and callbacks for incoming messages
644 # handlers and callbacks for incoming messages
641 #--------------------------------------------------------------------------
645 #--------------------------------------------------------------------------
642
646
643 def _unwrap_exception(self, content):
647 def _unwrap_exception(self, content):
644 """unwrap exception, and remap engine_id to int."""
648 """unwrap exception, and remap engine_id to int."""
645 e = error.unwrap_exception(content)
649 e = error.unwrap_exception(content)
646 # print e.traceback
650 # print e.traceback
647 if e.engine_info:
651 if e.engine_info:
648 e_uuid = e.engine_info['engine_uuid']
652 e_uuid = e.engine_info['engine_uuid']
649 eid = self._engines[e_uuid]
653 eid = self._engines[e_uuid]
650 e.engine_info['engine_id'] = eid
654 e.engine_info['engine_id'] = eid
651 return e
655 return e
652
656
653 def _extract_metadata(self, msg):
657 def _extract_metadata(self, msg):
654 header = msg['header']
658 header = msg['header']
655 parent = msg['parent_header']
659 parent = msg['parent_header']
656 msg_meta = msg['metadata']
660 msg_meta = msg['metadata']
657 content = msg['content']
661 content = msg['content']
658 md = {'msg_id' : parent['msg_id'],
662 md = {'msg_id' : parent['msg_id'],
659 'received' : datetime.now(),
663 'received' : datetime.now(),
660 'engine_uuid' : msg_meta.get('engine', None),
664 'engine_uuid' : msg_meta.get('engine', None),
661 'follow' : msg_meta.get('follow', []),
665 'follow' : msg_meta.get('follow', []),
662 'after' : msg_meta.get('after', []),
666 'after' : msg_meta.get('after', []),
663 'status' : content['status'],
667 'status' : content['status'],
664 }
668 }
665
669
666 if md['engine_uuid'] is not None:
670 if md['engine_uuid'] is not None:
667 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
671 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
668
672
669 if 'date' in parent:
673 if 'date' in parent:
670 md['submitted'] = parent['date']
674 md['submitted'] = parent['date']
671 if 'started' in msg_meta:
675 if 'started' in msg_meta:
672 md['started'] = msg_meta['started']
676 md['started'] = msg_meta['started']
673 if 'date' in header:
677 if 'date' in header:
674 md['completed'] = header['date']
678 md['completed'] = header['date']
675 return md
679 return md
676
680
677 def _register_engine(self, msg):
681 def _register_engine(self, msg):
678 """Register a new engine, and update our connection info."""
682 """Register a new engine, and update our connection info."""
679 content = msg['content']
683 content = msg['content']
680 eid = content['id']
684 eid = content['id']
681 d = {eid : content['uuid']}
685 d = {eid : content['uuid']}
682 self._update_engines(d)
686 self._update_engines(d)
683
687
684 def _unregister_engine(self, msg):
688 def _unregister_engine(self, msg):
685 """Unregister an engine that has died."""
689 """Unregister an engine that has died."""
686 content = msg['content']
690 content = msg['content']
687 eid = int(content['id'])
691 eid = int(content['id'])
688 if eid in self._ids:
692 if eid in self._ids:
689 self._ids.remove(eid)
693 self._ids.remove(eid)
690 uuid = self._engines.pop(eid)
694 uuid = self._engines.pop(eid)
691
695
692 self._handle_stranded_msgs(eid, uuid)
696 self._handle_stranded_msgs(eid, uuid)
693
697
694 if self._task_socket and self._task_scheme == 'pure':
698 if self._task_socket and self._task_scheme == 'pure':
695 self._stop_scheduling_tasks()
699 self._stop_scheduling_tasks()
696
700
697 def _handle_stranded_msgs(self, eid, uuid):
701 def _handle_stranded_msgs(self, eid, uuid):
698 """Handle messages known to be on an engine when the engine unregisters.
702 """Handle messages known to be on an engine when the engine unregisters.
699
703
700 It is possible that this will fire prematurely - that is, an engine will
704 It is possible that this will fire prematurely - that is, an engine will
701 go down after completing a result, and the client will be notified
705 go down after completing a result, and the client will be notified
702 of the unregistration and later receive the successful result.
706 of the unregistration and later receive the successful result.
703 """
707 """
704
708
705 outstanding = self._outstanding_dict[uuid]
709 outstanding = self._outstanding_dict[uuid]
706
710
707 for msg_id in list(outstanding):
711 for msg_id in list(outstanding):
708 if msg_id in self.results:
712 if msg_id in self.results:
709 # we already
713 # we already
710 continue
714 continue
711 try:
715 try:
712 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
716 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
713 except:
717 except:
714 content = error.wrap_exception()
718 content = error.wrap_exception()
715 # build a fake message:
719 # build a fake message:
716 msg = self.session.msg('apply_reply', content=content)
720 msg = self.session.msg('apply_reply', content=content)
717 msg['parent_header']['msg_id'] = msg_id
721 msg['parent_header']['msg_id'] = msg_id
718 msg['metadata']['engine'] = uuid
722 msg['metadata']['engine'] = uuid
719 self._handle_apply_reply(msg)
723 self._handle_apply_reply(msg)
720
724
721 def _handle_execute_reply(self, msg):
725 def _handle_execute_reply(self, msg):
722 """Save the reply to an execute_request into our results.
726 """Save the reply to an execute_request into our results.
723
727
724 execute messages are never actually used. apply is used instead.
728 execute messages are never actually used. apply is used instead.
725 """
729 """
726
730
727 parent = msg['parent_header']
731 parent = msg['parent_header']
728 msg_id = parent['msg_id']
732 msg_id = parent['msg_id']
729 if msg_id not in self.outstanding:
733 if msg_id not in self.outstanding:
730 if msg_id in self.history:
734 if msg_id in self.history:
731 print ("got stale result: %s"%msg_id)
735 print ("got stale result: %s"%msg_id)
732 else:
736 else:
733 print ("got unknown result: %s"%msg_id)
737 print ("got unknown result: %s"%msg_id)
734 else:
738 else:
735 self.outstanding.remove(msg_id)
739 self.outstanding.remove(msg_id)
736
740
737 content = msg['content']
741 content = msg['content']
738 header = msg['header']
742 header = msg['header']
739
743
740 # construct metadata:
744 # construct metadata:
741 md = self.metadata[msg_id]
745 md = self.metadata[msg_id]
742 md.update(self._extract_metadata(msg))
746 md.update(self._extract_metadata(msg))
743 # is this redundant?
747 # is this redundant?
744 self.metadata[msg_id] = md
748 self.metadata[msg_id] = md
745
749
746 e_outstanding = self._outstanding_dict[md['engine_uuid']]
750 e_outstanding = self._outstanding_dict[md['engine_uuid']]
747 if msg_id in e_outstanding:
751 if msg_id in e_outstanding:
748 e_outstanding.remove(msg_id)
752 e_outstanding.remove(msg_id)
749
753
750 # construct result:
754 # construct result:
751 if content['status'] == 'ok':
755 if content['status'] == 'ok':
752 self.results[msg_id] = ExecuteReply(msg_id, content, md)
756 self.results[msg_id] = ExecuteReply(msg_id, content, md)
753 elif content['status'] == 'aborted':
757 elif content['status'] == 'aborted':
754 self.results[msg_id] = error.TaskAborted(msg_id)
758 self.results[msg_id] = error.TaskAborted(msg_id)
755 elif content['status'] == 'resubmitted':
759 elif content['status'] == 'resubmitted':
756 # TODO: handle resubmission
760 # TODO: handle resubmission
757 pass
761 pass
758 else:
762 else:
759 self.results[msg_id] = self._unwrap_exception(content)
763 self.results[msg_id] = self._unwrap_exception(content)
760
764
761 def _handle_apply_reply(self, msg):
765 def _handle_apply_reply(self, msg):
762 """Save the reply to an apply_request into our results."""
766 """Save the reply to an apply_request into our results."""
763 parent = msg['parent_header']
767 parent = msg['parent_header']
764 msg_id = parent['msg_id']
768 msg_id = parent['msg_id']
765 if msg_id not in self.outstanding:
769 if msg_id not in self.outstanding:
766 if msg_id in self.history:
770 if msg_id in self.history:
767 print ("got stale result: %s"%msg_id)
771 print ("got stale result: %s"%msg_id)
768 print self.results[msg_id]
772 print self.results[msg_id]
769 print msg
773 print msg
770 else:
774 else:
771 print ("got unknown result: %s"%msg_id)
775 print ("got unknown result: %s"%msg_id)
772 else:
776 else:
773 self.outstanding.remove(msg_id)
777 self.outstanding.remove(msg_id)
774 content = msg['content']
778 content = msg['content']
775 header = msg['header']
779 header = msg['header']
776
780
777 # construct metadata:
781 # construct metadata:
778 md = self.metadata[msg_id]
782 md = self.metadata[msg_id]
779 md.update(self._extract_metadata(msg))
783 md.update(self._extract_metadata(msg))
780 # is this redundant?
784 # is this redundant?
781 self.metadata[msg_id] = md
785 self.metadata[msg_id] = md
782
786
783 e_outstanding = self._outstanding_dict[md['engine_uuid']]
787 e_outstanding = self._outstanding_dict[md['engine_uuid']]
784 if msg_id in e_outstanding:
788 if msg_id in e_outstanding:
785 e_outstanding.remove(msg_id)
789 e_outstanding.remove(msg_id)
786
790
787 # construct result:
791 # construct result:
788 if content['status'] == 'ok':
792 if content['status'] == 'ok':
789 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
793 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
790 elif content['status'] == 'aborted':
794 elif content['status'] == 'aborted':
791 self.results[msg_id] = error.TaskAborted(msg_id)
795 self.results[msg_id] = error.TaskAborted(msg_id)
792 elif content['status'] == 'resubmitted':
796 elif content['status'] == 'resubmitted':
793 # TODO: handle resubmission
797 # TODO: handle resubmission
794 pass
798 pass
795 else:
799 else:
796 self.results[msg_id] = self._unwrap_exception(content)
800 self.results[msg_id] = self._unwrap_exception(content)
797
801
798 def _flush_notifications(self):
802 def _flush_notifications(self):
799 """Flush notifications of engine registrations waiting
803 """Flush notifications of engine registrations waiting
800 in ZMQ queue."""
804 in ZMQ queue."""
801 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
805 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
802 while msg is not None:
806 while msg is not None:
803 if self.debug:
807 if self.debug:
804 pprint(msg)
808 pprint(msg)
805 msg_type = msg['header']['msg_type']
809 msg_type = msg['header']['msg_type']
806 handler = self._notification_handlers.get(msg_type, None)
810 handler = self._notification_handlers.get(msg_type, None)
807 if handler is None:
811 if handler is None:
808 raise Exception("Unhandled message type: %s" % msg_type)
812 raise Exception("Unhandled message type: %s" % msg_type)
809 else:
813 else:
810 handler(msg)
814 handler(msg)
811 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
815 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
812
816
813 def _flush_results(self, sock):
817 def _flush_results(self, sock):
814 """Flush task or queue results waiting in ZMQ queue."""
818 """Flush task or queue results waiting in ZMQ queue."""
815 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
819 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
816 while msg is not None:
820 while msg is not None:
817 if self.debug:
821 if self.debug:
818 pprint(msg)
822 pprint(msg)
819 msg_type = msg['header']['msg_type']
823 msg_type = msg['header']['msg_type']
820 handler = self._queue_handlers.get(msg_type, None)
824 handler = self._queue_handlers.get(msg_type, None)
821 if handler is None:
825 if handler is None:
822 raise Exception("Unhandled message type: %s" % msg_type)
826 raise Exception("Unhandled message type: %s" % msg_type)
823 else:
827 else:
824 handler(msg)
828 handler(msg)
825 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
829 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
826
830
827 def _flush_control(self, sock):
831 def _flush_control(self, sock):
828 """Flush replies from the control channel waiting
832 """Flush replies from the control channel waiting
829 in the ZMQ queue.
833 in the ZMQ queue.
830
834
831 Currently: ignore them."""
835 Currently: ignore them."""
832 if self._ignored_control_replies <= 0:
836 if self._ignored_control_replies <= 0:
833 return
837 return
834 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
838 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
835 while msg is not None:
839 while msg is not None:
836 self._ignored_control_replies -= 1
840 self._ignored_control_replies -= 1
837 if self.debug:
841 if self.debug:
838 pprint(msg)
842 pprint(msg)
839 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
843 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
840
844
841 def _flush_ignored_control(self):
845 def _flush_ignored_control(self):
842 """flush ignored control replies"""
846 """flush ignored control replies"""
843 while self._ignored_control_replies > 0:
847 while self._ignored_control_replies > 0:
844 self.session.recv(self._control_socket)
848 self.session.recv(self._control_socket)
845 self._ignored_control_replies -= 1
849 self._ignored_control_replies -= 1
846
850
847 def _flush_ignored_hub_replies(self):
851 def _flush_ignored_hub_replies(self):
848 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
852 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
849 while msg is not None:
853 while msg is not None:
850 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
854 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
851
855
852 def _flush_iopub(self, sock):
856 def _flush_iopub(self, sock):
853 """Flush replies from the iopub channel waiting
857 """Flush replies from the iopub channel waiting
854 in the ZMQ queue.
858 in the ZMQ queue.
855 """
859 """
856 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
860 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
857 while msg is not None:
861 while msg is not None:
858 if self.debug:
862 if self.debug:
859 pprint(msg)
863 pprint(msg)
860 parent = msg['parent_header']
864 parent = msg['parent_header']
861 # ignore IOPub messages with no parent.
865 # ignore IOPub messages with no parent.
862 # Caused by print statements or warnings from before the first execution.
866 # Caused by print statements or warnings from before the first execution.
863 if not parent:
867 if not parent:
864 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
868 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
865 continue
869 continue
866 msg_id = parent['msg_id']
870 msg_id = parent['msg_id']
867 content = msg['content']
871 content = msg['content']
868 header = msg['header']
872 header = msg['header']
869 msg_type = msg['header']['msg_type']
873 msg_type = msg['header']['msg_type']
870
874
871 # init metadata:
875 # init metadata:
872 md = self.metadata[msg_id]
876 md = self.metadata[msg_id]
873
877
874 if msg_type == 'stream':
878 if msg_type == 'stream':
875 name = content['name']
879 name = content['name']
876 s = md[name] or ''
880 s = md[name] or ''
877 md[name] = s + content['data']
881 md[name] = s + content['data']
878 elif msg_type == 'pyerr':
882 elif msg_type == 'pyerr':
879 md.update({'pyerr' : self._unwrap_exception(content)})
883 md.update({'pyerr' : self._unwrap_exception(content)})
880 elif msg_type == 'pyin':
884 elif msg_type == 'pyin':
881 md.update({'pyin' : content['code']})
885 md.update({'pyin' : content['code']})
882 elif msg_type == 'display_data':
886 elif msg_type == 'display_data':
883 md['outputs'].append(content)
887 md['outputs'].append(content)
884 elif msg_type == 'pyout':
888 elif msg_type == 'pyout':
885 md['pyout'] = content
889 md['pyout'] = content
886 elif msg_type == 'data_message':
890 elif msg_type == 'data_message':
887 data, remainder = serialize.unserialize_object(msg['buffers'])
891 data, remainder = serialize.unserialize_object(msg['buffers'])
888 md['data'].update(data)
892 md['data'].update(data)
889 elif msg_type == 'status':
893 elif msg_type == 'status':
890 # idle message comes after all outputs
894 # idle message comes after all outputs
891 if content['execution_state'] == 'idle':
895 if content['execution_state'] == 'idle':
892 md['outputs_ready'] = True
896 md['outputs_ready'] = True
893 else:
897 else:
894 # unhandled msg_type (status, etc.)
898 # unhandled msg_type (status, etc.)
895 pass
899 pass
896
900
897 # reduntant?
901 # reduntant?
898 self.metadata[msg_id] = md
902 self.metadata[msg_id] = md
899
903
900 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
904 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
901
905
902 #--------------------------------------------------------------------------
906 #--------------------------------------------------------------------------
903 # len, getitem
907 # len, getitem
904 #--------------------------------------------------------------------------
908 #--------------------------------------------------------------------------
905
909
906 def __len__(self):
910 def __len__(self):
907 """len(client) returns # of engines."""
911 """len(client) returns # of engines."""
908 return len(self.ids)
912 return len(self.ids)
909
913
910 def __getitem__(self, key):
914 def __getitem__(self, key):
911 """index access returns DirectView multiplexer objects
915 """index access returns DirectView multiplexer objects
912
916
913 Must be int, slice, or list/tuple/xrange of ints"""
917 Must be int, slice, or list/tuple/xrange of ints"""
914 if not isinstance(key, (int, slice, tuple, list, xrange)):
918 if not isinstance(key, (int, slice, tuple, list, xrange)):
915 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
919 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
916 else:
920 else:
917 return self.direct_view(key)
921 return self.direct_view(key)
918
922
919 #--------------------------------------------------------------------------
923 #--------------------------------------------------------------------------
920 # Begin public methods
924 # Begin public methods
921 #--------------------------------------------------------------------------
925 #--------------------------------------------------------------------------
922
926
923 @property
927 @property
924 def ids(self):
928 def ids(self):
925 """Always up-to-date ids property."""
929 """Always up-to-date ids property."""
926 self._flush_notifications()
930 self._flush_notifications()
927 # always copy:
931 # always copy:
928 return list(self._ids)
932 return list(self._ids)
929
933
930 def activate(self, targets='all', suffix=''):
934 def activate(self, targets='all', suffix=''):
931 """Create a DirectView and register it with IPython magics
935 """Create a DirectView and register it with IPython magics
932
936
933 Defines the magics `%px, %autopx, %pxresult, %%px`
937 Defines the magics `%px, %autopx, %pxresult, %%px`
934
938
935 Parameters
939 Parameters
936 ----------
940 ----------
937
941
938 targets: int, list of ints, or 'all'
942 targets: int, list of ints, or 'all'
939 The engines on which the view's magics will run
943 The engines on which the view's magics will run
940 suffix: str [default: '']
944 suffix: str [default: '']
941 The suffix, if any, for the magics. This allows you to have
945 The suffix, if any, for the magics. This allows you to have
942 multiple views associated with parallel magics at the same time.
946 multiple views associated with parallel magics at the same time.
943
947
944 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
948 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
945 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
949 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
946 on engine 0.
950 on engine 0.
947 """
951 """
948 view = self.direct_view(targets)
952 view = self.direct_view(targets)
949 view.block = True
953 view.block = True
950 view.activate(suffix)
954 view.activate(suffix)
951 return view
955 return view
952
956
953 def close(self):
957 def close(self, linger=None):
958 """Close my zmq Sockets
959
960 If `linger`, set the zmq LINGER socket option,
961 which allows discarding of messages.
962 """
954 if self._closed:
963 if self._closed:
955 return
964 return
956 self.stop_spin_thread()
965 self.stop_spin_thread()
957 snames = filter(lambda n: n.endswith('socket'), dir(self))
966 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
958 for socket in map(lambda name: getattr(self, name), snames):
967 for name in snames:
959 if isinstance(socket, zmq.Socket) and not socket.closed:
968 socket = getattr(self, name)
960 socket.close()
969 if socket is not None and not socket.closed:
970 if linger is not None:
971 socket.close(linger=linger)
972 else:
973 socket.close()
961 self._closed = True
974 self._closed = True
962
975
963 def _spin_every(self, interval=1):
976 def _spin_every(self, interval=1):
964 """target func for use in spin_thread"""
977 """target func for use in spin_thread"""
965 while True:
978 while True:
966 if self._stop_spinning.is_set():
979 if self._stop_spinning.is_set():
967 return
980 return
968 time.sleep(interval)
981 time.sleep(interval)
969 self.spin()
982 self.spin()
970
983
971 def spin_thread(self, interval=1):
984 def spin_thread(self, interval=1):
972 """call Client.spin() in a background thread on some regular interval
985 """call Client.spin() in a background thread on some regular interval
973
986
974 This helps ensure that messages don't pile up too much in the zmq queue
987 This helps ensure that messages don't pile up too much in the zmq queue
975 while you are working on other things, or just leaving an idle terminal.
988 while you are working on other things, or just leaving an idle terminal.
976
989
977 It also helps limit potential padding of the `received` timestamp
990 It also helps limit potential padding of the `received` timestamp
978 on AsyncResult objects, used for timings.
991 on AsyncResult objects, used for timings.
979
992
980 Parameters
993 Parameters
981 ----------
994 ----------
982
995
983 interval : float, optional
996 interval : float, optional
984 The interval on which to spin the client in the background thread
997 The interval on which to spin the client in the background thread
985 (simply passed to time.sleep).
998 (simply passed to time.sleep).
986
999
987 Notes
1000 Notes
988 -----
1001 -----
989
1002
990 For precision timing, you may want to use this method to put a bound
1003 For precision timing, you may want to use this method to put a bound
991 on the jitter (in seconds) in `received` timestamps used
1004 on the jitter (in seconds) in `received` timestamps used
992 in AsyncResult.wall_time.
1005 in AsyncResult.wall_time.
993
1006
994 """
1007 """
995 if self._spin_thread is not None:
1008 if self._spin_thread is not None:
996 self.stop_spin_thread()
1009 self.stop_spin_thread()
997 self._stop_spinning.clear()
1010 self._stop_spinning.clear()
998 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1011 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
999 self._spin_thread.daemon = True
1012 self._spin_thread.daemon = True
1000 self._spin_thread.start()
1013 self._spin_thread.start()
1001
1014
1002 def stop_spin_thread(self):
1015 def stop_spin_thread(self):
1003 """stop background spin_thread, if any"""
1016 """stop background spin_thread, if any"""
1004 if self._spin_thread is not None:
1017 if self._spin_thread is not None:
1005 self._stop_spinning.set()
1018 self._stop_spinning.set()
1006 self._spin_thread.join()
1019 self._spin_thread.join()
1007 self._spin_thread = None
1020 self._spin_thread = None
1008
1021
1009 def spin(self):
1022 def spin(self):
1010 """Flush any registration notifications and execution results
1023 """Flush any registration notifications and execution results
1011 waiting in the ZMQ queue.
1024 waiting in the ZMQ queue.
1012 """
1025 """
1013 if self._notification_socket:
1026 if self._notification_socket:
1014 self._flush_notifications()
1027 self._flush_notifications()
1015 if self._iopub_socket:
1028 if self._iopub_socket:
1016 self._flush_iopub(self._iopub_socket)
1029 self._flush_iopub(self._iopub_socket)
1017 if self._mux_socket:
1030 if self._mux_socket:
1018 self._flush_results(self._mux_socket)
1031 self._flush_results(self._mux_socket)
1019 if self._task_socket:
1032 if self._task_socket:
1020 self._flush_results(self._task_socket)
1033 self._flush_results(self._task_socket)
1021 if self._control_socket:
1034 if self._control_socket:
1022 self._flush_control(self._control_socket)
1035 self._flush_control(self._control_socket)
1023 if self._query_socket:
1036 if self._query_socket:
1024 self._flush_ignored_hub_replies()
1037 self._flush_ignored_hub_replies()
1025
1038
1026 def wait(self, jobs=None, timeout=-1):
1039 def wait(self, jobs=None, timeout=-1):
1027 """waits on one or more `jobs`, for up to `timeout` seconds.
1040 """waits on one or more `jobs`, for up to `timeout` seconds.
1028
1041
1029 Parameters
1042 Parameters
1030 ----------
1043 ----------
1031
1044
1032 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1045 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1033 ints are indices to self.history
1046 ints are indices to self.history
1034 strs are msg_ids
1047 strs are msg_ids
1035 default: wait on all outstanding messages
1048 default: wait on all outstanding messages
1036 timeout : float
1049 timeout : float
1037 a time in seconds, after which to give up.
1050 a time in seconds, after which to give up.
1038 default is -1, which means no timeout
1051 default is -1, which means no timeout
1039
1052
1040 Returns
1053 Returns
1041 -------
1054 -------
1042
1055
1043 True : when all msg_ids are done
1056 True : when all msg_ids are done
1044 False : timeout reached, some msg_ids still outstanding
1057 False : timeout reached, some msg_ids still outstanding
1045 """
1058 """
1046 tic = time.time()
1059 tic = time.time()
1047 if jobs is None:
1060 if jobs is None:
1048 theids = self.outstanding
1061 theids = self.outstanding
1049 else:
1062 else:
1050 if isinstance(jobs, (int, basestring, AsyncResult)):
1063 if isinstance(jobs, (int, basestring, AsyncResult)):
1051 jobs = [jobs]
1064 jobs = [jobs]
1052 theids = set()
1065 theids = set()
1053 for job in jobs:
1066 for job in jobs:
1054 if isinstance(job, int):
1067 if isinstance(job, int):
1055 # index access
1068 # index access
1056 job = self.history[job]
1069 job = self.history[job]
1057 elif isinstance(job, AsyncResult):
1070 elif isinstance(job, AsyncResult):
1058 map(theids.add, job.msg_ids)
1071 map(theids.add, job.msg_ids)
1059 continue
1072 continue
1060 theids.add(job)
1073 theids.add(job)
1061 if not theids.intersection(self.outstanding):
1074 if not theids.intersection(self.outstanding):
1062 return True
1075 return True
1063 self.spin()
1076 self.spin()
1064 while theids.intersection(self.outstanding):
1077 while theids.intersection(self.outstanding):
1065 if timeout >= 0 and ( time.time()-tic ) > timeout:
1078 if timeout >= 0 and ( time.time()-tic ) > timeout:
1066 break
1079 break
1067 time.sleep(1e-3)
1080 time.sleep(1e-3)
1068 self.spin()
1081 self.spin()
1069 return len(theids.intersection(self.outstanding)) == 0
1082 return len(theids.intersection(self.outstanding)) == 0
1070
1083
1071 #--------------------------------------------------------------------------
1084 #--------------------------------------------------------------------------
1072 # Control methods
1085 # Control methods
1073 #--------------------------------------------------------------------------
1086 #--------------------------------------------------------------------------
1074
1087
1075 @spin_first
1088 @spin_first
1076 def clear(self, targets=None, block=None):
1089 def clear(self, targets=None, block=None):
1077 """Clear the namespace in target(s)."""
1090 """Clear the namespace in target(s)."""
1078 block = self.block if block is None else block
1091 block = self.block if block is None else block
1079 targets = self._build_targets(targets)[0]
1092 targets = self._build_targets(targets)[0]
1080 for t in targets:
1093 for t in targets:
1081 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1094 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1082 error = False
1095 error = False
1083 if block:
1096 if block:
1084 self._flush_ignored_control()
1097 self._flush_ignored_control()
1085 for i in range(len(targets)):
1098 for i in range(len(targets)):
1086 idents,msg = self.session.recv(self._control_socket,0)
1099 idents,msg = self.session.recv(self._control_socket,0)
1087 if self.debug:
1100 if self.debug:
1088 pprint(msg)
1101 pprint(msg)
1089 if msg['content']['status'] != 'ok':
1102 if msg['content']['status'] != 'ok':
1090 error = self._unwrap_exception(msg['content'])
1103 error = self._unwrap_exception(msg['content'])
1091 else:
1104 else:
1092 self._ignored_control_replies += len(targets)
1105 self._ignored_control_replies += len(targets)
1093 if error:
1106 if error:
1094 raise error
1107 raise error
1095
1108
1096
1109
1097 @spin_first
1110 @spin_first
1098 def abort(self, jobs=None, targets=None, block=None):
1111 def abort(self, jobs=None, targets=None, block=None):
1099 """Abort specific jobs from the execution queues of target(s).
1112 """Abort specific jobs from the execution queues of target(s).
1100
1113
1101 This is a mechanism to prevent jobs that have already been submitted
1114 This is a mechanism to prevent jobs that have already been submitted
1102 from executing.
1115 from executing.
1103
1116
1104 Parameters
1117 Parameters
1105 ----------
1118 ----------
1106
1119
1107 jobs : msg_id, list of msg_ids, or AsyncResult
1120 jobs : msg_id, list of msg_ids, or AsyncResult
1108 The jobs to be aborted
1121 The jobs to be aborted
1109
1122
1110 If unspecified/None: abort all outstanding jobs.
1123 If unspecified/None: abort all outstanding jobs.
1111
1124
1112 """
1125 """
1113 block = self.block if block is None else block
1126 block = self.block if block is None else block
1114 jobs = jobs if jobs is not None else list(self.outstanding)
1127 jobs = jobs if jobs is not None else list(self.outstanding)
1115 targets = self._build_targets(targets)[0]
1128 targets = self._build_targets(targets)[0]
1116
1129
1117 msg_ids = []
1130 msg_ids = []
1118 if isinstance(jobs, (basestring,AsyncResult)):
1131 if isinstance(jobs, (basestring,AsyncResult)):
1119 jobs = [jobs]
1132 jobs = [jobs]
1120 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1133 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1121 if bad_ids:
1134 if bad_ids:
1122 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1135 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1123 for j in jobs:
1136 for j in jobs:
1124 if isinstance(j, AsyncResult):
1137 if isinstance(j, AsyncResult):
1125 msg_ids.extend(j.msg_ids)
1138 msg_ids.extend(j.msg_ids)
1126 else:
1139 else:
1127 msg_ids.append(j)
1140 msg_ids.append(j)
1128 content = dict(msg_ids=msg_ids)
1141 content = dict(msg_ids=msg_ids)
1129 for t in targets:
1142 for t in targets:
1130 self.session.send(self._control_socket, 'abort_request',
1143 self.session.send(self._control_socket, 'abort_request',
1131 content=content, ident=t)
1144 content=content, ident=t)
1132 error = False
1145 error = False
1133 if block:
1146 if block:
1134 self._flush_ignored_control()
1147 self._flush_ignored_control()
1135 for i in range(len(targets)):
1148 for i in range(len(targets)):
1136 idents,msg = self.session.recv(self._control_socket,0)
1149 idents,msg = self.session.recv(self._control_socket,0)
1137 if self.debug:
1150 if self.debug:
1138 pprint(msg)
1151 pprint(msg)
1139 if msg['content']['status'] != 'ok':
1152 if msg['content']['status'] != 'ok':
1140 error = self._unwrap_exception(msg['content'])
1153 error = self._unwrap_exception(msg['content'])
1141 else:
1154 else:
1142 self._ignored_control_replies += len(targets)
1155 self._ignored_control_replies += len(targets)
1143 if error:
1156 if error:
1144 raise error
1157 raise error
1145
1158
1146 @spin_first
1159 @spin_first
1147 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1160 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1148 """Terminates one or more engine processes, optionally including the hub.
1161 """Terminates one or more engine processes, optionally including the hub.
1149
1162
1150 Parameters
1163 Parameters
1151 ----------
1164 ----------
1152
1165
1153 targets: list of ints or 'all' [default: all]
1166 targets: list of ints or 'all' [default: all]
1154 Which engines to shutdown.
1167 Which engines to shutdown.
1155 hub: bool [default: False]
1168 hub: bool [default: False]
1156 Whether to include the Hub. hub=True implies targets='all'.
1169 Whether to include the Hub. hub=True implies targets='all'.
1157 block: bool [default: self.block]
1170 block: bool [default: self.block]
1158 Whether to wait for clean shutdown replies or not.
1171 Whether to wait for clean shutdown replies or not.
1159 restart: bool [default: False]
1172 restart: bool [default: False]
1160 NOT IMPLEMENTED
1173 NOT IMPLEMENTED
1161 whether to restart engines after shutting them down.
1174 whether to restart engines after shutting them down.
1162 """
1175 """
1163 from IPython.parallel.error import NoEnginesRegistered
1176 from IPython.parallel.error import NoEnginesRegistered
1164 if restart:
1177 if restart:
1165 raise NotImplementedError("Engine restart is not yet implemented")
1178 raise NotImplementedError("Engine restart is not yet implemented")
1166
1179
1167 block = self.block if block is None else block
1180 block = self.block if block is None else block
1168 if hub:
1181 if hub:
1169 targets = 'all'
1182 targets = 'all'
1170 try:
1183 try:
1171 targets = self._build_targets(targets)[0]
1184 targets = self._build_targets(targets)[0]
1172 except NoEnginesRegistered:
1185 except NoEnginesRegistered:
1173 targets = []
1186 targets = []
1174 for t in targets:
1187 for t in targets:
1175 self.session.send(self._control_socket, 'shutdown_request',
1188 self.session.send(self._control_socket, 'shutdown_request',
1176 content={'restart':restart},ident=t)
1189 content={'restart':restart},ident=t)
1177 error = False
1190 error = False
1178 if block or hub:
1191 if block or hub:
1179 self._flush_ignored_control()
1192 self._flush_ignored_control()
1180 for i in range(len(targets)):
1193 for i in range(len(targets)):
1181 idents,msg = self.session.recv(self._control_socket, 0)
1194 idents,msg = self.session.recv(self._control_socket, 0)
1182 if self.debug:
1195 if self.debug:
1183 pprint(msg)
1196 pprint(msg)
1184 if msg['content']['status'] != 'ok':
1197 if msg['content']['status'] != 'ok':
1185 error = self._unwrap_exception(msg['content'])
1198 error = self._unwrap_exception(msg['content'])
1186 else:
1199 else:
1187 self._ignored_control_replies += len(targets)
1200 self._ignored_control_replies += len(targets)
1188
1201
1189 if hub:
1202 if hub:
1190 time.sleep(0.25)
1203 time.sleep(0.25)
1191 self.session.send(self._query_socket, 'shutdown_request')
1204 self.session.send(self._query_socket, 'shutdown_request')
1192 idents,msg = self.session.recv(self._query_socket, 0)
1205 idents,msg = self.session.recv(self._query_socket, 0)
1193 if self.debug:
1206 if self.debug:
1194 pprint(msg)
1207 pprint(msg)
1195 if msg['content']['status'] != 'ok':
1208 if msg['content']['status'] != 'ok':
1196 error = self._unwrap_exception(msg['content'])
1209 error = self._unwrap_exception(msg['content'])
1197
1210
1198 if error:
1211 if error:
1199 raise error
1212 raise error
1200
1213
1201 #--------------------------------------------------------------------------
1214 #--------------------------------------------------------------------------
1202 # Execution related methods
1215 # Execution related methods
1203 #--------------------------------------------------------------------------
1216 #--------------------------------------------------------------------------
1204
1217
1205 def _maybe_raise(self, result):
1218 def _maybe_raise(self, result):
1206 """wrapper for maybe raising an exception if apply failed."""
1219 """wrapper for maybe raising an exception if apply failed."""
1207 if isinstance(result, error.RemoteError):
1220 if isinstance(result, error.RemoteError):
1208 raise result
1221 raise result
1209
1222
1210 return result
1223 return result
1211
1224
1212 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1225 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1213 ident=None):
1226 ident=None):
1214 """construct and send an apply message via a socket.
1227 """construct and send an apply message via a socket.
1215
1228
1216 This is the principal method with which all engine execution is performed by views.
1229 This is the principal method with which all engine execution is performed by views.
1217 """
1230 """
1218
1231
1219 if self._closed:
1232 if self._closed:
1220 raise RuntimeError("Client cannot be used after its sockets have been closed")
1233 raise RuntimeError("Client cannot be used after its sockets have been closed")
1221
1234
1222 # defaults:
1235 # defaults:
1223 args = args if args is not None else []
1236 args = args if args is not None else []
1224 kwargs = kwargs if kwargs is not None else {}
1237 kwargs = kwargs if kwargs is not None else {}
1225 metadata = metadata if metadata is not None else {}
1238 metadata = metadata if metadata is not None else {}
1226
1239
1227 # validate arguments
1240 # validate arguments
1228 if not callable(f) and not isinstance(f, Reference):
1241 if not callable(f) and not isinstance(f, Reference):
1229 raise TypeError("f must be callable, not %s"%type(f))
1242 raise TypeError("f must be callable, not %s"%type(f))
1230 if not isinstance(args, (tuple, list)):
1243 if not isinstance(args, (tuple, list)):
1231 raise TypeError("args must be tuple or list, not %s"%type(args))
1244 raise TypeError("args must be tuple or list, not %s"%type(args))
1232 if not isinstance(kwargs, dict):
1245 if not isinstance(kwargs, dict):
1233 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1246 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1234 if not isinstance(metadata, dict):
1247 if not isinstance(metadata, dict):
1235 raise TypeError("metadata must be dict, not %s"%type(metadata))
1248 raise TypeError("metadata must be dict, not %s"%type(metadata))
1236
1249
1237 bufs = serialize.pack_apply_message(f, args, kwargs,
1250 bufs = serialize.pack_apply_message(f, args, kwargs,
1238 buffer_threshold=self.session.buffer_threshold,
1251 buffer_threshold=self.session.buffer_threshold,
1239 item_threshold=self.session.item_threshold,
1252 item_threshold=self.session.item_threshold,
1240 )
1253 )
1241
1254
1242 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1255 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1243 metadata=metadata, track=track)
1256 metadata=metadata, track=track)
1244
1257
1245 msg_id = msg['header']['msg_id']
1258 msg_id = msg['header']['msg_id']
1246 self.outstanding.add(msg_id)
1259 self.outstanding.add(msg_id)
1247 if ident:
1260 if ident:
1248 # possibly routed to a specific engine
1261 # possibly routed to a specific engine
1249 if isinstance(ident, list):
1262 if isinstance(ident, list):
1250 ident = ident[-1]
1263 ident = ident[-1]
1251 if ident in self._engines.values():
1264 if ident in self._engines.values():
1252 # save for later, in case of engine death
1265 # save for later, in case of engine death
1253 self._outstanding_dict[ident].add(msg_id)
1266 self._outstanding_dict[ident].add(msg_id)
1254 self.history.append(msg_id)
1267 self.history.append(msg_id)
1255 self.metadata[msg_id]['submitted'] = datetime.now()
1268 self.metadata[msg_id]['submitted'] = datetime.now()
1256
1269
1257 return msg
1270 return msg
1258
1271
1259 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1272 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1260 """construct and send an execute request via a socket.
1273 """construct and send an execute request via a socket.
1261
1274
1262 """
1275 """
1263
1276
1264 if self._closed:
1277 if self._closed:
1265 raise RuntimeError("Client cannot be used after its sockets have been closed")
1278 raise RuntimeError("Client cannot be used after its sockets have been closed")
1266
1279
1267 # defaults:
1280 # defaults:
1268 metadata = metadata if metadata is not None else {}
1281 metadata = metadata if metadata is not None else {}
1269
1282
1270 # validate arguments
1283 # validate arguments
1271 if not isinstance(code, basestring):
1284 if not isinstance(code, basestring):
1272 raise TypeError("code must be text, not %s" % type(code))
1285 raise TypeError("code must be text, not %s" % type(code))
1273 if not isinstance(metadata, dict):
1286 if not isinstance(metadata, dict):
1274 raise TypeError("metadata must be dict, not %s" % type(metadata))
1287 raise TypeError("metadata must be dict, not %s" % type(metadata))
1275
1288
1276 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1289 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1277
1290
1278
1291
1279 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1292 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1280 metadata=metadata)
1293 metadata=metadata)
1281
1294
1282 msg_id = msg['header']['msg_id']
1295 msg_id = msg['header']['msg_id']
1283 self.outstanding.add(msg_id)
1296 self.outstanding.add(msg_id)
1284 if ident:
1297 if ident:
1285 # possibly routed to a specific engine
1298 # possibly routed to a specific engine
1286 if isinstance(ident, list):
1299 if isinstance(ident, list):
1287 ident = ident[-1]
1300 ident = ident[-1]
1288 if ident in self._engines.values():
1301 if ident in self._engines.values():
1289 # save for later, in case of engine death
1302 # save for later, in case of engine death
1290 self._outstanding_dict[ident].add(msg_id)
1303 self._outstanding_dict[ident].add(msg_id)
1291 self.history.append(msg_id)
1304 self.history.append(msg_id)
1292 self.metadata[msg_id]['submitted'] = datetime.now()
1305 self.metadata[msg_id]['submitted'] = datetime.now()
1293
1306
1294 return msg
1307 return msg
1295
1308
1296 #--------------------------------------------------------------------------
1309 #--------------------------------------------------------------------------
1297 # construct a View object
1310 # construct a View object
1298 #--------------------------------------------------------------------------
1311 #--------------------------------------------------------------------------
1299
1312
1300 def load_balanced_view(self, targets=None):
1313 def load_balanced_view(self, targets=None):
1301 """construct a DirectView object.
1314 """construct a DirectView object.
1302
1315
1303 If no arguments are specified, create a LoadBalancedView
1316 If no arguments are specified, create a LoadBalancedView
1304 using all engines.
1317 using all engines.
1305
1318
1306 Parameters
1319 Parameters
1307 ----------
1320 ----------
1308
1321
1309 targets: list,slice,int,etc. [default: use all engines]
1322 targets: list,slice,int,etc. [default: use all engines]
1310 The subset of engines across which to load-balance
1323 The subset of engines across which to load-balance
1311 """
1324 """
1312 if targets == 'all':
1325 if targets == 'all':
1313 targets = None
1326 targets = None
1314 if targets is not None:
1327 if targets is not None:
1315 targets = self._build_targets(targets)[1]
1328 targets = self._build_targets(targets)[1]
1316 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1329 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1317
1330
1318 def direct_view(self, targets='all'):
1331 def direct_view(self, targets='all'):
1319 """construct a DirectView object.
1332 """construct a DirectView object.
1320
1333
1321 If no targets are specified, create a DirectView using all engines.
1334 If no targets are specified, create a DirectView using all engines.
1322
1335
1323 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1336 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1324 evaluate the target engines at each execution, whereas rc[:] will connect to
1337 evaluate the target engines at each execution, whereas rc[:] will connect to
1325 all *current* engines, and that list will not change.
1338 all *current* engines, and that list will not change.
1326
1339
1327 That is, 'all' will always use all engines, whereas rc[:] will not use
1340 That is, 'all' will always use all engines, whereas rc[:] will not use
1328 engines added after the DirectView is constructed.
1341 engines added after the DirectView is constructed.
1329
1342
1330 Parameters
1343 Parameters
1331 ----------
1344 ----------
1332
1345
1333 targets: list,slice,int,etc. [default: use all engines]
1346 targets: list,slice,int,etc. [default: use all engines]
1334 The engines to use for the View
1347 The engines to use for the View
1335 """
1348 """
1336 single = isinstance(targets, int)
1349 single = isinstance(targets, int)
1337 # allow 'all' to be lazily evaluated at each execution
1350 # allow 'all' to be lazily evaluated at each execution
1338 if targets != 'all':
1351 if targets != 'all':
1339 targets = self._build_targets(targets)[1]
1352 targets = self._build_targets(targets)[1]
1340 if single:
1353 if single:
1341 targets = targets[0]
1354 targets = targets[0]
1342 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1355 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1343
1356
1344 #--------------------------------------------------------------------------
1357 #--------------------------------------------------------------------------
1345 # Query methods
1358 # Query methods
1346 #--------------------------------------------------------------------------
1359 #--------------------------------------------------------------------------
1347
1360
1348 @spin_first
1361 @spin_first
1349 def get_result(self, indices_or_msg_ids=None, block=None):
1362 def get_result(self, indices_or_msg_ids=None, block=None):
1350 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1363 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1351
1364
1352 If the client already has the results, no request to the Hub will be made.
1365 If the client already has the results, no request to the Hub will be made.
1353
1366
1354 This is a convenient way to construct AsyncResult objects, which are wrappers
1367 This is a convenient way to construct AsyncResult objects, which are wrappers
1355 that include metadata about execution, and allow for awaiting results that
1368 that include metadata about execution, and allow for awaiting results that
1356 were not submitted by this Client.
1369 were not submitted by this Client.
1357
1370
1358 It can also be a convenient way to retrieve the metadata associated with
1371 It can also be a convenient way to retrieve the metadata associated with
1359 blocking execution, since it always retrieves
1372 blocking execution, since it always retrieves
1360
1373
1361 Examples
1374 Examples
1362 --------
1375 --------
1363 ::
1376 ::
1364
1377
1365 In [10]: r = client.apply()
1378 In [10]: r = client.apply()
1366
1379
1367 Parameters
1380 Parameters
1368 ----------
1381 ----------
1369
1382
1370 indices_or_msg_ids : integer history index, str msg_id, or list of either
1383 indices_or_msg_ids : integer history index, str msg_id, or list of either
1371 The indices or msg_ids of indices to be retrieved
1384 The indices or msg_ids of indices to be retrieved
1372
1385
1373 block : bool
1386 block : bool
1374 Whether to wait for the result to be done
1387 Whether to wait for the result to be done
1375
1388
1376 Returns
1389 Returns
1377 -------
1390 -------
1378
1391
1379 AsyncResult
1392 AsyncResult
1380 A single AsyncResult object will always be returned.
1393 A single AsyncResult object will always be returned.
1381
1394
1382 AsyncHubResult
1395 AsyncHubResult
1383 A subclass of AsyncResult that retrieves results from the Hub
1396 A subclass of AsyncResult that retrieves results from the Hub
1384
1397
1385 """
1398 """
1386 block = self.block if block is None else block
1399 block = self.block if block is None else block
1387 if indices_or_msg_ids is None:
1400 if indices_or_msg_ids is None:
1388 indices_or_msg_ids = -1
1401 indices_or_msg_ids = -1
1389
1402
1390 single_result = False
1403 single_result = False
1391 if not isinstance(indices_or_msg_ids, (list,tuple)):
1404 if not isinstance(indices_or_msg_ids, (list,tuple)):
1392 indices_or_msg_ids = [indices_or_msg_ids]
1405 indices_or_msg_ids = [indices_or_msg_ids]
1393 single_result = True
1406 single_result = True
1394
1407
1395 theids = []
1408 theids = []
1396 for id in indices_or_msg_ids:
1409 for id in indices_or_msg_ids:
1397 if isinstance(id, int):
1410 if isinstance(id, int):
1398 id = self.history[id]
1411 id = self.history[id]
1399 if not isinstance(id, basestring):
1412 if not isinstance(id, basestring):
1400 raise TypeError("indices must be str or int, not %r"%id)
1413 raise TypeError("indices must be str or int, not %r"%id)
1401 theids.append(id)
1414 theids.append(id)
1402
1415
1403 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1416 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1404 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1417 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1405
1418
1406 # given single msg_id initially, get_result shot get the result itself,
1419 # given single msg_id initially, get_result shot get the result itself,
1407 # not a length-one list
1420 # not a length-one list
1408 if single_result:
1421 if single_result:
1409 theids = theids[0]
1422 theids = theids[0]
1410
1423
1411 if remote_ids:
1424 if remote_ids:
1412 ar = AsyncHubResult(self, msg_ids=theids)
1425 ar = AsyncHubResult(self, msg_ids=theids)
1413 else:
1426 else:
1414 ar = AsyncResult(self, msg_ids=theids)
1427 ar = AsyncResult(self, msg_ids=theids)
1415
1428
1416 if block:
1429 if block:
1417 ar.wait()
1430 ar.wait()
1418
1431
1419 return ar
1432 return ar
1420
1433
1421 @spin_first
1434 @spin_first
1422 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1435 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1423 """Resubmit one or more tasks.
1436 """Resubmit one or more tasks.
1424
1437
1425 in-flight tasks may not be resubmitted.
1438 in-flight tasks may not be resubmitted.
1426
1439
1427 Parameters
1440 Parameters
1428 ----------
1441 ----------
1429
1442
1430 indices_or_msg_ids : integer history index, str msg_id, or list of either
1443 indices_or_msg_ids : integer history index, str msg_id, or list of either
1431 The indices or msg_ids of indices to be retrieved
1444 The indices or msg_ids of indices to be retrieved
1432
1445
1433 block : bool
1446 block : bool
1434 Whether to wait for the result to be done
1447 Whether to wait for the result to be done
1435
1448
1436 Returns
1449 Returns
1437 -------
1450 -------
1438
1451
1439 AsyncHubResult
1452 AsyncHubResult
1440 A subclass of AsyncResult that retrieves results from the Hub
1453 A subclass of AsyncResult that retrieves results from the Hub
1441
1454
1442 """
1455 """
1443 block = self.block if block is None else block
1456 block = self.block if block is None else block
1444 if indices_or_msg_ids is None:
1457 if indices_or_msg_ids is None:
1445 indices_or_msg_ids = -1
1458 indices_or_msg_ids = -1
1446
1459
1447 if not isinstance(indices_or_msg_ids, (list,tuple)):
1460 if not isinstance(indices_or_msg_ids, (list,tuple)):
1448 indices_or_msg_ids = [indices_or_msg_ids]
1461 indices_or_msg_ids = [indices_or_msg_ids]
1449
1462
1450 theids = []
1463 theids = []
1451 for id in indices_or_msg_ids:
1464 for id in indices_or_msg_ids:
1452 if isinstance(id, int):
1465 if isinstance(id, int):
1453 id = self.history[id]
1466 id = self.history[id]
1454 if not isinstance(id, basestring):
1467 if not isinstance(id, basestring):
1455 raise TypeError("indices must be str or int, not %r"%id)
1468 raise TypeError("indices must be str or int, not %r"%id)
1456 theids.append(id)
1469 theids.append(id)
1457
1470
1458 content = dict(msg_ids = theids)
1471 content = dict(msg_ids = theids)
1459
1472
1460 self.session.send(self._query_socket, 'resubmit_request', content)
1473 self.session.send(self._query_socket, 'resubmit_request', content)
1461
1474
1462 zmq.select([self._query_socket], [], [])
1475 zmq.select([self._query_socket], [], [])
1463 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1476 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1464 if self.debug:
1477 if self.debug:
1465 pprint(msg)
1478 pprint(msg)
1466 content = msg['content']
1479 content = msg['content']
1467 if content['status'] != 'ok':
1480 if content['status'] != 'ok':
1468 raise self._unwrap_exception(content)
1481 raise self._unwrap_exception(content)
1469 mapping = content['resubmitted']
1482 mapping = content['resubmitted']
1470 new_ids = [ mapping[msg_id] for msg_id in theids ]
1483 new_ids = [ mapping[msg_id] for msg_id in theids ]
1471
1484
1472 ar = AsyncHubResult(self, msg_ids=new_ids)
1485 ar = AsyncHubResult(self, msg_ids=new_ids)
1473
1486
1474 if block:
1487 if block:
1475 ar.wait()
1488 ar.wait()
1476
1489
1477 return ar
1490 return ar
1478
1491
1479 @spin_first
1492 @spin_first
1480 def result_status(self, msg_ids, status_only=True):
1493 def result_status(self, msg_ids, status_only=True):
1481 """Check on the status of the result(s) of the apply request with `msg_ids`.
1494 """Check on the status of the result(s) of the apply request with `msg_ids`.
1482
1495
1483 If status_only is False, then the actual results will be retrieved, else
1496 If status_only is False, then the actual results will be retrieved, else
1484 only the status of the results will be checked.
1497 only the status of the results will be checked.
1485
1498
1486 Parameters
1499 Parameters
1487 ----------
1500 ----------
1488
1501
1489 msg_ids : list of msg_ids
1502 msg_ids : list of msg_ids
1490 if int:
1503 if int:
1491 Passed as index to self.history for convenience.
1504 Passed as index to self.history for convenience.
1492 status_only : bool (default: True)
1505 status_only : bool (default: True)
1493 if False:
1506 if False:
1494 Retrieve the actual results of completed tasks.
1507 Retrieve the actual results of completed tasks.
1495
1508
1496 Returns
1509 Returns
1497 -------
1510 -------
1498
1511
1499 results : dict
1512 results : dict
1500 There will always be the keys 'pending' and 'completed', which will
1513 There will always be the keys 'pending' and 'completed', which will
1501 be lists of msg_ids that are incomplete or complete. If `status_only`
1514 be lists of msg_ids that are incomplete or complete. If `status_only`
1502 is False, then completed results will be keyed by their `msg_id`.
1515 is False, then completed results will be keyed by their `msg_id`.
1503 """
1516 """
1504 if not isinstance(msg_ids, (list,tuple)):
1517 if not isinstance(msg_ids, (list,tuple)):
1505 msg_ids = [msg_ids]
1518 msg_ids = [msg_ids]
1506
1519
1507 theids = []
1520 theids = []
1508 for msg_id in msg_ids:
1521 for msg_id in msg_ids:
1509 if isinstance(msg_id, int):
1522 if isinstance(msg_id, int):
1510 msg_id = self.history[msg_id]
1523 msg_id = self.history[msg_id]
1511 if not isinstance(msg_id, basestring):
1524 if not isinstance(msg_id, basestring):
1512 raise TypeError("msg_ids must be str, not %r"%msg_id)
1525 raise TypeError("msg_ids must be str, not %r"%msg_id)
1513 theids.append(msg_id)
1526 theids.append(msg_id)
1514
1527
1515 completed = []
1528 completed = []
1516 local_results = {}
1529 local_results = {}
1517
1530
1518 # comment this block out to temporarily disable local shortcut:
1531 # comment this block out to temporarily disable local shortcut:
1519 for msg_id in theids:
1532 for msg_id in theids:
1520 if msg_id in self.results:
1533 if msg_id in self.results:
1521 completed.append(msg_id)
1534 completed.append(msg_id)
1522 local_results[msg_id] = self.results[msg_id]
1535 local_results[msg_id] = self.results[msg_id]
1523 theids.remove(msg_id)
1536 theids.remove(msg_id)
1524
1537
1525 if theids: # some not locally cached
1538 if theids: # some not locally cached
1526 content = dict(msg_ids=theids, status_only=status_only)
1539 content = dict(msg_ids=theids, status_only=status_only)
1527 msg = self.session.send(self._query_socket, "result_request", content=content)
1540 msg = self.session.send(self._query_socket, "result_request", content=content)
1528 zmq.select([self._query_socket], [], [])
1541 zmq.select([self._query_socket], [], [])
1529 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1542 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1530 if self.debug:
1543 if self.debug:
1531 pprint(msg)
1544 pprint(msg)
1532 content = msg['content']
1545 content = msg['content']
1533 if content['status'] != 'ok':
1546 if content['status'] != 'ok':
1534 raise self._unwrap_exception(content)
1547 raise self._unwrap_exception(content)
1535 buffers = msg['buffers']
1548 buffers = msg['buffers']
1536 else:
1549 else:
1537 content = dict(completed=[],pending=[])
1550 content = dict(completed=[],pending=[])
1538
1551
1539 content['completed'].extend(completed)
1552 content['completed'].extend(completed)
1540
1553
1541 if status_only:
1554 if status_only:
1542 return content
1555 return content
1543
1556
1544 failures = []
1557 failures = []
1545 # load cached results into result:
1558 # load cached results into result:
1546 content.update(local_results)
1559 content.update(local_results)
1547
1560
1548 # update cache with results:
1561 # update cache with results:
1549 for msg_id in sorted(theids):
1562 for msg_id in sorted(theids):
1550 if msg_id in content['completed']:
1563 if msg_id in content['completed']:
1551 rec = content[msg_id]
1564 rec = content[msg_id]
1552 parent = rec['header']
1565 parent = rec['header']
1553 header = rec['result_header']
1566 header = rec['result_header']
1554 rcontent = rec['result_content']
1567 rcontent = rec['result_content']
1555 iodict = rec['io']
1568 iodict = rec['io']
1556 if isinstance(rcontent, str):
1569 if isinstance(rcontent, str):
1557 rcontent = self.session.unpack(rcontent)
1570 rcontent = self.session.unpack(rcontent)
1558
1571
1559 md = self.metadata[msg_id]
1572 md = self.metadata[msg_id]
1560 md_msg = dict(
1573 md_msg = dict(
1561 content=rcontent,
1574 content=rcontent,
1562 parent_header=parent,
1575 parent_header=parent,
1563 header=header,
1576 header=header,
1564 metadata=rec['result_metadata'],
1577 metadata=rec['result_metadata'],
1565 )
1578 )
1566 md.update(self._extract_metadata(md_msg))
1579 md.update(self._extract_metadata(md_msg))
1567 if rec.get('received'):
1580 if rec.get('received'):
1568 md['received'] = rec['received']
1581 md['received'] = rec['received']
1569 md.update(iodict)
1582 md.update(iodict)
1570
1583
1571 if rcontent['status'] == 'ok':
1584 if rcontent['status'] == 'ok':
1572 if header['msg_type'] == 'apply_reply':
1585 if header['msg_type'] == 'apply_reply':
1573 res,buffers = serialize.unserialize_object(buffers)
1586 res,buffers = serialize.unserialize_object(buffers)
1574 elif header['msg_type'] == 'execute_reply':
1587 elif header['msg_type'] == 'execute_reply':
1575 res = ExecuteReply(msg_id, rcontent, md)
1588 res = ExecuteReply(msg_id, rcontent, md)
1576 else:
1589 else:
1577 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1590 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1578 else:
1591 else:
1579 res = self._unwrap_exception(rcontent)
1592 res = self._unwrap_exception(rcontent)
1580 failures.append(res)
1593 failures.append(res)
1581
1594
1582 self.results[msg_id] = res
1595 self.results[msg_id] = res
1583 content[msg_id] = res
1596 content[msg_id] = res
1584
1597
1585 if len(theids) == 1 and failures:
1598 if len(theids) == 1 and failures:
1586 raise failures[0]
1599 raise failures[0]
1587
1600
1588 error.collect_exceptions(failures, "result_status")
1601 error.collect_exceptions(failures, "result_status")
1589 return content
1602 return content
1590
1603
1591 @spin_first
1604 @spin_first
1592 def queue_status(self, targets='all', verbose=False):
1605 def queue_status(self, targets='all', verbose=False):
1593 """Fetch the status of engine queues.
1606 """Fetch the status of engine queues.
1594
1607
1595 Parameters
1608 Parameters
1596 ----------
1609 ----------
1597
1610
1598 targets : int/str/list of ints/strs
1611 targets : int/str/list of ints/strs
1599 the engines whose states are to be queried.
1612 the engines whose states are to be queried.
1600 default : all
1613 default : all
1601 verbose : bool
1614 verbose : bool
1602 Whether to return lengths only, or lists of ids for each element
1615 Whether to return lengths only, or lists of ids for each element
1603 """
1616 """
1604 if targets == 'all':
1617 if targets == 'all':
1605 # allow 'all' to be evaluated on the engine
1618 # allow 'all' to be evaluated on the engine
1606 engine_ids = None
1619 engine_ids = None
1607 else:
1620 else:
1608 engine_ids = self._build_targets(targets)[1]
1621 engine_ids = self._build_targets(targets)[1]
1609 content = dict(targets=engine_ids, verbose=verbose)
1622 content = dict(targets=engine_ids, verbose=verbose)
1610 self.session.send(self._query_socket, "queue_request", content=content)
1623 self.session.send(self._query_socket, "queue_request", content=content)
1611 idents,msg = self.session.recv(self._query_socket, 0)
1624 idents,msg = self.session.recv(self._query_socket, 0)
1612 if self.debug:
1625 if self.debug:
1613 pprint(msg)
1626 pprint(msg)
1614 content = msg['content']
1627 content = msg['content']
1615 status = content.pop('status')
1628 status = content.pop('status')
1616 if status != 'ok':
1629 if status != 'ok':
1617 raise self._unwrap_exception(content)
1630 raise self._unwrap_exception(content)
1618 content = rekey(content)
1631 content = rekey(content)
1619 if isinstance(targets, int):
1632 if isinstance(targets, int):
1620 return content[targets]
1633 return content[targets]
1621 else:
1634 else:
1622 return content
1635 return content
1623
1636
1624 def _build_msgids_from_target(self, targets=None):
1637 def _build_msgids_from_target(self, targets=None):
1625 """Build a list of msg_ids from the list of engine targets"""
1638 """Build a list of msg_ids from the list of engine targets"""
1626 if not targets: # needed as _build_targets otherwise uses all engines
1639 if not targets: # needed as _build_targets otherwise uses all engines
1627 return []
1640 return []
1628 target_ids = self._build_targets(targets)[0]
1641 target_ids = self._build_targets(targets)[0]
1629 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1642 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1630
1643
1631 def _build_msgids_from_jobs(self, jobs=None):
1644 def _build_msgids_from_jobs(self, jobs=None):
1632 """Build a list of msg_ids from "jobs" """
1645 """Build a list of msg_ids from "jobs" """
1633 if not jobs:
1646 if not jobs:
1634 return []
1647 return []
1635 msg_ids = []
1648 msg_ids = []
1636 if isinstance(jobs, (basestring,AsyncResult)):
1649 if isinstance(jobs, (basestring,AsyncResult)):
1637 jobs = [jobs]
1650 jobs = [jobs]
1638 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1651 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1639 if bad_ids:
1652 if bad_ids:
1640 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1653 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1641 for j in jobs:
1654 for j in jobs:
1642 if isinstance(j, AsyncResult):
1655 if isinstance(j, AsyncResult):
1643 msg_ids.extend(j.msg_ids)
1656 msg_ids.extend(j.msg_ids)
1644 else:
1657 else:
1645 msg_ids.append(j)
1658 msg_ids.append(j)
1646 return msg_ids
1659 return msg_ids
1647
1660
1648 def purge_local_results(self, jobs=[], targets=[]):
1661 def purge_local_results(self, jobs=[], targets=[]):
1649 """Clears the client caches of results and frees such memory.
1662 """Clears the client caches of results and frees such memory.
1650
1663
1651 Individual results can be purged by msg_id, or the entire
1664 Individual results can be purged by msg_id, or the entire
1652 history of specific targets can be purged.
1665 history of specific targets can be purged.
1653
1666
1654 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1667 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1655
1668
1656 The client must have no outstanding tasks before purging the caches.
1669 The client must have no outstanding tasks before purging the caches.
1657 Raises `AssertionError` if there are still outstanding tasks.
1670 Raises `AssertionError` if there are still outstanding tasks.
1658
1671
1659 After this call all `AsyncResults` are invalid and should be discarded.
1672 After this call all `AsyncResults` are invalid and should be discarded.
1660
1673
1661 If you must "reget" the results, you can still do so by using
1674 If you must "reget" the results, you can still do so by using
1662 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1675 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1663 redownload the results from the hub if they are still available
1676 redownload the results from the hub if they are still available
1664 (i.e `client.purge_hub_results(...)` has not been called.
1677 (i.e `client.purge_hub_results(...)` has not been called.
1665
1678
1666 Parameters
1679 Parameters
1667 ----------
1680 ----------
1668
1681
1669 jobs : str or list of str or AsyncResult objects
1682 jobs : str or list of str or AsyncResult objects
1670 the msg_ids whose results should be purged.
1683 the msg_ids whose results should be purged.
1671 targets : int/str/list of ints/strs
1684 targets : int/str/list of ints/strs
1672 The targets, by int_id, whose entire results are to be purged.
1685 The targets, by int_id, whose entire results are to be purged.
1673
1686
1674 default : None
1687 default : None
1675 """
1688 """
1676 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1689 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1677
1690
1678 if not targets and not jobs:
1691 if not targets and not jobs:
1679 raise ValueError("Must specify at least one of `targets` and `jobs`")
1692 raise ValueError("Must specify at least one of `targets` and `jobs`")
1680
1693
1681 if jobs == 'all':
1694 if jobs == 'all':
1682 self.results.clear()
1695 self.results.clear()
1683 self.metadata.clear()
1696 self.metadata.clear()
1684 return
1697 return
1685 else:
1698 else:
1686 msg_ids = []
1699 msg_ids = []
1687 msg_ids.extend(self._build_msgids_from_target(targets))
1700 msg_ids.extend(self._build_msgids_from_target(targets))
1688 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1701 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1689 map(self.results.pop, msg_ids)
1702 map(self.results.pop, msg_ids)
1690 map(self.metadata.pop, msg_ids)
1703 map(self.metadata.pop, msg_ids)
1691
1704
1692
1705
1693 @spin_first
1706 @spin_first
1694 def purge_hub_results(self, jobs=[], targets=[]):
1707 def purge_hub_results(self, jobs=[], targets=[]):
1695 """Tell the Hub to forget results.
1708 """Tell the Hub to forget results.
1696
1709
1697 Individual results can be purged by msg_id, or the entire
1710 Individual results can be purged by msg_id, or the entire
1698 history of specific targets can be purged.
1711 history of specific targets can be purged.
1699
1712
1700 Use `purge_results('all')` to scrub everything from the Hub's db.
1713 Use `purge_results('all')` to scrub everything from the Hub's db.
1701
1714
1702 Parameters
1715 Parameters
1703 ----------
1716 ----------
1704
1717
1705 jobs : str or list of str or AsyncResult objects
1718 jobs : str or list of str or AsyncResult objects
1706 the msg_ids whose results should be forgotten.
1719 the msg_ids whose results should be forgotten.
1707 targets : int/str/list of ints/strs
1720 targets : int/str/list of ints/strs
1708 The targets, by int_id, whose entire history is to be purged.
1721 The targets, by int_id, whose entire history is to be purged.
1709
1722
1710 default : None
1723 default : None
1711 """
1724 """
1712 if not targets and not jobs:
1725 if not targets and not jobs:
1713 raise ValueError("Must specify at least one of `targets` and `jobs`")
1726 raise ValueError("Must specify at least one of `targets` and `jobs`")
1714 if targets:
1727 if targets:
1715 targets = self._build_targets(targets)[1]
1728 targets = self._build_targets(targets)[1]
1716
1729
1717 # construct msg_ids from jobs
1730 # construct msg_ids from jobs
1718 if jobs == 'all':
1731 if jobs == 'all':
1719 msg_ids = jobs
1732 msg_ids = jobs
1720 else:
1733 else:
1721 msg_ids = self._build_msgids_from_jobs(jobs)
1734 msg_ids = self._build_msgids_from_jobs(jobs)
1722
1735
1723 content = dict(engine_ids=targets, msg_ids=msg_ids)
1736 content = dict(engine_ids=targets, msg_ids=msg_ids)
1724 self.session.send(self._query_socket, "purge_request", content=content)
1737 self.session.send(self._query_socket, "purge_request", content=content)
1725 idents, msg = self.session.recv(self._query_socket, 0)
1738 idents, msg = self.session.recv(self._query_socket, 0)
1726 if self.debug:
1739 if self.debug:
1727 pprint(msg)
1740 pprint(msg)
1728 content = msg['content']
1741 content = msg['content']
1729 if content['status'] != 'ok':
1742 if content['status'] != 'ok':
1730 raise self._unwrap_exception(content)
1743 raise self._unwrap_exception(content)
1731
1744
1732 def purge_results(self, jobs=[], targets=[]):
1745 def purge_results(self, jobs=[], targets=[]):
1733 """Clears the cached results from both the hub and the local client
1746 """Clears the cached results from both the hub and the local client
1734
1747
1735 Individual results can be purged by msg_id, or the entire
1748 Individual results can be purged by msg_id, or the entire
1736 history of specific targets can be purged.
1749 history of specific targets can be purged.
1737
1750
1738 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1751 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1739 the Client's db.
1752 the Client's db.
1740
1753
1741 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1754 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1742 the same arguments.
1755 the same arguments.
1743
1756
1744 Parameters
1757 Parameters
1745 ----------
1758 ----------
1746
1759
1747 jobs : str or list of str or AsyncResult objects
1760 jobs : str or list of str or AsyncResult objects
1748 the msg_ids whose results should be forgotten.
1761 the msg_ids whose results should be forgotten.
1749 targets : int/str/list of ints/strs
1762 targets : int/str/list of ints/strs
1750 The targets, by int_id, whose entire history is to be purged.
1763 The targets, by int_id, whose entire history is to be purged.
1751
1764
1752 default : None
1765 default : None
1753 """
1766 """
1754 self.purge_local_results(jobs=jobs, targets=targets)
1767 self.purge_local_results(jobs=jobs, targets=targets)
1755 self.purge_hub_results(jobs=jobs, targets=targets)
1768 self.purge_hub_results(jobs=jobs, targets=targets)
1756
1769
1757 def purge_everything(self):
1770 def purge_everything(self):
1758 """Clears all content from previous Tasks from both the hub and the local client
1771 """Clears all content from previous Tasks from both the hub and the local client
1759
1772
1760 In addition to calling `purge_results("all")` it also deletes the history and
1773 In addition to calling `purge_results("all")` it also deletes the history and
1761 other bookkeeping lists.
1774 other bookkeeping lists.
1762 """
1775 """
1763 self.purge_results("all")
1776 self.purge_results("all")
1764 self.history = []
1777 self.history = []
1765 self.session.digest_history.clear()
1778 self.session.digest_history.clear()
1766
1779
1767 @spin_first
1780 @spin_first
1768 def hub_history(self):
1781 def hub_history(self):
1769 """Get the Hub's history
1782 """Get the Hub's history
1770
1783
1771 Just like the Client, the Hub has a history, which is a list of msg_ids.
1784 Just like the Client, the Hub has a history, which is a list of msg_ids.
1772 This will contain the history of all clients, and, depending on configuration,
1785 This will contain the history of all clients, and, depending on configuration,
1773 may contain history across multiple cluster sessions.
1786 may contain history across multiple cluster sessions.
1774
1787
1775 Any msg_id returned here is a valid argument to `get_result`.
1788 Any msg_id returned here is a valid argument to `get_result`.
1776
1789
1777 Returns
1790 Returns
1778 -------
1791 -------
1779
1792
1780 msg_ids : list of strs
1793 msg_ids : list of strs
1781 list of all msg_ids, ordered by task submission time.
1794 list of all msg_ids, ordered by task submission time.
1782 """
1795 """
1783
1796
1784 self.session.send(self._query_socket, "history_request", content={})
1797 self.session.send(self._query_socket, "history_request", content={})
1785 idents, msg = self.session.recv(self._query_socket, 0)
1798 idents, msg = self.session.recv(self._query_socket, 0)
1786
1799
1787 if self.debug:
1800 if self.debug:
1788 pprint(msg)
1801 pprint(msg)
1789 content = msg['content']
1802 content = msg['content']
1790 if content['status'] != 'ok':
1803 if content['status'] != 'ok':
1791 raise self._unwrap_exception(content)
1804 raise self._unwrap_exception(content)
1792 else:
1805 else:
1793 return content['history']
1806 return content['history']
1794
1807
1795 @spin_first
1808 @spin_first
1796 def db_query(self, query, keys=None):
1809 def db_query(self, query, keys=None):
1797 """Query the Hub's TaskRecord database
1810 """Query the Hub's TaskRecord database
1798
1811
1799 This will return a list of task record dicts that match `query`
1812 This will return a list of task record dicts that match `query`
1800
1813
1801 Parameters
1814 Parameters
1802 ----------
1815 ----------
1803
1816
1804 query : mongodb query dict
1817 query : mongodb query dict
1805 The search dict. See mongodb query docs for details.
1818 The search dict. See mongodb query docs for details.
1806 keys : list of strs [optional]
1819 keys : list of strs [optional]
1807 The subset of keys to be returned. The default is to fetch everything but buffers.
1820 The subset of keys to be returned. The default is to fetch everything but buffers.
1808 'msg_id' will *always* be included.
1821 'msg_id' will *always* be included.
1809 """
1822 """
1810 if isinstance(keys, basestring):
1823 if isinstance(keys, basestring):
1811 keys = [keys]
1824 keys = [keys]
1812 content = dict(query=query, keys=keys)
1825 content = dict(query=query, keys=keys)
1813 self.session.send(self._query_socket, "db_request", content=content)
1826 self.session.send(self._query_socket, "db_request", content=content)
1814 idents, msg = self.session.recv(self._query_socket, 0)
1827 idents, msg = self.session.recv(self._query_socket, 0)
1815 if self.debug:
1828 if self.debug:
1816 pprint(msg)
1829 pprint(msg)
1817 content = msg['content']
1830 content = msg['content']
1818 if content['status'] != 'ok':
1831 if content['status'] != 'ok':
1819 raise self._unwrap_exception(content)
1832 raise self._unwrap_exception(content)
1820
1833
1821 records = content['records']
1834 records = content['records']
1822
1835
1823 buffer_lens = content['buffer_lens']
1836 buffer_lens = content['buffer_lens']
1824 result_buffer_lens = content['result_buffer_lens']
1837 result_buffer_lens = content['result_buffer_lens']
1825 buffers = msg['buffers']
1838 buffers = msg['buffers']
1826 has_bufs = buffer_lens is not None
1839 has_bufs = buffer_lens is not None
1827 has_rbufs = result_buffer_lens is not None
1840 has_rbufs = result_buffer_lens is not None
1828 for i,rec in enumerate(records):
1841 for i,rec in enumerate(records):
1829 # relink buffers
1842 # relink buffers
1830 if has_bufs:
1843 if has_bufs:
1831 blen = buffer_lens[i]
1844 blen = buffer_lens[i]
1832 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1845 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1833 if has_rbufs:
1846 if has_rbufs:
1834 blen = result_buffer_lens[i]
1847 blen = result_buffer_lens[i]
1835 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1848 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1836
1849
1837 return records
1850 return records
1838
1851
1839 __all__ = [ 'Client' ]
1852 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now