##// END OF EJS Templates
empty string should not be inserted
Robert McGibbon -
Show More
@@ -1,1729 +1,1729 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
36
36
37 from IPython.utils.coloransi import TermColors
37 from IPython.utils.coloransi import TermColors
38 from IPython.utils.jsonutil import rekey
38 from IPython.utils.jsonutil import rekey
39 from IPython.utils.localinterfaces import LOCAL_IPS
39 from IPython.utils.localinterfaces import LOCAL_IPS
40 from IPython.utils.path import get_ipython_dir
40 from IPython.utils.path import get_ipython_dir
41 from IPython.utils.py3compat import cast_bytes
41 from IPython.utils.py3compat import cast_bytes
42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
43 Dict, List, Bool, Set, Any)
43 Dict, List, Bool, Set, Any)
44 from IPython.external.decorator import decorator
44 from IPython.external.decorator import decorator
45 from IPython.external.ssh import tunnel
45 from IPython.external.ssh import tunnel
46
46
47 from IPython.parallel import Reference
47 from IPython.parallel import Reference
48 from IPython.parallel import error
48 from IPython.parallel import error
49 from IPython.parallel import util
49 from IPython.parallel import util
50
50
51 from IPython.zmq.session import Session, Message
51 from IPython.zmq.session import Session, Message
52 from IPython.zmq import serialize
52 from IPython.zmq import serialize
53
53
54 from .asyncresult import AsyncResult, AsyncHubResult
54 from .asyncresult import AsyncResult, AsyncHubResult
55 from .view import DirectView, LoadBalancedView
55 from .view import DirectView, LoadBalancedView
56
56
57 if sys.version_info[0] >= 3:
57 if sys.version_info[0] >= 3:
58 # xrange is used in a couple 'isinstance' tests in py2
58 # xrange is used in a couple 'isinstance' tests in py2
59 # should be just 'range' in 3k
59 # should be just 'range' in 3k
60 xrange = range
60 xrange = range
61
61
62 #--------------------------------------------------------------------------
62 #--------------------------------------------------------------------------
63 # Decorators for Client methods
63 # Decorators for Client methods
64 #--------------------------------------------------------------------------
64 #--------------------------------------------------------------------------
65
65
66 @decorator
66 @decorator
67 def spin_first(f, self, *args, **kwargs):
67 def spin_first(f, self, *args, **kwargs):
68 """Call spin() to sync state prior to calling the method."""
68 """Call spin() to sync state prior to calling the method."""
69 self.spin()
69 self.spin()
70 return f(self, *args, **kwargs)
70 return f(self, *args, **kwargs)
71
71
72
72
73 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
74 # Classes
74 # Classes
75 #--------------------------------------------------------------------------
75 #--------------------------------------------------------------------------
76
76
77
77
78 class ExecuteReply(object):
78 class ExecuteReply(object):
79 """wrapper for finished Execute results"""
79 """wrapper for finished Execute results"""
80 def __init__(self, msg_id, content, metadata):
80 def __init__(self, msg_id, content, metadata):
81 self.msg_id = msg_id
81 self.msg_id = msg_id
82 self._content = content
82 self._content = content
83 self.execution_count = content['execution_count']
83 self.execution_count = content['execution_count']
84 self.metadata = metadata
84 self.metadata = metadata
85
85
86 def __getitem__(self, key):
86 def __getitem__(self, key):
87 return self.metadata[key]
87 return self.metadata[key]
88
88
89 def __getattr__(self, key):
89 def __getattr__(self, key):
90 if key not in self.metadata:
90 if key not in self.metadata:
91 raise AttributeError(key)
91 raise AttributeError(key)
92 return self.metadata[key]
92 return self.metadata[key]
93
93
94 def __repr__(self):
94 def __repr__(self):
95 pyout = self.metadata['pyout'] or {'data':{}}
95 pyout = self.metadata['pyout'] or {'data':{}}
96 text_out = pyout['data'].get('text/plain', '')
96 text_out = pyout['data'].get('text/plain', '')
97 if len(text_out) > 32:
97 if len(text_out) > 32:
98 text_out = text_out[:29] + '...'
98 text_out = text_out[:29] + '...'
99
99
100 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
101
101
102 def _repr_pretty_(self, p, cycle):
102 def _repr_pretty_(self, p, cycle):
103 pyout = self.metadata['pyout'] or {'data':{}}
103 pyout = self.metadata['pyout'] or {'data':{}}
104 text_out = pyout['data'].get('text/plain', '')
104 text_out = pyout['data'].get('text/plain', '')
105
105
106 if not text_out:
106 if not text_out:
107 return
107 return
108
108
109 try:
109 try:
110 ip = get_ipython()
110 ip = get_ipython()
111 except NameError:
111 except NameError:
112 colors = "NoColor"
112 colors = "NoColor"
113 else:
113 else:
114 colors = ip.colors
114 colors = ip.colors
115
115
116 if colors == "NoColor":
116 if colors == "NoColor":
117 out = normal = ""
117 out = normal = ""
118 else:
118 else:
119 out = TermColors.Red
119 out = TermColors.Red
120 normal = TermColors.Normal
120 normal = TermColors.Normal
121
121
122 if '\n' in text_out and not text_out.startswith('\n'):
122 if '\n' in text_out and not text_out.startswith('\n'):
123 # add newline for multiline reprs
123 # add newline for multiline reprs
124 text_out = '\n' + text_out
124 text_out = '\n' + text_out
125
125
126 p.text(
126 p.text(
127 out + u'Out[%i:%i]: ' % (
127 out + u'Out[%i:%i]: ' % (
128 self.metadata['engine_id'], self.execution_count
128 self.metadata['engine_id'], self.execution_count
129 ) + normal + text_out
129 ) + normal + text_out
130 )
130 )
131
131
132 def _repr_html_(self):
132 def _repr_html_(self):
133 pyout = self.metadata['pyout'] or {'data':{}}
133 pyout = self.metadata['pyout'] or {'data':{}}
134 return pyout['data'].get("text/html")
134 return pyout['data'].get("text/html")
135
135
136 def _repr_latex_(self):
136 def _repr_latex_(self):
137 pyout = self.metadata['pyout'] or {'data':{}}
137 pyout = self.metadata['pyout'] or {'data':{}}
138 return pyout['data'].get("text/latex")
138 return pyout['data'].get("text/latex")
139
139
140 def _repr_json_(self):
140 def _repr_json_(self):
141 pyout = self.metadata['pyout'] or {'data':{}}
141 pyout = self.metadata['pyout'] or {'data':{}}
142 return pyout['data'].get("application/json")
142 return pyout['data'].get("application/json")
143
143
144 def _repr_javascript_(self):
144 def _repr_javascript_(self):
145 pyout = self.metadata['pyout'] or {'data':{}}
145 pyout = self.metadata['pyout'] or {'data':{}}
146 return pyout['data'].get("application/javascript")
146 return pyout['data'].get("application/javascript")
147
147
148 def _repr_png_(self):
148 def _repr_png_(self):
149 pyout = self.metadata['pyout'] or {'data':{}}
149 pyout = self.metadata['pyout'] or {'data':{}}
150 return pyout['data'].get("image/png")
150 return pyout['data'].get("image/png")
151
151
152 def _repr_jpeg_(self):
152 def _repr_jpeg_(self):
153 pyout = self.metadata['pyout'] or {'data':{}}
153 pyout = self.metadata['pyout'] or {'data':{}}
154 return pyout['data'].get("image/jpeg")
154 return pyout['data'].get("image/jpeg")
155
155
156 def _repr_svg_(self):
156 def _repr_svg_(self):
157 pyout = self.metadata['pyout'] or {'data':{}}
157 pyout = self.metadata['pyout'] or {'data':{}}
158 return pyout['data'].get("image/svg+xml")
158 return pyout['data'].get("image/svg+xml")
159
159
160
160
161 class Metadata(dict):
161 class Metadata(dict):
162 """Subclass of dict for initializing metadata values.
162 """Subclass of dict for initializing metadata values.
163
163
164 Attribute access works on keys.
164 Attribute access works on keys.
165
165
166 These objects have a strict set of keys - errors will raise if you try
166 These objects have a strict set of keys - errors will raise if you try
167 to add new keys.
167 to add new keys.
168 """
168 """
169 def __init__(self, *args, **kwargs):
169 def __init__(self, *args, **kwargs):
170 dict.__init__(self)
170 dict.__init__(self)
171 md = {'msg_id' : None,
171 md = {'msg_id' : None,
172 'submitted' : None,
172 'submitted' : None,
173 'started' : None,
173 'started' : None,
174 'completed' : None,
174 'completed' : None,
175 'received' : None,
175 'received' : None,
176 'engine_uuid' : None,
176 'engine_uuid' : None,
177 'engine_id' : None,
177 'engine_id' : None,
178 'follow' : None,
178 'follow' : None,
179 'after' : None,
179 'after' : None,
180 'status' : None,
180 'status' : None,
181
181
182 'pyin' : None,
182 'pyin' : None,
183 'pyout' : None,
183 'pyout' : None,
184 'pyerr' : None,
184 'pyerr' : None,
185 'stdout' : '',
185 'stdout' : '',
186 'stderr' : '',
186 'stderr' : '',
187 'outputs' : [],
187 'outputs' : [],
188 'data': {},
188 'data': {},
189 'outputs_ready' : False,
189 'outputs_ready' : False,
190 }
190 }
191 self.update(md)
191 self.update(md)
192 self.update(dict(*args, **kwargs))
192 self.update(dict(*args, **kwargs))
193
193
194 def __getattr__(self, key):
194 def __getattr__(self, key):
195 """getattr aliased to getitem"""
195 """getattr aliased to getitem"""
196 if key in self.iterkeys():
196 if key in self.iterkeys():
197 return self[key]
197 return self[key]
198 else:
198 else:
199 raise AttributeError(key)
199 raise AttributeError(key)
200
200
201 def __setattr__(self, key, value):
201 def __setattr__(self, key, value):
202 """setattr aliased to setitem, with strict"""
202 """setattr aliased to setitem, with strict"""
203 if key in self.iterkeys():
203 if key in self.iterkeys():
204 self[key] = value
204 self[key] = value
205 else:
205 else:
206 raise AttributeError(key)
206 raise AttributeError(key)
207
207
208 def __setitem__(self, key, value):
208 def __setitem__(self, key, value):
209 """strict static key enforcement"""
209 """strict static key enforcement"""
210 if key in self.iterkeys():
210 if key in self.iterkeys():
211 dict.__setitem__(self, key, value)
211 dict.__setitem__(self, key, value)
212 else:
212 else:
213 raise KeyError(key)
213 raise KeyError(key)
214
214
215
215
216 class Client(HasTraits):
216 class Client(HasTraits):
217 """A semi-synchronous client to the IPython ZMQ cluster
217 """A semi-synchronous client to the IPython ZMQ cluster
218
218
219 Parameters
219 Parameters
220 ----------
220 ----------
221
221
222 url_file : str/unicode; path to ipcontroller-client.json
222 url_file : str/unicode; path to ipcontroller-client.json
223 This JSON file should contain all the information needed to connect to a cluster,
223 This JSON file should contain all the information needed to connect to a cluster,
224 and is likely the only argument needed.
224 and is likely the only argument needed.
225 Connection information for the Hub's registration. If a json connector
225 Connection information for the Hub's registration. If a json connector
226 file is given, then likely no further configuration is necessary.
226 file is given, then likely no further configuration is necessary.
227 [Default: use profile]
227 [Default: use profile]
228 profile : bytes
228 profile : bytes
229 The name of the Cluster profile to be used to find connector information.
229 The name of the Cluster profile to be used to find connector information.
230 If run from an IPython application, the default profile will be the same
230 If run from an IPython application, the default profile will be the same
231 as the running application, otherwise it will be 'default'.
231 as the running application, otherwise it will be 'default'.
232 cluster_id : str
232 cluster_id : str
233 String id to added to runtime files, to prevent name collisions when using
233 String id to added to runtime files, to prevent name collisions when using
234 multiple clusters with a single profile simultaneously.
234 multiple clusters with a single profile simultaneously.
235 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
235 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
236 Since this is text inserted into filenames, typical recommendations apply:
236 Since this is text inserted into filenames, typical recommendations apply:
237 Simple character strings are ideal, and spaces are not recommended (but
237 Simple character strings are ideal, and spaces are not recommended (but
238 should generally work)
238 should generally work)
239 context : zmq.Context
239 context : zmq.Context
240 Pass an existing zmq.Context instance, otherwise the client will create its own.
240 Pass an existing zmq.Context instance, otherwise the client will create its own.
241 debug : bool
241 debug : bool
242 flag for lots of message printing for debug purposes
242 flag for lots of message printing for debug purposes
243 timeout : int/float
243 timeout : int/float
244 time (in seconds) to wait for connection replies from the Hub
244 time (in seconds) to wait for connection replies from the Hub
245 [Default: 10]
245 [Default: 10]
246
246
247 #-------------- session related args ----------------
247 #-------------- session related args ----------------
248
248
249 config : Config object
249 config : Config object
250 If specified, this will be relayed to the Session for configuration
250 If specified, this will be relayed to the Session for configuration
251 username : str
251 username : str
252 set username for the session object
252 set username for the session object
253
253
254 #-------------- ssh related args ----------------
254 #-------------- ssh related args ----------------
255 # These are args for configuring the ssh tunnel to be used
255 # These are args for configuring the ssh tunnel to be used
256 # credentials are used to forward connections over ssh to the Controller
256 # credentials are used to forward connections over ssh to the Controller
257 # Note that the ip given in `addr` needs to be relative to sshserver
257 # Note that the ip given in `addr` needs to be relative to sshserver
258 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
258 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
259 # and set sshserver as the same machine the Controller is on. However,
259 # and set sshserver as the same machine the Controller is on. However,
260 # the only requirement is that sshserver is able to see the Controller
260 # the only requirement is that sshserver is able to see the Controller
261 # (i.e. is within the same trusted network).
261 # (i.e. is within the same trusted network).
262
262
263 sshserver : str
263 sshserver : str
264 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
264 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
265 If keyfile or password is specified, and this is not, it will default to
265 If keyfile or password is specified, and this is not, it will default to
266 the ip given in addr.
266 the ip given in addr.
267 sshkey : str; path to ssh private key file
267 sshkey : str; path to ssh private key file
268 This specifies a key to be used in ssh login, default None.
268 This specifies a key to be used in ssh login, default None.
269 Regular default ssh keys will be used without specifying this argument.
269 Regular default ssh keys will be used without specifying this argument.
270 password : str
270 password : str
271 Your ssh password to sshserver. Note that if this is left None,
271 Your ssh password to sshserver. Note that if this is left None,
272 you will be prompted for it if passwordless key based login is unavailable.
272 you will be prompted for it if passwordless key based login is unavailable.
273 paramiko : bool
273 paramiko : bool
274 flag for whether to use paramiko instead of shell ssh for tunneling.
274 flag for whether to use paramiko instead of shell ssh for tunneling.
275 [default: True on win32, False else]
275 [default: True on win32, False else]
276
276
277
277
278 Attributes
278 Attributes
279 ----------
279 ----------
280
280
281 ids : list of int engine IDs
281 ids : list of int engine IDs
282 requesting the ids attribute always synchronizes
282 requesting the ids attribute always synchronizes
283 the registration state. To request ids without synchronization,
283 the registration state. To request ids without synchronization,
284 use semi-private _ids attributes.
284 use semi-private _ids attributes.
285
285
286 history : list of msg_ids
286 history : list of msg_ids
287 a list of msg_ids, keeping track of all the execution
287 a list of msg_ids, keeping track of all the execution
288 messages you have submitted in order.
288 messages you have submitted in order.
289
289
290 outstanding : set of msg_ids
290 outstanding : set of msg_ids
291 a set of msg_ids that have been submitted, but whose
291 a set of msg_ids that have been submitted, but whose
292 results have not yet been received.
292 results have not yet been received.
293
293
294 results : dict
294 results : dict
295 a dict of all our results, keyed by msg_id
295 a dict of all our results, keyed by msg_id
296
296
297 block : bool
297 block : bool
298 determines default behavior when block not specified
298 determines default behavior when block not specified
299 in execution methods
299 in execution methods
300
300
301 Methods
301 Methods
302 -------
302 -------
303
303
304 spin
304 spin
305 flushes incoming results and registration state changes
305 flushes incoming results and registration state changes
306 control methods spin, and requesting `ids` also ensures up to date
306 control methods spin, and requesting `ids` also ensures up to date
307
307
308 wait
308 wait
309 wait on one or more msg_ids
309 wait on one or more msg_ids
310
310
311 execution methods
311 execution methods
312 apply
312 apply
313 legacy: execute, run
313 legacy: execute, run
314
314
315 data movement
315 data movement
316 push, pull, scatter, gather
316 push, pull, scatter, gather
317
317
318 query methods
318 query methods
319 queue_status, get_result, purge, result_status
319 queue_status, get_result, purge, result_status
320
320
321 control methods
321 control methods
322 abort, shutdown
322 abort, shutdown
323
323
324 """
324 """
325
325
326
326
327 block = Bool(False)
327 block = Bool(False)
328 outstanding = Set()
328 outstanding = Set()
329 results = Instance('collections.defaultdict', (dict,))
329 results = Instance('collections.defaultdict', (dict,))
330 metadata = Instance('collections.defaultdict', (Metadata,))
330 metadata = Instance('collections.defaultdict', (Metadata,))
331 history = List()
331 history = List()
332 debug = Bool(False)
332 debug = Bool(False)
333 _spin_thread = Any()
333 _spin_thread = Any()
334 _stop_spinning = Any()
334 _stop_spinning = Any()
335
335
336 profile=Unicode()
336 profile=Unicode()
337 def _profile_default(self):
337 def _profile_default(self):
338 if BaseIPythonApplication.initialized():
338 if BaseIPythonApplication.initialized():
339 # an IPython app *might* be running, try to get its profile
339 # an IPython app *might* be running, try to get its profile
340 try:
340 try:
341 return BaseIPythonApplication.instance().profile
341 return BaseIPythonApplication.instance().profile
342 except (AttributeError, MultipleInstanceError):
342 except (AttributeError, MultipleInstanceError):
343 # could be a *different* subclass of config.Application,
343 # could be a *different* subclass of config.Application,
344 # which would raise one of these two errors.
344 # which would raise one of these two errors.
345 return u'default'
345 return u'default'
346 else:
346 else:
347 return u'default'
347 return u'default'
348
348
349
349
350 _outstanding_dict = Instance('collections.defaultdict', (set,))
350 _outstanding_dict = Instance('collections.defaultdict', (set,))
351 _ids = List()
351 _ids = List()
352 _connected=Bool(False)
352 _connected=Bool(False)
353 _ssh=Bool(False)
353 _ssh=Bool(False)
354 _context = Instance('zmq.Context')
354 _context = Instance('zmq.Context')
355 _config = Dict()
355 _config = Dict()
356 _engines=Instance(util.ReverseDict, (), {})
356 _engines=Instance(util.ReverseDict, (), {})
357 # _hub_socket=Instance('zmq.Socket')
357 # _hub_socket=Instance('zmq.Socket')
358 _query_socket=Instance('zmq.Socket')
358 _query_socket=Instance('zmq.Socket')
359 _control_socket=Instance('zmq.Socket')
359 _control_socket=Instance('zmq.Socket')
360 _iopub_socket=Instance('zmq.Socket')
360 _iopub_socket=Instance('zmq.Socket')
361 _notification_socket=Instance('zmq.Socket')
361 _notification_socket=Instance('zmq.Socket')
362 _mux_socket=Instance('zmq.Socket')
362 _mux_socket=Instance('zmq.Socket')
363 _task_socket=Instance('zmq.Socket')
363 _task_socket=Instance('zmq.Socket')
364 _task_scheme=Unicode()
364 _task_scheme=Unicode()
365 _closed = False
365 _closed = False
366 _ignored_control_replies=Integer(0)
366 _ignored_control_replies=Integer(0)
367 _ignored_hub_replies=Integer(0)
367 _ignored_hub_replies=Integer(0)
368
368
369 def __new__(self, *args, **kw):
369 def __new__(self, *args, **kw):
370 # don't raise on positional args
370 # don't raise on positional args
371 return HasTraits.__new__(self, **kw)
371 return HasTraits.__new__(self, **kw)
372
372
373 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
373 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
374 context=None, debug=False,
374 context=None, debug=False,
375 sshserver=None, sshkey=None, password=None, paramiko=None,
375 sshserver=None, sshkey=None, password=None, paramiko=None,
376 timeout=10, cluster_id=None, **extra_args
376 timeout=10, cluster_id=None, **extra_args
377 ):
377 ):
378 if profile:
378 if profile:
379 super(Client, self).__init__(debug=debug, profile=profile)
379 super(Client, self).__init__(debug=debug, profile=profile)
380 else:
380 else:
381 super(Client, self).__init__(debug=debug)
381 super(Client, self).__init__(debug=debug)
382 if context is None:
382 if context is None:
383 context = zmq.Context.instance()
383 context = zmq.Context.instance()
384 self._context = context
384 self._context = context
385 self._stop_spinning = Event()
385 self._stop_spinning = Event()
386
386
387 if 'url_or_file' in extra_args:
387 if 'url_or_file' in extra_args:
388 url_file = extra_args['url_or_file']
388 url_file = extra_args['url_or_file']
389 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
389 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
390
390
391 if url_file and util.is_url(url_file):
391 if url_file and util.is_url(url_file):
392 raise ValueError("single urls cannot be specified, url-files must be used.")
392 raise ValueError("single urls cannot be specified, url-files must be used.")
393
393
394 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
394 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
395
395
396 if self._cd is not None:
396 if self._cd is not None:
397 if url_file is None:
397 if url_file is None:
398 if cluster_id is None:
398 if not cluster_id:
399 client_json = 'ipcontroller-client.json'
399 client_json = 'ipcontroller-client.json'
400 else:
400 else:
401 client_json = 'ipcontroller-%s-client.json' % cluster_id
401 client_json = 'ipcontroller-%s-client.json' % cluster_id
402 url_or_file = pjoin(self._cd.security_dir, client_json)
402 url_or_file = pjoin(self._cd.security_dir, client_json)
403 if url_file is None:
403 if url_file is None:
404 raise ValueError(
404 raise ValueError(
405 "I can't find enough information to connect to a hub!"
405 "I can't find enough information to connect to a hub!"
406 " Please specify at least one of url_file or profile."
406 " Please specify at least one of url_file or profile."
407 )
407 )
408
408
409 with open(url_file) as f:
409 with open(url_file) as f:
410 cfg = json.load(f)
410 cfg = json.load(f)
411
411
412 self._task_scheme = cfg['task_scheme']
412 self._task_scheme = cfg['task_scheme']
413
413
414 # sync defaults from args, json:
414 # sync defaults from args, json:
415 if sshserver:
415 if sshserver:
416 cfg['ssh'] = sshserver
416 cfg['ssh'] = sshserver
417
417
418 location = cfg.setdefault('location', None)
418 location = cfg.setdefault('location', None)
419
419
420 proto,addr = cfg['interface'].split('://')
420 proto,addr = cfg['interface'].split('://')
421 addr = util.disambiguate_ip_address(addr)
421 addr = util.disambiguate_ip_address(addr)
422 cfg['interface'] = "%s://%s" % (proto, addr)
422 cfg['interface'] = "%s://%s" % (proto, addr)
423
423
424 # turn interface,port into full urls:
424 # turn interface,port into full urls:
425 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
425 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
426 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
426 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
427
427
428 url = cfg['registration']
428 url = cfg['registration']
429
429
430 if location is not None and addr == '127.0.0.1':
430 if location is not None and addr == '127.0.0.1':
431 # location specified, and connection is expected to be local
431 # location specified, and connection is expected to be local
432 if location not in LOCAL_IPS and not sshserver:
432 if location not in LOCAL_IPS and not sshserver:
433 # load ssh from JSON *only* if the controller is not on
433 # load ssh from JSON *only* if the controller is not on
434 # this machine
434 # this machine
435 sshserver=cfg['ssh']
435 sshserver=cfg['ssh']
436 if location not in LOCAL_IPS and not sshserver:
436 if location not in LOCAL_IPS and not sshserver:
437 # warn if no ssh specified, but SSH is probably needed
437 # warn if no ssh specified, but SSH is probably needed
438 # This is only a warning, because the most likely cause
438 # This is only a warning, because the most likely cause
439 # is a local Controller on a laptop whose IP is dynamic
439 # is a local Controller on a laptop whose IP is dynamic
440 warnings.warn("""
440 warnings.warn("""
441 Controller appears to be listening on localhost, but not on this machine.
441 Controller appears to be listening on localhost, but not on this machine.
442 If this is true, you should specify Client(...,sshserver='you@%s')
442 If this is true, you should specify Client(...,sshserver='you@%s')
443 or instruct your controller to listen on an external IP."""%location,
443 or instruct your controller to listen on an external IP."""%location,
444 RuntimeWarning)
444 RuntimeWarning)
445 elif not sshserver:
445 elif not sshserver:
446 # otherwise sync with cfg
446 # otherwise sync with cfg
447 sshserver = cfg['ssh']
447 sshserver = cfg['ssh']
448
448
449 self._config = cfg
449 self._config = cfg
450
450
451 self._ssh = bool(sshserver or sshkey or password)
451 self._ssh = bool(sshserver or sshkey or password)
452 if self._ssh and sshserver is None:
452 if self._ssh and sshserver is None:
453 # default to ssh via localhost
453 # default to ssh via localhost
454 sshserver = addr
454 sshserver = addr
455 if self._ssh and password is None:
455 if self._ssh and password is None:
456 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
456 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
457 password=False
457 password=False
458 else:
458 else:
459 password = getpass("SSH Password for %s: "%sshserver)
459 password = getpass("SSH Password for %s: "%sshserver)
460 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
460 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
461
461
462 # configure and construct the session
462 # configure and construct the session
463 extra_args['packer'] = cfg['pack']
463 extra_args['packer'] = cfg['pack']
464 extra_args['unpacker'] = cfg['unpack']
464 extra_args['unpacker'] = cfg['unpack']
465 extra_args['key'] = cast_bytes(cfg['exec_key'])
465 extra_args['key'] = cast_bytes(cfg['exec_key'])
466
466
467 self.session = Session(**extra_args)
467 self.session = Session(**extra_args)
468
468
469 self._query_socket = self._context.socket(zmq.DEALER)
469 self._query_socket = self._context.socket(zmq.DEALER)
470
470
471 if self._ssh:
471 if self._ssh:
472 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
472 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
473 else:
473 else:
474 self._query_socket.connect(cfg['registration'])
474 self._query_socket.connect(cfg['registration'])
475
475
476 self.session.debug = self.debug
476 self.session.debug = self.debug
477
477
478 self._notification_handlers = {'registration_notification' : self._register_engine,
478 self._notification_handlers = {'registration_notification' : self._register_engine,
479 'unregistration_notification' : self._unregister_engine,
479 'unregistration_notification' : self._unregister_engine,
480 'shutdown_notification' : lambda msg: self.close(),
480 'shutdown_notification' : lambda msg: self.close(),
481 }
481 }
482 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
482 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
483 'apply_reply' : self._handle_apply_reply}
483 'apply_reply' : self._handle_apply_reply}
484 self._connect(sshserver, ssh_kwargs, timeout)
484 self._connect(sshserver, ssh_kwargs, timeout)
485
485
486 # last step: setup magics, if we are in IPython:
486 # last step: setup magics, if we are in IPython:
487
487
488 try:
488 try:
489 ip = get_ipython()
489 ip = get_ipython()
490 except NameError:
490 except NameError:
491 return
491 return
492 else:
492 else:
493 if 'px' not in ip.magics_manager.magics:
493 if 'px' not in ip.magics_manager.magics:
494 # in IPython but we are the first Client.
494 # in IPython but we are the first Client.
495 # activate a default view for parallel magics.
495 # activate a default view for parallel magics.
496 self.activate()
496 self.activate()
497
497
498 def __del__(self):
498 def __del__(self):
499 """cleanup sockets, but _not_ context."""
499 """cleanup sockets, but _not_ context."""
500 self.close()
500 self.close()
501
501
502 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
502 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
503 if ipython_dir is None:
503 if ipython_dir is None:
504 ipython_dir = get_ipython_dir()
504 ipython_dir = get_ipython_dir()
505 if profile_dir is not None:
505 if profile_dir is not None:
506 try:
506 try:
507 self._cd = ProfileDir.find_profile_dir(profile_dir)
507 self._cd = ProfileDir.find_profile_dir(profile_dir)
508 return
508 return
509 except ProfileDirError:
509 except ProfileDirError:
510 pass
510 pass
511 elif profile is not None:
511 elif profile is not None:
512 try:
512 try:
513 self._cd = ProfileDir.find_profile_dir_by_name(
513 self._cd = ProfileDir.find_profile_dir_by_name(
514 ipython_dir, profile)
514 ipython_dir, profile)
515 return
515 return
516 except ProfileDirError:
516 except ProfileDirError:
517 pass
517 pass
518 self._cd = None
518 self._cd = None
519
519
520 def _update_engines(self, engines):
520 def _update_engines(self, engines):
521 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
521 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
522 for k,v in engines.iteritems():
522 for k,v in engines.iteritems():
523 eid = int(k)
523 eid = int(k)
524 if eid not in self._engines:
524 if eid not in self._engines:
525 self._ids.append(eid)
525 self._ids.append(eid)
526 self._engines[eid] = v
526 self._engines[eid] = v
527 self._ids = sorted(self._ids)
527 self._ids = sorted(self._ids)
528 if sorted(self._engines.keys()) != range(len(self._engines)) and \
528 if sorted(self._engines.keys()) != range(len(self._engines)) and \
529 self._task_scheme == 'pure' and self._task_socket:
529 self._task_scheme == 'pure' and self._task_socket:
530 self._stop_scheduling_tasks()
530 self._stop_scheduling_tasks()
531
531
532 def _stop_scheduling_tasks(self):
532 def _stop_scheduling_tasks(self):
533 """Stop scheduling tasks because an engine has been unregistered
533 """Stop scheduling tasks because an engine has been unregistered
534 from a pure ZMQ scheduler.
534 from a pure ZMQ scheduler.
535 """
535 """
536 self._task_socket.close()
536 self._task_socket.close()
537 self._task_socket = None
537 self._task_socket = None
538 msg = "An engine has been unregistered, and we are using pure " +\
538 msg = "An engine has been unregistered, and we are using pure " +\
539 "ZMQ task scheduling. Task farming will be disabled."
539 "ZMQ task scheduling. Task farming will be disabled."
540 if self.outstanding:
540 if self.outstanding:
541 msg += " If you were running tasks when this happened, " +\
541 msg += " If you were running tasks when this happened, " +\
542 "some `outstanding` msg_ids may never resolve."
542 "some `outstanding` msg_ids may never resolve."
543 warnings.warn(msg, RuntimeWarning)
543 warnings.warn(msg, RuntimeWarning)
544
544
545 def _build_targets(self, targets):
545 def _build_targets(self, targets):
546 """Turn valid target IDs or 'all' into two lists:
546 """Turn valid target IDs or 'all' into two lists:
547 (int_ids, uuids).
547 (int_ids, uuids).
548 """
548 """
549 if not self._ids:
549 if not self._ids:
550 # flush notification socket if no engines yet, just in case
550 # flush notification socket if no engines yet, just in case
551 if not self.ids:
551 if not self.ids:
552 raise error.NoEnginesRegistered("Can't build targets without any engines")
552 raise error.NoEnginesRegistered("Can't build targets without any engines")
553
553
554 if targets is None:
554 if targets is None:
555 targets = self._ids
555 targets = self._ids
556 elif isinstance(targets, basestring):
556 elif isinstance(targets, basestring):
557 if targets.lower() == 'all':
557 if targets.lower() == 'all':
558 targets = self._ids
558 targets = self._ids
559 else:
559 else:
560 raise TypeError("%r not valid str target, must be 'all'"%(targets))
560 raise TypeError("%r not valid str target, must be 'all'"%(targets))
561 elif isinstance(targets, int):
561 elif isinstance(targets, int):
562 if targets < 0:
562 if targets < 0:
563 targets = self.ids[targets]
563 targets = self.ids[targets]
564 if targets not in self._ids:
564 if targets not in self._ids:
565 raise IndexError("No such engine: %i"%targets)
565 raise IndexError("No such engine: %i"%targets)
566 targets = [targets]
566 targets = [targets]
567
567
568 if isinstance(targets, slice):
568 if isinstance(targets, slice):
569 indices = range(len(self._ids))[targets]
569 indices = range(len(self._ids))[targets]
570 ids = self.ids
570 ids = self.ids
571 targets = [ ids[i] for i in indices ]
571 targets = [ ids[i] for i in indices ]
572
572
573 if not isinstance(targets, (tuple, list, xrange)):
573 if not isinstance(targets, (tuple, list, xrange)):
574 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
574 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
575
575
576 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
576 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
577
577
578 def _connect(self, sshserver, ssh_kwargs, timeout):
578 def _connect(self, sshserver, ssh_kwargs, timeout):
579 """setup all our socket connections to the cluster. This is called from
579 """setup all our socket connections to the cluster. This is called from
580 __init__."""
580 __init__."""
581
581
582 # Maybe allow reconnecting?
582 # Maybe allow reconnecting?
583 if self._connected:
583 if self._connected:
584 return
584 return
585 self._connected=True
585 self._connected=True
586
586
587 def connect_socket(s, url):
587 def connect_socket(s, url):
588 # url = util.disambiguate_url(url, self._config['location'])
588 # url = util.disambiguate_url(url, self._config['location'])
589 if self._ssh:
589 if self._ssh:
590 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
590 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
591 else:
591 else:
592 return s.connect(url)
592 return s.connect(url)
593
593
594 self.session.send(self._query_socket, 'connection_request')
594 self.session.send(self._query_socket, 'connection_request')
595 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
595 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
596 poller = zmq.Poller()
596 poller = zmq.Poller()
597 poller.register(self._query_socket, zmq.POLLIN)
597 poller.register(self._query_socket, zmq.POLLIN)
598 # poll expects milliseconds, timeout is seconds
598 # poll expects milliseconds, timeout is seconds
599 evts = poller.poll(timeout*1000)
599 evts = poller.poll(timeout*1000)
600 if not evts:
600 if not evts:
601 raise error.TimeoutError("Hub connection request timed out")
601 raise error.TimeoutError("Hub connection request timed out")
602 idents,msg = self.session.recv(self._query_socket,mode=0)
602 idents,msg = self.session.recv(self._query_socket,mode=0)
603 if self.debug:
603 if self.debug:
604 pprint(msg)
604 pprint(msg)
605 content = msg['content']
605 content = msg['content']
606 # self._config['registration'] = dict(content)
606 # self._config['registration'] = dict(content)
607 cfg = self._config
607 cfg = self._config
608 if content['status'] == 'ok':
608 if content['status'] == 'ok':
609 self._mux_socket = self._context.socket(zmq.DEALER)
609 self._mux_socket = self._context.socket(zmq.DEALER)
610 connect_socket(self._mux_socket, cfg['mux'])
610 connect_socket(self._mux_socket, cfg['mux'])
611
611
612 self._task_socket = self._context.socket(zmq.DEALER)
612 self._task_socket = self._context.socket(zmq.DEALER)
613 connect_socket(self._task_socket, cfg['task'])
613 connect_socket(self._task_socket, cfg['task'])
614
614
615 self._notification_socket = self._context.socket(zmq.SUB)
615 self._notification_socket = self._context.socket(zmq.SUB)
616 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
616 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
617 connect_socket(self._notification_socket, cfg['notification'])
617 connect_socket(self._notification_socket, cfg['notification'])
618
618
619 self._control_socket = self._context.socket(zmq.DEALER)
619 self._control_socket = self._context.socket(zmq.DEALER)
620 connect_socket(self._control_socket, cfg['control'])
620 connect_socket(self._control_socket, cfg['control'])
621
621
622 self._iopub_socket = self._context.socket(zmq.SUB)
622 self._iopub_socket = self._context.socket(zmq.SUB)
623 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
623 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
624 connect_socket(self._iopub_socket, cfg['iopub'])
624 connect_socket(self._iopub_socket, cfg['iopub'])
625
625
626 self._update_engines(dict(content['engines']))
626 self._update_engines(dict(content['engines']))
627 else:
627 else:
628 self._connected = False
628 self._connected = False
629 raise Exception("Failed to connect!")
629 raise Exception("Failed to connect!")
630
630
631 #--------------------------------------------------------------------------
631 #--------------------------------------------------------------------------
632 # handlers and callbacks for incoming messages
632 # handlers and callbacks for incoming messages
633 #--------------------------------------------------------------------------
633 #--------------------------------------------------------------------------
634
634
635 def _unwrap_exception(self, content):
635 def _unwrap_exception(self, content):
636 """unwrap exception, and remap engine_id to int."""
636 """unwrap exception, and remap engine_id to int."""
637 e = error.unwrap_exception(content)
637 e = error.unwrap_exception(content)
638 # print e.traceback
638 # print e.traceback
639 if e.engine_info:
639 if e.engine_info:
640 e_uuid = e.engine_info['engine_uuid']
640 e_uuid = e.engine_info['engine_uuid']
641 eid = self._engines[e_uuid]
641 eid = self._engines[e_uuid]
642 e.engine_info['engine_id'] = eid
642 e.engine_info['engine_id'] = eid
643 return e
643 return e
644
644
645 def _extract_metadata(self, msg):
645 def _extract_metadata(self, msg):
646 header = msg['header']
646 header = msg['header']
647 parent = msg['parent_header']
647 parent = msg['parent_header']
648 msg_meta = msg['metadata']
648 msg_meta = msg['metadata']
649 content = msg['content']
649 content = msg['content']
650 md = {'msg_id' : parent['msg_id'],
650 md = {'msg_id' : parent['msg_id'],
651 'received' : datetime.now(),
651 'received' : datetime.now(),
652 'engine_uuid' : msg_meta.get('engine', None),
652 'engine_uuid' : msg_meta.get('engine', None),
653 'follow' : msg_meta.get('follow', []),
653 'follow' : msg_meta.get('follow', []),
654 'after' : msg_meta.get('after', []),
654 'after' : msg_meta.get('after', []),
655 'status' : content['status'],
655 'status' : content['status'],
656 }
656 }
657
657
658 if md['engine_uuid'] is not None:
658 if md['engine_uuid'] is not None:
659 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
659 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
660
660
661 if 'date' in parent:
661 if 'date' in parent:
662 md['submitted'] = parent['date']
662 md['submitted'] = parent['date']
663 if 'started' in msg_meta:
663 if 'started' in msg_meta:
664 md['started'] = msg_meta['started']
664 md['started'] = msg_meta['started']
665 if 'date' in header:
665 if 'date' in header:
666 md['completed'] = header['date']
666 md['completed'] = header['date']
667 return md
667 return md
668
668
669 def _register_engine(self, msg):
669 def _register_engine(self, msg):
670 """Register a new engine, and update our connection info."""
670 """Register a new engine, and update our connection info."""
671 content = msg['content']
671 content = msg['content']
672 eid = content['id']
672 eid = content['id']
673 d = {eid : content['uuid']}
673 d = {eid : content['uuid']}
674 self._update_engines(d)
674 self._update_engines(d)
675
675
676 def _unregister_engine(self, msg):
676 def _unregister_engine(self, msg):
677 """Unregister an engine that has died."""
677 """Unregister an engine that has died."""
678 content = msg['content']
678 content = msg['content']
679 eid = int(content['id'])
679 eid = int(content['id'])
680 if eid in self._ids:
680 if eid in self._ids:
681 self._ids.remove(eid)
681 self._ids.remove(eid)
682 uuid = self._engines.pop(eid)
682 uuid = self._engines.pop(eid)
683
683
684 self._handle_stranded_msgs(eid, uuid)
684 self._handle_stranded_msgs(eid, uuid)
685
685
686 if self._task_socket and self._task_scheme == 'pure':
686 if self._task_socket and self._task_scheme == 'pure':
687 self._stop_scheduling_tasks()
687 self._stop_scheduling_tasks()
688
688
689 def _handle_stranded_msgs(self, eid, uuid):
689 def _handle_stranded_msgs(self, eid, uuid):
690 """Handle messages known to be on an engine when the engine unregisters.
690 """Handle messages known to be on an engine when the engine unregisters.
691
691
692 It is possible that this will fire prematurely - that is, an engine will
692 It is possible that this will fire prematurely - that is, an engine will
693 go down after completing a result, and the client will be notified
693 go down after completing a result, and the client will be notified
694 of the unregistration and later receive the successful result.
694 of the unregistration and later receive the successful result.
695 """
695 """
696
696
697 outstanding = self._outstanding_dict[uuid]
697 outstanding = self._outstanding_dict[uuid]
698
698
699 for msg_id in list(outstanding):
699 for msg_id in list(outstanding):
700 if msg_id in self.results:
700 if msg_id in self.results:
701 # we already
701 # we already
702 continue
702 continue
703 try:
703 try:
704 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
704 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
705 except:
705 except:
706 content = error.wrap_exception()
706 content = error.wrap_exception()
707 # build a fake message:
707 # build a fake message:
708 parent = {}
708 parent = {}
709 header = {}
709 header = {}
710 parent['msg_id'] = msg_id
710 parent['msg_id'] = msg_id
711 header['engine'] = uuid
711 header['engine'] = uuid
712 header['date'] = datetime.now()
712 header['date'] = datetime.now()
713 msg = dict(parent_header=parent, header=header, content=content)
713 msg = dict(parent_header=parent, header=header, content=content)
714 self._handle_apply_reply(msg)
714 self._handle_apply_reply(msg)
715
715
716 def _handle_execute_reply(self, msg):
716 def _handle_execute_reply(self, msg):
717 """Save the reply to an execute_request into our results.
717 """Save the reply to an execute_request into our results.
718
718
719 execute messages are never actually used. apply is used instead.
719 execute messages are never actually used. apply is used instead.
720 """
720 """
721
721
722 parent = msg['parent_header']
722 parent = msg['parent_header']
723 msg_id = parent['msg_id']
723 msg_id = parent['msg_id']
724 if msg_id not in self.outstanding:
724 if msg_id not in self.outstanding:
725 if msg_id in self.history:
725 if msg_id in self.history:
726 print ("got stale result: %s"%msg_id)
726 print ("got stale result: %s"%msg_id)
727 else:
727 else:
728 print ("got unknown result: %s"%msg_id)
728 print ("got unknown result: %s"%msg_id)
729 else:
729 else:
730 self.outstanding.remove(msg_id)
730 self.outstanding.remove(msg_id)
731
731
732 content = msg['content']
732 content = msg['content']
733 header = msg['header']
733 header = msg['header']
734
734
735 # construct metadata:
735 # construct metadata:
736 md = self.metadata[msg_id]
736 md = self.metadata[msg_id]
737 md.update(self._extract_metadata(msg))
737 md.update(self._extract_metadata(msg))
738 # is this redundant?
738 # is this redundant?
739 self.metadata[msg_id] = md
739 self.metadata[msg_id] = md
740
740
741 e_outstanding = self._outstanding_dict[md['engine_uuid']]
741 e_outstanding = self._outstanding_dict[md['engine_uuid']]
742 if msg_id in e_outstanding:
742 if msg_id in e_outstanding:
743 e_outstanding.remove(msg_id)
743 e_outstanding.remove(msg_id)
744
744
745 # construct result:
745 # construct result:
746 if content['status'] == 'ok':
746 if content['status'] == 'ok':
747 self.results[msg_id] = ExecuteReply(msg_id, content, md)
747 self.results[msg_id] = ExecuteReply(msg_id, content, md)
748 elif content['status'] == 'aborted':
748 elif content['status'] == 'aborted':
749 self.results[msg_id] = error.TaskAborted(msg_id)
749 self.results[msg_id] = error.TaskAborted(msg_id)
750 elif content['status'] == 'resubmitted':
750 elif content['status'] == 'resubmitted':
751 # TODO: handle resubmission
751 # TODO: handle resubmission
752 pass
752 pass
753 else:
753 else:
754 self.results[msg_id] = self._unwrap_exception(content)
754 self.results[msg_id] = self._unwrap_exception(content)
755
755
756 def _handle_apply_reply(self, msg):
756 def _handle_apply_reply(self, msg):
757 """Save the reply to an apply_request into our results."""
757 """Save the reply to an apply_request into our results."""
758 parent = msg['parent_header']
758 parent = msg['parent_header']
759 msg_id = parent['msg_id']
759 msg_id = parent['msg_id']
760 if msg_id not in self.outstanding:
760 if msg_id not in self.outstanding:
761 if msg_id in self.history:
761 if msg_id in self.history:
762 print ("got stale result: %s"%msg_id)
762 print ("got stale result: %s"%msg_id)
763 print self.results[msg_id]
763 print self.results[msg_id]
764 print msg
764 print msg
765 else:
765 else:
766 print ("got unknown result: %s"%msg_id)
766 print ("got unknown result: %s"%msg_id)
767 else:
767 else:
768 self.outstanding.remove(msg_id)
768 self.outstanding.remove(msg_id)
769 content = msg['content']
769 content = msg['content']
770 header = msg['header']
770 header = msg['header']
771
771
772 # construct metadata:
772 # construct metadata:
773 md = self.metadata[msg_id]
773 md = self.metadata[msg_id]
774 md.update(self._extract_metadata(msg))
774 md.update(self._extract_metadata(msg))
775 # is this redundant?
775 # is this redundant?
776 self.metadata[msg_id] = md
776 self.metadata[msg_id] = md
777
777
778 e_outstanding = self._outstanding_dict[md['engine_uuid']]
778 e_outstanding = self._outstanding_dict[md['engine_uuid']]
779 if msg_id in e_outstanding:
779 if msg_id in e_outstanding:
780 e_outstanding.remove(msg_id)
780 e_outstanding.remove(msg_id)
781
781
782 # construct result:
782 # construct result:
783 if content['status'] == 'ok':
783 if content['status'] == 'ok':
784 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
784 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
785 elif content['status'] == 'aborted':
785 elif content['status'] == 'aborted':
786 self.results[msg_id] = error.TaskAborted(msg_id)
786 self.results[msg_id] = error.TaskAborted(msg_id)
787 elif content['status'] == 'resubmitted':
787 elif content['status'] == 'resubmitted':
788 # TODO: handle resubmission
788 # TODO: handle resubmission
789 pass
789 pass
790 else:
790 else:
791 self.results[msg_id] = self._unwrap_exception(content)
791 self.results[msg_id] = self._unwrap_exception(content)
792
792
793 def _flush_notifications(self):
793 def _flush_notifications(self):
794 """Flush notifications of engine registrations waiting
794 """Flush notifications of engine registrations waiting
795 in ZMQ queue."""
795 in ZMQ queue."""
796 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
796 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
797 while msg is not None:
797 while msg is not None:
798 if self.debug:
798 if self.debug:
799 pprint(msg)
799 pprint(msg)
800 msg_type = msg['header']['msg_type']
800 msg_type = msg['header']['msg_type']
801 handler = self._notification_handlers.get(msg_type, None)
801 handler = self._notification_handlers.get(msg_type, None)
802 if handler is None:
802 if handler is None:
803 raise Exception("Unhandled message type: %s"%msg.msg_type)
803 raise Exception("Unhandled message type: %s"%msg.msg_type)
804 else:
804 else:
805 handler(msg)
805 handler(msg)
806 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
806 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
807
807
808 def _flush_results(self, sock):
808 def _flush_results(self, sock):
809 """Flush task or queue results waiting in ZMQ queue."""
809 """Flush task or queue results waiting in ZMQ queue."""
810 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
810 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
811 while msg is not None:
811 while msg is not None:
812 if self.debug:
812 if self.debug:
813 pprint(msg)
813 pprint(msg)
814 msg_type = msg['header']['msg_type']
814 msg_type = msg['header']['msg_type']
815 handler = self._queue_handlers.get(msg_type, None)
815 handler = self._queue_handlers.get(msg_type, None)
816 if handler is None:
816 if handler is None:
817 raise Exception("Unhandled message type: %s"%msg.msg_type)
817 raise Exception("Unhandled message type: %s"%msg.msg_type)
818 else:
818 else:
819 handler(msg)
819 handler(msg)
820 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
820 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
821
821
822 def _flush_control(self, sock):
822 def _flush_control(self, sock):
823 """Flush replies from the control channel waiting
823 """Flush replies from the control channel waiting
824 in the ZMQ queue.
824 in the ZMQ queue.
825
825
826 Currently: ignore them."""
826 Currently: ignore them."""
827 if self._ignored_control_replies <= 0:
827 if self._ignored_control_replies <= 0:
828 return
828 return
829 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
829 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
830 while msg is not None:
830 while msg is not None:
831 self._ignored_control_replies -= 1
831 self._ignored_control_replies -= 1
832 if self.debug:
832 if self.debug:
833 pprint(msg)
833 pprint(msg)
834 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
834 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
835
835
836 def _flush_ignored_control(self):
836 def _flush_ignored_control(self):
837 """flush ignored control replies"""
837 """flush ignored control replies"""
838 while self._ignored_control_replies > 0:
838 while self._ignored_control_replies > 0:
839 self.session.recv(self._control_socket)
839 self.session.recv(self._control_socket)
840 self._ignored_control_replies -= 1
840 self._ignored_control_replies -= 1
841
841
842 def _flush_ignored_hub_replies(self):
842 def _flush_ignored_hub_replies(self):
843 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
843 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
844 while msg is not None:
844 while msg is not None:
845 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
845 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
846
846
847 def _flush_iopub(self, sock):
847 def _flush_iopub(self, sock):
848 """Flush replies from the iopub channel waiting
848 """Flush replies from the iopub channel waiting
849 in the ZMQ queue.
849 in the ZMQ queue.
850 """
850 """
851 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
851 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
852 while msg is not None:
852 while msg is not None:
853 if self.debug:
853 if self.debug:
854 pprint(msg)
854 pprint(msg)
855 parent = msg['parent_header']
855 parent = msg['parent_header']
856 # ignore IOPub messages with no parent.
856 # ignore IOPub messages with no parent.
857 # Caused by print statements or warnings from before the first execution.
857 # Caused by print statements or warnings from before the first execution.
858 if not parent:
858 if not parent:
859 continue
859 continue
860 msg_id = parent['msg_id']
860 msg_id = parent['msg_id']
861 content = msg['content']
861 content = msg['content']
862 header = msg['header']
862 header = msg['header']
863 msg_type = msg['header']['msg_type']
863 msg_type = msg['header']['msg_type']
864
864
865 # init metadata:
865 # init metadata:
866 md = self.metadata[msg_id]
866 md = self.metadata[msg_id]
867
867
868 if msg_type == 'stream':
868 if msg_type == 'stream':
869 name = content['name']
869 name = content['name']
870 s = md[name] or ''
870 s = md[name] or ''
871 md[name] = s + content['data']
871 md[name] = s + content['data']
872 elif msg_type == 'pyerr':
872 elif msg_type == 'pyerr':
873 md.update({'pyerr' : self._unwrap_exception(content)})
873 md.update({'pyerr' : self._unwrap_exception(content)})
874 elif msg_type == 'pyin':
874 elif msg_type == 'pyin':
875 md.update({'pyin' : content['code']})
875 md.update({'pyin' : content['code']})
876 elif msg_type == 'display_data':
876 elif msg_type == 'display_data':
877 md['outputs'].append(content)
877 md['outputs'].append(content)
878 elif msg_type == 'pyout':
878 elif msg_type == 'pyout':
879 md['pyout'] = content
879 md['pyout'] = content
880 elif msg_type == 'data_message':
880 elif msg_type == 'data_message':
881 data, remainder = serialize.unserialize_object(msg['buffers'])
881 data, remainder = serialize.unserialize_object(msg['buffers'])
882 md['data'].update(data)
882 md['data'].update(data)
883 elif msg_type == 'status':
883 elif msg_type == 'status':
884 # idle message comes after all outputs
884 # idle message comes after all outputs
885 if content['execution_state'] == 'idle':
885 if content['execution_state'] == 'idle':
886 md['outputs_ready'] = True
886 md['outputs_ready'] = True
887 else:
887 else:
888 # unhandled msg_type (status, etc.)
888 # unhandled msg_type (status, etc.)
889 pass
889 pass
890
890
891 # reduntant?
891 # reduntant?
892 self.metadata[msg_id] = md
892 self.metadata[msg_id] = md
893
893
894 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
894 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
895
895
896 #--------------------------------------------------------------------------
896 #--------------------------------------------------------------------------
897 # len, getitem
897 # len, getitem
898 #--------------------------------------------------------------------------
898 #--------------------------------------------------------------------------
899
899
900 def __len__(self):
900 def __len__(self):
901 """len(client) returns # of engines."""
901 """len(client) returns # of engines."""
902 return len(self.ids)
902 return len(self.ids)
903
903
904 def __getitem__(self, key):
904 def __getitem__(self, key):
905 """index access returns DirectView multiplexer objects
905 """index access returns DirectView multiplexer objects
906
906
907 Must be int, slice, or list/tuple/xrange of ints"""
907 Must be int, slice, or list/tuple/xrange of ints"""
908 if not isinstance(key, (int, slice, tuple, list, xrange)):
908 if not isinstance(key, (int, slice, tuple, list, xrange)):
909 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
909 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
910 else:
910 else:
911 return self.direct_view(key)
911 return self.direct_view(key)
912
912
913 #--------------------------------------------------------------------------
913 #--------------------------------------------------------------------------
914 # Begin public methods
914 # Begin public methods
915 #--------------------------------------------------------------------------
915 #--------------------------------------------------------------------------
916
916
917 @property
917 @property
918 def ids(self):
918 def ids(self):
919 """Always up-to-date ids property."""
919 """Always up-to-date ids property."""
920 self._flush_notifications()
920 self._flush_notifications()
921 # always copy:
921 # always copy:
922 return list(self._ids)
922 return list(self._ids)
923
923
924 def activate(self, targets='all', suffix=''):
924 def activate(self, targets='all', suffix=''):
925 """Create a DirectView and register it with IPython magics
925 """Create a DirectView and register it with IPython magics
926
926
927 Defines the magics `%px, %autopx, %pxresult, %%px`
927 Defines the magics `%px, %autopx, %pxresult, %%px`
928
928
929 Parameters
929 Parameters
930 ----------
930 ----------
931
931
932 targets: int, list of ints, or 'all'
932 targets: int, list of ints, or 'all'
933 The engines on which the view's magics will run
933 The engines on which the view's magics will run
934 suffix: str [default: '']
934 suffix: str [default: '']
935 The suffix, if any, for the magics. This allows you to have
935 The suffix, if any, for the magics. This allows you to have
936 multiple views associated with parallel magics at the same time.
936 multiple views associated with parallel magics at the same time.
937
937
938 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
938 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
939 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
939 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
940 on engine 0.
940 on engine 0.
941 """
941 """
942 view = self.direct_view(targets)
942 view = self.direct_view(targets)
943 view.block = True
943 view.block = True
944 view.activate(suffix)
944 view.activate(suffix)
945 return view
945 return view
946
946
947 def close(self):
947 def close(self):
948 if self._closed:
948 if self._closed:
949 return
949 return
950 self.stop_spin_thread()
950 self.stop_spin_thread()
951 snames = filter(lambda n: n.endswith('socket'), dir(self))
951 snames = filter(lambda n: n.endswith('socket'), dir(self))
952 for socket in map(lambda name: getattr(self, name), snames):
952 for socket in map(lambda name: getattr(self, name), snames):
953 if isinstance(socket, zmq.Socket) and not socket.closed:
953 if isinstance(socket, zmq.Socket) and not socket.closed:
954 socket.close()
954 socket.close()
955 self._closed = True
955 self._closed = True
956
956
957 def _spin_every(self, interval=1):
957 def _spin_every(self, interval=1):
958 """target func for use in spin_thread"""
958 """target func for use in spin_thread"""
959 while True:
959 while True:
960 if self._stop_spinning.is_set():
960 if self._stop_spinning.is_set():
961 return
961 return
962 time.sleep(interval)
962 time.sleep(interval)
963 self.spin()
963 self.spin()
964
964
965 def spin_thread(self, interval=1):
965 def spin_thread(self, interval=1):
966 """call Client.spin() in a background thread on some regular interval
966 """call Client.spin() in a background thread on some regular interval
967
967
968 This helps ensure that messages don't pile up too much in the zmq queue
968 This helps ensure that messages don't pile up too much in the zmq queue
969 while you are working on other things, or just leaving an idle terminal.
969 while you are working on other things, or just leaving an idle terminal.
970
970
971 It also helps limit potential padding of the `received` timestamp
971 It also helps limit potential padding of the `received` timestamp
972 on AsyncResult objects, used for timings.
972 on AsyncResult objects, used for timings.
973
973
974 Parameters
974 Parameters
975 ----------
975 ----------
976
976
977 interval : float, optional
977 interval : float, optional
978 The interval on which to spin the client in the background thread
978 The interval on which to spin the client in the background thread
979 (simply passed to time.sleep).
979 (simply passed to time.sleep).
980
980
981 Notes
981 Notes
982 -----
982 -----
983
983
984 For precision timing, you may want to use this method to put a bound
984 For precision timing, you may want to use this method to put a bound
985 on the jitter (in seconds) in `received` timestamps used
985 on the jitter (in seconds) in `received` timestamps used
986 in AsyncResult.wall_time.
986 in AsyncResult.wall_time.
987
987
988 """
988 """
989 if self._spin_thread is not None:
989 if self._spin_thread is not None:
990 self.stop_spin_thread()
990 self.stop_spin_thread()
991 self._stop_spinning.clear()
991 self._stop_spinning.clear()
992 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
992 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
993 self._spin_thread.daemon = True
993 self._spin_thread.daemon = True
994 self._spin_thread.start()
994 self._spin_thread.start()
995
995
996 def stop_spin_thread(self):
996 def stop_spin_thread(self):
997 """stop background spin_thread, if any"""
997 """stop background spin_thread, if any"""
998 if self._spin_thread is not None:
998 if self._spin_thread is not None:
999 self._stop_spinning.set()
999 self._stop_spinning.set()
1000 self._spin_thread.join()
1000 self._spin_thread.join()
1001 self._spin_thread = None
1001 self._spin_thread = None
1002
1002
1003 def spin(self):
1003 def spin(self):
1004 """Flush any registration notifications and execution results
1004 """Flush any registration notifications and execution results
1005 waiting in the ZMQ queue.
1005 waiting in the ZMQ queue.
1006 """
1006 """
1007 if self._notification_socket:
1007 if self._notification_socket:
1008 self._flush_notifications()
1008 self._flush_notifications()
1009 if self._iopub_socket:
1009 if self._iopub_socket:
1010 self._flush_iopub(self._iopub_socket)
1010 self._flush_iopub(self._iopub_socket)
1011 if self._mux_socket:
1011 if self._mux_socket:
1012 self._flush_results(self._mux_socket)
1012 self._flush_results(self._mux_socket)
1013 if self._task_socket:
1013 if self._task_socket:
1014 self._flush_results(self._task_socket)
1014 self._flush_results(self._task_socket)
1015 if self._control_socket:
1015 if self._control_socket:
1016 self._flush_control(self._control_socket)
1016 self._flush_control(self._control_socket)
1017 if self._query_socket:
1017 if self._query_socket:
1018 self._flush_ignored_hub_replies()
1018 self._flush_ignored_hub_replies()
1019
1019
1020 def wait(self, jobs=None, timeout=-1):
1020 def wait(self, jobs=None, timeout=-1):
1021 """waits on one or more `jobs`, for up to `timeout` seconds.
1021 """waits on one or more `jobs`, for up to `timeout` seconds.
1022
1022
1023 Parameters
1023 Parameters
1024 ----------
1024 ----------
1025
1025
1026 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1026 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1027 ints are indices to self.history
1027 ints are indices to self.history
1028 strs are msg_ids
1028 strs are msg_ids
1029 default: wait on all outstanding messages
1029 default: wait on all outstanding messages
1030 timeout : float
1030 timeout : float
1031 a time in seconds, after which to give up.
1031 a time in seconds, after which to give up.
1032 default is -1, which means no timeout
1032 default is -1, which means no timeout
1033
1033
1034 Returns
1034 Returns
1035 -------
1035 -------
1036
1036
1037 True : when all msg_ids are done
1037 True : when all msg_ids are done
1038 False : timeout reached, some msg_ids still outstanding
1038 False : timeout reached, some msg_ids still outstanding
1039 """
1039 """
1040 tic = time.time()
1040 tic = time.time()
1041 if jobs is None:
1041 if jobs is None:
1042 theids = self.outstanding
1042 theids = self.outstanding
1043 else:
1043 else:
1044 if isinstance(jobs, (int, basestring, AsyncResult)):
1044 if isinstance(jobs, (int, basestring, AsyncResult)):
1045 jobs = [jobs]
1045 jobs = [jobs]
1046 theids = set()
1046 theids = set()
1047 for job in jobs:
1047 for job in jobs:
1048 if isinstance(job, int):
1048 if isinstance(job, int):
1049 # index access
1049 # index access
1050 job = self.history[job]
1050 job = self.history[job]
1051 elif isinstance(job, AsyncResult):
1051 elif isinstance(job, AsyncResult):
1052 map(theids.add, job.msg_ids)
1052 map(theids.add, job.msg_ids)
1053 continue
1053 continue
1054 theids.add(job)
1054 theids.add(job)
1055 if not theids.intersection(self.outstanding):
1055 if not theids.intersection(self.outstanding):
1056 return True
1056 return True
1057 self.spin()
1057 self.spin()
1058 while theids.intersection(self.outstanding):
1058 while theids.intersection(self.outstanding):
1059 if timeout >= 0 and ( time.time()-tic ) > timeout:
1059 if timeout >= 0 and ( time.time()-tic ) > timeout:
1060 break
1060 break
1061 time.sleep(1e-3)
1061 time.sleep(1e-3)
1062 self.spin()
1062 self.spin()
1063 return len(theids.intersection(self.outstanding)) == 0
1063 return len(theids.intersection(self.outstanding)) == 0
1064
1064
1065 #--------------------------------------------------------------------------
1065 #--------------------------------------------------------------------------
1066 # Control methods
1066 # Control methods
1067 #--------------------------------------------------------------------------
1067 #--------------------------------------------------------------------------
1068
1068
1069 @spin_first
1069 @spin_first
1070 def clear(self, targets=None, block=None):
1070 def clear(self, targets=None, block=None):
1071 """Clear the namespace in target(s)."""
1071 """Clear the namespace in target(s)."""
1072 block = self.block if block is None else block
1072 block = self.block if block is None else block
1073 targets = self._build_targets(targets)[0]
1073 targets = self._build_targets(targets)[0]
1074 for t in targets:
1074 for t in targets:
1075 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1075 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1076 error = False
1076 error = False
1077 if block:
1077 if block:
1078 self._flush_ignored_control()
1078 self._flush_ignored_control()
1079 for i in range(len(targets)):
1079 for i in range(len(targets)):
1080 idents,msg = self.session.recv(self._control_socket,0)
1080 idents,msg = self.session.recv(self._control_socket,0)
1081 if self.debug:
1081 if self.debug:
1082 pprint(msg)
1082 pprint(msg)
1083 if msg['content']['status'] != 'ok':
1083 if msg['content']['status'] != 'ok':
1084 error = self._unwrap_exception(msg['content'])
1084 error = self._unwrap_exception(msg['content'])
1085 else:
1085 else:
1086 self._ignored_control_replies += len(targets)
1086 self._ignored_control_replies += len(targets)
1087 if error:
1087 if error:
1088 raise error
1088 raise error
1089
1089
1090
1090
1091 @spin_first
1091 @spin_first
1092 def abort(self, jobs=None, targets=None, block=None):
1092 def abort(self, jobs=None, targets=None, block=None):
1093 """Abort specific jobs from the execution queues of target(s).
1093 """Abort specific jobs from the execution queues of target(s).
1094
1094
1095 This is a mechanism to prevent jobs that have already been submitted
1095 This is a mechanism to prevent jobs that have already been submitted
1096 from executing.
1096 from executing.
1097
1097
1098 Parameters
1098 Parameters
1099 ----------
1099 ----------
1100
1100
1101 jobs : msg_id, list of msg_ids, or AsyncResult
1101 jobs : msg_id, list of msg_ids, or AsyncResult
1102 The jobs to be aborted
1102 The jobs to be aborted
1103
1103
1104 If unspecified/None: abort all outstanding jobs.
1104 If unspecified/None: abort all outstanding jobs.
1105
1105
1106 """
1106 """
1107 block = self.block if block is None else block
1107 block = self.block if block is None else block
1108 jobs = jobs if jobs is not None else list(self.outstanding)
1108 jobs = jobs if jobs is not None else list(self.outstanding)
1109 targets = self._build_targets(targets)[0]
1109 targets = self._build_targets(targets)[0]
1110
1110
1111 msg_ids = []
1111 msg_ids = []
1112 if isinstance(jobs, (basestring,AsyncResult)):
1112 if isinstance(jobs, (basestring,AsyncResult)):
1113 jobs = [jobs]
1113 jobs = [jobs]
1114 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1114 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1115 if bad_ids:
1115 if bad_ids:
1116 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1116 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1117 for j in jobs:
1117 for j in jobs:
1118 if isinstance(j, AsyncResult):
1118 if isinstance(j, AsyncResult):
1119 msg_ids.extend(j.msg_ids)
1119 msg_ids.extend(j.msg_ids)
1120 else:
1120 else:
1121 msg_ids.append(j)
1121 msg_ids.append(j)
1122 content = dict(msg_ids=msg_ids)
1122 content = dict(msg_ids=msg_ids)
1123 for t in targets:
1123 for t in targets:
1124 self.session.send(self._control_socket, 'abort_request',
1124 self.session.send(self._control_socket, 'abort_request',
1125 content=content, ident=t)
1125 content=content, ident=t)
1126 error = False
1126 error = False
1127 if block:
1127 if block:
1128 self._flush_ignored_control()
1128 self._flush_ignored_control()
1129 for i in range(len(targets)):
1129 for i in range(len(targets)):
1130 idents,msg = self.session.recv(self._control_socket,0)
1130 idents,msg = self.session.recv(self._control_socket,0)
1131 if self.debug:
1131 if self.debug:
1132 pprint(msg)
1132 pprint(msg)
1133 if msg['content']['status'] != 'ok':
1133 if msg['content']['status'] != 'ok':
1134 error = self._unwrap_exception(msg['content'])
1134 error = self._unwrap_exception(msg['content'])
1135 else:
1135 else:
1136 self._ignored_control_replies += len(targets)
1136 self._ignored_control_replies += len(targets)
1137 if error:
1137 if error:
1138 raise error
1138 raise error
1139
1139
1140 @spin_first
1140 @spin_first
1141 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1141 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1142 """Terminates one or more engine processes, optionally including the hub.
1142 """Terminates one or more engine processes, optionally including the hub.
1143
1143
1144 Parameters
1144 Parameters
1145 ----------
1145 ----------
1146
1146
1147 targets: list of ints or 'all' [default: all]
1147 targets: list of ints or 'all' [default: all]
1148 Which engines to shutdown.
1148 Which engines to shutdown.
1149 hub: bool [default: False]
1149 hub: bool [default: False]
1150 Whether to include the Hub. hub=True implies targets='all'.
1150 Whether to include the Hub. hub=True implies targets='all'.
1151 block: bool [default: self.block]
1151 block: bool [default: self.block]
1152 Whether to wait for clean shutdown replies or not.
1152 Whether to wait for clean shutdown replies or not.
1153 restart: bool [default: False]
1153 restart: bool [default: False]
1154 NOT IMPLEMENTED
1154 NOT IMPLEMENTED
1155 whether to restart engines after shutting them down.
1155 whether to restart engines after shutting them down.
1156 """
1156 """
1157
1157
1158 if restart:
1158 if restart:
1159 raise NotImplementedError("Engine restart is not yet implemented")
1159 raise NotImplementedError("Engine restart is not yet implemented")
1160
1160
1161 block = self.block if block is None else block
1161 block = self.block if block is None else block
1162 if hub:
1162 if hub:
1163 targets = 'all'
1163 targets = 'all'
1164 targets = self._build_targets(targets)[0]
1164 targets = self._build_targets(targets)[0]
1165 for t in targets:
1165 for t in targets:
1166 self.session.send(self._control_socket, 'shutdown_request',
1166 self.session.send(self._control_socket, 'shutdown_request',
1167 content={'restart':restart},ident=t)
1167 content={'restart':restart},ident=t)
1168 error = False
1168 error = False
1169 if block or hub:
1169 if block or hub:
1170 self._flush_ignored_control()
1170 self._flush_ignored_control()
1171 for i in range(len(targets)):
1171 for i in range(len(targets)):
1172 idents,msg = self.session.recv(self._control_socket, 0)
1172 idents,msg = self.session.recv(self._control_socket, 0)
1173 if self.debug:
1173 if self.debug:
1174 pprint(msg)
1174 pprint(msg)
1175 if msg['content']['status'] != 'ok':
1175 if msg['content']['status'] != 'ok':
1176 error = self._unwrap_exception(msg['content'])
1176 error = self._unwrap_exception(msg['content'])
1177 else:
1177 else:
1178 self._ignored_control_replies += len(targets)
1178 self._ignored_control_replies += len(targets)
1179
1179
1180 if hub:
1180 if hub:
1181 time.sleep(0.25)
1181 time.sleep(0.25)
1182 self.session.send(self._query_socket, 'shutdown_request')
1182 self.session.send(self._query_socket, 'shutdown_request')
1183 idents,msg = self.session.recv(self._query_socket, 0)
1183 idents,msg = self.session.recv(self._query_socket, 0)
1184 if self.debug:
1184 if self.debug:
1185 pprint(msg)
1185 pprint(msg)
1186 if msg['content']['status'] != 'ok':
1186 if msg['content']['status'] != 'ok':
1187 error = self._unwrap_exception(msg['content'])
1187 error = self._unwrap_exception(msg['content'])
1188
1188
1189 if error:
1189 if error:
1190 raise error
1190 raise error
1191
1191
1192 #--------------------------------------------------------------------------
1192 #--------------------------------------------------------------------------
1193 # Execution related methods
1193 # Execution related methods
1194 #--------------------------------------------------------------------------
1194 #--------------------------------------------------------------------------
1195
1195
1196 def _maybe_raise(self, result):
1196 def _maybe_raise(self, result):
1197 """wrapper for maybe raising an exception if apply failed."""
1197 """wrapper for maybe raising an exception if apply failed."""
1198 if isinstance(result, error.RemoteError):
1198 if isinstance(result, error.RemoteError):
1199 raise result
1199 raise result
1200
1200
1201 return result
1201 return result
1202
1202
1203 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1203 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1204 ident=None):
1204 ident=None):
1205 """construct and send an apply message via a socket.
1205 """construct and send an apply message via a socket.
1206
1206
1207 This is the principal method with which all engine execution is performed by views.
1207 This is the principal method with which all engine execution is performed by views.
1208 """
1208 """
1209
1209
1210 if self._closed:
1210 if self._closed:
1211 raise RuntimeError("Client cannot be used after its sockets have been closed")
1211 raise RuntimeError("Client cannot be used after its sockets have been closed")
1212
1212
1213 # defaults:
1213 # defaults:
1214 args = args if args is not None else []
1214 args = args if args is not None else []
1215 kwargs = kwargs if kwargs is not None else {}
1215 kwargs = kwargs if kwargs is not None else {}
1216 metadata = metadata if metadata is not None else {}
1216 metadata = metadata if metadata is not None else {}
1217
1217
1218 # validate arguments
1218 # validate arguments
1219 if not callable(f) and not isinstance(f, Reference):
1219 if not callable(f) and not isinstance(f, Reference):
1220 raise TypeError("f must be callable, not %s"%type(f))
1220 raise TypeError("f must be callable, not %s"%type(f))
1221 if not isinstance(args, (tuple, list)):
1221 if not isinstance(args, (tuple, list)):
1222 raise TypeError("args must be tuple or list, not %s"%type(args))
1222 raise TypeError("args must be tuple or list, not %s"%type(args))
1223 if not isinstance(kwargs, dict):
1223 if not isinstance(kwargs, dict):
1224 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1224 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1225 if not isinstance(metadata, dict):
1225 if not isinstance(metadata, dict):
1226 raise TypeError("metadata must be dict, not %s"%type(metadata))
1226 raise TypeError("metadata must be dict, not %s"%type(metadata))
1227
1227
1228 bufs = serialize.pack_apply_message(f, args, kwargs,
1228 bufs = serialize.pack_apply_message(f, args, kwargs,
1229 buffer_threshold=self.session.buffer_threshold,
1229 buffer_threshold=self.session.buffer_threshold,
1230 item_threshold=self.session.item_threshold,
1230 item_threshold=self.session.item_threshold,
1231 )
1231 )
1232
1232
1233 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1233 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1234 metadata=metadata, track=track)
1234 metadata=metadata, track=track)
1235
1235
1236 msg_id = msg['header']['msg_id']
1236 msg_id = msg['header']['msg_id']
1237 self.outstanding.add(msg_id)
1237 self.outstanding.add(msg_id)
1238 if ident:
1238 if ident:
1239 # possibly routed to a specific engine
1239 # possibly routed to a specific engine
1240 if isinstance(ident, list):
1240 if isinstance(ident, list):
1241 ident = ident[-1]
1241 ident = ident[-1]
1242 if ident in self._engines.values():
1242 if ident in self._engines.values():
1243 # save for later, in case of engine death
1243 # save for later, in case of engine death
1244 self._outstanding_dict[ident].add(msg_id)
1244 self._outstanding_dict[ident].add(msg_id)
1245 self.history.append(msg_id)
1245 self.history.append(msg_id)
1246 self.metadata[msg_id]['submitted'] = datetime.now()
1246 self.metadata[msg_id]['submitted'] = datetime.now()
1247
1247
1248 return msg
1248 return msg
1249
1249
1250 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1250 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1251 """construct and send an execute request via a socket.
1251 """construct and send an execute request via a socket.
1252
1252
1253 """
1253 """
1254
1254
1255 if self._closed:
1255 if self._closed:
1256 raise RuntimeError("Client cannot be used after its sockets have been closed")
1256 raise RuntimeError("Client cannot be used after its sockets have been closed")
1257
1257
1258 # defaults:
1258 # defaults:
1259 metadata = metadata if metadata is not None else {}
1259 metadata = metadata if metadata is not None else {}
1260
1260
1261 # validate arguments
1261 # validate arguments
1262 if not isinstance(code, basestring):
1262 if not isinstance(code, basestring):
1263 raise TypeError("code must be text, not %s" % type(code))
1263 raise TypeError("code must be text, not %s" % type(code))
1264 if not isinstance(metadata, dict):
1264 if not isinstance(metadata, dict):
1265 raise TypeError("metadata must be dict, not %s" % type(metadata))
1265 raise TypeError("metadata must be dict, not %s" % type(metadata))
1266
1266
1267 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1267 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1268
1268
1269
1269
1270 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1270 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1271 metadata=metadata)
1271 metadata=metadata)
1272
1272
1273 msg_id = msg['header']['msg_id']
1273 msg_id = msg['header']['msg_id']
1274 self.outstanding.add(msg_id)
1274 self.outstanding.add(msg_id)
1275 if ident:
1275 if ident:
1276 # possibly routed to a specific engine
1276 # possibly routed to a specific engine
1277 if isinstance(ident, list):
1277 if isinstance(ident, list):
1278 ident = ident[-1]
1278 ident = ident[-1]
1279 if ident in self._engines.values():
1279 if ident in self._engines.values():
1280 # save for later, in case of engine death
1280 # save for later, in case of engine death
1281 self._outstanding_dict[ident].add(msg_id)
1281 self._outstanding_dict[ident].add(msg_id)
1282 self.history.append(msg_id)
1282 self.history.append(msg_id)
1283 self.metadata[msg_id]['submitted'] = datetime.now()
1283 self.metadata[msg_id]['submitted'] = datetime.now()
1284
1284
1285 return msg
1285 return msg
1286
1286
1287 #--------------------------------------------------------------------------
1287 #--------------------------------------------------------------------------
1288 # construct a View object
1288 # construct a View object
1289 #--------------------------------------------------------------------------
1289 #--------------------------------------------------------------------------
1290
1290
1291 def load_balanced_view(self, targets=None):
1291 def load_balanced_view(self, targets=None):
1292 """construct a DirectView object.
1292 """construct a DirectView object.
1293
1293
1294 If no arguments are specified, create a LoadBalancedView
1294 If no arguments are specified, create a LoadBalancedView
1295 using all engines.
1295 using all engines.
1296
1296
1297 Parameters
1297 Parameters
1298 ----------
1298 ----------
1299
1299
1300 targets: list,slice,int,etc. [default: use all engines]
1300 targets: list,slice,int,etc. [default: use all engines]
1301 The subset of engines across which to load-balance
1301 The subset of engines across which to load-balance
1302 """
1302 """
1303 if targets == 'all':
1303 if targets == 'all':
1304 targets = None
1304 targets = None
1305 if targets is not None:
1305 if targets is not None:
1306 targets = self._build_targets(targets)[1]
1306 targets = self._build_targets(targets)[1]
1307 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1307 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1308
1308
1309 def direct_view(self, targets='all'):
1309 def direct_view(self, targets='all'):
1310 """construct a DirectView object.
1310 """construct a DirectView object.
1311
1311
1312 If no targets are specified, create a DirectView using all engines.
1312 If no targets are specified, create a DirectView using all engines.
1313
1313
1314 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1314 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1315 evaluate the target engines at each execution, whereas rc[:] will connect to
1315 evaluate the target engines at each execution, whereas rc[:] will connect to
1316 all *current* engines, and that list will not change.
1316 all *current* engines, and that list will not change.
1317
1317
1318 That is, 'all' will always use all engines, whereas rc[:] will not use
1318 That is, 'all' will always use all engines, whereas rc[:] will not use
1319 engines added after the DirectView is constructed.
1319 engines added after the DirectView is constructed.
1320
1320
1321 Parameters
1321 Parameters
1322 ----------
1322 ----------
1323
1323
1324 targets: list,slice,int,etc. [default: use all engines]
1324 targets: list,slice,int,etc. [default: use all engines]
1325 The engines to use for the View
1325 The engines to use for the View
1326 """
1326 """
1327 single = isinstance(targets, int)
1327 single = isinstance(targets, int)
1328 # allow 'all' to be lazily evaluated at each execution
1328 # allow 'all' to be lazily evaluated at each execution
1329 if targets != 'all':
1329 if targets != 'all':
1330 targets = self._build_targets(targets)[1]
1330 targets = self._build_targets(targets)[1]
1331 if single:
1331 if single:
1332 targets = targets[0]
1332 targets = targets[0]
1333 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1333 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1334
1334
1335 #--------------------------------------------------------------------------
1335 #--------------------------------------------------------------------------
1336 # Query methods
1336 # Query methods
1337 #--------------------------------------------------------------------------
1337 #--------------------------------------------------------------------------
1338
1338
1339 @spin_first
1339 @spin_first
1340 def get_result(self, indices_or_msg_ids=None, block=None):
1340 def get_result(self, indices_or_msg_ids=None, block=None):
1341 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1341 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1342
1342
1343 If the client already has the results, no request to the Hub will be made.
1343 If the client already has the results, no request to the Hub will be made.
1344
1344
1345 This is a convenient way to construct AsyncResult objects, which are wrappers
1345 This is a convenient way to construct AsyncResult objects, which are wrappers
1346 that include metadata about execution, and allow for awaiting results that
1346 that include metadata about execution, and allow for awaiting results that
1347 were not submitted by this Client.
1347 were not submitted by this Client.
1348
1348
1349 It can also be a convenient way to retrieve the metadata associated with
1349 It can also be a convenient way to retrieve the metadata associated with
1350 blocking execution, since it always retrieves
1350 blocking execution, since it always retrieves
1351
1351
1352 Examples
1352 Examples
1353 --------
1353 --------
1354 ::
1354 ::
1355
1355
1356 In [10]: r = client.apply()
1356 In [10]: r = client.apply()
1357
1357
1358 Parameters
1358 Parameters
1359 ----------
1359 ----------
1360
1360
1361 indices_or_msg_ids : integer history index, str msg_id, or list of either
1361 indices_or_msg_ids : integer history index, str msg_id, or list of either
1362 The indices or msg_ids of indices to be retrieved
1362 The indices or msg_ids of indices to be retrieved
1363
1363
1364 block : bool
1364 block : bool
1365 Whether to wait for the result to be done
1365 Whether to wait for the result to be done
1366
1366
1367 Returns
1367 Returns
1368 -------
1368 -------
1369
1369
1370 AsyncResult
1370 AsyncResult
1371 A single AsyncResult object will always be returned.
1371 A single AsyncResult object will always be returned.
1372
1372
1373 AsyncHubResult
1373 AsyncHubResult
1374 A subclass of AsyncResult that retrieves results from the Hub
1374 A subclass of AsyncResult that retrieves results from the Hub
1375
1375
1376 """
1376 """
1377 block = self.block if block is None else block
1377 block = self.block if block is None else block
1378 if indices_or_msg_ids is None:
1378 if indices_or_msg_ids is None:
1379 indices_or_msg_ids = -1
1379 indices_or_msg_ids = -1
1380
1380
1381 if not isinstance(indices_or_msg_ids, (list,tuple)):
1381 if not isinstance(indices_or_msg_ids, (list,tuple)):
1382 indices_or_msg_ids = [indices_or_msg_ids]
1382 indices_or_msg_ids = [indices_or_msg_ids]
1383
1383
1384 theids = []
1384 theids = []
1385 for id in indices_or_msg_ids:
1385 for id in indices_or_msg_ids:
1386 if isinstance(id, int):
1386 if isinstance(id, int):
1387 id = self.history[id]
1387 id = self.history[id]
1388 if not isinstance(id, basestring):
1388 if not isinstance(id, basestring):
1389 raise TypeError("indices must be str or int, not %r"%id)
1389 raise TypeError("indices must be str or int, not %r"%id)
1390 theids.append(id)
1390 theids.append(id)
1391
1391
1392 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1392 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1393 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1393 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1394
1394
1395 if remote_ids:
1395 if remote_ids:
1396 ar = AsyncHubResult(self, msg_ids=theids)
1396 ar = AsyncHubResult(self, msg_ids=theids)
1397 else:
1397 else:
1398 ar = AsyncResult(self, msg_ids=theids)
1398 ar = AsyncResult(self, msg_ids=theids)
1399
1399
1400 if block:
1400 if block:
1401 ar.wait()
1401 ar.wait()
1402
1402
1403 return ar
1403 return ar
1404
1404
1405 @spin_first
1405 @spin_first
1406 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1406 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1407 """Resubmit one or more tasks.
1407 """Resubmit one or more tasks.
1408
1408
1409 in-flight tasks may not be resubmitted.
1409 in-flight tasks may not be resubmitted.
1410
1410
1411 Parameters
1411 Parameters
1412 ----------
1412 ----------
1413
1413
1414 indices_or_msg_ids : integer history index, str msg_id, or list of either
1414 indices_or_msg_ids : integer history index, str msg_id, or list of either
1415 The indices or msg_ids of indices to be retrieved
1415 The indices or msg_ids of indices to be retrieved
1416
1416
1417 block : bool
1417 block : bool
1418 Whether to wait for the result to be done
1418 Whether to wait for the result to be done
1419
1419
1420 Returns
1420 Returns
1421 -------
1421 -------
1422
1422
1423 AsyncHubResult
1423 AsyncHubResult
1424 A subclass of AsyncResult that retrieves results from the Hub
1424 A subclass of AsyncResult that retrieves results from the Hub
1425
1425
1426 """
1426 """
1427 block = self.block if block is None else block
1427 block = self.block if block is None else block
1428 if indices_or_msg_ids is None:
1428 if indices_or_msg_ids is None:
1429 indices_or_msg_ids = -1
1429 indices_or_msg_ids = -1
1430
1430
1431 if not isinstance(indices_or_msg_ids, (list,tuple)):
1431 if not isinstance(indices_or_msg_ids, (list,tuple)):
1432 indices_or_msg_ids = [indices_or_msg_ids]
1432 indices_or_msg_ids = [indices_or_msg_ids]
1433
1433
1434 theids = []
1434 theids = []
1435 for id in indices_or_msg_ids:
1435 for id in indices_or_msg_ids:
1436 if isinstance(id, int):
1436 if isinstance(id, int):
1437 id = self.history[id]
1437 id = self.history[id]
1438 if not isinstance(id, basestring):
1438 if not isinstance(id, basestring):
1439 raise TypeError("indices must be str or int, not %r"%id)
1439 raise TypeError("indices must be str or int, not %r"%id)
1440 theids.append(id)
1440 theids.append(id)
1441
1441
1442 content = dict(msg_ids = theids)
1442 content = dict(msg_ids = theids)
1443
1443
1444 self.session.send(self._query_socket, 'resubmit_request', content)
1444 self.session.send(self._query_socket, 'resubmit_request', content)
1445
1445
1446 zmq.select([self._query_socket], [], [])
1446 zmq.select([self._query_socket], [], [])
1447 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1447 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1448 if self.debug:
1448 if self.debug:
1449 pprint(msg)
1449 pprint(msg)
1450 content = msg['content']
1450 content = msg['content']
1451 if content['status'] != 'ok':
1451 if content['status'] != 'ok':
1452 raise self._unwrap_exception(content)
1452 raise self._unwrap_exception(content)
1453 mapping = content['resubmitted']
1453 mapping = content['resubmitted']
1454 new_ids = [ mapping[msg_id] for msg_id in theids ]
1454 new_ids = [ mapping[msg_id] for msg_id in theids ]
1455
1455
1456 ar = AsyncHubResult(self, msg_ids=new_ids)
1456 ar = AsyncHubResult(self, msg_ids=new_ids)
1457
1457
1458 if block:
1458 if block:
1459 ar.wait()
1459 ar.wait()
1460
1460
1461 return ar
1461 return ar
1462
1462
1463 @spin_first
1463 @spin_first
1464 def result_status(self, msg_ids, status_only=True):
1464 def result_status(self, msg_ids, status_only=True):
1465 """Check on the status of the result(s) of the apply request with `msg_ids`.
1465 """Check on the status of the result(s) of the apply request with `msg_ids`.
1466
1466
1467 If status_only is False, then the actual results will be retrieved, else
1467 If status_only is False, then the actual results will be retrieved, else
1468 only the status of the results will be checked.
1468 only the status of the results will be checked.
1469
1469
1470 Parameters
1470 Parameters
1471 ----------
1471 ----------
1472
1472
1473 msg_ids : list of msg_ids
1473 msg_ids : list of msg_ids
1474 if int:
1474 if int:
1475 Passed as index to self.history for convenience.
1475 Passed as index to self.history for convenience.
1476 status_only : bool (default: True)
1476 status_only : bool (default: True)
1477 if False:
1477 if False:
1478 Retrieve the actual results of completed tasks.
1478 Retrieve the actual results of completed tasks.
1479
1479
1480 Returns
1480 Returns
1481 -------
1481 -------
1482
1482
1483 results : dict
1483 results : dict
1484 There will always be the keys 'pending' and 'completed', which will
1484 There will always be the keys 'pending' and 'completed', which will
1485 be lists of msg_ids that are incomplete or complete. If `status_only`
1485 be lists of msg_ids that are incomplete or complete. If `status_only`
1486 is False, then completed results will be keyed by their `msg_id`.
1486 is False, then completed results will be keyed by their `msg_id`.
1487 """
1487 """
1488 if not isinstance(msg_ids, (list,tuple)):
1488 if not isinstance(msg_ids, (list,tuple)):
1489 msg_ids = [msg_ids]
1489 msg_ids = [msg_ids]
1490
1490
1491 theids = []
1491 theids = []
1492 for msg_id in msg_ids:
1492 for msg_id in msg_ids:
1493 if isinstance(msg_id, int):
1493 if isinstance(msg_id, int):
1494 msg_id = self.history[msg_id]
1494 msg_id = self.history[msg_id]
1495 if not isinstance(msg_id, basestring):
1495 if not isinstance(msg_id, basestring):
1496 raise TypeError("msg_ids must be str, not %r"%msg_id)
1496 raise TypeError("msg_ids must be str, not %r"%msg_id)
1497 theids.append(msg_id)
1497 theids.append(msg_id)
1498
1498
1499 completed = []
1499 completed = []
1500 local_results = {}
1500 local_results = {}
1501
1501
1502 # comment this block out to temporarily disable local shortcut:
1502 # comment this block out to temporarily disable local shortcut:
1503 for msg_id in theids:
1503 for msg_id in theids:
1504 if msg_id in self.results:
1504 if msg_id in self.results:
1505 completed.append(msg_id)
1505 completed.append(msg_id)
1506 local_results[msg_id] = self.results[msg_id]
1506 local_results[msg_id] = self.results[msg_id]
1507 theids.remove(msg_id)
1507 theids.remove(msg_id)
1508
1508
1509 if theids: # some not locally cached
1509 if theids: # some not locally cached
1510 content = dict(msg_ids=theids, status_only=status_only)
1510 content = dict(msg_ids=theids, status_only=status_only)
1511 msg = self.session.send(self._query_socket, "result_request", content=content)
1511 msg = self.session.send(self._query_socket, "result_request", content=content)
1512 zmq.select([self._query_socket], [], [])
1512 zmq.select([self._query_socket], [], [])
1513 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1513 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1514 if self.debug:
1514 if self.debug:
1515 pprint(msg)
1515 pprint(msg)
1516 content = msg['content']
1516 content = msg['content']
1517 if content['status'] != 'ok':
1517 if content['status'] != 'ok':
1518 raise self._unwrap_exception(content)
1518 raise self._unwrap_exception(content)
1519 buffers = msg['buffers']
1519 buffers = msg['buffers']
1520 else:
1520 else:
1521 content = dict(completed=[],pending=[])
1521 content = dict(completed=[],pending=[])
1522
1522
1523 content['completed'].extend(completed)
1523 content['completed'].extend(completed)
1524
1524
1525 if status_only:
1525 if status_only:
1526 return content
1526 return content
1527
1527
1528 failures = []
1528 failures = []
1529 # load cached results into result:
1529 # load cached results into result:
1530 content.update(local_results)
1530 content.update(local_results)
1531
1531
1532 # update cache with results:
1532 # update cache with results:
1533 for msg_id in sorted(theids):
1533 for msg_id in sorted(theids):
1534 if msg_id in content['completed']:
1534 if msg_id in content['completed']:
1535 rec = content[msg_id]
1535 rec = content[msg_id]
1536 parent = rec['header']
1536 parent = rec['header']
1537 header = rec['result_header']
1537 header = rec['result_header']
1538 rcontent = rec['result_content']
1538 rcontent = rec['result_content']
1539 iodict = rec['io']
1539 iodict = rec['io']
1540 if isinstance(rcontent, str):
1540 if isinstance(rcontent, str):
1541 rcontent = self.session.unpack(rcontent)
1541 rcontent = self.session.unpack(rcontent)
1542
1542
1543 md = self.metadata[msg_id]
1543 md = self.metadata[msg_id]
1544 md_msg = dict(
1544 md_msg = dict(
1545 content=rcontent,
1545 content=rcontent,
1546 parent_header=parent,
1546 parent_header=parent,
1547 header=header,
1547 header=header,
1548 metadata=rec['result_metadata'],
1548 metadata=rec['result_metadata'],
1549 )
1549 )
1550 md.update(self._extract_metadata(md_msg))
1550 md.update(self._extract_metadata(md_msg))
1551 if rec.get('received'):
1551 if rec.get('received'):
1552 md['received'] = rec['received']
1552 md['received'] = rec['received']
1553 md.update(iodict)
1553 md.update(iodict)
1554
1554
1555 if rcontent['status'] == 'ok':
1555 if rcontent['status'] == 'ok':
1556 if header['msg_type'] == 'apply_reply':
1556 if header['msg_type'] == 'apply_reply':
1557 res,buffers = serialize.unserialize_object(buffers)
1557 res,buffers = serialize.unserialize_object(buffers)
1558 elif header['msg_type'] == 'execute_reply':
1558 elif header['msg_type'] == 'execute_reply':
1559 res = ExecuteReply(msg_id, rcontent, md)
1559 res = ExecuteReply(msg_id, rcontent, md)
1560 else:
1560 else:
1561 raise KeyError("unhandled msg type: %r" % header[msg_type])
1561 raise KeyError("unhandled msg type: %r" % header[msg_type])
1562 else:
1562 else:
1563 res = self._unwrap_exception(rcontent)
1563 res = self._unwrap_exception(rcontent)
1564 failures.append(res)
1564 failures.append(res)
1565
1565
1566 self.results[msg_id] = res
1566 self.results[msg_id] = res
1567 content[msg_id] = res
1567 content[msg_id] = res
1568
1568
1569 if len(theids) == 1 and failures:
1569 if len(theids) == 1 and failures:
1570 raise failures[0]
1570 raise failures[0]
1571
1571
1572 error.collect_exceptions(failures, "result_status")
1572 error.collect_exceptions(failures, "result_status")
1573 return content
1573 return content
1574
1574
1575 @spin_first
1575 @spin_first
1576 def queue_status(self, targets='all', verbose=False):
1576 def queue_status(self, targets='all', verbose=False):
1577 """Fetch the status of engine queues.
1577 """Fetch the status of engine queues.
1578
1578
1579 Parameters
1579 Parameters
1580 ----------
1580 ----------
1581
1581
1582 targets : int/str/list of ints/strs
1582 targets : int/str/list of ints/strs
1583 the engines whose states are to be queried.
1583 the engines whose states are to be queried.
1584 default : all
1584 default : all
1585 verbose : bool
1585 verbose : bool
1586 Whether to return lengths only, or lists of ids for each element
1586 Whether to return lengths only, or lists of ids for each element
1587 """
1587 """
1588 if targets == 'all':
1588 if targets == 'all':
1589 # allow 'all' to be evaluated on the engine
1589 # allow 'all' to be evaluated on the engine
1590 engine_ids = None
1590 engine_ids = None
1591 else:
1591 else:
1592 engine_ids = self._build_targets(targets)[1]
1592 engine_ids = self._build_targets(targets)[1]
1593 content = dict(targets=engine_ids, verbose=verbose)
1593 content = dict(targets=engine_ids, verbose=verbose)
1594 self.session.send(self._query_socket, "queue_request", content=content)
1594 self.session.send(self._query_socket, "queue_request", content=content)
1595 idents,msg = self.session.recv(self._query_socket, 0)
1595 idents,msg = self.session.recv(self._query_socket, 0)
1596 if self.debug:
1596 if self.debug:
1597 pprint(msg)
1597 pprint(msg)
1598 content = msg['content']
1598 content = msg['content']
1599 status = content.pop('status')
1599 status = content.pop('status')
1600 if status != 'ok':
1600 if status != 'ok':
1601 raise self._unwrap_exception(content)
1601 raise self._unwrap_exception(content)
1602 content = rekey(content)
1602 content = rekey(content)
1603 if isinstance(targets, int):
1603 if isinstance(targets, int):
1604 return content[targets]
1604 return content[targets]
1605 else:
1605 else:
1606 return content
1606 return content
1607
1607
1608 @spin_first
1608 @spin_first
1609 def purge_results(self, jobs=[], targets=[]):
1609 def purge_results(self, jobs=[], targets=[]):
1610 """Tell the Hub to forget results.
1610 """Tell the Hub to forget results.
1611
1611
1612 Individual results can be purged by msg_id, or the entire
1612 Individual results can be purged by msg_id, or the entire
1613 history of specific targets can be purged.
1613 history of specific targets can be purged.
1614
1614
1615 Use `purge_results('all')` to scrub everything from the Hub's db.
1615 Use `purge_results('all')` to scrub everything from the Hub's db.
1616
1616
1617 Parameters
1617 Parameters
1618 ----------
1618 ----------
1619
1619
1620 jobs : str or list of str or AsyncResult objects
1620 jobs : str or list of str or AsyncResult objects
1621 the msg_ids whose results should be forgotten.
1621 the msg_ids whose results should be forgotten.
1622 targets : int/str/list of ints/strs
1622 targets : int/str/list of ints/strs
1623 The targets, by int_id, whose entire history is to be purged.
1623 The targets, by int_id, whose entire history is to be purged.
1624
1624
1625 default : None
1625 default : None
1626 """
1626 """
1627 if not targets and not jobs:
1627 if not targets and not jobs:
1628 raise ValueError("Must specify at least one of `targets` and `jobs`")
1628 raise ValueError("Must specify at least one of `targets` and `jobs`")
1629 if targets:
1629 if targets:
1630 targets = self._build_targets(targets)[1]
1630 targets = self._build_targets(targets)[1]
1631
1631
1632 # construct msg_ids from jobs
1632 # construct msg_ids from jobs
1633 if jobs == 'all':
1633 if jobs == 'all':
1634 msg_ids = jobs
1634 msg_ids = jobs
1635 else:
1635 else:
1636 msg_ids = []
1636 msg_ids = []
1637 if isinstance(jobs, (basestring,AsyncResult)):
1637 if isinstance(jobs, (basestring,AsyncResult)):
1638 jobs = [jobs]
1638 jobs = [jobs]
1639 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1639 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1640 if bad_ids:
1640 if bad_ids:
1641 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1641 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1642 for j in jobs:
1642 for j in jobs:
1643 if isinstance(j, AsyncResult):
1643 if isinstance(j, AsyncResult):
1644 msg_ids.extend(j.msg_ids)
1644 msg_ids.extend(j.msg_ids)
1645 else:
1645 else:
1646 msg_ids.append(j)
1646 msg_ids.append(j)
1647
1647
1648 content = dict(engine_ids=targets, msg_ids=msg_ids)
1648 content = dict(engine_ids=targets, msg_ids=msg_ids)
1649 self.session.send(self._query_socket, "purge_request", content=content)
1649 self.session.send(self._query_socket, "purge_request", content=content)
1650 idents, msg = self.session.recv(self._query_socket, 0)
1650 idents, msg = self.session.recv(self._query_socket, 0)
1651 if self.debug:
1651 if self.debug:
1652 pprint(msg)
1652 pprint(msg)
1653 content = msg['content']
1653 content = msg['content']
1654 if content['status'] != 'ok':
1654 if content['status'] != 'ok':
1655 raise self._unwrap_exception(content)
1655 raise self._unwrap_exception(content)
1656
1656
1657 @spin_first
1657 @spin_first
1658 def hub_history(self):
1658 def hub_history(self):
1659 """Get the Hub's history
1659 """Get the Hub's history
1660
1660
1661 Just like the Client, the Hub has a history, which is a list of msg_ids.
1661 Just like the Client, the Hub has a history, which is a list of msg_ids.
1662 This will contain the history of all clients, and, depending on configuration,
1662 This will contain the history of all clients, and, depending on configuration,
1663 may contain history across multiple cluster sessions.
1663 may contain history across multiple cluster sessions.
1664
1664
1665 Any msg_id returned here is a valid argument to `get_result`.
1665 Any msg_id returned here is a valid argument to `get_result`.
1666
1666
1667 Returns
1667 Returns
1668 -------
1668 -------
1669
1669
1670 msg_ids : list of strs
1670 msg_ids : list of strs
1671 list of all msg_ids, ordered by task submission time.
1671 list of all msg_ids, ordered by task submission time.
1672 """
1672 """
1673
1673
1674 self.session.send(self._query_socket, "history_request", content={})
1674 self.session.send(self._query_socket, "history_request", content={})
1675 idents, msg = self.session.recv(self._query_socket, 0)
1675 idents, msg = self.session.recv(self._query_socket, 0)
1676
1676
1677 if self.debug:
1677 if self.debug:
1678 pprint(msg)
1678 pprint(msg)
1679 content = msg['content']
1679 content = msg['content']
1680 if content['status'] != 'ok':
1680 if content['status'] != 'ok':
1681 raise self._unwrap_exception(content)
1681 raise self._unwrap_exception(content)
1682 else:
1682 else:
1683 return content['history']
1683 return content['history']
1684
1684
1685 @spin_first
1685 @spin_first
1686 def db_query(self, query, keys=None):
1686 def db_query(self, query, keys=None):
1687 """Query the Hub's TaskRecord database
1687 """Query the Hub's TaskRecord database
1688
1688
1689 This will return a list of task record dicts that match `query`
1689 This will return a list of task record dicts that match `query`
1690
1690
1691 Parameters
1691 Parameters
1692 ----------
1692 ----------
1693
1693
1694 query : mongodb query dict
1694 query : mongodb query dict
1695 The search dict. See mongodb query docs for details.
1695 The search dict. See mongodb query docs for details.
1696 keys : list of strs [optional]
1696 keys : list of strs [optional]
1697 The subset of keys to be returned. The default is to fetch everything but buffers.
1697 The subset of keys to be returned. The default is to fetch everything but buffers.
1698 'msg_id' will *always* be included.
1698 'msg_id' will *always* be included.
1699 """
1699 """
1700 if isinstance(keys, basestring):
1700 if isinstance(keys, basestring):
1701 keys = [keys]
1701 keys = [keys]
1702 content = dict(query=query, keys=keys)
1702 content = dict(query=query, keys=keys)
1703 self.session.send(self._query_socket, "db_request", content=content)
1703 self.session.send(self._query_socket, "db_request", content=content)
1704 idents, msg = self.session.recv(self._query_socket, 0)
1704 idents, msg = self.session.recv(self._query_socket, 0)
1705 if self.debug:
1705 if self.debug:
1706 pprint(msg)
1706 pprint(msg)
1707 content = msg['content']
1707 content = msg['content']
1708 if content['status'] != 'ok':
1708 if content['status'] != 'ok':
1709 raise self._unwrap_exception(content)
1709 raise self._unwrap_exception(content)
1710
1710
1711 records = content['records']
1711 records = content['records']
1712
1712
1713 buffer_lens = content['buffer_lens']
1713 buffer_lens = content['buffer_lens']
1714 result_buffer_lens = content['result_buffer_lens']
1714 result_buffer_lens = content['result_buffer_lens']
1715 buffers = msg['buffers']
1715 buffers = msg['buffers']
1716 has_bufs = buffer_lens is not None
1716 has_bufs = buffer_lens is not None
1717 has_rbufs = result_buffer_lens is not None
1717 has_rbufs = result_buffer_lens is not None
1718 for i,rec in enumerate(records):
1718 for i,rec in enumerate(records):
1719 # relink buffers
1719 # relink buffers
1720 if has_bufs:
1720 if has_bufs:
1721 blen = buffer_lens[i]
1721 blen = buffer_lens[i]
1722 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1722 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1723 if has_rbufs:
1723 if has_rbufs:
1724 blen = result_buffer_lens[i]
1724 blen = result_buffer_lens[i]
1725 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1725 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1726
1726
1727 return records
1727 return records
1728
1728
1729 __all__ = [ 'Client' ]
1729 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now