##// END OF EJS Templates
Allow shutdown when no engines are registered.
David Hirschfeld -
Show More
@@ -1,1726 +1,1729 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
36
36
37 from IPython.utils.coloransi import TermColors
37 from IPython.utils.coloransi import TermColors
38 from IPython.utils.jsonutil import rekey
38 from IPython.utils.jsonutil import rekey
39 from IPython.utils.localinterfaces import LOCAL_IPS
39 from IPython.utils.localinterfaces import LOCAL_IPS
40 from IPython.utils.path import get_ipython_dir
40 from IPython.utils.path import get_ipython_dir
41 from IPython.utils.py3compat import cast_bytes
41 from IPython.utils.py3compat import cast_bytes
42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
43 Dict, List, Bool, Set, Any)
43 Dict, List, Bool, Set, Any)
44 from IPython.external.decorator import decorator
44 from IPython.external.decorator import decorator
45 from IPython.external.ssh import tunnel
45 from IPython.external.ssh import tunnel
46
46
47 from IPython.parallel import Reference
47 from IPython.parallel import Reference
48 from IPython.parallel import error
48 from IPython.parallel import error
49 from IPython.parallel import util
49 from IPython.parallel import util
50
50
51 from IPython.zmq.session import Session, Message
51 from IPython.zmq.session import Session, Message
52 from IPython.zmq import serialize
52 from IPython.zmq import serialize
53
53
54 from .asyncresult import AsyncResult, AsyncHubResult
54 from .asyncresult import AsyncResult, AsyncHubResult
55 from .view import DirectView, LoadBalancedView
55 from .view import DirectView, LoadBalancedView
56
56
57 if sys.version_info[0] >= 3:
57 if sys.version_info[0] >= 3:
58 # xrange is used in a couple 'isinstance' tests in py2
58 # xrange is used in a couple 'isinstance' tests in py2
59 # should be just 'range' in 3k
59 # should be just 'range' in 3k
60 xrange = range
60 xrange = range
61
61
62 #--------------------------------------------------------------------------
62 #--------------------------------------------------------------------------
63 # Decorators for Client methods
63 # Decorators for Client methods
64 #--------------------------------------------------------------------------
64 #--------------------------------------------------------------------------
65
65
66 @decorator
66 @decorator
67 def spin_first(f, self, *args, **kwargs):
67 def spin_first(f, self, *args, **kwargs):
68 """Call spin() to sync state prior to calling the method."""
68 """Call spin() to sync state prior to calling the method."""
69 self.spin()
69 self.spin()
70 return f(self, *args, **kwargs)
70 return f(self, *args, **kwargs)
71
71
72
72
73 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
74 # Classes
74 # Classes
75 #--------------------------------------------------------------------------
75 #--------------------------------------------------------------------------
76
76
77
77
78 class ExecuteReply(object):
78 class ExecuteReply(object):
79 """wrapper for finished Execute results"""
79 """wrapper for finished Execute results"""
80 def __init__(self, msg_id, content, metadata):
80 def __init__(self, msg_id, content, metadata):
81 self.msg_id = msg_id
81 self.msg_id = msg_id
82 self._content = content
82 self._content = content
83 self.execution_count = content['execution_count']
83 self.execution_count = content['execution_count']
84 self.metadata = metadata
84 self.metadata = metadata
85
85
86 def __getitem__(self, key):
86 def __getitem__(self, key):
87 return self.metadata[key]
87 return self.metadata[key]
88
88
89 def __getattr__(self, key):
89 def __getattr__(self, key):
90 if key not in self.metadata:
90 if key not in self.metadata:
91 raise AttributeError(key)
91 raise AttributeError(key)
92 return self.metadata[key]
92 return self.metadata[key]
93
93
94 def __repr__(self):
94 def __repr__(self):
95 pyout = self.metadata['pyout'] or {'data':{}}
95 pyout = self.metadata['pyout'] or {'data':{}}
96 text_out = pyout['data'].get('text/plain', '')
96 text_out = pyout['data'].get('text/plain', '')
97 if len(text_out) > 32:
97 if len(text_out) > 32:
98 text_out = text_out[:29] + '...'
98 text_out = text_out[:29] + '...'
99
99
100 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
101
101
102 def _repr_pretty_(self, p, cycle):
102 def _repr_pretty_(self, p, cycle):
103 pyout = self.metadata['pyout'] or {'data':{}}
103 pyout = self.metadata['pyout'] or {'data':{}}
104 text_out = pyout['data'].get('text/plain', '')
104 text_out = pyout['data'].get('text/plain', '')
105
105
106 if not text_out:
106 if not text_out:
107 return
107 return
108
108
109 try:
109 try:
110 ip = get_ipython()
110 ip = get_ipython()
111 except NameError:
111 except NameError:
112 colors = "NoColor"
112 colors = "NoColor"
113 else:
113 else:
114 colors = ip.colors
114 colors = ip.colors
115
115
116 if colors == "NoColor":
116 if colors == "NoColor":
117 out = normal = ""
117 out = normal = ""
118 else:
118 else:
119 out = TermColors.Red
119 out = TermColors.Red
120 normal = TermColors.Normal
120 normal = TermColors.Normal
121
121
122 if '\n' in text_out and not text_out.startswith('\n'):
122 if '\n' in text_out and not text_out.startswith('\n'):
123 # add newline for multiline reprs
123 # add newline for multiline reprs
124 text_out = '\n' + text_out
124 text_out = '\n' + text_out
125
125
126 p.text(
126 p.text(
127 out + u'Out[%i:%i]: ' % (
127 out + u'Out[%i:%i]: ' % (
128 self.metadata['engine_id'], self.execution_count
128 self.metadata['engine_id'], self.execution_count
129 ) + normal + text_out
129 ) + normal + text_out
130 )
130 )
131
131
132 def _repr_html_(self):
132 def _repr_html_(self):
133 pyout = self.metadata['pyout'] or {'data':{}}
133 pyout = self.metadata['pyout'] or {'data':{}}
134 return pyout['data'].get("text/html")
134 return pyout['data'].get("text/html")
135
135
136 def _repr_latex_(self):
136 def _repr_latex_(self):
137 pyout = self.metadata['pyout'] or {'data':{}}
137 pyout = self.metadata['pyout'] or {'data':{}}
138 return pyout['data'].get("text/latex")
138 return pyout['data'].get("text/latex")
139
139
140 def _repr_json_(self):
140 def _repr_json_(self):
141 pyout = self.metadata['pyout'] or {'data':{}}
141 pyout = self.metadata['pyout'] or {'data':{}}
142 return pyout['data'].get("application/json")
142 return pyout['data'].get("application/json")
143
143
144 def _repr_javascript_(self):
144 def _repr_javascript_(self):
145 pyout = self.metadata['pyout'] or {'data':{}}
145 pyout = self.metadata['pyout'] or {'data':{}}
146 return pyout['data'].get("application/javascript")
146 return pyout['data'].get("application/javascript")
147
147
148 def _repr_png_(self):
148 def _repr_png_(self):
149 pyout = self.metadata['pyout'] or {'data':{}}
149 pyout = self.metadata['pyout'] or {'data':{}}
150 return pyout['data'].get("image/png")
150 return pyout['data'].get("image/png")
151
151
152 def _repr_jpeg_(self):
152 def _repr_jpeg_(self):
153 pyout = self.metadata['pyout'] or {'data':{}}
153 pyout = self.metadata['pyout'] or {'data':{}}
154 return pyout['data'].get("image/jpeg")
154 return pyout['data'].get("image/jpeg")
155
155
156 def _repr_svg_(self):
156 def _repr_svg_(self):
157 pyout = self.metadata['pyout'] or {'data':{}}
157 pyout = self.metadata['pyout'] or {'data':{}}
158 return pyout['data'].get("image/svg+xml")
158 return pyout['data'].get("image/svg+xml")
159
159
160
160
161 class Metadata(dict):
161 class Metadata(dict):
162 """Subclass of dict for initializing metadata values.
162 """Subclass of dict for initializing metadata values.
163
163
164 Attribute access works on keys.
164 Attribute access works on keys.
165
165
166 These objects have a strict set of keys - errors will raise if you try
166 These objects have a strict set of keys - errors will raise if you try
167 to add new keys.
167 to add new keys.
168 """
168 """
169 def __init__(self, *args, **kwargs):
169 def __init__(self, *args, **kwargs):
170 dict.__init__(self)
170 dict.__init__(self)
171 md = {'msg_id' : None,
171 md = {'msg_id' : None,
172 'submitted' : None,
172 'submitted' : None,
173 'started' : None,
173 'started' : None,
174 'completed' : None,
174 'completed' : None,
175 'received' : None,
175 'received' : None,
176 'engine_uuid' : None,
176 'engine_uuid' : None,
177 'engine_id' : None,
177 'engine_id' : None,
178 'follow' : None,
178 'follow' : None,
179 'after' : None,
179 'after' : None,
180 'status' : None,
180 'status' : None,
181
181
182 'pyin' : None,
182 'pyin' : None,
183 'pyout' : None,
183 'pyout' : None,
184 'pyerr' : None,
184 'pyerr' : None,
185 'stdout' : '',
185 'stdout' : '',
186 'stderr' : '',
186 'stderr' : '',
187 'outputs' : [],
187 'outputs' : [],
188 'data': {},
188 'data': {},
189 'outputs_ready' : False,
189 'outputs_ready' : False,
190 }
190 }
191 self.update(md)
191 self.update(md)
192 self.update(dict(*args, **kwargs))
192 self.update(dict(*args, **kwargs))
193
193
194 def __getattr__(self, key):
194 def __getattr__(self, key):
195 """getattr aliased to getitem"""
195 """getattr aliased to getitem"""
196 if key in self.iterkeys():
196 if key in self.iterkeys():
197 return self[key]
197 return self[key]
198 else:
198 else:
199 raise AttributeError(key)
199 raise AttributeError(key)
200
200
201 def __setattr__(self, key, value):
201 def __setattr__(self, key, value):
202 """setattr aliased to setitem, with strict"""
202 """setattr aliased to setitem, with strict"""
203 if key in self.iterkeys():
203 if key in self.iterkeys():
204 self[key] = value
204 self[key] = value
205 else:
205 else:
206 raise AttributeError(key)
206 raise AttributeError(key)
207
207
208 def __setitem__(self, key, value):
208 def __setitem__(self, key, value):
209 """strict static key enforcement"""
209 """strict static key enforcement"""
210 if key in self.iterkeys():
210 if key in self.iterkeys():
211 dict.__setitem__(self, key, value)
211 dict.__setitem__(self, key, value)
212 else:
212 else:
213 raise KeyError(key)
213 raise KeyError(key)
214
214
215
215
216 class Client(HasTraits):
216 class Client(HasTraits):
217 """A semi-synchronous client to the IPython ZMQ cluster
217 """A semi-synchronous client to the IPython ZMQ cluster
218
218
219 Parameters
219 Parameters
220 ----------
220 ----------
221
221
222 url_file : str/unicode; path to ipcontroller-client.json
222 url_file : str/unicode; path to ipcontroller-client.json
223 This JSON file should contain all the information needed to connect to a cluster,
223 This JSON file should contain all the information needed to connect to a cluster,
224 and is likely the only argument needed.
224 and is likely the only argument needed.
225 Connection information for the Hub's registration. If a json connector
225 Connection information for the Hub's registration. If a json connector
226 file is given, then likely no further configuration is necessary.
226 file is given, then likely no further configuration is necessary.
227 [Default: use profile]
227 [Default: use profile]
228 profile : bytes
228 profile : bytes
229 The name of the Cluster profile to be used to find connector information.
229 The name of the Cluster profile to be used to find connector information.
230 If run from an IPython application, the default profile will be the same
230 If run from an IPython application, the default profile will be the same
231 as the running application, otherwise it will be 'default'.
231 as the running application, otherwise it will be 'default'.
232 cluster_id : str
232 cluster_id : str
233 String id to added to runtime files, to prevent name collisions when using
233 String id to added to runtime files, to prevent name collisions when using
234 multiple clusters with a single profile simultaneously.
234 multiple clusters with a single profile simultaneously.
235 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
235 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
236 Since this is text inserted into filenames, typical recommendations apply:
236 Since this is text inserted into filenames, typical recommendations apply:
237 Simple character strings are ideal, and spaces are not recommended (but
237 Simple character strings are ideal, and spaces are not recommended (but
238 should generally work)
238 should generally work)
239 context : zmq.Context
239 context : zmq.Context
240 Pass an existing zmq.Context instance, otherwise the client will create its own.
240 Pass an existing zmq.Context instance, otherwise the client will create its own.
241 debug : bool
241 debug : bool
242 flag for lots of message printing for debug purposes
242 flag for lots of message printing for debug purposes
243 timeout : int/float
243 timeout : int/float
244 time (in seconds) to wait for connection replies from the Hub
244 time (in seconds) to wait for connection replies from the Hub
245 [Default: 10]
245 [Default: 10]
246
246
247 #-------------- session related args ----------------
247 #-------------- session related args ----------------
248
248
249 config : Config object
249 config : Config object
250 If specified, this will be relayed to the Session for configuration
250 If specified, this will be relayed to the Session for configuration
251 username : str
251 username : str
252 set username for the session object
252 set username for the session object
253
253
254 #-------------- ssh related args ----------------
254 #-------------- ssh related args ----------------
255 # These are args for configuring the ssh tunnel to be used
255 # These are args for configuring the ssh tunnel to be used
256 # credentials are used to forward connections over ssh to the Controller
256 # credentials are used to forward connections over ssh to the Controller
257 # Note that the ip given in `addr` needs to be relative to sshserver
257 # Note that the ip given in `addr` needs to be relative to sshserver
258 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
258 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
259 # and set sshserver as the same machine the Controller is on. However,
259 # and set sshserver as the same machine the Controller is on. However,
260 # the only requirement is that sshserver is able to see the Controller
260 # the only requirement is that sshserver is able to see the Controller
261 # (i.e. is within the same trusted network).
261 # (i.e. is within the same trusted network).
262
262
263 sshserver : str
263 sshserver : str
264 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
264 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
265 If keyfile or password is specified, and this is not, it will default to
265 If keyfile or password is specified, and this is not, it will default to
266 the ip given in addr.
266 the ip given in addr.
267 sshkey : str; path to ssh private key file
267 sshkey : str; path to ssh private key file
268 This specifies a key to be used in ssh login, default None.
268 This specifies a key to be used in ssh login, default None.
269 Regular default ssh keys will be used without specifying this argument.
269 Regular default ssh keys will be used without specifying this argument.
270 password : str
270 password : str
271 Your ssh password to sshserver. Note that if this is left None,
271 Your ssh password to sshserver. Note that if this is left None,
272 you will be prompted for it if passwordless key based login is unavailable.
272 you will be prompted for it if passwordless key based login is unavailable.
273 paramiko : bool
273 paramiko : bool
274 flag for whether to use paramiko instead of shell ssh for tunneling.
274 flag for whether to use paramiko instead of shell ssh for tunneling.
275 [default: True on win32, False else]
275 [default: True on win32, False else]
276
276
277
277
278 Attributes
278 Attributes
279 ----------
279 ----------
280
280
281 ids : list of int engine IDs
281 ids : list of int engine IDs
282 requesting the ids attribute always synchronizes
282 requesting the ids attribute always synchronizes
283 the registration state. To request ids without synchronization,
283 the registration state. To request ids without synchronization,
284 use semi-private _ids attributes.
284 use semi-private _ids attributes.
285
285
286 history : list of msg_ids
286 history : list of msg_ids
287 a list of msg_ids, keeping track of all the execution
287 a list of msg_ids, keeping track of all the execution
288 messages you have submitted in order.
288 messages you have submitted in order.
289
289
290 outstanding : set of msg_ids
290 outstanding : set of msg_ids
291 a set of msg_ids that have been submitted, but whose
291 a set of msg_ids that have been submitted, but whose
292 results have not yet been received.
292 results have not yet been received.
293
293
294 results : dict
294 results : dict
295 a dict of all our results, keyed by msg_id
295 a dict of all our results, keyed by msg_id
296
296
297 block : bool
297 block : bool
298 determines default behavior when block not specified
298 determines default behavior when block not specified
299 in execution methods
299 in execution methods
300
300
301 Methods
301 Methods
302 -------
302 -------
303
303
304 spin
304 spin
305 flushes incoming results and registration state changes
305 flushes incoming results and registration state changes
306 control methods spin, and requesting `ids` also ensures up to date
306 control methods spin, and requesting `ids` also ensures up to date
307
307
308 wait
308 wait
309 wait on one or more msg_ids
309 wait on one or more msg_ids
310
310
311 execution methods
311 execution methods
312 apply
312 apply
313 legacy: execute, run
313 legacy: execute, run
314
314
315 data movement
315 data movement
316 push, pull, scatter, gather
316 push, pull, scatter, gather
317
317
318 query methods
318 query methods
319 queue_status, get_result, purge, result_status
319 queue_status, get_result, purge, result_status
320
320
321 control methods
321 control methods
322 abort, shutdown
322 abort, shutdown
323
323
324 """
324 """
325
325
326
326
327 block = Bool(False)
327 block = Bool(False)
328 outstanding = Set()
328 outstanding = Set()
329 results = Instance('collections.defaultdict', (dict,))
329 results = Instance('collections.defaultdict', (dict,))
330 metadata = Instance('collections.defaultdict', (Metadata,))
330 metadata = Instance('collections.defaultdict', (Metadata,))
331 history = List()
331 history = List()
332 debug = Bool(False)
332 debug = Bool(False)
333 _spin_thread = Any()
333 _spin_thread = Any()
334 _stop_spinning = Any()
334 _stop_spinning = Any()
335
335
336 profile=Unicode()
336 profile=Unicode()
337 def _profile_default(self):
337 def _profile_default(self):
338 if BaseIPythonApplication.initialized():
338 if BaseIPythonApplication.initialized():
339 # an IPython app *might* be running, try to get its profile
339 # an IPython app *might* be running, try to get its profile
340 try:
340 try:
341 return BaseIPythonApplication.instance().profile
341 return BaseIPythonApplication.instance().profile
342 except (AttributeError, MultipleInstanceError):
342 except (AttributeError, MultipleInstanceError):
343 # could be a *different* subclass of config.Application,
343 # could be a *different* subclass of config.Application,
344 # which would raise one of these two errors.
344 # which would raise one of these two errors.
345 return u'default'
345 return u'default'
346 else:
346 else:
347 return u'default'
347 return u'default'
348
348
349
349
350 _outstanding_dict = Instance('collections.defaultdict', (set,))
350 _outstanding_dict = Instance('collections.defaultdict', (set,))
351 _ids = List()
351 _ids = List()
352 _connected=Bool(False)
352 _connected=Bool(False)
353 _ssh=Bool(False)
353 _ssh=Bool(False)
354 _context = Instance('zmq.Context')
354 _context = Instance('zmq.Context')
355 _config = Dict()
355 _config = Dict()
356 _engines=Instance(util.ReverseDict, (), {})
356 _engines=Instance(util.ReverseDict, (), {})
357 # _hub_socket=Instance('zmq.Socket')
357 # _hub_socket=Instance('zmq.Socket')
358 _query_socket=Instance('zmq.Socket')
358 _query_socket=Instance('zmq.Socket')
359 _control_socket=Instance('zmq.Socket')
359 _control_socket=Instance('zmq.Socket')
360 _iopub_socket=Instance('zmq.Socket')
360 _iopub_socket=Instance('zmq.Socket')
361 _notification_socket=Instance('zmq.Socket')
361 _notification_socket=Instance('zmq.Socket')
362 _mux_socket=Instance('zmq.Socket')
362 _mux_socket=Instance('zmq.Socket')
363 _task_socket=Instance('zmq.Socket')
363 _task_socket=Instance('zmq.Socket')
364 _task_scheme=Unicode()
364 _task_scheme=Unicode()
365 _closed = False
365 _closed = False
366 _ignored_control_replies=Integer(0)
366 _ignored_control_replies=Integer(0)
367 _ignored_hub_replies=Integer(0)
367 _ignored_hub_replies=Integer(0)
368
368
369 def __new__(self, *args, **kw):
369 def __new__(self, *args, **kw):
370 # don't raise on positional args
370 # don't raise on positional args
371 return HasTraits.__new__(self, **kw)
371 return HasTraits.__new__(self, **kw)
372
372
373 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
373 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
374 context=None, debug=False,
374 context=None, debug=False,
375 sshserver=None, sshkey=None, password=None, paramiko=None,
375 sshserver=None, sshkey=None, password=None, paramiko=None,
376 timeout=10, cluster_id=None, **extra_args
376 timeout=10, cluster_id=None, **extra_args
377 ):
377 ):
378 if profile:
378 if profile:
379 super(Client, self).__init__(debug=debug, profile=profile)
379 super(Client, self).__init__(debug=debug, profile=profile)
380 else:
380 else:
381 super(Client, self).__init__(debug=debug)
381 super(Client, self).__init__(debug=debug)
382 if context is None:
382 if context is None:
383 context = zmq.Context.instance()
383 context = zmq.Context.instance()
384 self._context = context
384 self._context = context
385 self._stop_spinning = Event()
385 self._stop_spinning = Event()
386
386
387 if 'url_or_file' in extra_args:
387 if 'url_or_file' in extra_args:
388 url_file = extra_args['url_or_file']
388 url_file = extra_args['url_or_file']
389 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
389 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
390
390
391 if url_file and util.is_url(url_file):
391 if url_file and util.is_url(url_file):
392 raise ValueError("single urls cannot be specified, url-files must be used.")
392 raise ValueError("single urls cannot be specified, url-files must be used.")
393
393
394 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
394 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
395
395
396 if self._cd is not None:
396 if self._cd is not None:
397 if url_file is None:
397 if url_file is None:
398 if not cluster_id:
398 if not cluster_id:
399 client_json = 'ipcontroller-client.json'
399 client_json = 'ipcontroller-client.json'
400 else:
400 else:
401 client_json = 'ipcontroller-%s-client.json' % cluster_id
401 client_json = 'ipcontroller-%s-client.json' % cluster_id
402 url_file = pjoin(self._cd.security_dir, client_json)
402 url_file = pjoin(self._cd.security_dir, client_json)
403 if url_file is None:
403 if url_file is None:
404 raise ValueError(
404 raise ValueError(
405 "I can't find enough information to connect to a hub!"
405 "I can't find enough information to connect to a hub!"
406 " Please specify at least one of url_file or profile."
406 " Please specify at least one of url_file or profile."
407 )
407 )
408
408
409 with open(url_file) as f:
409 with open(url_file) as f:
410 cfg = json.load(f)
410 cfg = json.load(f)
411
411
412 self._task_scheme = cfg['task_scheme']
412 self._task_scheme = cfg['task_scheme']
413
413
414 # sync defaults from args, json:
414 # sync defaults from args, json:
415 if sshserver:
415 if sshserver:
416 cfg['ssh'] = sshserver
416 cfg['ssh'] = sshserver
417
417
418 location = cfg.setdefault('location', None)
418 location = cfg.setdefault('location', None)
419
419
420 proto,addr = cfg['interface'].split('://')
420 proto,addr = cfg['interface'].split('://')
421 addr = util.disambiguate_ip_address(addr, location)
421 addr = util.disambiguate_ip_address(addr, location)
422 cfg['interface'] = "%s://%s" % (proto, addr)
422 cfg['interface'] = "%s://%s" % (proto, addr)
423
423
424 # turn interface,port into full urls:
424 # turn interface,port into full urls:
425 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
425 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
426 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
426 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
427
427
428 url = cfg['registration']
428 url = cfg['registration']
429
429
430 if location is not None and addr == '127.0.0.1':
430 if location is not None and addr == '127.0.0.1':
431 # location specified, and connection is expected to be local
431 # location specified, and connection is expected to be local
432 if location not in LOCAL_IPS and not sshserver:
432 if location not in LOCAL_IPS and not sshserver:
433 # load ssh from JSON *only* if the controller is not on
433 # load ssh from JSON *only* if the controller is not on
434 # this machine
434 # this machine
435 sshserver=cfg['ssh']
435 sshserver=cfg['ssh']
436 if location not in LOCAL_IPS and not sshserver:
436 if location not in LOCAL_IPS and not sshserver:
437 # warn if no ssh specified, but SSH is probably needed
437 # warn if no ssh specified, but SSH is probably needed
438 # This is only a warning, because the most likely cause
438 # This is only a warning, because the most likely cause
439 # is a local Controller on a laptop whose IP is dynamic
439 # is a local Controller on a laptop whose IP is dynamic
440 warnings.warn("""
440 warnings.warn("""
441 Controller appears to be listening on localhost, but not on this machine.
441 Controller appears to be listening on localhost, but not on this machine.
442 If this is true, you should specify Client(...,sshserver='you@%s')
442 If this is true, you should specify Client(...,sshserver='you@%s')
443 or instruct your controller to listen on an external IP."""%location,
443 or instruct your controller to listen on an external IP."""%location,
444 RuntimeWarning)
444 RuntimeWarning)
445 elif not sshserver:
445 elif not sshserver:
446 # otherwise sync with cfg
446 # otherwise sync with cfg
447 sshserver = cfg['ssh']
447 sshserver = cfg['ssh']
448
448
449 self._config = cfg
449 self._config = cfg
450
450
451 self._ssh = bool(sshserver or sshkey or password)
451 self._ssh = bool(sshserver or sshkey or password)
452 if self._ssh and sshserver is None:
452 if self._ssh and sshserver is None:
453 # default to ssh via localhost
453 # default to ssh via localhost
454 sshserver = addr
454 sshserver = addr
455 if self._ssh and password is None:
455 if self._ssh and password is None:
456 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
456 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
457 password=False
457 password=False
458 else:
458 else:
459 password = getpass("SSH Password for %s: "%sshserver)
459 password = getpass("SSH Password for %s: "%sshserver)
460 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
460 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
461
461
462 # configure and construct the session
462 # configure and construct the session
463 extra_args['packer'] = cfg['pack']
463 extra_args['packer'] = cfg['pack']
464 extra_args['unpacker'] = cfg['unpack']
464 extra_args['unpacker'] = cfg['unpack']
465 extra_args['key'] = cast_bytes(cfg['exec_key'])
465 extra_args['key'] = cast_bytes(cfg['exec_key'])
466
466
467 self.session = Session(**extra_args)
467 self.session = Session(**extra_args)
468
468
469 self._query_socket = self._context.socket(zmq.DEALER)
469 self._query_socket = self._context.socket(zmq.DEALER)
470
470
471 if self._ssh:
471 if self._ssh:
472 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
472 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
473 else:
473 else:
474 self._query_socket.connect(cfg['registration'])
474 self._query_socket.connect(cfg['registration'])
475
475
476 self.session.debug = self.debug
476 self.session.debug = self.debug
477
477
478 self._notification_handlers = {'registration_notification' : self._register_engine,
478 self._notification_handlers = {'registration_notification' : self._register_engine,
479 'unregistration_notification' : self._unregister_engine,
479 'unregistration_notification' : self._unregister_engine,
480 'shutdown_notification' : lambda msg: self.close(),
480 'shutdown_notification' : lambda msg: self.close(),
481 }
481 }
482 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
482 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
483 'apply_reply' : self._handle_apply_reply}
483 'apply_reply' : self._handle_apply_reply}
484 self._connect(sshserver, ssh_kwargs, timeout)
484 self._connect(sshserver, ssh_kwargs, timeout)
485
485
486 # last step: setup magics, if we are in IPython:
486 # last step: setup magics, if we are in IPython:
487
487
488 try:
488 try:
489 ip = get_ipython()
489 ip = get_ipython()
490 except NameError:
490 except NameError:
491 return
491 return
492 else:
492 else:
493 if 'px' not in ip.magics_manager.magics:
493 if 'px' not in ip.magics_manager.magics:
494 # in IPython but we are the first Client.
494 # in IPython but we are the first Client.
495 # activate a default view for parallel magics.
495 # activate a default view for parallel magics.
496 self.activate()
496 self.activate()
497
497
498 def __del__(self):
498 def __del__(self):
499 """cleanup sockets, but _not_ context."""
499 """cleanup sockets, but _not_ context."""
500 self.close()
500 self.close()
501
501
502 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
502 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
503 if ipython_dir is None:
503 if ipython_dir is None:
504 ipython_dir = get_ipython_dir()
504 ipython_dir = get_ipython_dir()
505 if profile_dir is not None:
505 if profile_dir is not None:
506 try:
506 try:
507 self._cd = ProfileDir.find_profile_dir(profile_dir)
507 self._cd = ProfileDir.find_profile_dir(profile_dir)
508 return
508 return
509 except ProfileDirError:
509 except ProfileDirError:
510 pass
510 pass
511 elif profile is not None:
511 elif profile is not None:
512 try:
512 try:
513 self._cd = ProfileDir.find_profile_dir_by_name(
513 self._cd = ProfileDir.find_profile_dir_by_name(
514 ipython_dir, profile)
514 ipython_dir, profile)
515 return
515 return
516 except ProfileDirError:
516 except ProfileDirError:
517 pass
517 pass
518 self._cd = None
518 self._cd = None
519
519
520 def _update_engines(self, engines):
520 def _update_engines(self, engines):
521 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
521 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
522 for k,v in engines.iteritems():
522 for k,v in engines.iteritems():
523 eid = int(k)
523 eid = int(k)
524 if eid not in self._engines:
524 if eid not in self._engines:
525 self._ids.append(eid)
525 self._ids.append(eid)
526 self._engines[eid] = v
526 self._engines[eid] = v
527 self._ids = sorted(self._ids)
527 self._ids = sorted(self._ids)
528 if sorted(self._engines.keys()) != range(len(self._engines)) and \
528 if sorted(self._engines.keys()) != range(len(self._engines)) and \
529 self._task_scheme == 'pure' and self._task_socket:
529 self._task_scheme == 'pure' and self._task_socket:
530 self._stop_scheduling_tasks()
530 self._stop_scheduling_tasks()
531
531
532 def _stop_scheduling_tasks(self):
532 def _stop_scheduling_tasks(self):
533 """Stop scheduling tasks because an engine has been unregistered
533 """Stop scheduling tasks because an engine has been unregistered
534 from a pure ZMQ scheduler.
534 from a pure ZMQ scheduler.
535 """
535 """
536 self._task_socket.close()
536 self._task_socket.close()
537 self._task_socket = None
537 self._task_socket = None
538 msg = "An engine has been unregistered, and we are using pure " +\
538 msg = "An engine has been unregistered, and we are using pure " +\
539 "ZMQ task scheduling. Task farming will be disabled."
539 "ZMQ task scheduling. Task farming will be disabled."
540 if self.outstanding:
540 if self.outstanding:
541 msg += " If you were running tasks when this happened, " +\
541 msg += " If you were running tasks when this happened, " +\
542 "some `outstanding` msg_ids may never resolve."
542 "some `outstanding` msg_ids may never resolve."
543 warnings.warn(msg, RuntimeWarning)
543 warnings.warn(msg, RuntimeWarning)
544
544
545 def _build_targets(self, targets):
545 def _build_targets(self, targets):
546 """Turn valid target IDs or 'all' into two lists:
546 """Turn valid target IDs or 'all' into two lists:
547 (int_ids, uuids).
547 (int_ids, uuids).
548 """
548 """
549 if not self._ids:
549 if not self._ids:
550 # flush notification socket if no engines yet, just in case
550 # flush notification socket if no engines yet, just in case
551 if not self.ids:
551 if not self.ids:
552 raise error.NoEnginesRegistered("Can't build targets without any engines")
552 raise error.NoEnginesRegistered("Can't build targets without any engines")
553
553
554 if targets is None:
554 if targets is None:
555 targets = self._ids
555 targets = self._ids
556 elif isinstance(targets, basestring):
556 elif isinstance(targets, basestring):
557 if targets.lower() == 'all':
557 if targets.lower() == 'all':
558 targets = self._ids
558 targets = self._ids
559 else:
559 else:
560 raise TypeError("%r not valid str target, must be 'all'"%(targets))
560 raise TypeError("%r not valid str target, must be 'all'"%(targets))
561 elif isinstance(targets, int):
561 elif isinstance(targets, int):
562 if targets < 0:
562 if targets < 0:
563 targets = self.ids[targets]
563 targets = self.ids[targets]
564 if targets not in self._ids:
564 if targets not in self._ids:
565 raise IndexError("No such engine: %i"%targets)
565 raise IndexError("No such engine: %i"%targets)
566 targets = [targets]
566 targets = [targets]
567
567
568 if isinstance(targets, slice):
568 if isinstance(targets, slice):
569 indices = range(len(self._ids))[targets]
569 indices = range(len(self._ids))[targets]
570 ids = self.ids
570 ids = self.ids
571 targets = [ ids[i] for i in indices ]
571 targets = [ ids[i] for i in indices ]
572
572
573 if not isinstance(targets, (tuple, list, xrange)):
573 if not isinstance(targets, (tuple, list, xrange)):
574 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
574 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
575
575
576 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
576 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
577
577
578 def _connect(self, sshserver, ssh_kwargs, timeout):
578 def _connect(self, sshserver, ssh_kwargs, timeout):
579 """setup all our socket connections to the cluster. This is called from
579 """setup all our socket connections to the cluster. This is called from
580 __init__."""
580 __init__."""
581
581
582 # Maybe allow reconnecting?
582 # Maybe allow reconnecting?
583 if self._connected:
583 if self._connected:
584 return
584 return
585 self._connected=True
585 self._connected=True
586
586
587 def connect_socket(s, url):
587 def connect_socket(s, url):
588 # url = util.disambiguate_url(url, self._config['location'])
588 # url = util.disambiguate_url(url, self._config['location'])
589 if self._ssh:
589 if self._ssh:
590 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
590 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
591 else:
591 else:
592 return s.connect(url)
592 return s.connect(url)
593
593
594 self.session.send(self._query_socket, 'connection_request')
594 self.session.send(self._query_socket, 'connection_request')
595 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
595 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
596 poller = zmq.Poller()
596 poller = zmq.Poller()
597 poller.register(self._query_socket, zmq.POLLIN)
597 poller.register(self._query_socket, zmq.POLLIN)
598 # poll expects milliseconds, timeout is seconds
598 # poll expects milliseconds, timeout is seconds
599 evts = poller.poll(timeout*1000)
599 evts = poller.poll(timeout*1000)
600 if not evts:
600 if not evts:
601 raise error.TimeoutError("Hub connection request timed out")
601 raise error.TimeoutError("Hub connection request timed out")
602 idents,msg = self.session.recv(self._query_socket,mode=0)
602 idents,msg = self.session.recv(self._query_socket,mode=0)
603 if self.debug:
603 if self.debug:
604 pprint(msg)
604 pprint(msg)
605 content = msg['content']
605 content = msg['content']
606 # self._config['registration'] = dict(content)
606 # self._config['registration'] = dict(content)
607 cfg = self._config
607 cfg = self._config
608 if content['status'] == 'ok':
608 if content['status'] == 'ok':
609 self._mux_socket = self._context.socket(zmq.DEALER)
609 self._mux_socket = self._context.socket(zmq.DEALER)
610 connect_socket(self._mux_socket, cfg['mux'])
610 connect_socket(self._mux_socket, cfg['mux'])
611
611
612 self._task_socket = self._context.socket(zmq.DEALER)
612 self._task_socket = self._context.socket(zmq.DEALER)
613 connect_socket(self._task_socket, cfg['task'])
613 connect_socket(self._task_socket, cfg['task'])
614
614
615 self._notification_socket = self._context.socket(zmq.SUB)
615 self._notification_socket = self._context.socket(zmq.SUB)
616 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
616 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
617 connect_socket(self._notification_socket, cfg['notification'])
617 connect_socket(self._notification_socket, cfg['notification'])
618
618
619 self._control_socket = self._context.socket(zmq.DEALER)
619 self._control_socket = self._context.socket(zmq.DEALER)
620 connect_socket(self._control_socket, cfg['control'])
620 connect_socket(self._control_socket, cfg['control'])
621
621
622 self._iopub_socket = self._context.socket(zmq.SUB)
622 self._iopub_socket = self._context.socket(zmq.SUB)
623 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
623 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
624 connect_socket(self._iopub_socket, cfg['iopub'])
624 connect_socket(self._iopub_socket, cfg['iopub'])
625
625
626 self._update_engines(dict(content['engines']))
626 self._update_engines(dict(content['engines']))
627 else:
627 else:
628 self._connected = False
628 self._connected = False
629 raise Exception("Failed to connect!")
629 raise Exception("Failed to connect!")
630
630
631 #--------------------------------------------------------------------------
631 #--------------------------------------------------------------------------
632 # handlers and callbacks for incoming messages
632 # handlers and callbacks for incoming messages
633 #--------------------------------------------------------------------------
633 #--------------------------------------------------------------------------
634
634
635 def _unwrap_exception(self, content):
635 def _unwrap_exception(self, content):
636 """unwrap exception, and remap engine_id to int."""
636 """unwrap exception, and remap engine_id to int."""
637 e = error.unwrap_exception(content)
637 e = error.unwrap_exception(content)
638 # print e.traceback
638 # print e.traceback
639 if e.engine_info:
639 if e.engine_info:
640 e_uuid = e.engine_info['engine_uuid']
640 e_uuid = e.engine_info['engine_uuid']
641 eid = self._engines[e_uuid]
641 eid = self._engines[e_uuid]
642 e.engine_info['engine_id'] = eid
642 e.engine_info['engine_id'] = eid
643 return e
643 return e
644
644
645 def _extract_metadata(self, msg):
645 def _extract_metadata(self, msg):
646 header = msg['header']
646 header = msg['header']
647 parent = msg['parent_header']
647 parent = msg['parent_header']
648 msg_meta = msg['metadata']
648 msg_meta = msg['metadata']
649 content = msg['content']
649 content = msg['content']
650 md = {'msg_id' : parent['msg_id'],
650 md = {'msg_id' : parent['msg_id'],
651 'received' : datetime.now(),
651 'received' : datetime.now(),
652 'engine_uuid' : msg_meta.get('engine', None),
652 'engine_uuid' : msg_meta.get('engine', None),
653 'follow' : msg_meta.get('follow', []),
653 'follow' : msg_meta.get('follow', []),
654 'after' : msg_meta.get('after', []),
654 'after' : msg_meta.get('after', []),
655 'status' : content['status'],
655 'status' : content['status'],
656 }
656 }
657
657
658 if md['engine_uuid'] is not None:
658 if md['engine_uuid'] is not None:
659 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
659 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
660
660
661 if 'date' in parent:
661 if 'date' in parent:
662 md['submitted'] = parent['date']
662 md['submitted'] = parent['date']
663 if 'started' in msg_meta:
663 if 'started' in msg_meta:
664 md['started'] = msg_meta['started']
664 md['started'] = msg_meta['started']
665 if 'date' in header:
665 if 'date' in header:
666 md['completed'] = header['date']
666 md['completed'] = header['date']
667 return md
667 return md
668
668
669 def _register_engine(self, msg):
669 def _register_engine(self, msg):
670 """Register a new engine, and update our connection info."""
670 """Register a new engine, and update our connection info."""
671 content = msg['content']
671 content = msg['content']
672 eid = content['id']
672 eid = content['id']
673 d = {eid : content['uuid']}
673 d = {eid : content['uuid']}
674 self._update_engines(d)
674 self._update_engines(d)
675
675
676 def _unregister_engine(self, msg):
676 def _unregister_engine(self, msg):
677 """Unregister an engine that has died."""
677 """Unregister an engine that has died."""
678 content = msg['content']
678 content = msg['content']
679 eid = int(content['id'])
679 eid = int(content['id'])
680 if eid in self._ids:
680 if eid in self._ids:
681 self._ids.remove(eid)
681 self._ids.remove(eid)
682 uuid = self._engines.pop(eid)
682 uuid = self._engines.pop(eid)
683
683
684 self._handle_stranded_msgs(eid, uuid)
684 self._handle_stranded_msgs(eid, uuid)
685
685
686 if self._task_socket and self._task_scheme == 'pure':
686 if self._task_socket and self._task_scheme == 'pure':
687 self._stop_scheduling_tasks()
687 self._stop_scheduling_tasks()
688
688
689 def _handle_stranded_msgs(self, eid, uuid):
689 def _handle_stranded_msgs(self, eid, uuid):
690 """Handle messages known to be on an engine when the engine unregisters.
690 """Handle messages known to be on an engine when the engine unregisters.
691
691
692 It is possible that this will fire prematurely - that is, an engine will
692 It is possible that this will fire prematurely - that is, an engine will
693 go down after completing a result, and the client will be notified
693 go down after completing a result, and the client will be notified
694 of the unregistration and later receive the successful result.
694 of the unregistration and later receive the successful result.
695 """
695 """
696
696
697 outstanding = self._outstanding_dict[uuid]
697 outstanding = self._outstanding_dict[uuid]
698
698
699 for msg_id in list(outstanding):
699 for msg_id in list(outstanding):
700 if msg_id in self.results:
700 if msg_id in self.results:
701 # we already
701 # we already
702 continue
702 continue
703 try:
703 try:
704 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
704 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
705 except:
705 except:
706 content = error.wrap_exception()
706 content = error.wrap_exception()
707 # build a fake message:
707 # build a fake message:
708 msg = self.session.msg('apply_reply', content=content)
708 msg = self.session.msg('apply_reply', content=content)
709 msg['parent_header']['msg_id'] = msg_id
709 msg['parent_header']['msg_id'] = msg_id
710 msg['metadata']['engine'] = uuid
710 msg['metadata']['engine'] = uuid
711 self._handle_apply_reply(msg)
711 self._handle_apply_reply(msg)
712
712
713 def _handle_execute_reply(self, msg):
713 def _handle_execute_reply(self, msg):
714 """Save the reply to an execute_request into our results.
714 """Save the reply to an execute_request into our results.
715
715
716 execute messages are never actually used. apply is used instead.
716 execute messages are never actually used. apply is used instead.
717 """
717 """
718
718
719 parent = msg['parent_header']
719 parent = msg['parent_header']
720 msg_id = parent['msg_id']
720 msg_id = parent['msg_id']
721 if msg_id not in self.outstanding:
721 if msg_id not in self.outstanding:
722 if msg_id in self.history:
722 if msg_id in self.history:
723 print ("got stale result: %s"%msg_id)
723 print ("got stale result: %s"%msg_id)
724 else:
724 else:
725 print ("got unknown result: %s"%msg_id)
725 print ("got unknown result: %s"%msg_id)
726 else:
726 else:
727 self.outstanding.remove(msg_id)
727 self.outstanding.remove(msg_id)
728
728
729 content = msg['content']
729 content = msg['content']
730 header = msg['header']
730 header = msg['header']
731
731
732 # construct metadata:
732 # construct metadata:
733 md = self.metadata[msg_id]
733 md = self.metadata[msg_id]
734 md.update(self._extract_metadata(msg))
734 md.update(self._extract_metadata(msg))
735 # is this redundant?
735 # is this redundant?
736 self.metadata[msg_id] = md
736 self.metadata[msg_id] = md
737
737
738 e_outstanding = self._outstanding_dict[md['engine_uuid']]
738 e_outstanding = self._outstanding_dict[md['engine_uuid']]
739 if msg_id in e_outstanding:
739 if msg_id in e_outstanding:
740 e_outstanding.remove(msg_id)
740 e_outstanding.remove(msg_id)
741
741
742 # construct result:
742 # construct result:
743 if content['status'] == 'ok':
743 if content['status'] == 'ok':
744 self.results[msg_id] = ExecuteReply(msg_id, content, md)
744 self.results[msg_id] = ExecuteReply(msg_id, content, md)
745 elif content['status'] == 'aborted':
745 elif content['status'] == 'aborted':
746 self.results[msg_id] = error.TaskAborted(msg_id)
746 self.results[msg_id] = error.TaskAborted(msg_id)
747 elif content['status'] == 'resubmitted':
747 elif content['status'] == 'resubmitted':
748 # TODO: handle resubmission
748 # TODO: handle resubmission
749 pass
749 pass
750 else:
750 else:
751 self.results[msg_id] = self._unwrap_exception(content)
751 self.results[msg_id] = self._unwrap_exception(content)
752
752
753 def _handle_apply_reply(self, msg):
753 def _handle_apply_reply(self, msg):
754 """Save the reply to an apply_request into our results."""
754 """Save the reply to an apply_request into our results."""
755 parent = msg['parent_header']
755 parent = msg['parent_header']
756 msg_id = parent['msg_id']
756 msg_id = parent['msg_id']
757 if msg_id not in self.outstanding:
757 if msg_id not in self.outstanding:
758 if msg_id in self.history:
758 if msg_id in self.history:
759 print ("got stale result: %s"%msg_id)
759 print ("got stale result: %s"%msg_id)
760 print self.results[msg_id]
760 print self.results[msg_id]
761 print msg
761 print msg
762 else:
762 else:
763 print ("got unknown result: %s"%msg_id)
763 print ("got unknown result: %s"%msg_id)
764 else:
764 else:
765 self.outstanding.remove(msg_id)
765 self.outstanding.remove(msg_id)
766 content = msg['content']
766 content = msg['content']
767 header = msg['header']
767 header = msg['header']
768
768
769 # construct metadata:
769 # construct metadata:
770 md = self.metadata[msg_id]
770 md = self.metadata[msg_id]
771 md.update(self._extract_metadata(msg))
771 md.update(self._extract_metadata(msg))
772 # is this redundant?
772 # is this redundant?
773 self.metadata[msg_id] = md
773 self.metadata[msg_id] = md
774
774
775 e_outstanding = self._outstanding_dict[md['engine_uuid']]
775 e_outstanding = self._outstanding_dict[md['engine_uuid']]
776 if msg_id in e_outstanding:
776 if msg_id in e_outstanding:
777 e_outstanding.remove(msg_id)
777 e_outstanding.remove(msg_id)
778
778
779 # construct result:
779 # construct result:
780 if content['status'] == 'ok':
780 if content['status'] == 'ok':
781 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
781 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
782 elif content['status'] == 'aborted':
782 elif content['status'] == 'aborted':
783 self.results[msg_id] = error.TaskAborted(msg_id)
783 self.results[msg_id] = error.TaskAborted(msg_id)
784 elif content['status'] == 'resubmitted':
784 elif content['status'] == 'resubmitted':
785 # TODO: handle resubmission
785 # TODO: handle resubmission
786 pass
786 pass
787 else:
787 else:
788 self.results[msg_id] = self._unwrap_exception(content)
788 self.results[msg_id] = self._unwrap_exception(content)
789
789
790 def _flush_notifications(self):
790 def _flush_notifications(self):
791 """Flush notifications of engine registrations waiting
791 """Flush notifications of engine registrations waiting
792 in ZMQ queue."""
792 in ZMQ queue."""
793 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
793 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
794 while msg is not None:
794 while msg is not None:
795 if self.debug:
795 if self.debug:
796 pprint(msg)
796 pprint(msg)
797 msg_type = msg['header']['msg_type']
797 msg_type = msg['header']['msg_type']
798 handler = self._notification_handlers.get(msg_type, None)
798 handler = self._notification_handlers.get(msg_type, None)
799 if handler is None:
799 if handler is None:
800 raise Exception("Unhandled message type: %s"%msg.msg_type)
800 raise Exception("Unhandled message type: %s"%msg.msg_type)
801 else:
801 else:
802 handler(msg)
802 handler(msg)
803 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
803 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
804
804
805 def _flush_results(self, sock):
805 def _flush_results(self, sock):
806 """Flush task or queue results waiting in ZMQ queue."""
806 """Flush task or queue results waiting in ZMQ queue."""
807 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
807 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
808 while msg is not None:
808 while msg is not None:
809 if self.debug:
809 if self.debug:
810 pprint(msg)
810 pprint(msg)
811 msg_type = msg['header']['msg_type']
811 msg_type = msg['header']['msg_type']
812 handler = self._queue_handlers.get(msg_type, None)
812 handler = self._queue_handlers.get(msg_type, None)
813 if handler is None:
813 if handler is None:
814 raise Exception("Unhandled message type: %s"%msg.msg_type)
814 raise Exception("Unhandled message type: %s"%msg.msg_type)
815 else:
815 else:
816 handler(msg)
816 handler(msg)
817 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
817 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
818
818
819 def _flush_control(self, sock):
819 def _flush_control(self, sock):
820 """Flush replies from the control channel waiting
820 """Flush replies from the control channel waiting
821 in the ZMQ queue.
821 in the ZMQ queue.
822
822
823 Currently: ignore them."""
823 Currently: ignore them."""
824 if self._ignored_control_replies <= 0:
824 if self._ignored_control_replies <= 0:
825 return
825 return
826 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
826 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
827 while msg is not None:
827 while msg is not None:
828 self._ignored_control_replies -= 1
828 self._ignored_control_replies -= 1
829 if self.debug:
829 if self.debug:
830 pprint(msg)
830 pprint(msg)
831 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
831 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
832
832
833 def _flush_ignored_control(self):
833 def _flush_ignored_control(self):
834 """flush ignored control replies"""
834 """flush ignored control replies"""
835 while self._ignored_control_replies > 0:
835 while self._ignored_control_replies > 0:
836 self.session.recv(self._control_socket)
836 self.session.recv(self._control_socket)
837 self._ignored_control_replies -= 1
837 self._ignored_control_replies -= 1
838
838
839 def _flush_ignored_hub_replies(self):
839 def _flush_ignored_hub_replies(self):
840 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
840 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
841 while msg is not None:
841 while msg is not None:
842 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
842 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
843
843
844 def _flush_iopub(self, sock):
844 def _flush_iopub(self, sock):
845 """Flush replies from the iopub channel waiting
845 """Flush replies from the iopub channel waiting
846 in the ZMQ queue.
846 in the ZMQ queue.
847 """
847 """
848 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
848 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
849 while msg is not None:
849 while msg is not None:
850 if self.debug:
850 if self.debug:
851 pprint(msg)
851 pprint(msg)
852 parent = msg['parent_header']
852 parent = msg['parent_header']
853 # ignore IOPub messages with no parent.
853 # ignore IOPub messages with no parent.
854 # Caused by print statements or warnings from before the first execution.
854 # Caused by print statements or warnings from before the first execution.
855 if not parent:
855 if not parent:
856 continue
856 continue
857 msg_id = parent['msg_id']
857 msg_id = parent['msg_id']
858 content = msg['content']
858 content = msg['content']
859 header = msg['header']
859 header = msg['header']
860 msg_type = msg['header']['msg_type']
860 msg_type = msg['header']['msg_type']
861
861
862 # init metadata:
862 # init metadata:
863 md = self.metadata[msg_id]
863 md = self.metadata[msg_id]
864
864
865 if msg_type == 'stream':
865 if msg_type == 'stream':
866 name = content['name']
866 name = content['name']
867 s = md[name] or ''
867 s = md[name] or ''
868 md[name] = s + content['data']
868 md[name] = s + content['data']
869 elif msg_type == 'pyerr':
869 elif msg_type == 'pyerr':
870 md.update({'pyerr' : self._unwrap_exception(content)})
870 md.update({'pyerr' : self._unwrap_exception(content)})
871 elif msg_type == 'pyin':
871 elif msg_type == 'pyin':
872 md.update({'pyin' : content['code']})
872 md.update({'pyin' : content['code']})
873 elif msg_type == 'display_data':
873 elif msg_type == 'display_data':
874 md['outputs'].append(content)
874 md['outputs'].append(content)
875 elif msg_type == 'pyout':
875 elif msg_type == 'pyout':
876 md['pyout'] = content
876 md['pyout'] = content
877 elif msg_type == 'data_message':
877 elif msg_type == 'data_message':
878 data, remainder = serialize.unserialize_object(msg['buffers'])
878 data, remainder = serialize.unserialize_object(msg['buffers'])
879 md['data'].update(data)
879 md['data'].update(data)
880 elif msg_type == 'status':
880 elif msg_type == 'status':
881 # idle message comes after all outputs
881 # idle message comes after all outputs
882 if content['execution_state'] == 'idle':
882 if content['execution_state'] == 'idle':
883 md['outputs_ready'] = True
883 md['outputs_ready'] = True
884 else:
884 else:
885 # unhandled msg_type (status, etc.)
885 # unhandled msg_type (status, etc.)
886 pass
886 pass
887
887
888 # reduntant?
888 # reduntant?
889 self.metadata[msg_id] = md
889 self.metadata[msg_id] = md
890
890
891 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
891 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
892
892
893 #--------------------------------------------------------------------------
893 #--------------------------------------------------------------------------
894 # len, getitem
894 # len, getitem
895 #--------------------------------------------------------------------------
895 #--------------------------------------------------------------------------
896
896
897 def __len__(self):
897 def __len__(self):
898 """len(client) returns # of engines."""
898 """len(client) returns # of engines."""
899 return len(self.ids)
899 return len(self.ids)
900
900
901 def __getitem__(self, key):
901 def __getitem__(self, key):
902 """index access returns DirectView multiplexer objects
902 """index access returns DirectView multiplexer objects
903
903
904 Must be int, slice, or list/tuple/xrange of ints"""
904 Must be int, slice, or list/tuple/xrange of ints"""
905 if not isinstance(key, (int, slice, tuple, list, xrange)):
905 if not isinstance(key, (int, slice, tuple, list, xrange)):
906 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
906 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
907 else:
907 else:
908 return self.direct_view(key)
908 return self.direct_view(key)
909
909
910 #--------------------------------------------------------------------------
910 #--------------------------------------------------------------------------
911 # Begin public methods
911 # Begin public methods
912 #--------------------------------------------------------------------------
912 #--------------------------------------------------------------------------
913
913
914 @property
914 @property
915 def ids(self):
915 def ids(self):
916 """Always up-to-date ids property."""
916 """Always up-to-date ids property."""
917 self._flush_notifications()
917 self._flush_notifications()
918 # always copy:
918 # always copy:
919 return list(self._ids)
919 return list(self._ids)
920
920
921 def activate(self, targets='all', suffix=''):
921 def activate(self, targets='all', suffix=''):
922 """Create a DirectView and register it with IPython magics
922 """Create a DirectView and register it with IPython magics
923
923
924 Defines the magics `%px, %autopx, %pxresult, %%px`
924 Defines the magics `%px, %autopx, %pxresult, %%px`
925
925
926 Parameters
926 Parameters
927 ----------
927 ----------
928
928
929 targets: int, list of ints, or 'all'
929 targets: int, list of ints, or 'all'
930 The engines on which the view's magics will run
930 The engines on which the view's magics will run
931 suffix: str [default: '']
931 suffix: str [default: '']
932 The suffix, if any, for the magics. This allows you to have
932 The suffix, if any, for the magics. This allows you to have
933 multiple views associated with parallel magics at the same time.
933 multiple views associated with parallel magics at the same time.
934
934
935 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
935 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
936 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
936 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
937 on engine 0.
937 on engine 0.
938 """
938 """
939 view = self.direct_view(targets)
939 view = self.direct_view(targets)
940 view.block = True
940 view.block = True
941 view.activate(suffix)
941 view.activate(suffix)
942 return view
942 return view
943
943
944 def close(self):
944 def close(self):
945 if self._closed:
945 if self._closed:
946 return
946 return
947 self.stop_spin_thread()
947 self.stop_spin_thread()
948 snames = filter(lambda n: n.endswith('socket'), dir(self))
948 snames = filter(lambda n: n.endswith('socket'), dir(self))
949 for socket in map(lambda name: getattr(self, name), snames):
949 for socket in map(lambda name: getattr(self, name), snames):
950 if isinstance(socket, zmq.Socket) and not socket.closed:
950 if isinstance(socket, zmq.Socket) and not socket.closed:
951 socket.close()
951 socket.close()
952 self._closed = True
952 self._closed = True
953
953
954 def _spin_every(self, interval=1):
954 def _spin_every(self, interval=1):
955 """target func for use in spin_thread"""
955 """target func for use in spin_thread"""
956 while True:
956 while True:
957 if self._stop_spinning.is_set():
957 if self._stop_spinning.is_set():
958 return
958 return
959 time.sleep(interval)
959 time.sleep(interval)
960 self.spin()
960 self.spin()
961
961
962 def spin_thread(self, interval=1):
962 def spin_thread(self, interval=1):
963 """call Client.spin() in a background thread on some regular interval
963 """call Client.spin() in a background thread on some regular interval
964
964
965 This helps ensure that messages don't pile up too much in the zmq queue
965 This helps ensure that messages don't pile up too much in the zmq queue
966 while you are working on other things, or just leaving an idle terminal.
966 while you are working on other things, or just leaving an idle terminal.
967
967
968 It also helps limit potential padding of the `received` timestamp
968 It also helps limit potential padding of the `received` timestamp
969 on AsyncResult objects, used for timings.
969 on AsyncResult objects, used for timings.
970
970
971 Parameters
971 Parameters
972 ----------
972 ----------
973
973
974 interval : float, optional
974 interval : float, optional
975 The interval on which to spin the client in the background thread
975 The interval on which to spin the client in the background thread
976 (simply passed to time.sleep).
976 (simply passed to time.sleep).
977
977
978 Notes
978 Notes
979 -----
979 -----
980
980
981 For precision timing, you may want to use this method to put a bound
981 For precision timing, you may want to use this method to put a bound
982 on the jitter (in seconds) in `received` timestamps used
982 on the jitter (in seconds) in `received` timestamps used
983 in AsyncResult.wall_time.
983 in AsyncResult.wall_time.
984
984
985 """
985 """
986 if self._spin_thread is not None:
986 if self._spin_thread is not None:
987 self.stop_spin_thread()
987 self.stop_spin_thread()
988 self._stop_spinning.clear()
988 self._stop_spinning.clear()
989 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
989 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
990 self._spin_thread.daemon = True
990 self._spin_thread.daemon = True
991 self._spin_thread.start()
991 self._spin_thread.start()
992
992
993 def stop_spin_thread(self):
993 def stop_spin_thread(self):
994 """stop background spin_thread, if any"""
994 """stop background spin_thread, if any"""
995 if self._spin_thread is not None:
995 if self._spin_thread is not None:
996 self._stop_spinning.set()
996 self._stop_spinning.set()
997 self._spin_thread.join()
997 self._spin_thread.join()
998 self._spin_thread = None
998 self._spin_thread = None
999
999
1000 def spin(self):
1000 def spin(self):
1001 """Flush any registration notifications and execution results
1001 """Flush any registration notifications and execution results
1002 waiting in the ZMQ queue.
1002 waiting in the ZMQ queue.
1003 """
1003 """
1004 if self._notification_socket:
1004 if self._notification_socket:
1005 self._flush_notifications()
1005 self._flush_notifications()
1006 if self._iopub_socket:
1006 if self._iopub_socket:
1007 self._flush_iopub(self._iopub_socket)
1007 self._flush_iopub(self._iopub_socket)
1008 if self._mux_socket:
1008 if self._mux_socket:
1009 self._flush_results(self._mux_socket)
1009 self._flush_results(self._mux_socket)
1010 if self._task_socket:
1010 if self._task_socket:
1011 self._flush_results(self._task_socket)
1011 self._flush_results(self._task_socket)
1012 if self._control_socket:
1012 if self._control_socket:
1013 self._flush_control(self._control_socket)
1013 self._flush_control(self._control_socket)
1014 if self._query_socket:
1014 if self._query_socket:
1015 self._flush_ignored_hub_replies()
1015 self._flush_ignored_hub_replies()
1016
1016
1017 def wait(self, jobs=None, timeout=-1):
1017 def wait(self, jobs=None, timeout=-1):
1018 """waits on one or more `jobs`, for up to `timeout` seconds.
1018 """waits on one or more `jobs`, for up to `timeout` seconds.
1019
1019
1020 Parameters
1020 Parameters
1021 ----------
1021 ----------
1022
1022
1023 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1023 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1024 ints are indices to self.history
1024 ints are indices to self.history
1025 strs are msg_ids
1025 strs are msg_ids
1026 default: wait on all outstanding messages
1026 default: wait on all outstanding messages
1027 timeout : float
1027 timeout : float
1028 a time in seconds, after which to give up.
1028 a time in seconds, after which to give up.
1029 default is -1, which means no timeout
1029 default is -1, which means no timeout
1030
1030
1031 Returns
1031 Returns
1032 -------
1032 -------
1033
1033
1034 True : when all msg_ids are done
1034 True : when all msg_ids are done
1035 False : timeout reached, some msg_ids still outstanding
1035 False : timeout reached, some msg_ids still outstanding
1036 """
1036 """
1037 tic = time.time()
1037 tic = time.time()
1038 if jobs is None:
1038 if jobs is None:
1039 theids = self.outstanding
1039 theids = self.outstanding
1040 else:
1040 else:
1041 if isinstance(jobs, (int, basestring, AsyncResult)):
1041 if isinstance(jobs, (int, basestring, AsyncResult)):
1042 jobs = [jobs]
1042 jobs = [jobs]
1043 theids = set()
1043 theids = set()
1044 for job in jobs:
1044 for job in jobs:
1045 if isinstance(job, int):
1045 if isinstance(job, int):
1046 # index access
1046 # index access
1047 job = self.history[job]
1047 job = self.history[job]
1048 elif isinstance(job, AsyncResult):
1048 elif isinstance(job, AsyncResult):
1049 map(theids.add, job.msg_ids)
1049 map(theids.add, job.msg_ids)
1050 continue
1050 continue
1051 theids.add(job)
1051 theids.add(job)
1052 if not theids.intersection(self.outstanding):
1052 if not theids.intersection(self.outstanding):
1053 return True
1053 return True
1054 self.spin()
1054 self.spin()
1055 while theids.intersection(self.outstanding):
1055 while theids.intersection(self.outstanding):
1056 if timeout >= 0 and ( time.time()-tic ) > timeout:
1056 if timeout >= 0 and ( time.time()-tic ) > timeout:
1057 break
1057 break
1058 time.sleep(1e-3)
1058 time.sleep(1e-3)
1059 self.spin()
1059 self.spin()
1060 return len(theids.intersection(self.outstanding)) == 0
1060 return len(theids.intersection(self.outstanding)) == 0
1061
1061
1062 #--------------------------------------------------------------------------
1062 #--------------------------------------------------------------------------
1063 # Control methods
1063 # Control methods
1064 #--------------------------------------------------------------------------
1064 #--------------------------------------------------------------------------
1065
1065
1066 @spin_first
1066 @spin_first
1067 def clear(self, targets=None, block=None):
1067 def clear(self, targets=None, block=None):
1068 """Clear the namespace in target(s)."""
1068 """Clear the namespace in target(s)."""
1069 block = self.block if block is None else block
1069 block = self.block if block is None else block
1070 targets = self._build_targets(targets)[0]
1070 targets = self._build_targets(targets)[0]
1071 for t in targets:
1071 for t in targets:
1072 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1072 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1073 error = False
1073 error = False
1074 if block:
1074 if block:
1075 self._flush_ignored_control()
1075 self._flush_ignored_control()
1076 for i in range(len(targets)):
1076 for i in range(len(targets)):
1077 idents,msg = self.session.recv(self._control_socket,0)
1077 idents,msg = self.session.recv(self._control_socket,0)
1078 if self.debug:
1078 if self.debug:
1079 pprint(msg)
1079 pprint(msg)
1080 if msg['content']['status'] != 'ok':
1080 if msg['content']['status'] != 'ok':
1081 error = self._unwrap_exception(msg['content'])
1081 error = self._unwrap_exception(msg['content'])
1082 else:
1082 else:
1083 self._ignored_control_replies += len(targets)
1083 self._ignored_control_replies += len(targets)
1084 if error:
1084 if error:
1085 raise error
1085 raise error
1086
1086
1087
1087
1088 @spin_first
1088 @spin_first
1089 def abort(self, jobs=None, targets=None, block=None):
1089 def abort(self, jobs=None, targets=None, block=None):
1090 """Abort specific jobs from the execution queues of target(s).
1090 """Abort specific jobs from the execution queues of target(s).
1091
1091
1092 This is a mechanism to prevent jobs that have already been submitted
1092 This is a mechanism to prevent jobs that have already been submitted
1093 from executing.
1093 from executing.
1094
1094
1095 Parameters
1095 Parameters
1096 ----------
1096 ----------
1097
1097
1098 jobs : msg_id, list of msg_ids, or AsyncResult
1098 jobs : msg_id, list of msg_ids, or AsyncResult
1099 The jobs to be aborted
1099 The jobs to be aborted
1100
1100
1101 If unspecified/None: abort all outstanding jobs.
1101 If unspecified/None: abort all outstanding jobs.
1102
1102
1103 """
1103 """
1104 block = self.block if block is None else block
1104 block = self.block if block is None else block
1105 jobs = jobs if jobs is not None else list(self.outstanding)
1105 jobs = jobs if jobs is not None else list(self.outstanding)
1106 targets = self._build_targets(targets)[0]
1106 targets = self._build_targets(targets)[0]
1107
1107
1108 msg_ids = []
1108 msg_ids = []
1109 if isinstance(jobs, (basestring,AsyncResult)):
1109 if isinstance(jobs, (basestring,AsyncResult)):
1110 jobs = [jobs]
1110 jobs = [jobs]
1111 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1111 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1112 if bad_ids:
1112 if bad_ids:
1113 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1113 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1114 for j in jobs:
1114 for j in jobs:
1115 if isinstance(j, AsyncResult):
1115 if isinstance(j, AsyncResult):
1116 msg_ids.extend(j.msg_ids)
1116 msg_ids.extend(j.msg_ids)
1117 else:
1117 else:
1118 msg_ids.append(j)
1118 msg_ids.append(j)
1119 content = dict(msg_ids=msg_ids)
1119 content = dict(msg_ids=msg_ids)
1120 for t in targets:
1120 for t in targets:
1121 self.session.send(self._control_socket, 'abort_request',
1121 self.session.send(self._control_socket, 'abort_request',
1122 content=content, ident=t)
1122 content=content, ident=t)
1123 error = False
1123 error = False
1124 if block:
1124 if block:
1125 self._flush_ignored_control()
1125 self._flush_ignored_control()
1126 for i in range(len(targets)):
1126 for i in range(len(targets)):
1127 idents,msg = self.session.recv(self._control_socket,0)
1127 idents,msg = self.session.recv(self._control_socket,0)
1128 if self.debug:
1128 if self.debug:
1129 pprint(msg)
1129 pprint(msg)
1130 if msg['content']['status'] != 'ok':
1130 if msg['content']['status'] != 'ok':
1131 error = self._unwrap_exception(msg['content'])
1131 error = self._unwrap_exception(msg['content'])
1132 else:
1132 else:
1133 self._ignored_control_replies += len(targets)
1133 self._ignored_control_replies += len(targets)
1134 if error:
1134 if error:
1135 raise error
1135 raise error
1136
1136
1137 @spin_first
1137 @spin_first
1138 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1138 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1139 """Terminates one or more engine processes, optionally including the hub.
1139 """Terminates one or more engine processes, optionally including the hub.
1140
1140
1141 Parameters
1141 Parameters
1142 ----------
1142 ----------
1143
1143
1144 targets: list of ints or 'all' [default: all]
1144 targets: list of ints or 'all' [default: all]
1145 Which engines to shutdown.
1145 Which engines to shutdown.
1146 hub: bool [default: False]
1146 hub: bool [default: False]
1147 Whether to include the Hub. hub=True implies targets='all'.
1147 Whether to include the Hub. hub=True implies targets='all'.
1148 block: bool [default: self.block]
1148 block: bool [default: self.block]
1149 Whether to wait for clean shutdown replies or not.
1149 Whether to wait for clean shutdown replies or not.
1150 restart: bool [default: False]
1150 restart: bool [default: False]
1151 NOT IMPLEMENTED
1151 NOT IMPLEMENTED
1152 whether to restart engines after shutting them down.
1152 whether to restart engines after shutting them down.
1153 """
1153 """
1154
1154 from IPython.parallel.error import NoEnginesRegistered
1155 if restart:
1155 if restart:
1156 raise NotImplementedError("Engine restart is not yet implemented")
1156 raise NotImplementedError("Engine restart is not yet implemented")
1157
1157
1158 block = self.block if block is None else block
1158 block = self.block if block is None else block
1159 if hub:
1159 if hub:
1160 targets = 'all'
1160 targets = 'all'
1161 targets = self._build_targets(targets)[0]
1161 try:
1162 targets = self._build_targets(targets)[0]
1163 except NoEnginesRegistered:
1164 targets = []
1162 for t in targets:
1165 for t in targets:
1163 self.session.send(self._control_socket, 'shutdown_request',
1166 self.session.send(self._control_socket, 'shutdown_request',
1164 content={'restart':restart},ident=t)
1167 content={'restart':restart},ident=t)
1165 error = False
1168 error = False
1166 if block or hub:
1169 if block or hub:
1167 self._flush_ignored_control()
1170 self._flush_ignored_control()
1168 for i in range(len(targets)):
1171 for i in range(len(targets)):
1169 idents,msg = self.session.recv(self._control_socket, 0)
1172 idents,msg = self.session.recv(self._control_socket, 0)
1170 if self.debug:
1173 if self.debug:
1171 pprint(msg)
1174 pprint(msg)
1172 if msg['content']['status'] != 'ok':
1175 if msg['content']['status'] != 'ok':
1173 error = self._unwrap_exception(msg['content'])
1176 error = self._unwrap_exception(msg['content'])
1174 else:
1177 else:
1175 self._ignored_control_replies += len(targets)
1178 self._ignored_control_replies += len(targets)
1176
1179
1177 if hub:
1180 if hub:
1178 time.sleep(0.25)
1181 time.sleep(0.25)
1179 self.session.send(self._query_socket, 'shutdown_request')
1182 self.session.send(self._query_socket, 'shutdown_request')
1180 idents,msg = self.session.recv(self._query_socket, 0)
1183 idents,msg = self.session.recv(self._query_socket, 0)
1181 if self.debug:
1184 if self.debug:
1182 pprint(msg)
1185 pprint(msg)
1183 if msg['content']['status'] != 'ok':
1186 if msg['content']['status'] != 'ok':
1184 error = self._unwrap_exception(msg['content'])
1187 error = self._unwrap_exception(msg['content'])
1185
1188
1186 if error:
1189 if error:
1187 raise error
1190 raise error
1188
1191
1189 #--------------------------------------------------------------------------
1192 #--------------------------------------------------------------------------
1190 # Execution related methods
1193 # Execution related methods
1191 #--------------------------------------------------------------------------
1194 #--------------------------------------------------------------------------
1192
1195
1193 def _maybe_raise(self, result):
1196 def _maybe_raise(self, result):
1194 """wrapper for maybe raising an exception if apply failed."""
1197 """wrapper for maybe raising an exception if apply failed."""
1195 if isinstance(result, error.RemoteError):
1198 if isinstance(result, error.RemoteError):
1196 raise result
1199 raise result
1197
1200
1198 return result
1201 return result
1199
1202
1200 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1203 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1201 ident=None):
1204 ident=None):
1202 """construct and send an apply message via a socket.
1205 """construct and send an apply message via a socket.
1203
1206
1204 This is the principal method with which all engine execution is performed by views.
1207 This is the principal method with which all engine execution is performed by views.
1205 """
1208 """
1206
1209
1207 if self._closed:
1210 if self._closed:
1208 raise RuntimeError("Client cannot be used after its sockets have been closed")
1211 raise RuntimeError("Client cannot be used after its sockets have been closed")
1209
1212
1210 # defaults:
1213 # defaults:
1211 args = args if args is not None else []
1214 args = args if args is not None else []
1212 kwargs = kwargs if kwargs is not None else {}
1215 kwargs = kwargs if kwargs is not None else {}
1213 metadata = metadata if metadata is not None else {}
1216 metadata = metadata if metadata is not None else {}
1214
1217
1215 # validate arguments
1218 # validate arguments
1216 if not callable(f) and not isinstance(f, Reference):
1219 if not callable(f) and not isinstance(f, Reference):
1217 raise TypeError("f must be callable, not %s"%type(f))
1220 raise TypeError("f must be callable, not %s"%type(f))
1218 if not isinstance(args, (tuple, list)):
1221 if not isinstance(args, (tuple, list)):
1219 raise TypeError("args must be tuple or list, not %s"%type(args))
1222 raise TypeError("args must be tuple or list, not %s"%type(args))
1220 if not isinstance(kwargs, dict):
1223 if not isinstance(kwargs, dict):
1221 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1224 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1222 if not isinstance(metadata, dict):
1225 if not isinstance(metadata, dict):
1223 raise TypeError("metadata must be dict, not %s"%type(metadata))
1226 raise TypeError("metadata must be dict, not %s"%type(metadata))
1224
1227
1225 bufs = serialize.pack_apply_message(f, args, kwargs,
1228 bufs = serialize.pack_apply_message(f, args, kwargs,
1226 buffer_threshold=self.session.buffer_threshold,
1229 buffer_threshold=self.session.buffer_threshold,
1227 item_threshold=self.session.item_threshold,
1230 item_threshold=self.session.item_threshold,
1228 )
1231 )
1229
1232
1230 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1233 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1231 metadata=metadata, track=track)
1234 metadata=metadata, track=track)
1232
1235
1233 msg_id = msg['header']['msg_id']
1236 msg_id = msg['header']['msg_id']
1234 self.outstanding.add(msg_id)
1237 self.outstanding.add(msg_id)
1235 if ident:
1238 if ident:
1236 # possibly routed to a specific engine
1239 # possibly routed to a specific engine
1237 if isinstance(ident, list):
1240 if isinstance(ident, list):
1238 ident = ident[-1]
1241 ident = ident[-1]
1239 if ident in self._engines.values():
1242 if ident in self._engines.values():
1240 # save for later, in case of engine death
1243 # save for later, in case of engine death
1241 self._outstanding_dict[ident].add(msg_id)
1244 self._outstanding_dict[ident].add(msg_id)
1242 self.history.append(msg_id)
1245 self.history.append(msg_id)
1243 self.metadata[msg_id]['submitted'] = datetime.now()
1246 self.metadata[msg_id]['submitted'] = datetime.now()
1244
1247
1245 return msg
1248 return msg
1246
1249
1247 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1250 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1248 """construct and send an execute request via a socket.
1251 """construct and send an execute request via a socket.
1249
1252
1250 """
1253 """
1251
1254
1252 if self._closed:
1255 if self._closed:
1253 raise RuntimeError("Client cannot be used after its sockets have been closed")
1256 raise RuntimeError("Client cannot be used after its sockets have been closed")
1254
1257
1255 # defaults:
1258 # defaults:
1256 metadata = metadata if metadata is not None else {}
1259 metadata = metadata if metadata is not None else {}
1257
1260
1258 # validate arguments
1261 # validate arguments
1259 if not isinstance(code, basestring):
1262 if not isinstance(code, basestring):
1260 raise TypeError("code must be text, not %s" % type(code))
1263 raise TypeError("code must be text, not %s" % type(code))
1261 if not isinstance(metadata, dict):
1264 if not isinstance(metadata, dict):
1262 raise TypeError("metadata must be dict, not %s" % type(metadata))
1265 raise TypeError("metadata must be dict, not %s" % type(metadata))
1263
1266
1264 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1267 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1265
1268
1266
1269
1267 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1270 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1268 metadata=metadata)
1271 metadata=metadata)
1269
1272
1270 msg_id = msg['header']['msg_id']
1273 msg_id = msg['header']['msg_id']
1271 self.outstanding.add(msg_id)
1274 self.outstanding.add(msg_id)
1272 if ident:
1275 if ident:
1273 # possibly routed to a specific engine
1276 # possibly routed to a specific engine
1274 if isinstance(ident, list):
1277 if isinstance(ident, list):
1275 ident = ident[-1]
1278 ident = ident[-1]
1276 if ident in self._engines.values():
1279 if ident in self._engines.values():
1277 # save for later, in case of engine death
1280 # save for later, in case of engine death
1278 self._outstanding_dict[ident].add(msg_id)
1281 self._outstanding_dict[ident].add(msg_id)
1279 self.history.append(msg_id)
1282 self.history.append(msg_id)
1280 self.metadata[msg_id]['submitted'] = datetime.now()
1283 self.metadata[msg_id]['submitted'] = datetime.now()
1281
1284
1282 return msg
1285 return msg
1283
1286
1284 #--------------------------------------------------------------------------
1287 #--------------------------------------------------------------------------
1285 # construct a View object
1288 # construct a View object
1286 #--------------------------------------------------------------------------
1289 #--------------------------------------------------------------------------
1287
1290
1288 def load_balanced_view(self, targets=None):
1291 def load_balanced_view(self, targets=None):
1289 """construct a DirectView object.
1292 """construct a DirectView object.
1290
1293
1291 If no arguments are specified, create a LoadBalancedView
1294 If no arguments are specified, create a LoadBalancedView
1292 using all engines.
1295 using all engines.
1293
1296
1294 Parameters
1297 Parameters
1295 ----------
1298 ----------
1296
1299
1297 targets: list,slice,int,etc. [default: use all engines]
1300 targets: list,slice,int,etc. [default: use all engines]
1298 The subset of engines across which to load-balance
1301 The subset of engines across which to load-balance
1299 """
1302 """
1300 if targets == 'all':
1303 if targets == 'all':
1301 targets = None
1304 targets = None
1302 if targets is not None:
1305 if targets is not None:
1303 targets = self._build_targets(targets)[1]
1306 targets = self._build_targets(targets)[1]
1304 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1307 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1305
1308
1306 def direct_view(self, targets='all'):
1309 def direct_view(self, targets='all'):
1307 """construct a DirectView object.
1310 """construct a DirectView object.
1308
1311
1309 If no targets are specified, create a DirectView using all engines.
1312 If no targets are specified, create a DirectView using all engines.
1310
1313
1311 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1314 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1312 evaluate the target engines at each execution, whereas rc[:] will connect to
1315 evaluate the target engines at each execution, whereas rc[:] will connect to
1313 all *current* engines, and that list will not change.
1316 all *current* engines, and that list will not change.
1314
1317
1315 That is, 'all' will always use all engines, whereas rc[:] will not use
1318 That is, 'all' will always use all engines, whereas rc[:] will not use
1316 engines added after the DirectView is constructed.
1319 engines added after the DirectView is constructed.
1317
1320
1318 Parameters
1321 Parameters
1319 ----------
1322 ----------
1320
1323
1321 targets: list,slice,int,etc. [default: use all engines]
1324 targets: list,slice,int,etc. [default: use all engines]
1322 The engines to use for the View
1325 The engines to use for the View
1323 """
1326 """
1324 single = isinstance(targets, int)
1327 single = isinstance(targets, int)
1325 # allow 'all' to be lazily evaluated at each execution
1328 # allow 'all' to be lazily evaluated at each execution
1326 if targets != 'all':
1329 if targets != 'all':
1327 targets = self._build_targets(targets)[1]
1330 targets = self._build_targets(targets)[1]
1328 if single:
1331 if single:
1329 targets = targets[0]
1332 targets = targets[0]
1330 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1333 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1331
1334
1332 #--------------------------------------------------------------------------
1335 #--------------------------------------------------------------------------
1333 # Query methods
1336 # Query methods
1334 #--------------------------------------------------------------------------
1337 #--------------------------------------------------------------------------
1335
1338
1336 @spin_first
1339 @spin_first
1337 def get_result(self, indices_or_msg_ids=None, block=None):
1340 def get_result(self, indices_or_msg_ids=None, block=None):
1338 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1341 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1339
1342
1340 If the client already has the results, no request to the Hub will be made.
1343 If the client already has the results, no request to the Hub will be made.
1341
1344
1342 This is a convenient way to construct AsyncResult objects, which are wrappers
1345 This is a convenient way to construct AsyncResult objects, which are wrappers
1343 that include metadata about execution, and allow for awaiting results that
1346 that include metadata about execution, and allow for awaiting results that
1344 were not submitted by this Client.
1347 were not submitted by this Client.
1345
1348
1346 It can also be a convenient way to retrieve the metadata associated with
1349 It can also be a convenient way to retrieve the metadata associated with
1347 blocking execution, since it always retrieves
1350 blocking execution, since it always retrieves
1348
1351
1349 Examples
1352 Examples
1350 --------
1353 --------
1351 ::
1354 ::
1352
1355
1353 In [10]: r = client.apply()
1356 In [10]: r = client.apply()
1354
1357
1355 Parameters
1358 Parameters
1356 ----------
1359 ----------
1357
1360
1358 indices_or_msg_ids : integer history index, str msg_id, or list of either
1361 indices_or_msg_ids : integer history index, str msg_id, or list of either
1359 The indices or msg_ids of indices to be retrieved
1362 The indices or msg_ids of indices to be retrieved
1360
1363
1361 block : bool
1364 block : bool
1362 Whether to wait for the result to be done
1365 Whether to wait for the result to be done
1363
1366
1364 Returns
1367 Returns
1365 -------
1368 -------
1366
1369
1367 AsyncResult
1370 AsyncResult
1368 A single AsyncResult object will always be returned.
1371 A single AsyncResult object will always be returned.
1369
1372
1370 AsyncHubResult
1373 AsyncHubResult
1371 A subclass of AsyncResult that retrieves results from the Hub
1374 A subclass of AsyncResult that retrieves results from the Hub
1372
1375
1373 """
1376 """
1374 block = self.block if block is None else block
1377 block = self.block if block is None else block
1375 if indices_or_msg_ids is None:
1378 if indices_or_msg_ids is None:
1376 indices_or_msg_ids = -1
1379 indices_or_msg_ids = -1
1377
1380
1378 if not isinstance(indices_or_msg_ids, (list,tuple)):
1381 if not isinstance(indices_or_msg_ids, (list,tuple)):
1379 indices_or_msg_ids = [indices_or_msg_ids]
1382 indices_or_msg_ids = [indices_or_msg_ids]
1380
1383
1381 theids = []
1384 theids = []
1382 for id in indices_or_msg_ids:
1385 for id in indices_or_msg_ids:
1383 if isinstance(id, int):
1386 if isinstance(id, int):
1384 id = self.history[id]
1387 id = self.history[id]
1385 if not isinstance(id, basestring):
1388 if not isinstance(id, basestring):
1386 raise TypeError("indices must be str or int, not %r"%id)
1389 raise TypeError("indices must be str or int, not %r"%id)
1387 theids.append(id)
1390 theids.append(id)
1388
1391
1389 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1392 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1390 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1393 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1391
1394
1392 if remote_ids:
1395 if remote_ids:
1393 ar = AsyncHubResult(self, msg_ids=theids)
1396 ar = AsyncHubResult(self, msg_ids=theids)
1394 else:
1397 else:
1395 ar = AsyncResult(self, msg_ids=theids)
1398 ar = AsyncResult(self, msg_ids=theids)
1396
1399
1397 if block:
1400 if block:
1398 ar.wait()
1401 ar.wait()
1399
1402
1400 return ar
1403 return ar
1401
1404
1402 @spin_first
1405 @spin_first
1403 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1406 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1404 """Resubmit one or more tasks.
1407 """Resubmit one or more tasks.
1405
1408
1406 in-flight tasks may not be resubmitted.
1409 in-flight tasks may not be resubmitted.
1407
1410
1408 Parameters
1411 Parameters
1409 ----------
1412 ----------
1410
1413
1411 indices_or_msg_ids : integer history index, str msg_id, or list of either
1414 indices_or_msg_ids : integer history index, str msg_id, or list of either
1412 The indices or msg_ids of indices to be retrieved
1415 The indices or msg_ids of indices to be retrieved
1413
1416
1414 block : bool
1417 block : bool
1415 Whether to wait for the result to be done
1418 Whether to wait for the result to be done
1416
1419
1417 Returns
1420 Returns
1418 -------
1421 -------
1419
1422
1420 AsyncHubResult
1423 AsyncHubResult
1421 A subclass of AsyncResult that retrieves results from the Hub
1424 A subclass of AsyncResult that retrieves results from the Hub
1422
1425
1423 """
1426 """
1424 block = self.block if block is None else block
1427 block = self.block if block is None else block
1425 if indices_or_msg_ids is None:
1428 if indices_or_msg_ids is None:
1426 indices_or_msg_ids = -1
1429 indices_or_msg_ids = -1
1427
1430
1428 if not isinstance(indices_or_msg_ids, (list,tuple)):
1431 if not isinstance(indices_or_msg_ids, (list,tuple)):
1429 indices_or_msg_ids = [indices_or_msg_ids]
1432 indices_or_msg_ids = [indices_or_msg_ids]
1430
1433
1431 theids = []
1434 theids = []
1432 for id in indices_or_msg_ids:
1435 for id in indices_or_msg_ids:
1433 if isinstance(id, int):
1436 if isinstance(id, int):
1434 id = self.history[id]
1437 id = self.history[id]
1435 if not isinstance(id, basestring):
1438 if not isinstance(id, basestring):
1436 raise TypeError("indices must be str or int, not %r"%id)
1439 raise TypeError("indices must be str or int, not %r"%id)
1437 theids.append(id)
1440 theids.append(id)
1438
1441
1439 content = dict(msg_ids = theids)
1442 content = dict(msg_ids = theids)
1440
1443
1441 self.session.send(self._query_socket, 'resubmit_request', content)
1444 self.session.send(self._query_socket, 'resubmit_request', content)
1442
1445
1443 zmq.select([self._query_socket], [], [])
1446 zmq.select([self._query_socket], [], [])
1444 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1447 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1445 if self.debug:
1448 if self.debug:
1446 pprint(msg)
1449 pprint(msg)
1447 content = msg['content']
1450 content = msg['content']
1448 if content['status'] != 'ok':
1451 if content['status'] != 'ok':
1449 raise self._unwrap_exception(content)
1452 raise self._unwrap_exception(content)
1450 mapping = content['resubmitted']
1453 mapping = content['resubmitted']
1451 new_ids = [ mapping[msg_id] for msg_id in theids ]
1454 new_ids = [ mapping[msg_id] for msg_id in theids ]
1452
1455
1453 ar = AsyncHubResult(self, msg_ids=new_ids)
1456 ar = AsyncHubResult(self, msg_ids=new_ids)
1454
1457
1455 if block:
1458 if block:
1456 ar.wait()
1459 ar.wait()
1457
1460
1458 return ar
1461 return ar
1459
1462
1460 @spin_first
1463 @spin_first
1461 def result_status(self, msg_ids, status_only=True):
1464 def result_status(self, msg_ids, status_only=True):
1462 """Check on the status of the result(s) of the apply request with `msg_ids`.
1465 """Check on the status of the result(s) of the apply request with `msg_ids`.
1463
1466
1464 If status_only is False, then the actual results will be retrieved, else
1467 If status_only is False, then the actual results will be retrieved, else
1465 only the status of the results will be checked.
1468 only the status of the results will be checked.
1466
1469
1467 Parameters
1470 Parameters
1468 ----------
1471 ----------
1469
1472
1470 msg_ids : list of msg_ids
1473 msg_ids : list of msg_ids
1471 if int:
1474 if int:
1472 Passed as index to self.history for convenience.
1475 Passed as index to self.history for convenience.
1473 status_only : bool (default: True)
1476 status_only : bool (default: True)
1474 if False:
1477 if False:
1475 Retrieve the actual results of completed tasks.
1478 Retrieve the actual results of completed tasks.
1476
1479
1477 Returns
1480 Returns
1478 -------
1481 -------
1479
1482
1480 results : dict
1483 results : dict
1481 There will always be the keys 'pending' and 'completed', which will
1484 There will always be the keys 'pending' and 'completed', which will
1482 be lists of msg_ids that are incomplete or complete. If `status_only`
1485 be lists of msg_ids that are incomplete or complete. If `status_only`
1483 is False, then completed results will be keyed by their `msg_id`.
1486 is False, then completed results will be keyed by their `msg_id`.
1484 """
1487 """
1485 if not isinstance(msg_ids, (list,tuple)):
1488 if not isinstance(msg_ids, (list,tuple)):
1486 msg_ids = [msg_ids]
1489 msg_ids = [msg_ids]
1487
1490
1488 theids = []
1491 theids = []
1489 for msg_id in msg_ids:
1492 for msg_id in msg_ids:
1490 if isinstance(msg_id, int):
1493 if isinstance(msg_id, int):
1491 msg_id = self.history[msg_id]
1494 msg_id = self.history[msg_id]
1492 if not isinstance(msg_id, basestring):
1495 if not isinstance(msg_id, basestring):
1493 raise TypeError("msg_ids must be str, not %r"%msg_id)
1496 raise TypeError("msg_ids must be str, not %r"%msg_id)
1494 theids.append(msg_id)
1497 theids.append(msg_id)
1495
1498
1496 completed = []
1499 completed = []
1497 local_results = {}
1500 local_results = {}
1498
1501
1499 # comment this block out to temporarily disable local shortcut:
1502 # comment this block out to temporarily disable local shortcut:
1500 for msg_id in theids:
1503 for msg_id in theids:
1501 if msg_id in self.results:
1504 if msg_id in self.results:
1502 completed.append(msg_id)
1505 completed.append(msg_id)
1503 local_results[msg_id] = self.results[msg_id]
1506 local_results[msg_id] = self.results[msg_id]
1504 theids.remove(msg_id)
1507 theids.remove(msg_id)
1505
1508
1506 if theids: # some not locally cached
1509 if theids: # some not locally cached
1507 content = dict(msg_ids=theids, status_only=status_only)
1510 content = dict(msg_ids=theids, status_only=status_only)
1508 msg = self.session.send(self._query_socket, "result_request", content=content)
1511 msg = self.session.send(self._query_socket, "result_request", content=content)
1509 zmq.select([self._query_socket], [], [])
1512 zmq.select([self._query_socket], [], [])
1510 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1513 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1511 if self.debug:
1514 if self.debug:
1512 pprint(msg)
1515 pprint(msg)
1513 content = msg['content']
1516 content = msg['content']
1514 if content['status'] != 'ok':
1517 if content['status'] != 'ok':
1515 raise self._unwrap_exception(content)
1518 raise self._unwrap_exception(content)
1516 buffers = msg['buffers']
1519 buffers = msg['buffers']
1517 else:
1520 else:
1518 content = dict(completed=[],pending=[])
1521 content = dict(completed=[],pending=[])
1519
1522
1520 content['completed'].extend(completed)
1523 content['completed'].extend(completed)
1521
1524
1522 if status_only:
1525 if status_only:
1523 return content
1526 return content
1524
1527
1525 failures = []
1528 failures = []
1526 # load cached results into result:
1529 # load cached results into result:
1527 content.update(local_results)
1530 content.update(local_results)
1528
1531
1529 # update cache with results:
1532 # update cache with results:
1530 for msg_id in sorted(theids):
1533 for msg_id in sorted(theids):
1531 if msg_id in content['completed']:
1534 if msg_id in content['completed']:
1532 rec = content[msg_id]
1535 rec = content[msg_id]
1533 parent = rec['header']
1536 parent = rec['header']
1534 header = rec['result_header']
1537 header = rec['result_header']
1535 rcontent = rec['result_content']
1538 rcontent = rec['result_content']
1536 iodict = rec['io']
1539 iodict = rec['io']
1537 if isinstance(rcontent, str):
1540 if isinstance(rcontent, str):
1538 rcontent = self.session.unpack(rcontent)
1541 rcontent = self.session.unpack(rcontent)
1539
1542
1540 md = self.metadata[msg_id]
1543 md = self.metadata[msg_id]
1541 md_msg = dict(
1544 md_msg = dict(
1542 content=rcontent,
1545 content=rcontent,
1543 parent_header=parent,
1546 parent_header=parent,
1544 header=header,
1547 header=header,
1545 metadata=rec['result_metadata'],
1548 metadata=rec['result_metadata'],
1546 )
1549 )
1547 md.update(self._extract_metadata(md_msg))
1550 md.update(self._extract_metadata(md_msg))
1548 if rec.get('received'):
1551 if rec.get('received'):
1549 md['received'] = rec['received']
1552 md['received'] = rec['received']
1550 md.update(iodict)
1553 md.update(iodict)
1551
1554
1552 if rcontent['status'] == 'ok':
1555 if rcontent['status'] == 'ok':
1553 if header['msg_type'] == 'apply_reply':
1556 if header['msg_type'] == 'apply_reply':
1554 res,buffers = serialize.unserialize_object(buffers)
1557 res,buffers = serialize.unserialize_object(buffers)
1555 elif header['msg_type'] == 'execute_reply':
1558 elif header['msg_type'] == 'execute_reply':
1556 res = ExecuteReply(msg_id, rcontent, md)
1559 res = ExecuteReply(msg_id, rcontent, md)
1557 else:
1560 else:
1558 raise KeyError("unhandled msg type: %r" % header[msg_type])
1561 raise KeyError("unhandled msg type: %r" % header[msg_type])
1559 else:
1562 else:
1560 res = self._unwrap_exception(rcontent)
1563 res = self._unwrap_exception(rcontent)
1561 failures.append(res)
1564 failures.append(res)
1562
1565
1563 self.results[msg_id] = res
1566 self.results[msg_id] = res
1564 content[msg_id] = res
1567 content[msg_id] = res
1565
1568
1566 if len(theids) == 1 and failures:
1569 if len(theids) == 1 and failures:
1567 raise failures[0]
1570 raise failures[0]
1568
1571
1569 error.collect_exceptions(failures, "result_status")
1572 error.collect_exceptions(failures, "result_status")
1570 return content
1573 return content
1571
1574
1572 @spin_first
1575 @spin_first
1573 def queue_status(self, targets='all', verbose=False):
1576 def queue_status(self, targets='all', verbose=False):
1574 """Fetch the status of engine queues.
1577 """Fetch the status of engine queues.
1575
1578
1576 Parameters
1579 Parameters
1577 ----------
1580 ----------
1578
1581
1579 targets : int/str/list of ints/strs
1582 targets : int/str/list of ints/strs
1580 the engines whose states are to be queried.
1583 the engines whose states are to be queried.
1581 default : all
1584 default : all
1582 verbose : bool
1585 verbose : bool
1583 Whether to return lengths only, or lists of ids for each element
1586 Whether to return lengths only, or lists of ids for each element
1584 """
1587 """
1585 if targets == 'all':
1588 if targets == 'all':
1586 # allow 'all' to be evaluated on the engine
1589 # allow 'all' to be evaluated on the engine
1587 engine_ids = None
1590 engine_ids = None
1588 else:
1591 else:
1589 engine_ids = self._build_targets(targets)[1]
1592 engine_ids = self._build_targets(targets)[1]
1590 content = dict(targets=engine_ids, verbose=verbose)
1593 content = dict(targets=engine_ids, verbose=verbose)
1591 self.session.send(self._query_socket, "queue_request", content=content)
1594 self.session.send(self._query_socket, "queue_request", content=content)
1592 idents,msg = self.session.recv(self._query_socket, 0)
1595 idents,msg = self.session.recv(self._query_socket, 0)
1593 if self.debug:
1596 if self.debug:
1594 pprint(msg)
1597 pprint(msg)
1595 content = msg['content']
1598 content = msg['content']
1596 status = content.pop('status')
1599 status = content.pop('status')
1597 if status != 'ok':
1600 if status != 'ok':
1598 raise self._unwrap_exception(content)
1601 raise self._unwrap_exception(content)
1599 content = rekey(content)
1602 content = rekey(content)
1600 if isinstance(targets, int):
1603 if isinstance(targets, int):
1601 return content[targets]
1604 return content[targets]
1602 else:
1605 else:
1603 return content
1606 return content
1604
1607
1605 @spin_first
1608 @spin_first
1606 def purge_results(self, jobs=[], targets=[]):
1609 def purge_results(self, jobs=[], targets=[]):
1607 """Tell the Hub to forget results.
1610 """Tell the Hub to forget results.
1608
1611
1609 Individual results can be purged by msg_id, or the entire
1612 Individual results can be purged by msg_id, or the entire
1610 history of specific targets can be purged.
1613 history of specific targets can be purged.
1611
1614
1612 Use `purge_results('all')` to scrub everything from the Hub's db.
1615 Use `purge_results('all')` to scrub everything from the Hub's db.
1613
1616
1614 Parameters
1617 Parameters
1615 ----------
1618 ----------
1616
1619
1617 jobs : str or list of str or AsyncResult objects
1620 jobs : str or list of str or AsyncResult objects
1618 the msg_ids whose results should be forgotten.
1621 the msg_ids whose results should be forgotten.
1619 targets : int/str/list of ints/strs
1622 targets : int/str/list of ints/strs
1620 The targets, by int_id, whose entire history is to be purged.
1623 The targets, by int_id, whose entire history is to be purged.
1621
1624
1622 default : None
1625 default : None
1623 """
1626 """
1624 if not targets and not jobs:
1627 if not targets and not jobs:
1625 raise ValueError("Must specify at least one of `targets` and `jobs`")
1628 raise ValueError("Must specify at least one of `targets` and `jobs`")
1626 if targets:
1629 if targets:
1627 targets = self._build_targets(targets)[1]
1630 targets = self._build_targets(targets)[1]
1628
1631
1629 # construct msg_ids from jobs
1632 # construct msg_ids from jobs
1630 if jobs == 'all':
1633 if jobs == 'all':
1631 msg_ids = jobs
1634 msg_ids = jobs
1632 else:
1635 else:
1633 msg_ids = []
1636 msg_ids = []
1634 if isinstance(jobs, (basestring,AsyncResult)):
1637 if isinstance(jobs, (basestring,AsyncResult)):
1635 jobs = [jobs]
1638 jobs = [jobs]
1636 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1639 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1637 if bad_ids:
1640 if bad_ids:
1638 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1641 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1639 for j in jobs:
1642 for j in jobs:
1640 if isinstance(j, AsyncResult):
1643 if isinstance(j, AsyncResult):
1641 msg_ids.extend(j.msg_ids)
1644 msg_ids.extend(j.msg_ids)
1642 else:
1645 else:
1643 msg_ids.append(j)
1646 msg_ids.append(j)
1644
1647
1645 content = dict(engine_ids=targets, msg_ids=msg_ids)
1648 content = dict(engine_ids=targets, msg_ids=msg_ids)
1646 self.session.send(self._query_socket, "purge_request", content=content)
1649 self.session.send(self._query_socket, "purge_request", content=content)
1647 idents, msg = self.session.recv(self._query_socket, 0)
1650 idents, msg = self.session.recv(self._query_socket, 0)
1648 if self.debug:
1651 if self.debug:
1649 pprint(msg)
1652 pprint(msg)
1650 content = msg['content']
1653 content = msg['content']
1651 if content['status'] != 'ok':
1654 if content['status'] != 'ok':
1652 raise self._unwrap_exception(content)
1655 raise self._unwrap_exception(content)
1653
1656
1654 @spin_first
1657 @spin_first
1655 def hub_history(self):
1658 def hub_history(self):
1656 """Get the Hub's history
1659 """Get the Hub's history
1657
1660
1658 Just like the Client, the Hub has a history, which is a list of msg_ids.
1661 Just like the Client, the Hub has a history, which is a list of msg_ids.
1659 This will contain the history of all clients, and, depending on configuration,
1662 This will contain the history of all clients, and, depending on configuration,
1660 may contain history across multiple cluster sessions.
1663 may contain history across multiple cluster sessions.
1661
1664
1662 Any msg_id returned here is a valid argument to `get_result`.
1665 Any msg_id returned here is a valid argument to `get_result`.
1663
1666
1664 Returns
1667 Returns
1665 -------
1668 -------
1666
1669
1667 msg_ids : list of strs
1670 msg_ids : list of strs
1668 list of all msg_ids, ordered by task submission time.
1671 list of all msg_ids, ordered by task submission time.
1669 """
1672 """
1670
1673
1671 self.session.send(self._query_socket, "history_request", content={})
1674 self.session.send(self._query_socket, "history_request", content={})
1672 idents, msg = self.session.recv(self._query_socket, 0)
1675 idents, msg = self.session.recv(self._query_socket, 0)
1673
1676
1674 if self.debug:
1677 if self.debug:
1675 pprint(msg)
1678 pprint(msg)
1676 content = msg['content']
1679 content = msg['content']
1677 if content['status'] != 'ok':
1680 if content['status'] != 'ok':
1678 raise self._unwrap_exception(content)
1681 raise self._unwrap_exception(content)
1679 else:
1682 else:
1680 return content['history']
1683 return content['history']
1681
1684
1682 @spin_first
1685 @spin_first
1683 def db_query(self, query, keys=None):
1686 def db_query(self, query, keys=None):
1684 """Query the Hub's TaskRecord database
1687 """Query the Hub's TaskRecord database
1685
1688
1686 This will return a list of task record dicts that match `query`
1689 This will return a list of task record dicts that match `query`
1687
1690
1688 Parameters
1691 Parameters
1689 ----------
1692 ----------
1690
1693
1691 query : mongodb query dict
1694 query : mongodb query dict
1692 The search dict. See mongodb query docs for details.
1695 The search dict. See mongodb query docs for details.
1693 keys : list of strs [optional]
1696 keys : list of strs [optional]
1694 The subset of keys to be returned. The default is to fetch everything but buffers.
1697 The subset of keys to be returned. The default is to fetch everything but buffers.
1695 'msg_id' will *always* be included.
1698 'msg_id' will *always* be included.
1696 """
1699 """
1697 if isinstance(keys, basestring):
1700 if isinstance(keys, basestring):
1698 keys = [keys]
1701 keys = [keys]
1699 content = dict(query=query, keys=keys)
1702 content = dict(query=query, keys=keys)
1700 self.session.send(self._query_socket, "db_request", content=content)
1703 self.session.send(self._query_socket, "db_request", content=content)
1701 idents, msg = self.session.recv(self._query_socket, 0)
1704 idents, msg = self.session.recv(self._query_socket, 0)
1702 if self.debug:
1705 if self.debug:
1703 pprint(msg)
1706 pprint(msg)
1704 content = msg['content']
1707 content = msg['content']
1705 if content['status'] != 'ok':
1708 if content['status'] != 'ok':
1706 raise self._unwrap_exception(content)
1709 raise self._unwrap_exception(content)
1707
1710
1708 records = content['records']
1711 records = content['records']
1709
1712
1710 buffer_lens = content['buffer_lens']
1713 buffer_lens = content['buffer_lens']
1711 result_buffer_lens = content['result_buffer_lens']
1714 result_buffer_lens = content['result_buffer_lens']
1712 buffers = msg['buffers']
1715 buffers = msg['buffers']
1713 has_bufs = buffer_lens is not None
1716 has_bufs = buffer_lens is not None
1714 has_rbufs = result_buffer_lens is not None
1717 has_rbufs = result_buffer_lens is not None
1715 for i,rec in enumerate(records):
1718 for i,rec in enumerate(records):
1716 # relink buffers
1719 # relink buffers
1717 if has_bufs:
1720 if has_bufs:
1718 blen = buffer_lens[i]
1721 blen = buffer_lens[i]
1719 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1722 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1720 if has_rbufs:
1723 if has_rbufs:
1721 blen = result_buffer_lens[i]
1724 blen = result_buffer_lens[i]
1722 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1725 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1723
1726
1724 return records
1727 return records
1725
1728
1726 __all__ = [ 'Client' ]
1729 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now