##// END OF EJS Templates
close Client sockets if connection fails...
MinRK -
Show More
@@ -1,1853 +1,1858 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
36
36
37 from IPython.utils.capture import RichOutput
37 from IPython.utils.capture import RichOutput
38 from IPython.utils.coloransi import TermColors
38 from IPython.utils.coloransi import TermColors
39 from IPython.utils.jsonutil import rekey
39 from IPython.utils.jsonutil import rekey
40 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
40 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
41 from IPython.utils.path import get_ipython_dir
41 from IPython.utils.path import get_ipython_dir
42 from IPython.utils.py3compat import cast_bytes
42 from IPython.utils.py3compat import cast_bytes
43 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
43 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
44 Dict, List, Bool, Set, Any)
44 Dict, List, Bool, Set, Any)
45 from IPython.external.decorator import decorator
45 from IPython.external.decorator import decorator
46 from IPython.external.ssh import tunnel
46 from IPython.external.ssh import tunnel
47
47
48 from IPython.parallel import Reference
48 from IPython.parallel import Reference
49 from IPython.parallel import error
49 from IPython.parallel import error
50 from IPython.parallel import util
50 from IPython.parallel import util
51
51
52 from IPython.kernel.zmq.session import Session, Message
52 from IPython.kernel.zmq.session import Session, Message
53 from IPython.kernel.zmq import serialize
53 from IPython.kernel.zmq import serialize
54
54
55 from .asyncresult import AsyncResult, AsyncHubResult
55 from .asyncresult import AsyncResult, AsyncHubResult
56 from .view import DirectView, LoadBalancedView
56 from .view import DirectView, LoadBalancedView
57
57
58 if sys.version_info[0] >= 3:
58 if sys.version_info[0] >= 3:
59 # xrange is used in a couple 'isinstance' tests in py2
59 # xrange is used in a couple 'isinstance' tests in py2
60 # should be just 'range' in 3k
60 # should be just 'range' in 3k
61 xrange = range
61 xrange = range
62
62
63 #--------------------------------------------------------------------------
63 #--------------------------------------------------------------------------
64 # Decorators for Client methods
64 # Decorators for Client methods
65 #--------------------------------------------------------------------------
65 #--------------------------------------------------------------------------
66
66
67 @decorator
67 @decorator
68 def spin_first(f, self, *args, **kwargs):
68 def spin_first(f, self, *args, **kwargs):
69 """Call spin() to sync state prior to calling the method."""
69 """Call spin() to sync state prior to calling the method."""
70 self.spin()
70 self.spin()
71 return f(self, *args, **kwargs)
71 return f(self, *args, **kwargs)
72
72
73
73
74 #--------------------------------------------------------------------------
74 #--------------------------------------------------------------------------
75 # Classes
75 # Classes
76 #--------------------------------------------------------------------------
76 #--------------------------------------------------------------------------
77
77
78
78
79 class ExecuteReply(RichOutput):
79 class ExecuteReply(RichOutput):
80 """wrapper for finished Execute results"""
80 """wrapper for finished Execute results"""
81 def __init__(self, msg_id, content, metadata):
81 def __init__(self, msg_id, content, metadata):
82 self.msg_id = msg_id
82 self.msg_id = msg_id
83 self._content = content
83 self._content = content
84 self.execution_count = content['execution_count']
84 self.execution_count = content['execution_count']
85 self.metadata = metadata
85 self.metadata = metadata
86
86
87 # RichOutput overrides
87 # RichOutput overrides
88
88
89 @property
89 @property
90 def source(self):
90 def source(self):
91 pyout = self.metadata['pyout']
91 pyout = self.metadata['pyout']
92 if pyout:
92 if pyout:
93 return pyout.get('source', '')
93 return pyout.get('source', '')
94
94
95 @property
95 @property
96 def data(self):
96 def data(self):
97 pyout = self.metadata['pyout']
97 pyout = self.metadata['pyout']
98 if pyout:
98 if pyout:
99 return pyout.get('data', {})
99 return pyout.get('data', {})
100
100
101 @property
101 @property
102 def _metadata(self):
102 def _metadata(self):
103 pyout = self.metadata['pyout']
103 pyout = self.metadata['pyout']
104 if pyout:
104 if pyout:
105 return pyout.get('metadata', {})
105 return pyout.get('metadata', {})
106
106
107 def display(self):
107 def display(self):
108 from IPython.display import publish_display_data
108 from IPython.display import publish_display_data
109 publish_display_data(self.source, self.data, self.metadata)
109 publish_display_data(self.source, self.data, self.metadata)
110
110
111 def _repr_mime_(self, mime):
111 def _repr_mime_(self, mime):
112 if mime not in self.data:
112 if mime not in self.data:
113 return
113 return
114 data = self.data[mime]
114 data = self.data[mime]
115 if mime in self._metadata:
115 if mime in self._metadata:
116 return data, self._metadata[mime]
116 return data, self._metadata[mime]
117 else:
117 else:
118 return data
118 return data
119
119
120 def __getitem__(self, key):
120 def __getitem__(self, key):
121 return self.metadata[key]
121 return self.metadata[key]
122
122
123 def __getattr__(self, key):
123 def __getattr__(self, key):
124 if key not in self.metadata:
124 if key not in self.metadata:
125 raise AttributeError(key)
125 raise AttributeError(key)
126 return self.metadata[key]
126 return self.metadata[key]
127
127
128 def __repr__(self):
128 def __repr__(self):
129 pyout = self.metadata['pyout'] or {'data':{}}
129 pyout = self.metadata['pyout'] or {'data':{}}
130 text_out = pyout['data'].get('text/plain', '')
130 text_out = pyout['data'].get('text/plain', '')
131 if len(text_out) > 32:
131 if len(text_out) > 32:
132 text_out = text_out[:29] + '...'
132 text_out = text_out[:29] + '...'
133
133
134 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
134 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
135
135
136 def _repr_pretty_(self, p, cycle):
136 def _repr_pretty_(self, p, cycle):
137 pyout = self.metadata['pyout'] or {'data':{}}
137 pyout = self.metadata['pyout'] or {'data':{}}
138 text_out = pyout['data'].get('text/plain', '')
138 text_out = pyout['data'].get('text/plain', '')
139
139
140 if not text_out:
140 if not text_out:
141 return
141 return
142
142
143 try:
143 try:
144 ip = get_ipython()
144 ip = get_ipython()
145 except NameError:
145 except NameError:
146 colors = "NoColor"
146 colors = "NoColor"
147 else:
147 else:
148 colors = ip.colors
148 colors = ip.colors
149
149
150 if colors == "NoColor":
150 if colors == "NoColor":
151 out = normal = ""
151 out = normal = ""
152 else:
152 else:
153 out = TermColors.Red
153 out = TermColors.Red
154 normal = TermColors.Normal
154 normal = TermColors.Normal
155
155
156 if '\n' in text_out and not text_out.startswith('\n'):
156 if '\n' in text_out and not text_out.startswith('\n'):
157 # add newline for multiline reprs
157 # add newline for multiline reprs
158 text_out = '\n' + text_out
158 text_out = '\n' + text_out
159
159
160 p.text(
160 p.text(
161 out + u'Out[%i:%i]: ' % (
161 out + u'Out[%i:%i]: ' % (
162 self.metadata['engine_id'], self.execution_count
162 self.metadata['engine_id'], self.execution_count
163 ) + normal + text_out
163 ) + normal + text_out
164 )
164 )
165
165
166
166
167 class Metadata(dict):
167 class Metadata(dict):
168 """Subclass of dict for initializing metadata values.
168 """Subclass of dict for initializing metadata values.
169
169
170 Attribute access works on keys.
170 Attribute access works on keys.
171
171
172 These objects have a strict set of keys - errors will raise if you try
172 These objects have a strict set of keys - errors will raise if you try
173 to add new keys.
173 to add new keys.
174 """
174 """
175 def __init__(self, *args, **kwargs):
175 def __init__(self, *args, **kwargs):
176 dict.__init__(self)
176 dict.__init__(self)
177 md = {'msg_id' : None,
177 md = {'msg_id' : None,
178 'submitted' : None,
178 'submitted' : None,
179 'started' : None,
179 'started' : None,
180 'completed' : None,
180 'completed' : None,
181 'received' : None,
181 'received' : None,
182 'engine_uuid' : None,
182 'engine_uuid' : None,
183 'engine_id' : None,
183 'engine_id' : None,
184 'follow' : None,
184 'follow' : None,
185 'after' : None,
185 'after' : None,
186 'status' : None,
186 'status' : None,
187
187
188 'pyin' : None,
188 'pyin' : None,
189 'pyout' : None,
189 'pyout' : None,
190 'pyerr' : None,
190 'pyerr' : None,
191 'stdout' : '',
191 'stdout' : '',
192 'stderr' : '',
192 'stderr' : '',
193 'outputs' : [],
193 'outputs' : [],
194 'data': {},
194 'data': {},
195 'outputs_ready' : False,
195 'outputs_ready' : False,
196 }
196 }
197 self.update(md)
197 self.update(md)
198 self.update(dict(*args, **kwargs))
198 self.update(dict(*args, **kwargs))
199
199
200 def __getattr__(self, key):
200 def __getattr__(self, key):
201 """getattr aliased to getitem"""
201 """getattr aliased to getitem"""
202 if key in self.iterkeys():
202 if key in self.iterkeys():
203 return self[key]
203 return self[key]
204 else:
204 else:
205 raise AttributeError(key)
205 raise AttributeError(key)
206
206
207 def __setattr__(self, key, value):
207 def __setattr__(self, key, value):
208 """setattr aliased to setitem, with strict"""
208 """setattr aliased to setitem, with strict"""
209 if key in self.iterkeys():
209 if key in self.iterkeys():
210 self[key] = value
210 self[key] = value
211 else:
211 else:
212 raise AttributeError(key)
212 raise AttributeError(key)
213
213
214 def __setitem__(self, key, value):
214 def __setitem__(self, key, value):
215 """strict static key enforcement"""
215 """strict static key enforcement"""
216 if key in self.iterkeys():
216 if key in self.iterkeys():
217 dict.__setitem__(self, key, value)
217 dict.__setitem__(self, key, value)
218 else:
218 else:
219 raise KeyError(key)
219 raise KeyError(key)
220
220
221
221
222 class Client(HasTraits):
222 class Client(HasTraits):
223 """A semi-synchronous client to the IPython ZMQ cluster
223 """A semi-synchronous client to the IPython ZMQ cluster
224
224
225 Parameters
225 Parameters
226 ----------
226 ----------
227
227
228 url_file : str/unicode; path to ipcontroller-client.json
228 url_file : str/unicode; path to ipcontroller-client.json
229 This JSON file should contain all the information needed to connect to a cluster,
229 This JSON file should contain all the information needed to connect to a cluster,
230 and is likely the only argument needed.
230 and is likely the only argument needed.
231 Connection information for the Hub's registration. If a json connector
231 Connection information for the Hub's registration. If a json connector
232 file is given, then likely no further configuration is necessary.
232 file is given, then likely no further configuration is necessary.
233 [Default: use profile]
233 [Default: use profile]
234 profile : bytes
234 profile : bytes
235 The name of the Cluster profile to be used to find connector information.
235 The name of the Cluster profile to be used to find connector information.
236 If run from an IPython application, the default profile will be the same
236 If run from an IPython application, the default profile will be the same
237 as the running application, otherwise it will be 'default'.
237 as the running application, otherwise it will be 'default'.
238 cluster_id : str
238 cluster_id : str
239 String id to added to runtime files, to prevent name collisions when using
239 String id to added to runtime files, to prevent name collisions when using
240 multiple clusters with a single profile simultaneously.
240 multiple clusters with a single profile simultaneously.
241 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
241 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
242 Since this is text inserted into filenames, typical recommendations apply:
242 Since this is text inserted into filenames, typical recommendations apply:
243 Simple character strings are ideal, and spaces are not recommended (but
243 Simple character strings are ideal, and spaces are not recommended (but
244 should generally work)
244 should generally work)
245 context : zmq.Context
245 context : zmq.Context
246 Pass an existing zmq.Context instance, otherwise the client will create its own.
246 Pass an existing zmq.Context instance, otherwise the client will create its own.
247 debug : bool
247 debug : bool
248 flag for lots of message printing for debug purposes
248 flag for lots of message printing for debug purposes
249 timeout : int/float
249 timeout : int/float
250 time (in seconds) to wait for connection replies from the Hub
250 time (in seconds) to wait for connection replies from the Hub
251 [Default: 10]
251 [Default: 10]
252
252
253 #-------------- session related args ----------------
253 #-------------- session related args ----------------
254
254
255 config : Config object
255 config : Config object
256 If specified, this will be relayed to the Session for configuration
256 If specified, this will be relayed to the Session for configuration
257 username : str
257 username : str
258 set username for the session object
258 set username for the session object
259
259
260 #-------------- ssh related args ----------------
260 #-------------- ssh related args ----------------
261 # These are args for configuring the ssh tunnel to be used
261 # These are args for configuring the ssh tunnel to be used
262 # credentials are used to forward connections over ssh to the Controller
262 # credentials are used to forward connections over ssh to the Controller
263 # Note that the ip given in `addr` needs to be relative to sshserver
263 # Note that the ip given in `addr` needs to be relative to sshserver
264 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
264 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
265 # and set sshserver as the same machine the Controller is on. However,
265 # and set sshserver as the same machine the Controller is on. However,
266 # the only requirement is that sshserver is able to see the Controller
266 # the only requirement is that sshserver is able to see the Controller
267 # (i.e. is within the same trusted network).
267 # (i.e. is within the same trusted network).
268
268
269 sshserver : str
269 sshserver : str
270 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
270 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
271 If keyfile or password is specified, and this is not, it will default to
271 If keyfile or password is specified, and this is not, it will default to
272 the ip given in addr.
272 the ip given in addr.
273 sshkey : str; path to ssh private key file
273 sshkey : str; path to ssh private key file
274 This specifies a key to be used in ssh login, default None.
274 This specifies a key to be used in ssh login, default None.
275 Regular default ssh keys will be used without specifying this argument.
275 Regular default ssh keys will be used without specifying this argument.
276 password : str
276 password : str
277 Your ssh password to sshserver. Note that if this is left None,
277 Your ssh password to sshserver. Note that if this is left None,
278 you will be prompted for it if passwordless key based login is unavailable.
278 you will be prompted for it if passwordless key based login is unavailable.
279 paramiko : bool
279 paramiko : bool
280 flag for whether to use paramiko instead of shell ssh for tunneling.
280 flag for whether to use paramiko instead of shell ssh for tunneling.
281 [default: True on win32, False else]
281 [default: True on win32, False else]
282
282
283
283
284 Attributes
284 Attributes
285 ----------
285 ----------
286
286
287 ids : list of int engine IDs
287 ids : list of int engine IDs
288 requesting the ids attribute always synchronizes
288 requesting the ids attribute always synchronizes
289 the registration state. To request ids without synchronization,
289 the registration state. To request ids without synchronization,
290 use semi-private _ids attributes.
290 use semi-private _ids attributes.
291
291
292 history : list of msg_ids
292 history : list of msg_ids
293 a list of msg_ids, keeping track of all the execution
293 a list of msg_ids, keeping track of all the execution
294 messages you have submitted in order.
294 messages you have submitted in order.
295
295
296 outstanding : set of msg_ids
296 outstanding : set of msg_ids
297 a set of msg_ids that have been submitted, but whose
297 a set of msg_ids that have been submitted, but whose
298 results have not yet been received.
298 results have not yet been received.
299
299
300 results : dict
300 results : dict
301 a dict of all our results, keyed by msg_id
301 a dict of all our results, keyed by msg_id
302
302
303 block : bool
303 block : bool
304 determines default behavior when block not specified
304 determines default behavior when block not specified
305 in execution methods
305 in execution methods
306
306
307 Methods
307 Methods
308 -------
308 -------
309
309
310 spin
310 spin
311 flushes incoming results and registration state changes
311 flushes incoming results and registration state changes
312 control methods spin, and requesting `ids` also ensures up to date
312 control methods spin, and requesting `ids` also ensures up to date
313
313
314 wait
314 wait
315 wait on one or more msg_ids
315 wait on one or more msg_ids
316
316
317 execution methods
317 execution methods
318 apply
318 apply
319 legacy: execute, run
319 legacy: execute, run
320
320
321 data movement
321 data movement
322 push, pull, scatter, gather
322 push, pull, scatter, gather
323
323
324 query methods
324 query methods
325 queue_status, get_result, purge, result_status
325 queue_status, get_result, purge, result_status
326
326
327 control methods
327 control methods
328 abort, shutdown
328 abort, shutdown
329
329
330 """
330 """
331
331
332
332
333 block = Bool(False)
333 block = Bool(False)
334 outstanding = Set()
334 outstanding = Set()
335 results = Instance('collections.defaultdict', (dict,))
335 results = Instance('collections.defaultdict', (dict,))
336 metadata = Instance('collections.defaultdict', (Metadata,))
336 metadata = Instance('collections.defaultdict', (Metadata,))
337 history = List()
337 history = List()
338 debug = Bool(False)
338 debug = Bool(False)
339 _spin_thread = Any()
339 _spin_thread = Any()
340 _stop_spinning = Any()
340 _stop_spinning = Any()
341
341
342 profile=Unicode()
342 profile=Unicode()
343 def _profile_default(self):
343 def _profile_default(self):
344 if BaseIPythonApplication.initialized():
344 if BaseIPythonApplication.initialized():
345 # an IPython app *might* be running, try to get its profile
345 # an IPython app *might* be running, try to get its profile
346 try:
346 try:
347 return BaseIPythonApplication.instance().profile
347 return BaseIPythonApplication.instance().profile
348 except (AttributeError, MultipleInstanceError):
348 except (AttributeError, MultipleInstanceError):
349 # could be a *different* subclass of config.Application,
349 # could be a *different* subclass of config.Application,
350 # which would raise one of these two errors.
350 # which would raise one of these two errors.
351 return u'default'
351 return u'default'
352 else:
352 else:
353 return u'default'
353 return u'default'
354
354
355
355
356 _outstanding_dict = Instance('collections.defaultdict', (set,))
356 _outstanding_dict = Instance('collections.defaultdict', (set,))
357 _ids = List()
357 _ids = List()
358 _connected=Bool(False)
358 _connected=Bool(False)
359 _ssh=Bool(False)
359 _ssh=Bool(False)
360 _context = Instance('zmq.Context')
360 _context = Instance('zmq.Context')
361 _config = Dict()
361 _config = Dict()
362 _engines=Instance(util.ReverseDict, (), {})
362 _engines=Instance(util.ReverseDict, (), {})
363 # _hub_socket=Instance('zmq.Socket')
363 # _hub_socket=Instance('zmq.Socket')
364 _query_socket=Instance('zmq.Socket')
364 _query_socket=Instance('zmq.Socket')
365 _control_socket=Instance('zmq.Socket')
365 _control_socket=Instance('zmq.Socket')
366 _iopub_socket=Instance('zmq.Socket')
366 _iopub_socket=Instance('zmq.Socket')
367 _notification_socket=Instance('zmq.Socket')
367 _notification_socket=Instance('zmq.Socket')
368 _mux_socket=Instance('zmq.Socket')
368 _mux_socket=Instance('zmq.Socket')
369 _task_socket=Instance('zmq.Socket')
369 _task_socket=Instance('zmq.Socket')
370 _task_scheme=Unicode()
370 _task_scheme=Unicode()
371 _closed = False
371 _closed = False
372 _ignored_control_replies=Integer(0)
372 _ignored_control_replies=Integer(0)
373 _ignored_hub_replies=Integer(0)
373 _ignored_hub_replies=Integer(0)
374
374
375 def __new__(self, *args, **kw):
375 def __new__(self, *args, **kw):
376 # don't raise on positional args
376 # don't raise on positional args
377 return HasTraits.__new__(self, **kw)
377 return HasTraits.__new__(self, **kw)
378
378
379 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
379 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
380 context=None, debug=False,
380 context=None, debug=False,
381 sshserver=None, sshkey=None, password=None, paramiko=None,
381 sshserver=None, sshkey=None, password=None, paramiko=None,
382 timeout=10, cluster_id=None, **extra_args
382 timeout=10, cluster_id=None, **extra_args
383 ):
383 ):
384 if profile:
384 if profile:
385 super(Client, self).__init__(debug=debug, profile=profile)
385 super(Client, self).__init__(debug=debug, profile=profile)
386 else:
386 else:
387 super(Client, self).__init__(debug=debug)
387 super(Client, self).__init__(debug=debug)
388 if context is None:
388 if context is None:
389 context = zmq.Context.instance()
389 context = zmq.Context.instance()
390 self._context = context
390 self._context = context
391 self._stop_spinning = Event()
391 self._stop_spinning = Event()
392
392
393 if 'url_or_file' in extra_args:
393 if 'url_or_file' in extra_args:
394 url_file = extra_args['url_or_file']
394 url_file = extra_args['url_or_file']
395 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
395 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
396
396
397 if url_file and util.is_url(url_file):
397 if url_file and util.is_url(url_file):
398 raise ValueError("single urls cannot be specified, url-files must be used.")
398 raise ValueError("single urls cannot be specified, url-files must be used.")
399
399
400 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
400 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
401
401
402 if self._cd is not None:
402 if self._cd is not None:
403 if url_file is None:
403 if url_file is None:
404 if not cluster_id:
404 if not cluster_id:
405 client_json = 'ipcontroller-client.json'
405 client_json = 'ipcontroller-client.json'
406 else:
406 else:
407 client_json = 'ipcontroller-%s-client.json' % cluster_id
407 client_json = 'ipcontroller-%s-client.json' % cluster_id
408 url_file = pjoin(self._cd.security_dir, client_json)
408 url_file = pjoin(self._cd.security_dir, client_json)
409 if url_file is None:
409 if url_file is None:
410 raise ValueError(
410 raise ValueError(
411 "I can't find enough information to connect to a hub!"
411 "I can't find enough information to connect to a hub!"
412 " Please specify at least one of url_file or profile."
412 " Please specify at least one of url_file or profile."
413 )
413 )
414
414
415 with open(url_file) as f:
415 with open(url_file) as f:
416 cfg = json.load(f)
416 cfg = json.load(f)
417
417
418 self._task_scheme = cfg['task_scheme']
418 self._task_scheme = cfg['task_scheme']
419
419
420 # sync defaults from args, json:
420 # sync defaults from args, json:
421 if sshserver:
421 if sshserver:
422 cfg['ssh'] = sshserver
422 cfg['ssh'] = sshserver
423
423
424 location = cfg.setdefault('location', None)
424 location = cfg.setdefault('location', None)
425
425
426 proto,addr = cfg['interface'].split('://')
426 proto,addr = cfg['interface'].split('://')
427 addr = util.disambiguate_ip_address(addr, location)
427 addr = util.disambiguate_ip_address(addr, location)
428 cfg['interface'] = "%s://%s" % (proto, addr)
428 cfg['interface'] = "%s://%s" % (proto, addr)
429
429
430 # turn interface,port into full urls:
430 # turn interface,port into full urls:
431 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
431 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
432 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
432 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
433
433
434 url = cfg['registration']
434 url = cfg['registration']
435
435
436 if location is not None and addr == LOCALHOST:
436 if location is not None and addr == LOCALHOST:
437 # location specified, and connection is expected to be local
437 # location specified, and connection is expected to be local
438 if location not in LOCAL_IPS and not sshserver:
438 if location not in LOCAL_IPS and not sshserver:
439 # load ssh from JSON *only* if the controller is not on
439 # load ssh from JSON *only* if the controller is not on
440 # this machine
440 # this machine
441 sshserver=cfg['ssh']
441 sshserver=cfg['ssh']
442 if location not in LOCAL_IPS and not sshserver:
442 if location not in LOCAL_IPS and not sshserver:
443 # warn if no ssh specified, but SSH is probably needed
443 # warn if no ssh specified, but SSH is probably needed
444 # This is only a warning, because the most likely cause
444 # This is only a warning, because the most likely cause
445 # is a local Controller on a laptop whose IP is dynamic
445 # is a local Controller on a laptop whose IP is dynamic
446 warnings.warn("""
446 warnings.warn("""
447 Controller appears to be listening on localhost, but not on this machine.
447 Controller appears to be listening on localhost, but not on this machine.
448 If this is true, you should specify Client(...,sshserver='you@%s')
448 If this is true, you should specify Client(...,sshserver='you@%s')
449 or instruct your controller to listen on an external IP."""%location,
449 or instruct your controller to listen on an external IP."""%location,
450 RuntimeWarning)
450 RuntimeWarning)
451 elif not sshserver:
451 elif not sshserver:
452 # otherwise sync with cfg
452 # otherwise sync with cfg
453 sshserver = cfg['ssh']
453 sshserver = cfg['ssh']
454
454
455 self._config = cfg
455 self._config = cfg
456
456
457 self._ssh = bool(sshserver or sshkey or password)
457 self._ssh = bool(sshserver or sshkey or password)
458 if self._ssh and sshserver is None:
458 if self._ssh and sshserver is None:
459 # default to ssh via localhost
459 # default to ssh via localhost
460 sshserver = addr
460 sshserver = addr
461 if self._ssh and password is None:
461 if self._ssh and password is None:
462 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
462 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
463 password=False
463 password=False
464 else:
464 else:
465 password = getpass("SSH Password for %s: "%sshserver)
465 password = getpass("SSH Password for %s: "%sshserver)
466 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
466 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
467
467
468 # configure and construct the session
468 # configure and construct the session
469 try:
469 try:
470 extra_args['packer'] = cfg['pack']
470 extra_args['packer'] = cfg['pack']
471 extra_args['unpacker'] = cfg['unpack']
471 extra_args['unpacker'] = cfg['unpack']
472 extra_args['key'] = cast_bytes(cfg['key'])
472 extra_args['key'] = cast_bytes(cfg['key'])
473 extra_args['signature_scheme'] = cfg['signature_scheme']
473 extra_args['signature_scheme'] = cfg['signature_scheme']
474 except KeyError as exc:
474 except KeyError as exc:
475 msg = '\n'.join([
475 msg = '\n'.join([
476 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
476 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
477 "If you are reusing connection files, remove them and start ipcontroller again."
477 "If you are reusing connection files, remove them and start ipcontroller again."
478 ])
478 ])
479 raise ValueError(msg.format(exc.message))
479 raise ValueError(msg.format(exc.message))
480
480
481 self.session = Session(**extra_args)
481 self.session = Session(**extra_args)
482
482
483 self._query_socket = self._context.socket(zmq.DEALER)
483 self._query_socket = self._context.socket(zmq.DEALER)
484
484
485 if self._ssh:
485 if self._ssh:
486 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
486 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
487 else:
487 else:
488 self._query_socket.connect(cfg['registration'])
488 self._query_socket.connect(cfg['registration'])
489
489
490 self.session.debug = self.debug
490 self.session.debug = self.debug
491
491
492 self._notification_handlers = {'registration_notification' : self._register_engine,
492 self._notification_handlers = {'registration_notification' : self._register_engine,
493 'unregistration_notification' : self._unregister_engine,
493 'unregistration_notification' : self._unregister_engine,
494 'shutdown_notification' : lambda msg: self.close(),
494 'shutdown_notification' : lambda msg: self.close(),
495 }
495 }
496 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
496 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
497 'apply_reply' : self._handle_apply_reply}
497 'apply_reply' : self._handle_apply_reply}
498 self._connect(sshserver, ssh_kwargs, timeout)
498
499 try:
500 self._connect(sshserver, ssh_kwargs, timeout)
501 except:
502 self.close(linger=0)
503 raise
499
504
500 # last step: setup magics, if we are in IPython:
505 # last step: setup magics, if we are in IPython:
501
506
502 try:
507 try:
503 ip = get_ipython()
508 ip = get_ipython()
504 except NameError:
509 except NameError:
505 return
510 return
506 else:
511 else:
507 if 'px' not in ip.magics_manager.magics:
512 if 'px' not in ip.magics_manager.magics:
508 # in IPython but we are the first Client.
513 # in IPython but we are the first Client.
509 # activate a default view for parallel magics.
514 # activate a default view for parallel magics.
510 self.activate()
515 self.activate()
511
516
512 def __del__(self):
517 def __del__(self):
513 """cleanup sockets, but _not_ context."""
518 """cleanup sockets, but _not_ context."""
514 self.close()
519 self.close()
515
520
516 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
521 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
517 if ipython_dir is None:
522 if ipython_dir is None:
518 ipython_dir = get_ipython_dir()
523 ipython_dir = get_ipython_dir()
519 if profile_dir is not None:
524 if profile_dir is not None:
520 try:
525 try:
521 self._cd = ProfileDir.find_profile_dir(profile_dir)
526 self._cd = ProfileDir.find_profile_dir(profile_dir)
522 return
527 return
523 except ProfileDirError:
528 except ProfileDirError:
524 pass
529 pass
525 elif profile is not None:
530 elif profile is not None:
526 try:
531 try:
527 self._cd = ProfileDir.find_profile_dir_by_name(
532 self._cd = ProfileDir.find_profile_dir_by_name(
528 ipython_dir, profile)
533 ipython_dir, profile)
529 return
534 return
530 except ProfileDirError:
535 except ProfileDirError:
531 pass
536 pass
532 self._cd = None
537 self._cd = None
533
538
534 def _update_engines(self, engines):
539 def _update_engines(self, engines):
535 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
540 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
536 for k,v in engines.iteritems():
541 for k,v in engines.iteritems():
537 eid = int(k)
542 eid = int(k)
538 if eid not in self._engines:
543 if eid not in self._engines:
539 self._ids.append(eid)
544 self._ids.append(eid)
540 self._engines[eid] = v
545 self._engines[eid] = v
541 self._ids = sorted(self._ids)
546 self._ids = sorted(self._ids)
542 if sorted(self._engines.keys()) != range(len(self._engines)) and \
547 if sorted(self._engines.keys()) != range(len(self._engines)) and \
543 self._task_scheme == 'pure' and self._task_socket:
548 self._task_scheme == 'pure' and self._task_socket:
544 self._stop_scheduling_tasks()
549 self._stop_scheduling_tasks()
545
550
546 def _stop_scheduling_tasks(self):
551 def _stop_scheduling_tasks(self):
547 """Stop scheduling tasks because an engine has been unregistered
552 """Stop scheduling tasks because an engine has been unregistered
548 from a pure ZMQ scheduler.
553 from a pure ZMQ scheduler.
549 """
554 """
550 self._task_socket.close()
555 self._task_socket.close()
551 self._task_socket = None
556 self._task_socket = None
552 msg = "An engine has been unregistered, and we are using pure " +\
557 msg = "An engine has been unregistered, and we are using pure " +\
553 "ZMQ task scheduling. Task farming will be disabled."
558 "ZMQ task scheduling. Task farming will be disabled."
554 if self.outstanding:
559 if self.outstanding:
555 msg += " If you were running tasks when this happened, " +\
560 msg += " If you were running tasks when this happened, " +\
556 "some `outstanding` msg_ids may never resolve."
561 "some `outstanding` msg_ids may never resolve."
557 warnings.warn(msg, RuntimeWarning)
562 warnings.warn(msg, RuntimeWarning)
558
563
559 def _build_targets(self, targets):
564 def _build_targets(self, targets):
560 """Turn valid target IDs or 'all' into two lists:
565 """Turn valid target IDs or 'all' into two lists:
561 (int_ids, uuids).
566 (int_ids, uuids).
562 """
567 """
563 if not self._ids:
568 if not self._ids:
564 # flush notification socket if no engines yet, just in case
569 # flush notification socket if no engines yet, just in case
565 if not self.ids:
570 if not self.ids:
566 raise error.NoEnginesRegistered("Can't build targets without any engines")
571 raise error.NoEnginesRegistered("Can't build targets without any engines")
567
572
568 if targets is None:
573 if targets is None:
569 targets = self._ids
574 targets = self._ids
570 elif isinstance(targets, basestring):
575 elif isinstance(targets, basestring):
571 if targets.lower() == 'all':
576 if targets.lower() == 'all':
572 targets = self._ids
577 targets = self._ids
573 else:
578 else:
574 raise TypeError("%r not valid str target, must be 'all'"%(targets))
579 raise TypeError("%r not valid str target, must be 'all'"%(targets))
575 elif isinstance(targets, int):
580 elif isinstance(targets, int):
576 if targets < 0:
581 if targets < 0:
577 targets = self.ids[targets]
582 targets = self.ids[targets]
578 if targets not in self._ids:
583 if targets not in self._ids:
579 raise IndexError("No such engine: %i"%targets)
584 raise IndexError("No such engine: %i"%targets)
580 targets = [targets]
585 targets = [targets]
581
586
582 if isinstance(targets, slice):
587 if isinstance(targets, slice):
583 indices = range(len(self._ids))[targets]
588 indices = range(len(self._ids))[targets]
584 ids = self.ids
589 ids = self.ids
585 targets = [ ids[i] for i in indices ]
590 targets = [ ids[i] for i in indices ]
586
591
587 if not isinstance(targets, (tuple, list, xrange)):
592 if not isinstance(targets, (tuple, list, xrange)):
588 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
593 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
589
594
590 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
595 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
591
596
592 def _connect(self, sshserver, ssh_kwargs, timeout):
597 def _connect(self, sshserver, ssh_kwargs, timeout):
593 """setup all our socket connections to the cluster. This is called from
598 """setup all our socket connections to the cluster. This is called from
594 __init__."""
599 __init__."""
595
600
596 # Maybe allow reconnecting?
601 # Maybe allow reconnecting?
597 if self._connected:
602 if self._connected:
598 return
603 return
599 self._connected=True
604 self._connected=True
600
605
601 def connect_socket(s, url):
606 def connect_socket(s, url):
602 if self._ssh:
607 if self._ssh:
603 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
608 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
604 else:
609 else:
605 return s.connect(url)
610 return s.connect(url)
606
611
607 self.session.send(self._query_socket, 'connection_request')
612 self.session.send(self._query_socket, 'connection_request')
608 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
613 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
609 poller = zmq.Poller()
614 poller = zmq.Poller()
610 poller.register(self._query_socket, zmq.POLLIN)
615 poller.register(self._query_socket, zmq.POLLIN)
611 # poll expects milliseconds, timeout is seconds
616 # poll expects milliseconds, timeout is seconds
612 evts = poller.poll(timeout*1000)
617 evts = poller.poll(timeout*1000)
613 if not evts:
618 if not evts:
614 raise error.TimeoutError("Hub connection request timed out")
619 raise error.TimeoutError("Hub connection request timed out")
615 idents,msg = self.session.recv(self._query_socket,mode=0)
620 idents,msg = self.session.recv(self._query_socket,mode=0)
616 if self.debug:
621 if self.debug:
617 pprint(msg)
622 pprint(msg)
618 content = msg['content']
623 content = msg['content']
619 # self._config['registration'] = dict(content)
624 # self._config['registration'] = dict(content)
620 cfg = self._config
625 cfg = self._config
621 if content['status'] == 'ok':
626 if content['status'] == 'ok':
622 self._mux_socket = self._context.socket(zmq.DEALER)
627 self._mux_socket = self._context.socket(zmq.DEALER)
623 connect_socket(self._mux_socket, cfg['mux'])
628 connect_socket(self._mux_socket, cfg['mux'])
624
629
625 self._task_socket = self._context.socket(zmq.DEALER)
630 self._task_socket = self._context.socket(zmq.DEALER)
626 connect_socket(self._task_socket, cfg['task'])
631 connect_socket(self._task_socket, cfg['task'])
627
632
628 self._notification_socket = self._context.socket(zmq.SUB)
633 self._notification_socket = self._context.socket(zmq.SUB)
629 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
634 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
630 connect_socket(self._notification_socket, cfg['notification'])
635 connect_socket(self._notification_socket, cfg['notification'])
631
636
632 self._control_socket = self._context.socket(zmq.DEALER)
637 self._control_socket = self._context.socket(zmq.DEALER)
633 connect_socket(self._control_socket, cfg['control'])
638 connect_socket(self._control_socket, cfg['control'])
634
639
635 self._iopub_socket = self._context.socket(zmq.SUB)
640 self._iopub_socket = self._context.socket(zmq.SUB)
636 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
641 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
637 connect_socket(self._iopub_socket, cfg['iopub'])
642 connect_socket(self._iopub_socket, cfg['iopub'])
638
643
639 self._update_engines(dict(content['engines']))
644 self._update_engines(dict(content['engines']))
640 else:
645 else:
641 self._connected = False
646 self._connected = False
642 raise Exception("Failed to connect!")
647 raise Exception("Failed to connect!")
643
648
644 #--------------------------------------------------------------------------
649 #--------------------------------------------------------------------------
645 # handlers and callbacks for incoming messages
650 # handlers and callbacks for incoming messages
646 #--------------------------------------------------------------------------
651 #--------------------------------------------------------------------------
647
652
648 def _unwrap_exception(self, content):
653 def _unwrap_exception(self, content):
649 """unwrap exception, and remap engine_id to int."""
654 """unwrap exception, and remap engine_id to int."""
650 e = error.unwrap_exception(content)
655 e = error.unwrap_exception(content)
651 # print e.traceback
656 # print e.traceback
652 if e.engine_info:
657 if e.engine_info:
653 e_uuid = e.engine_info['engine_uuid']
658 e_uuid = e.engine_info['engine_uuid']
654 eid = self._engines[e_uuid]
659 eid = self._engines[e_uuid]
655 e.engine_info['engine_id'] = eid
660 e.engine_info['engine_id'] = eid
656 return e
661 return e
657
662
658 def _extract_metadata(self, msg):
663 def _extract_metadata(self, msg):
659 header = msg['header']
664 header = msg['header']
660 parent = msg['parent_header']
665 parent = msg['parent_header']
661 msg_meta = msg['metadata']
666 msg_meta = msg['metadata']
662 content = msg['content']
667 content = msg['content']
663 md = {'msg_id' : parent['msg_id'],
668 md = {'msg_id' : parent['msg_id'],
664 'received' : datetime.now(),
669 'received' : datetime.now(),
665 'engine_uuid' : msg_meta.get('engine', None),
670 'engine_uuid' : msg_meta.get('engine', None),
666 'follow' : msg_meta.get('follow', []),
671 'follow' : msg_meta.get('follow', []),
667 'after' : msg_meta.get('after', []),
672 'after' : msg_meta.get('after', []),
668 'status' : content['status'],
673 'status' : content['status'],
669 }
674 }
670
675
671 if md['engine_uuid'] is not None:
676 if md['engine_uuid'] is not None:
672 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
677 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
673
678
674 if 'date' in parent:
679 if 'date' in parent:
675 md['submitted'] = parent['date']
680 md['submitted'] = parent['date']
676 if 'started' in msg_meta:
681 if 'started' in msg_meta:
677 md['started'] = msg_meta['started']
682 md['started'] = msg_meta['started']
678 if 'date' in header:
683 if 'date' in header:
679 md['completed'] = header['date']
684 md['completed'] = header['date']
680 return md
685 return md
681
686
682 def _register_engine(self, msg):
687 def _register_engine(self, msg):
683 """Register a new engine, and update our connection info."""
688 """Register a new engine, and update our connection info."""
684 content = msg['content']
689 content = msg['content']
685 eid = content['id']
690 eid = content['id']
686 d = {eid : content['uuid']}
691 d = {eid : content['uuid']}
687 self._update_engines(d)
692 self._update_engines(d)
688
693
689 def _unregister_engine(self, msg):
694 def _unregister_engine(self, msg):
690 """Unregister an engine that has died."""
695 """Unregister an engine that has died."""
691 content = msg['content']
696 content = msg['content']
692 eid = int(content['id'])
697 eid = int(content['id'])
693 if eid in self._ids:
698 if eid in self._ids:
694 self._ids.remove(eid)
699 self._ids.remove(eid)
695 uuid = self._engines.pop(eid)
700 uuid = self._engines.pop(eid)
696
701
697 self._handle_stranded_msgs(eid, uuid)
702 self._handle_stranded_msgs(eid, uuid)
698
703
699 if self._task_socket and self._task_scheme == 'pure':
704 if self._task_socket and self._task_scheme == 'pure':
700 self._stop_scheduling_tasks()
705 self._stop_scheduling_tasks()
701
706
702 def _handle_stranded_msgs(self, eid, uuid):
707 def _handle_stranded_msgs(self, eid, uuid):
703 """Handle messages known to be on an engine when the engine unregisters.
708 """Handle messages known to be on an engine when the engine unregisters.
704
709
705 It is possible that this will fire prematurely - that is, an engine will
710 It is possible that this will fire prematurely - that is, an engine will
706 go down after completing a result, and the client will be notified
711 go down after completing a result, and the client will be notified
707 of the unregistration and later receive the successful result.
712 of the unregistration and later receive the successful result.
708 """
713 """
709
714
710 outstanding = self._outstanding_dict[uuid]
715 outstanding = self._outstanding_dict[uuid]
711
716
712 for msg_id in list(outstanding):
717 for msg_id in list(outstanding):
713 if msg_id in self.results:
718 if msg_id in self.results:
714 # we already
719 # we already
715 continue
720 continue
716 try:
721 try:
717 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
722 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
718 except:
723 except:
719 content = error.wrap_exception()
724 content = error.wrap_exception()
720 # build a fake message:
725 # build a fake message:
721 msg = self.session.msg('apply_reply', content=content)
726 msg = self.session.msg('apply_reply', content=content)
722 msg['parent_header']['msg_id'] = msg_id
727 msg['parent_header']['msg_id'] = msg_id
723 msg['metadata']['engine'] = uuid
728 msg['metadata']['engine'] = uuid
724 self._handle_apply_reply(msg)
729 self._handle_apply_reply(msg)
725
730
726 def _handle_execute_reply(self, msg):
731 def _handle_execute_reply(self, msg):
727 """Save the reply to an execute_request into our results.
732 """Save the reply to an execute_request into our results.
728
733
729 execute messages are never actually used. apply is used instead.
734 execute messages are never actually used. apply is used instead.
730 """
735 """
731
736
732 parent = msg['parent_header']
737 parent = msg['parent_header']
733 msg_id = parent['msg_id']
738 msg_id = parent['msg_id']
734 if msg_id not in self.outstanding:
739 if msg_id not in self.outstanding:
735 if msg_id in self.history:
740 if msg_id in self.history:
736 print ("got stale result: %s"%msg_id)
741 print ("got stale result: %s"%msg_id)
737 else:
742 else:
738 print ("got unknown result: %s"%msg_id)
743 print ("got unknown result: %s"%msg_id)
739 else:
744 else:
740 self.outstanding.remove(msg_id)
745 self.outstanding.remove(msg_id)
741
746
742 content = msg['content']
747 content = msg['content']
743 header = msg['header']
748 header = msg['header']
744
749
745 # construct metadata:
750 # construct metadata:
746 md = self.metadata[msg_id]
751 md = self.metadata[msg_id]
747 md.update(self._extract_metadata(msg))
752 md.update(self._extract_metadata(msg))
748 # is this redundant?
753 # is this redundant?
749 self.metadata[msg_id] = md
754 self.metadata[msg_id] = md
750
755
751 e_outstanding = self._outstanding_dict[md['engine_uuid']]
756 e_outstanding = self._outstanding_dict[md['engine_uuid']]
752 if msg_id in e_outstanding:
757 if msg_id in e_outstanding:
753 e_outstanding.remove(msg_id)
758 e_outstanding.remove(msg_id)
754
759
755 # construct result:
760 # construct result:
756 if content['status'] == 'ok':
761 if content['status'] == 'ok':
757 self.results[msg_id] = ExecuteReply(msg_id, content, md)
762 self.results[msg_id] = ExecuteReply(msg_id, content, md)
758 elif content['status'] == 'aborted':
763 elif content['status'] == 'aborted':
759 self.results[msg_id] = error.TaskAborted(msg_id)
764 self.results[msg_id] = error.TaskAborted(msg_id)
760 elif content['status'] == 'resubmitted':
765 elif content['status'] == 'resubmitted':
761 # TODO: handle resubmission
766 # TODO: handle resubmission
762 pass
767 pass
763 else:
768 else:
764 self.results[msg_id] = self._unwrap_exception(content)
769 self.results[msg_id] = self._unwrap_exception(content)
765
770
766 def _handle_apply_reply(self, msg):
771 def _handle_apply_reply(self, msg):
767 """Save the reply to an apply_request into our results."""
772 """Save the reply to an apply_request into our results."""
768 parent = msg['parent_header']
773 parent = msg['parent_header']
769 msg_id = parent['msg_id']
774 msg_id = parent['msg_id']
770 if msg_id not in self.outstanding:
775 if msg_id not in self.outstanding:
771 if msg_id in self.history:
776 if msg_id in self.history:
772 print ("got stale result: %s"%msg_id)
777 print ("got stale result: %s"%msg_id)
773 print self.results[msg_id]
778 print self.results[msg_id]
774 print msg
779 print msg
775 else:
780 else:
776 print ("got unknown result: %s"%msg_id)
781 print ("got unknown result: %s"%msg_id)
777 else:
782 else:
778 self.outstanding.remove(msg_id)
783 self.outstanding.remove(msg_id)
779 content = msg['content']
784 content = msg['content']
780 header = msg['header']
785 header = msg['header']
781
786
782 # construct metadata:
787 # construct metadata:
783 md = self.metadata[msg_id]
788 md = self.metadata[msg_id]
784 md.update(self._extract_metadata(msg))
789 md.update(self._extract_metadata(msg))
785 # is this redundant?
790 # is this redundant?
786 self.metadata[msg_id] = md
791 self.metadata[msg_id] = md
787
792
788 e_outstanding = self._outstanding_dict[md['engine_uuid']]
793 e_outstanding = self._outstanding_dict[md['engine_uuid']]
789 if msg_id in e_outstanding:
794 if msg_id in e_outstanding:
790 e_outstanding.remove(msg_id)
795 e_outstanding.remove(msg_id)
791
796
792 # construct result:
797 # construct result:
793 if content['status'] == 'ok':
798 if content['status'] == 'ok':
794 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
799 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
795 elif content['status'] == 'aborted':
800 elif content['status'] == 'aborted':
796 self.results[msg_id] = error.TaskAborted(msg_id)
801 self.results[msg_id] = error.TaskAborted(msg_id)
797 elif content['status'] == 'resubmitted':
802 elif content['status'] == 'resubmitted':
798 # TODO: handle resubmission
803 # TODO: handle resubmission
799 pass
804 pass
800 else:
805 else:
801 self.results[msg_id] = self._unwrap_exception(content)
806 self.results[msg_id] = self._unwrap_exception(content)
802
807
803 def _flush_notifications(self):
808 def _flush_notifications(self):
804 """Flush notifications of engine registrations waiting
809 """Flush notifications of engine registrations waiting
805 in ZMQ queue."""
810 in ZMQ queue."""
806 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
811 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
807 while msg is not None:
812 while msg is not None:
808 if self.debug:
813 if self.debug:
809 pprint(msg)
814 pprint(msg)
810 msg_type = msg['header']['msg_type']
815 msg_type = msg['header']['msg_type']
811 handler = self._notification_handlers.get(msg_type, None)
816 handler = self._notification_handlers.get(msg_type, None)
812 if handler is None:
817 if handler is None:
813 raise Exception("Unhandled message type: %s" % msg_type)
818 raise Exception("Unhandled message type: %s" % msg_type)
814 else:
819 else:
815 handler(msg)
820 handler(msg)
816 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
821 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
817
822
818 def _flush_results(self, sock):
823 def _flush_results(self, sock):
819 """Flush task or queue results waiting in ZMQ queue."""
824 """Flush task or queue results waiting in ZMQ queue."""
820 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
825 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
821 while msg is not None:
826 while msg is not None:
822 if self.debug:
827 if self.debug:
823 pprint(msg)
828 pprint(msg)
824 msg_type = msg['header']['msg_type']
829 msg_type = msg['header']['msg_type']
825 handler = self._queue_handlers.get(msg_type, None)
830 handler = self._queue_handlers.get(msg_type, None)
826 if handler is None:
831 if handler is None:
827 raise Exception("Unhandled message type: %s" % msg_type)
832 raise Exception("Unhandled message type: %s" % msg_type)
828 else:
833 else:
829 handler(msg)
834 handler(msg)
830 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
835 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
831
836
832 def _flush_control(self, sock):
837 def _flush_control(self, sock):
833 """Flush replies from the control channel waiting
838 """Flush replies from the control channel waiting
834 in the ZMQ queue.
839 in the ZMQ queue.
835
840
836 Currently: ignore them."""
841 Currently: ignore them."""
837 if self._ignored_control_replies <= 0:
842 if self._ignored_control_replies <= 0:
838 return
843 return
839 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
844 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
840 while msg is not None:
845 while msg is not None:
841 self._ignored_control_replies -= 1
846 self._ignored_control_replies -= 1
842 if self.debug:
847 if self.debug:
843 pprint(msg)
848 pprint(msg)
844 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
849 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
845
850
846 def _flush_ignored_control(self):
851 def _flush_ignored_control(self):
847 """flush ignored control replies"""
852 """flush ignored control replies"""
848 while self._ignored_control_replies > 0:
853 while self._ignored_control_replies > 0:
849 self.session.recv(self._control_socket)
854 self.session.recv(self._control_socket)
850 self._ignored_control_replies -= 1
855 self._ignored_control_replies -= 1
851
856
852 def _flush_ignored_hub_replies(self):
857 def _flush_ignored_hub_replies(self):
853 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
858 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
854 while msg is not None:
859 while msg is not None:
855 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
860 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
856
861
857 def _flush_iopub(self, sock):
862 def _flush_iopub(self, sock):
858 """Flush replies from the iopub channel waiting
863 """Flush replies from the iopub channel waiting
859 in the ZMQ queue.
864 in the ZMQ queue.
860 """
865 """
861 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
866 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
862 while msg is not None:
867 while msg is not None:
863 if self.debug:
868 if self.debug:
864 pprint(msg)
869 pprint(msg)
865 parent = msg['parent_header']
870 parent = msg['parent_header']
866 # ignore IOPub messages with no parent.
871 # ignore IOPub messages with no parent.
867 # Caused by print statements or warnings from before the first execution.
872 # Caused by print statements or warnings from before the first execution.
868 if not parent:
873 if not parent:
869 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
874 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
870 continue
875 continue
871 msg_id = parent['msg_id']
876 msg_id = parent['msg_id']
872 content = msg['content']
877 content = msg['content']
873 header = msg['header']
878 header = msg['header']
874 msg_type = msg['header']['msg_type']
879 msg_type = msg['header']['msg_type']
875
880
876 # init metadata:
881 # init metadata:
877 md = self.metadata[msg_id]
882 md = self.metadata[msg_id]
878
883
879 if msg_type == 'stream':
884 if msg_type == 'stream':
880 name = content['name']
885 name = content['name']
881 s = md[name] or ''
886 s = md[name] or ''
882 md[name] = s + content['data']
887 md[name] = s + content['data']
883 elif msg_type == 'pyerr':
888 elif msg_type == 'pyerr':
884 md.update({'pyerr' : self._unwrap_exception(content)})
889 md.update({'pyerr' : self._unwrap_exception(content)})
885 elif msg_type == 'pyin':
890 elif msg_type == 'pyin':
886 md.update({'pyin' : content['code']})
891 md.update({'pyin' : content['code']})
887 elif msg_type == 'display_data':
892 elif msg_type == 'display_data':
888 md['outputs'].append(content)
893 md['outputs'].append(content)
889 elif msg_type == 'pyout':
894 elif msg_type == 'pyout':
890 md['pyout'] = content
895 md['pyout'] = content
891 elif msg_type == 'data_message':
896 elif msg_type == 'data_message':
892 data, remainder = serialize.unserialize_object(msg['buffers'])
897 data, remainder = serialize.unserialize_object(msg['buffers'])
893 md['data'].update(data)
898 md['data'].update(data)
894 elif msg_type == 'status':
899 elif msg_type == 'status':
895 # idle message comes after all outputs
900 # idle message comes after all outputs
896 if content['execution_state'] == 'idle':
901 if content['execution_state'] == 'idle':
897 md['outputs_ready'] = True
902 md['outputs_ready'] = True
898 else:
903 else:
899 # unhandled msg_type (status, etc.)
904 # unhandled msg_type (status, etc.)
900 pass
905 pass
901
906
902 # reduntant?
907 # reduntant?
903 self.metadata[msg_id] = md
908 self.metadata[msg_id] = md
904
909
905 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
910 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
906
911
907 #--------------------------------------------------------------------------
912 #--------------------------------------------------------------------------
908 # len, getitem
913 # len, getitem
909 #--------------------------------------------------------------------------
914 #--------------------------------------------------------------------------
910
915
911 def __len__(self):
916 def __len__(self):
912 """len(client) returns # of engines."""
917 """len(client) returns # of engines."""
913 return len(self.ids)
918 return len(self.ids)
914
919
915 def __getitem__(self, key):
920 def __getitem__(self, key):
916 """index access returns DirectView multiplexer objects
921 """index access returns DirectView multiplexer objects
917
922
918 Must be int, slice, or list/tuple/xrange of ints"""
923 Must be int, slice, or list/tuple/xrange of ints"""
919 if not isinstance(key, (int, slice, tuple, list, xrange)):
924 if not isinstance(key, (int, slice, tuple, list, xrange)):
920 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
925 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
921 else:
926 else:
922 return self.direct_view(key)
927 return self.direct_view(key)
923
928
924 #--------------------------------------------------------------------------
929 #--------------------------------------------------------------------------
925 # Begin public methods
930 # Begin public methods
926 #--------------------------------------------------------------------------
931 #--------------------------------------------------------------------------
927
932
928 @property
933 @property
929 def ids(self):
934 def ids(self):
930 """Always up-to-date ids property."""
935 """Always up-to-date ids property."""
931 self._flush_notifications()
936 self._flush_notifications()
932 # always copy:
937 # always copy:
933 return list(self._ids)
938 return list(self._ids)
934
939
935 def activate(self, targets='all', suffix=''):
940 def activate(self, targets='all', suffix=''):
936 """Create a DirectView and register it with IPython magics
941 """Create a DirectView and register it with IPython magics
937
942
938 Defines the magics `%px, %autopx, %pxresult, %%px`
943 Defines the magics `%px, %autopx, %pxresult, %%px`
939
944
940 Parameters
945 Parameters
941 ----------
946 ----------
942
947
943 targets: int, list of ints, or 'all'
948 targets: int, list of ints, or 'all'
944 The engines on which the view's magics will run
949 The engines on which the view's magics will run
945 suffix: str [default: '']
950 suffix: str [default: '']
946 The suffix, if any, for the magics. This allows you to have
951 The suffix, if any, for the magics. This allows you to have
947 multiple views associated with parallel magics at the same time.
952 multiple views associated with parallel magics at the same time.
948
953
949 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
954 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
950 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
955 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
951 on engine 0.
956 on engine 0.
952 """
957 """
953 view = self.direct_view(targets)
958 view = self.direct_view(targets)
954 view.block = True
959 view.block = True
955 view.activate(suffix)
960 view.activate(suffix)
956 return view
961 return view
957
962
958 def close(self, linger=None):
963 def close(self, linger=None):
959 """Close my zmq Sockets
964 """Close my zmq Sockets
960
965
961 If `linger`, set the zmq LINGER socket option,
966 If `linger`, set the zmq LINGER socket option,
962 which allows discarding of messages.
967 which allows discarding of messages.
963 """
968 """
964 if self._closed:
969 if self._closed:
965 return
970 return
966 self.stop_spin_thread()
971 self.stop_spin_thread()
967 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
972 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
968 for name in snames:
973 for name in snames:
969 socket = getattr(self, name)
974 socket = getattr(self, name)
970 if socket is not None and not socket.closed:
975 if socket is not None and not socket.closed:
971 if linger is not None:
976 if linger is not None:
972 socket.close(linger=linger)
977 socket.close(linger=linger)
973 else:
978 else:
974 socket.close()
979 socket.close()
975 self._closed = True
980 self._closed = True
976
981
977 def _spin_every(self, interval=1):
982 def _spin_every(self, interval=1):
978 """target func for use in spin_thread"""
983 """target func for use in spin_thread"""
979 while True:
984 while True:
980 if self._stop_spinning.is_set():
985 if self._stop_spinning.is_set():
981 return
986 return
982 time.sleep(interval)
987 time.sleep(interval)
983 self.spin()
988 self.spin()
984
989
985 def spin_thread(self, interval=1):
990 def spin_thread(self, interval=1):
986 """call Client.spin() in a background thread on some regular interval
991 """call Client.spin() in a background thread on some regular interval
987
992
988 This helps ensure that messages don't pile up too much in the zmq queue
993 This helps ensure that messages don't pile up too much in the zmq queue
989 while you are working on other things, or just leaving an idle terminal.
994 while you are working on other things, or just leaving an idle terminal.
990
995
991 It also helps limit potential padding of the `received` timestamp
996 It also helps limit potential padding of the `received` timestamp
992 on AsyncResult objects, used for timings.
997 on AsyncResult objects, used for timings.
993
998
994 Parameters
999 Parameters
995 ----------
1000 ----------
996
1001
997 interval : float, optional
1002 interval : float, optional
998 The interval on which to spin the client in the background thread
1003 The interval on which to spin the client in the background thread
999 (simply passed to time.sleep).
1004 (simply passed to time.sleep).
1000
1005
1001 Notes
1006 Notes
1002 -----
1007 -----
1003
1008
1004 For precision timing, you may want to use this method to put a bound
1009 For precision timing, you may want to use this method to put a bound
1005 on the jitter (in seconds) in `received` timestamps used
1010 on the jitter (in seconds) in `received` timestamps used
1006 in AsyncResult.wall_time.
1011 in AsyncResult.wall_time.
1007
1012
1008 """
1013 """
1009 if self._spin_thread is not None:
1014 if self._spin_thread is not None:
1010 self.stop_spin_thread()
1015 self.stop_spin_thread()
1011 self._stop_spinning.clear()
1016 self._stop_spinning.clear()
1012 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1017 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1013 self._spin_thread.daemon = True
1018 self._spin_thread.daemon = True
1014 self._spin_thread.start()
1019 self._spin_thread.start()
1015
1020
1016 def stop_spin_thread(self):
1021 def stop_spin_thread(self):
1017 """stop background spin_thread, if any"""
1022 """stop background spin_thread, if any"""
1018 if self._spin_thread is not None:
1023 if self._spin_thread is not None:
1019 self._stop_spinning.set()
1024 self._stop_spinning.set()
1020 self._spin_thread.join()
1025 self._spin_thread.join()
1021 self._spin_thread = None
1026 self._spin_thread = None
1022
1027
1023 def spin(self):
1028 def spin(self):
1024 """Flush any registration notifications and execution results
1029 """Flush any registration notifications and execution results
1025 waiting in the ZMQ queue.
1030 waiting in the ZMQ queue.
1026 """
1031 """
1027 if self._notification_socket:
1032 if self._notification_socket:
1028 self._flush_notifications()
1033 self._flush_notifications()
1029 if self._iopub_socket:
1034 if self._iopub_socket:
1030 self._flush_iopub(self._iopub_socket)
1035 self._flush_iopub(self._iopub_socket)
1031 if self._mux_socket:
1036 if self._mux_socket:
1032 self._flush_results(self._mux_socket)
1037 self._flush_results(self._mux_socket)
1033 if self._task_socket:
1038 if self._task_socket:
1034 self._flush_results(self._task_socket)
1039 self._flush_results(self._task_socket)
1035 if self._control_socket:
1040 if self._control_socket:
1036 self._flush_control(self._control_socket)
1041 self._flush_control(self._control_socket)
1037 if self._query_socket:
1042 if self._query_socket:
1038 self._flush_ignored_hub_replies()
1043 self._flush_ignored_hub_replies()
1039
1044
1040 def wait(self, jobs=None, timeout=-1):
1045 def wait(self, jobs=None, timeout=-1):
1041 """waits on one or more `jobs`, for up to `timeout` seconds.
1046 """waits on one or more `jobs`, for up to `timeout` seconds.
1042
1047
1043 Parameters
1048 Parameters
1044 ----------
1049 ----------
1045
1050
1046 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1051 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1047 ints are indices to self.history
1052 ints are indices to self.history
1048 strs are msg_ids
1053 strs are msg_ids
1049 default: wait on all outstanding messages
1054 default: wait on all outstanding messages
1050 timeout : float
1055 timeout : float
1051 a time in seconds, after which to give up.
1056 a time in seconds, after which to give up.
1052 default is -1, which means no timeout
1057 default is -1, which means no timeout
1053
1058
1054 Returns
1059 Returns
1055 -------
1060 -------
1056
1061
1057 True : when all msg_ids are done
1062 True : when all msg_ids are done
1058 False : timeout reached, some msg_ids still outstanding
1063 False : timeout reached, some msg_ids still outstanding
1059 """
1064 """
1060 tic = time.time()
1065 tic = time.time()
1061 if jobs is None:
1066 if jobs is None:
1062 theids = self.outstanding
1067 theids = self.outstanding
1063 else:
1068 else:
1064 if isinstance(jobs, (int, basestring, AsyncResult)):
1069 if isinstance(jobs, (int, basestring, AsyncResult)):
1065 jobs = [jobs]
1070 jobs = [jobs]
1066 theids = set()
1071 theids = set()
1067 for job in jobs:
1072 for job in jobs:
1068 if isinstance(job, int):
1073 if isinstance(job, int):
1069 # index access
1074 # index access
1070 job = self.history[job]
1075 job = self.history[job]
1071 elif isinstance(job, AsyncResult):
1076 elif isinstance(job, AsyncResult):
1072 map(theids.add, job.msg_ids)
1077 map(theids.add, job.msg_ids)
1073 continue
1078 continue
1074 theids.add(job)
1079 theids.add(job)
1075 if not theids.intersection(self.outstanding):
1080 if not theids.intersection(self.outstanding):
1076 return True
1081 return True
1077 self.spin()
1082 self.spin()
1078 while theids.intersection(self.outstanding):
1083 while theids.intersection(self.outstanding):
1079 if timeout >= 0 and ( time.time()-tic ) > timeout:
1084 if timeout >= 0 and ( time.time()-tic ) > timeout:
1080 break
1085 break
1081 time.sleep(1e-3)
1086 time.sleep(1e-3)
1082 self.spin()
1087 self.spin()
1083 return len(theids.intersection(self.outstanding)) == 0
1088 return len(theids.intersection(self.outstanding)) == 0
1084
1089
1085 #--------------------------------------------------------------------------
1090 #--------------------------------------------------------------------------
1086 # Control methods
1091 # Control methods
1087 #--------------------------------------------------------------------------
1092 #--------------------------------------------------------------------------
1088
1093
1089 @spin_first
1094 @spin_first
1090 def clear(self, targets=None, block=None):
1095 def clear(self, targets=None, block=None):
1091 """Clear the namespace in target(s)."""
1096 """Clear the namespace in target(s)."""
1092 block = self.block if block is None else block
1097 block = self.block if block is None else block
1093 targets = self._build_targets(targets)[0]
1098 targets = self._build_targets(targets)[0]
1094 for t in targets:
1099 for t in targets:
1095 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1100 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1096 error = False
1101 error = False
1097 if block:
1102 if block:
1098 self._flush_ignored_control()
1103 self._flush_ignored_control()
1099 for i in range(len(targets)):
1104 for i in range(len(targets)):
1100 idents,msg = self.session.recv(self._control_socket,0)
1105 idents,msg = self.session.recv(self._control_socket,0)
1101 if self.debug:
1106 if self.debug:
1102 pprint(msg)
1107 pprint(msg)
1103 if msg['content']['status'] != 'ok':
1108 if msg['content']['status'] != 'ok':
1104 error = self._unwrap_exception(msg['content'])
1109 error = self._unwrap_exception(msg['content'])
1105 else:
1110 else:
1106 self._ignored_control_replies += len(targets)
1111 self._ignored_control_replies += len(targets)
1107 if error:
1112 if error:
1108 raise error
1113 raise error
1109
1114
1110
1115
1111 @spin_first
1116 @spin_first
1112 def abort(self, jobs=None, targets=None, block=None):
1117 def abort(self, jobs=None, targets=None, block=None):
1113 """Abort specific jobs from the execution queues of target(s).
1118 """Abort specific jobs from the execution queues of target(s).
1114
1119
1115 This is a mechanism to prevent jobs that have already been submitted
1120 This is a mechanism to prevent jobs that have already been submitted
1116 from executing.
1121 from executing.
1117
1122
1118 Parameters
1123 Parameters
1119 ----------
1124 ----------
1120
1125
1121 jobs : msg_id, list of msg_ids, or AsyncResult
1126 jobs : msg_id, list of msg_ids, or AsyncResult
1122 The jobs to be aborted
1127 The jobs to be aborted
1123
1128
1124 If unspecified/None: abort all outstanding jobs.
1129 If unspecified/None: abort all outstanding jobs.
1125
1130
1126 """
1131 """
1127 block = self.block if block is None else block
1132 block = self.block if block is None else block
1128 jobs = jobs if jobs is not None else list(self.outstanding)
1133 jobs = jobs if jobs is not None else list(self.outstanding)
1129 targets = self._build_targets(targets)[0]
1134 targets = self._build_targets(targets)[0]
1130
1135
1131 msg_ids = []
1136 msg_ids = []
1132 if isinstance(jobs, (basestring,AsyncResult)):
1137 if isinstance(jobs, (basestring,AsyncResult)):
1133 jobs = [jobs]
1138 jobs = [jobs]
1134 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1139 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1135 if bad_ids:
1140 if bad_ids:
1136 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1141 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1137 for j in jobs:
1142 for j in jobs:
1138 if isinstance(j, AsyncResult):
1143 if isinstance(j, AsyncResult):
1139 msg_ids.extend(j.msg_ids)
1144 msg_ids.extend(j.msg_ids)
1140 else:
1145 else:
1141 msg_ids.append(j)
1146 msg_ids.append(j)
1142 content = dict(msg_ids=msg_ids)
1147 content = dict(msg_ids=msg_ids)
1143 for t in targets:
1148 for t in targets:
1144 self.session.send(self._control_socket, 'abort_request',
1149 self.session.send(self._control_socket, 'abort_request',
1145 content=content, ident=t)
1150 content=content, ident=t)
1146 error = False
1151 error = False
1147 if block:
1152 if block:
1148 self._flush_ignored_control()
1153 self._flush_ignored_control()
1149 for i in range(len(targets)):
1154 for i in range(len(targets)):
1150 idents,msg = self.session.recv(self._control_socket,0)
1155 idents,msg = self.session.recv(self._control_socket,0)
1151 if self.debug:
1156 if self.debug:
1152 pprint(msg)
1157 pprint(msg)
1153 if msg['content']['status'] != 'ok':
1158 if msg['content']['status'] != 'ok':
1154 error = self._unwrap_exception(msg['content'])
1159 error = self._unwrap_exception(msg['content'])
1155 else:
1160 else:
1156 self._ignored_control_replies += len(targets)
1161 self._ignored_control_replies += len(targets)
1157 if error:
1162 if error:
1158 raise error
1163 raise error
1159
1164
1160 @spin_first
1165 @spin_first
1161 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1166 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1162 """Terminates one or more engine processes, optionally including the hub.
1167 """Terminates one or more engine processes, optionally including the hub.
1163
1168
1164 Parameters
1169 Parameters
1165 ----------
1170 ----------
1166
1171
1167 targets: list of ints or 'all' [default: all]
1172 targets: list of ints or 'all' [default: all]
1168 Which engines to shutdown.
1173 Which engines to shutdown.
1169 hub: bool [default: False]
1174 hub: bool [default: False]
1170 Whether to include the Hub. hub=True implies targets='all'.
1175 Whether to include the Hub. hub=True implies targets='all'.
1171 block: bool [default: self.block]
1176 block: bool [default: self.block]
1172 Whether to wait for clean shutdown replies or not.
1177 Whether to wait for clean shutdown replies or not.
1173 restart: bool [default: False]
1178 restart: bool [default: False]
1174 NOT IMPLEMENTED
1179 NOT IMPLEMENTED
1175 whether to restart engines after shutting them down.
1180 whether to restart engines after shutting them down.
1176 """
1181 """
1177 from IPython.parallel.error import NoEnginesRegistered
1182 from IPython.parallel.error import NoEnginesRegistered
1178 if restart:
1183 if restart:
1179 raise NotImplementedError("Engine restart is not yet implemented")
1184 raise NotImplementedError("Engine restart is not yet implemented")
1180
1185
1181 block = self.block if block is None else block
1186 block = self.block if block is None else block
1182 if hub:
1187 if hub:
1183 targets = 'all'
1188 targets = 'all'
1184 try:
1189 try:
1185 targets = self._build_targets(targets)[0]
1190 targets = self._build_targets(targets)[0]
1186 except NoEnginesRegistered:
1191 except NoEnginesRegistered:
1187 targets = []
1192 targets = []
1188 for t in targets:
1193 for t in targets:
1189 self.session.send(self._control_socket, 'shutdown_request',
1194 self.session.send(self._control_socket, 'shutdown_request',
1190 content={'restart':restart},ident=t)
1195 content={'restart':restart},ident=t)
1191 error = False
1196 error = False
1192 if block or hub:
1197 if block or hub:
1193 self._flush_ignored_control()
1198 self._flush_ignored_control()
1194 for i in range(len(targets)):
1199 for i in range(len(targets)):
1195 idents,msg = self.session.recv(self._control_socket, 0)
1200 idents,msg = self.session.recv(self._control_socket, 0)
1196 if self.debug:
1201 if self.debug:
1197 pprint(msg)
1202 pprint(msg)
1198 if msg['content']['status'] != 'ok':
1203 if msg['content']['status'] != 'ok':
1199 error = self._unwrap_exception(msg['content'])
1204 error = self._unwrap_exception(msg['content'])
1200 else:
1205 else:
1201 self._ignored_control_replies += len(targets)
1206 self._ignored_control_replies += len(targets)
1202
1207
1203 if hub:
1208 if hub:
1204 time.sleep(0.25)
1209 time.sleep(0.25)
1205 self.session.send(self._query_socket, 'shutdown_request')
1210 self.session.send(self._query_socket, 'shutdown_request')
1206 idents,msg = self.session.recv(self._query_socket, 0)
1211 idents,msg = self.session.recv(self._query_socket, 0)
1207 if self.debug:
1212 if self.debug:
1208 pprint(msg)
1213 pprint(msg)
1209 if msg['content']['status'] != 'ok':
1214 if msg['content']['status'] != 'ok':
1210 error = self._unwrap_exception(msg['content'])
1215 error = self._unwrap_exception(msg['content'])
1211
1216
1212 if error:
1217 if error:
1213 raise error
1218 raise error
1214
1219
1215 #--------------------------------------------------------------------------
1220 #--------------------------------------------------------------------------
1216 # Execution related methods
1221 # Execution related methods
1217 #--------------------------------------------------------------------------
1222 #--------------------------------------------------------------------------
1218
1223
1219 def _maybe_raise(self, result):
1224 def _maybe_raise(self, result):
1220 """wrapper for maybe raising an exception if apply failed."""
1225 """wrapper for maybe raising an exception if apply failed."""
1221 if isinstance(result, error.RemoteError):
1226 if isinstance(result, error.RemoteError):
1222 raise result
1227 raise result
1223
1228
1224 return result
1229 return result
1225
1230
1226 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1231 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1227 ident=None):
1232 ident=None):
1228 """construct and send an apply message via a socket.
1233 """construct and send an apply message via a socket.
1229
1234
1230 This is the principal method with which all engine execution is performed by views.
1235 This is the principal method with which all engine execution is performed by views.
1231 """
1236 """
1232
1237
1233 if self._closed:
1238 if self._closed:
1234 raise RuntimeError("Client cannot be used after its sockets have been closed")
1239 raise RuntimeError("Client cannot be used after its sockets have been closed")
1235
1240
1236 # defaults:
1241 # defaults:
1237 args = args if args is not None else []
1242 args = args if args is not None else []
1238 kwargs = kwargs if kwargs is not None else {}
1243 kwargs = kwargs if kwargs is not None else {}
1239 metadata = metadata if metadata is not None else {}
1244 metadata = metadata if metadata is not None else {}
1240
1245
1241 # validate arguments
1246 # validate arguments
1242 if not callable(f) and not isinstance(f, Reference):
1247 if not callable(f) and not isinstance(f, Reference):
1243 raise TypeError("f must be callable, not %s"%type(f))
1248 raise TypeError("f must be callable, not %s"%type(f))
1244 if not isinstance(args, (tuple, list)):
1249 if not isinstance(args, (tuple, list)):
1245 raise TypeError("args must be tuple or list, not %s"%type(args))
1250 raise TypeError("args must be tuple or list, not %s"%type(args))
1246 if not isinstance(kwargs, dict):
1251 if not isinstance(kwargs, dict):
1247 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1252 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1248 if not isinstance(metadata, dict):
1253 if not isinstance(metadata, dict):
1249 raise TypeError("metadata must be dict, not %s"%type(metadata))
1254 raise TypeError("metadata must be dict, not %s"%type(metadata))
1250
1255
1251 bufs = serialize.pack_apply_message(f, args, kwargs,
1256 bufs = serialize.pack_apply_message(f, args, kwargs,
1252 buffer_threshold=self.session.buffer_threshold,
1257 buffer_threshold=self.session.buffer_threshold,
1253 item_threshold=self.session.item_threshold,
1258 item_threshold=self.session.item_threshold,
1254 )
1259 )
1255
1260
1256 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1261 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1257 metadata=metadata, track=track)
1262 metadata=metadata, track=track)
1258
1263
1259 msg_id = msg['header']['msg_id']
1264 msg_id = msg['header']['msg_id']
1260 self.outstanding.add(msg_id)
1265 self.outstanding.add(msg_id)
1261 if ident:
1266 if ident:
1262 # possibly routed to a specific engine
1267 # possibly routed to a specific engine
1263 if isinstance(ident, list):
1268 if isinstance(ident, list):
1264 ident = ident[-1]
1269 ident = ident[-1]
1265 if ident in self._engines.values():
1270 if ident in self._engines.values():
1266 # save for later, in case of engine death
1271 # save for later, in case of engine death
1267 self._outstanding_dict[ident].add(msg_id)
1272 self._outstanding_dict[ident].add(msg_id)
1268 self.history.append(msg_id)
1273 self.history.append(msg_id)
1269 self.metadata[msg_id]['submitted'] = datetime.now()
1274 self.metadata[msg_id]['submitted'] = datetime.now()
1270
1275
1271 return msg
1276 return msg
1272
1277
1273 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1278 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1274 """construct and send an execute request via a socket.
1279 """construct and send an execute request via a socket.
1275
1280
1276 """
1281 """
1277
1282
1278 if self._closed:
1283 if self._closed:
1279 raise RuntimeError("Client cannot be used after its sockets have been closed")
1284 raise RuntimeError("Client cannot be used after its sockets have been closed")
1280
1285
1281 # defaults:
1286 # defaults:
1282 metadata = metadata if metadata is not None else {}
1287 metadata = metadata if metadata is not None else {}
1283
1288
1284 # validate arguments
1289 # validate arguments
1285 if not isinstance(code, basestring):
1290 if not isinstance(code, basestring):
1286 raise TypeError("code must be text, not %s" % type(code))
1291 raise TypeError("code must be text, not %s" % type(code))
1287 if not isinstance(metadata, dict):
1292 if not isinstance(metadata, dict):
1288 raise TypeError("metadata must be dict, not %s" % type(metadata))
1293 raise TypeError("metadata must be dict, not %s" % type(metadata))
1289
1294
1290 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1295 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1291
1296
1292
1297
1293 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1298 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1294 metadata=metadata)
1299 metadata=metadata)
1295
1300
1296 msg_id = msg['header']['msg_id']
1301 msg_id = msg['header']['msg_id']
1297 self.outstanding.add(msg_id)
1302 self.outstanding.add(msg_id)
1298 if ident:
1303 if ident:
1299 # possibly routed to a specific engine
1304 # possibly routed to a specific engine
1300 if isinstance(ident, list):
1305 if isinstance(ident, list):
1301 ident = ident[-1]
1306 ident = ident[-1]
1302 if ident in self._engines.values():
1307 if ident in self._engines.values():
1303 # save for later, in case of engine death
1308 # save for later, in case of engine death
1304 self._outstanding_dict[ident].add(msg_id)
1309 self._outstanding_dict[ident].add(msg_id)
1305 self.history.append(msg_id)
1310 self.history.append(msg_id)
1306 self.metadata[msg_id]['submitted'] = datetime.now()
1311 self.metadata[msg_id]['submitted'] = datetime.now()
1307
1312
1308 return msg
1313 return msg
1309
1314
1310 #--------------------------------------------------------------------------
1315 #--------------------------------------------------------------------------
1311 # construct a View object
1316 # construct a View object
1312 #--------------------------------------------------------------------------
1317 #--------------------------------------------------------------------------
1313
1318
1314 def load_balanced_view(self, targets=None):
1319 def load_balanced_view(self, targets=None):
1315 """construct a DirectView object.
1320 """construct a DirectView object.
1316
1321
1317 If no arguments are specified, create a LoadBalancedView
1322 If no arguments are specified, create a LoadBalancedView
1318 using all engines.
1323 using all engines.
1319
1324
1320 Parameters
1325 Parameters
1321 ----------
1326 ----------
1322
1327
1323 targets: list,slice,int,etc. [default: use all engines]
1328 targets: list,slice,int,etc. [default: use all engines]
1324 The subset of engines across which to load-balance
1329 The subset of engines across which to load-balance
1325 """
1330 """
1326 if targets == 'all':
1331 if targets == 'all':
1327 targets = None
1332 targets = None
1328 if targets is not None:
1333 if targets is not None:
1329 targets = self._build_targets(targets)[1]
1334 targets = self._build_targets(targets)[1]
1330 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1335 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1331
1336
1332 def direct_view(self, targets='all'):
1337 def direct_view(self, targets='all'):
1333 """construct a DirectView object.
1338 """construct a DirectView object.
1334
1339
1335 If no targets are specified, create a DirectView using all engines.
1340 If no targets are specified, create a DirectView using all engines.
1336
1341
1337 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1342 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1338 evaluate the target engines at each execution, whereas rc[:] will connect to
1343 evaluate the target engines at each execution, whereas rc[:] will connect to
1339 all *current* engines, and that list will not change.
1344 all *current* engines, and that list will not change.
1340
1345
1341 That is, 'all' will always use all engines, whereas rc[:] will not use
1346 That is, 'all' will always use all engines, whereas rc[:] will not use
1342 engines added after the DirectView is constructed.
1347 engines added after the DirectView is constructed.
1343
1348
1344 Parameters
1349 Parameters
1345 ----------
1350 ----------
1346
1351
1347 targets: list,slice,int,etc. [default: use all engines]
1352 targets: list,slice,int,etc. [default: use all engines]
1348 The engines to use for the View
1353 The engines to use for the View
1349 """
1354 """
1350 single = isinstance(targets, int)
1355 single = isinstance(targets, int)
1351 # allow 'all' to be lazily evaluated at each execution
1356 # allow 'all' to be lazily evaluated at each execution
1352 if targets != 'all':
1357 if targets != 'all':
1353 targets = self._build_targets(targets)[1]
1358 targets = self._build_targets(targets)[1]
1354 if single:
1359 if single:
1355 targets = targets[0]
1360 targets = targets[0]
1356 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1361 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1357
1362
1358 #--------------------------------------------------------------------------
1363 #--------------------------------------------------------------------------
1359 # Query methods
1364 # Query methods
1360 #--------------------------------------------------------------------------
1365 #--------------------------------------------------------------------------
1361
1366
1362 @spin_first
1367 @spin_first
1363 def get_result(self, indices_or_msg_ids=None, block=None):
1368 def get_result(self, indices_or_msg_ids=None, block=None):
1364 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1369 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1365
1370
1366 If the client already has the results, no request to the Hub will be made.
1371 If the client already has the results, no request to the Hub will be made.
1367
1372
1368 This is a convenient way to construct AsyncResult objects, which are wrappers
1373 This is a convenient way to construct AsyncResult objects, which are wrappers
1369 that include metadata about execution, and allow for awaiting results that
1374 that include metadata about execution, and allow for awaiting results that
1370 were not submitted by this Client.
1375 were not submitted by this Client.
1371
1376
1372 It can also be a convenient way to retrieve the metadata associated with
1377 It can also be a convenient way to retrieve the metadata associated with
1373 blocking execution, since it always retrieves
1378 blocking execution, since it always retrieves
1374
1379
1375 Examples
1380 Examples
1376 --------
1381 --------
1377 ::
1382 ::
1378
1383
1379 In [10]: r = client.apply()
1384 In [10]: r = client.apply()
1380
1385
1381 Parameters
1386 Parameters
1382 ----------
1387 ----------
1383
1388
1384 indices_or_msg_ids : integer history index, str msg_id, or list of either
1389 indices_or_msg_ids : integer history index, str msg_id, or list of either
1385 The indices or msg_ids of indices to be retrieved
1390 The indices or msg_ids of indices to be retrieved
1386
1391
1387 block : bool
1392 block : bool
1388 Whether to wait for the result to be done
1393 Whether to wait for the result to be done
1389
1394
1390 Returns
1395 Returns
1391 -------
1396 -------
1392
1397
1393 AsyncResult
1398 AsyncResult
1394 A single AsyncResult object will always be returned.
1399 A single AsyncResult object will always be returned.
1395
1400
1396 AsyncHubResult
1401 AsyncHubResult
1397 A subclass of AsyncResult that retrieves results from the Hub
1402 A subclass of AsyncResult that retrieves results from the Hub
1398
1403
1399 """
1404 """
1400 block = self.block if block is None else block
1405 block = self.block if block is None else block
1401 if indices_or_msg_ids is None:
1406 if indices_or_msg_ids is None:
1402 indices_or_msg_ids = -1
1407 indices_or_msg_ids = -1
1403
1408
1404 single_result = False
1409 single_result = False
1405 if not isinstance(indices_or_msg_ids, (list,tuple)):
1410 if not isinstance(indices_or_msg_ids, (list,tuple)):
1406 indices_or_msg_ids = [indices_or_msg_ids]
1411 indices_or_msg_ids = [indices_or_msg_ids]
1407 single_result = True
1412 single_result = True
1408
1413
1409 theids = []
1414 theids = []
1410 for id in indices_or_msg_ids:
1415 for id in indices_or_msg_ids:
1411 if isinstance(id, int):
1416 if isinstance(id, int):
1412 id = self.history[id]
1417 id = self.history[id]
1413 if not isinstance(id, basestring):
1418 if not isinstance(id, basestring):
1414 raise TypeError("indices must be str or int, not %r"%id)
1419 raise TypeError("indices must be str or int, not %r"%id)
1415 theids.append(id)
1420 theids.append(id)
1416
1421
1417 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1422 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1418 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1423 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1419
1424
1420 # given single msg_id initially, get_result shot get the result itself,
1425 # given single msg_id initially, get_result shot get the result itself,
1421 # not a length-one list
1426 # not a length-one list
1422 if single_result:
1427 if single_result:
1423 theids = theids[0]
1428 theids = theids[0]
1424
1429
1425 if remote_ids:
1430 if remote_ids:
1426 ar = AsyncHubResult(self, msg_ids=theids)
1431 ar = AsyncHubResult(self, msg_ids=theids)
1427 else:
1432 else:
1428 ar = AsyncResult(self, msg_ids=theids)
1433 ar = AsyncResult(self, msg_ids=theids)
1429
1434
1430 if block:
1435 if block:
1431 ar.wait()
1436 ar.wait()
1432
1437
1433 return ar
1438 return ar
1434
1439
1435 @spin_first
1440 @spin_first
1436 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1441 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1437 """Resubmit one or more tasks.
1442 """Resubmit one or more tasks.
1438
1443
1439 in-flight tasks may not be resubmitted.
1444 in-flight tasks may not be resubmitted.
1440
1445
1441 Parameters
1446 Parameters
1442 ----------
1447 ----------
1443
1448
1444 indices_or_msg_ids : integer history index, str msg_id, or list of either
1449 indices_or_msg_ids : integer history index, str msg_id, or list of either
1445 The indices or msg_ids of indices to be retrieved
1450 The indices or msg_ids of indices to be retrieved
1446
1451
1447 block : bool
1452 block : bool
1448 Whether to wait for the result to be done
1453 Whether to wait for the result to be done
1449
1454
1450 Returns
1455 Returns
1451 -------
1456 -------
1452
1457
1453 AsyncHubResult
1458 AsyncHubResult
1454 A subclass of AsyncResult that retrieves results from the Hub
1459 A subclass of AsyncResult that retrieves results from the Hub
1455
1460
1456 """
1461 """
1457 block = self.block if block is None else block
1462 block = self.block if block is None else block
1458 if indices_or_msg_ids is None:
1463 if indices_or_msg_ids is None:
1459 indices_or_msg_ids = -1
1464 indices_or_msg_ids = -1
1460
1465
1461 if not isinstance(indices_or_msg_ids, (list,tuple)):
1466 if not isinstance(indices_or_msg_ids, (list,tuple)):
1462 indices_or_msg_ids = [indices_or_msg_ids]
1467 indices_or_msg_ids = [indices_or_msg_ids]
1463
1468
1464 theids = []
1469 theids = []
1465 for id in indices_or_msg_ids:
1470 for id in indices_or_msg_ids:
1466 if isinstance(id, int):
1471 if isinstance(id, int):
1467 id = self.history[id]
1472 id = self.history[id]
1468 if not isinstance(id, basestring):
1473 if not isinstance(id, basestring):
1469 raise TypeError("indices must be str or int, not %r"%id)
1474 raise TypeError("indices must be str or int, not %r"%id)
1470 theids.append(id)
1475 theids.append(id)
1471
1476
1472 content = dict(msg_ids = theids)
1477 content = dict(msg_ids = theids)
1473
1478
1474 self.session.send(self._query_socket, 'resubmit_request', content)
1479 self.session.send(self._query_socket, 'resubmit_request', content)
1475
1480
1476 zmq.select([self._query_socket], [], [])
1481 zmq.select([self._query_socket], [], [])
1477 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1482 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1478 if self.debug:
1483 if self.debug:
1479 pprint(msg)
1484 pprint(msg)
1480 content = msg['content']
1485 content = msg['content']
1481 if content['status'] != 'ok':
1486 if content['status'] != 'ok':
1482 raise self._unwrap_exception(content)
1487 raise self._unwrap_exception(content)
1483 mapping = content['resubmitted']
1488 mapping = content['resubmitted']
1484 new_ids = [ mapping[msg_id] for msg_id in theids ]
1489 new_ids = [ mapping[msg_id] for msg_id in theids ]
1485
1490
1486 ar = AsyncHubResult(self, msg_ids=new_ids)
1491 ar = AsyncHubResult(self, msg_ids=new_ids)
1487
1492
1488 if block:
1493 if block:
1489 ar.wait()
1494 ar.wait()
1490
1495
1491 return ar
1496 return ar
1492
1497
1493 @spin_first
1498 @spin_first
1494 def result_status(self, msg_ids, status_only=True):
1499 def result_status(self, msg_ids, status_only=True):
1495 """Check on the status of the result(s) of the apply request with `msg_ids`.
1500 """Check on the status of the result(s) of the apply request with `msg_ids`.
1496
1501
1497 If status_only is False, then the actual results will be retrieved, else
1502 If status_only is False, then the actual results will be retrieved, else
1498 only the status of the results will be checked.
1503 only the status of the results will be checked.
1499
1504
1500 Parameters
1505 Parameters
1501 ----------
1506 ----------
1502
1507
1503 msg_ids : list of msg_ids
1508 msg_ids : list of msg_ids
1504 if int:
1509 if int:
1505 Passed as index to self.history for convenience.
1510 Passed as index to self.history for convenience.
1506 status_only : bool (default: True)
1511 status_only : bool (default: True)
1507 if False:
1512 if False:
1508 Retrieve the actual results of completed tasks.
1513 Retrieve the actual results of completed tasks.
1509
1514
1510 Returns
1515 Returns
1511 -------
1516 -------
1512
1517
1513 results : dict
1518 results : dict
1514 There will always be the keys 'pending' and 'completed', which will
1519 There will always be the keys 'pending' and 'completed', which will
1515 be lists of msg_ids that are incomplete or complete. If `status_only`
1520 be lists of msg_ids that are incomplete or complete. If `status_only`
1516 is False, then completed results will be keyed by their `msg_id`.
1521 is False, then completed results will be keyed by their `msg_id`.
1517 """
1522 """
1518 if not isinstance(msg_ids, (list,tuple)):
1523 if not isinstance(msg_ids, (list,tuple)):
1519 msg_ids = [msg_ids]
1524 msg_ids = [msg_ids]
1520
1525
1521 theids = []
1526 theids = []
1522 for msg_id in msg_ids:
1527 for msg_id in msg_ids:
1523 if isinstance(msg_id, int):
1528 if isinstance(msg_id, int):
1524 msg_id = self.history[msg_id]
1529 msg_id = self.history[msg_id]
1525 if not isinstance(msg_id, basestring):
1530 if not isinstance(msg_id, basestring):
1526 raise TypeError("msg_ids must be str, not %r"%msg_id)
1531 raise TypeError("msg_ids must be str, not %r"%msg_id)
1527 theids.append(msg_id)
1532 theids.append(msg_id)
1528
1533
1529 completed = []
1534 completed = []
1530 local_results = {}
1535 local_results = {}
1531
1536
1532 # comment this block out to temporarily disable local shortcut:
1537 # comment this block out to temporarily disable local shortcut:
1533 for msg_id in theids:
1538 for msg_id in theids:
1534 if msg_id in self.results:
1539 if msg_id in self.results:
1535 completed.append(msg_id)
1540 completed.append(msg_id)
1536 local_results[msg_id] = self.results[msg_id]
1541 local_results[msg_id] = self.results[msg_id]
1537 theids.remove(msg_id)
1542 theids.remove(msg_id)
1538
1543
1539 if theids: # some not locally cached
1544 if theids: # some not locally cached
1540 content = dict(msg_ids=theids, status_only=status_only)
1545 content = dict(msg_ids=theids, status_only=status_only)
1541 msg = self.session.send(self._query_socket, "result_request", content=content)
1546 msg = self.session.send(self._query_socket, "result_request", content=content)
1542 zmq.select([self._query_socket], [], [])
1547 zmq.select([self._query_socket], [], [])
1543 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1548 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1544 if self.debug:
1549 if self.debug:
1545 pprint(msg)
1550 pprint(msg)
1546 content = msg['content']
1551 content = msg['content']
1547 if content['status'] != 'ok':
1552 if content['status'] != 'ok':
1548 raise self._unwrap_exception(content)
1553 raise self._unwrap_exception(content)
1549 buffers = msg['buffers']
1554 buffers = msg['buffers']
1550 else:
1555 else:
1551 content = dict(completed=[],pending=[])
1556 content = dict(completed=[],pending=[])
1552
1557
1553 content['completed'].extend(completed)
1558 content['completed'].extend(completed)
1554
1559
1555 if status_only:
1560 if status_only:
1556 return content
1561 return content
1557
1562
1558 failures = []
1563 failures = []
1559 # load cached results into result:
1564 # load cached results into result:
1560 content.update(local_results)
1565 content.update(local_results)
1561
1566
1562 # update cache with results:
1567 # update cache with results:
1563 for msg_id in sorted(theids):
1568 for msg_id in sorted(theids):
1564 if msg_id in content['completed']:
1569 if msg_id in content['completed']:
1565 rec = content[msg_id]
1570 rec = content[msg_id]
1566 parent = rec['header']
1571 parent = rec['header']
1567 header = rec['result_header']
1572 header = rec['result_header']
1568 rcontent = rec['result_content']
1573 rcontent = rec['result_content']
1569 iodict = rec['io']
1574 iodict = rec['io']
1570 if isinstance(rcontent, str):
1575 if isinstance(rcontent, str):
1571 rcontent = self.session.unpack(rcontent)
1576 rcontent = self.session.unpack(rcontent)
1572
1577
1573 md = self.metadata[msg_id]
1578 md = self.metadata[msg_id]
1574 md_msg = dict(
1579 md_msg = dict(
1575 content=rcontent,
1580 content=rcontent,
1576 parent_header=parent,
1581 parent_header=parent,
1577 header=header,
1582 header=header,
1578 metadata=rec['result_metadata'],
1583 metadata=rec['result_metadata'],
1579 )
1584 )
1580 md.update(self._extract_metadata(md_msg))
1585 md.update(self._extract_metadata(md_msg))
1581 if rec.get('received'):
1586 if rec.get('received'):
1582 md['received'] = rec['received']
1587 md['received'] = rec['received']
1583 md.update(iodict)
1588 md.update(iodict)
1584
1589
1585 if rcontent['status'] == 'ok':
1590 if rcontent['status'] == 'ok':
1586 if header['msg_type'] == 'apply_reply':
1591 if header['msg_type'] == 'apply_reply':
1587 res,buffers = serialize.unserialize_object(buffers)
1592 res,buffers = serialize.unserialize_object(buffers)
1588 elif header['msg_type'] == 'execute_reply':
1593 elif header['msg_type'] == 'execute_reply':
1589 res = ExecuteReply(msg_id, rcontent, md)
1594 res = ExecuteReply(msg_id, rcontent, md)
1590 else:
1595 else:
1591 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1596 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1592 else:
1597 else:
1593 res = self._unwrap_exception(rcontent)
1598 res = self._unwrap_exception(rcontent)
1594 failures.append(res)
1599 failures.append(res)
1595
1600
1596 self.results[msg_id] = res
1601 self.results[msg_id] = res
1597 content[msg_id] = res
1602 content[msg_id] = res
1598
1603
1599 if len(theids) == 1 and failures:
1604 if len(theids) == 1 and failures:
1600 raise failures[0]
1605 raise failures[0]
1601
1606
1602 error.collect_exceptions(failures, "result_status")
1607 error.collect_exceptions(failures, "result_status")
1603 return content
1608 return content
1604
1609
1605 @spin_first
1610 @spin_first
1606 def queue_status(self, targets='all', verbose=False):
1611 def queue_status(self, targets='all', verbose=False):
1607 """Fetch the status of engine queues.
1612 """Fetch the status of engine queues.
1608
1613
1609 Parameters
1614 Parameters
1610 ----------
1615 ----------
1611
1616
1612 targets : int/str/list of ints/strs
1617 targets : int/str/list of ints/strs
1613 the engines whose states are to be queried.
1618 the engines whose states are to be queried.
1614 default : all
1619 default : all
1615 verbose : bool
1620 verbose : bool
1616 Whether to return lengths only, or lists of ids for each element
1621 Whether to return lengths only, or lists of ids for each element
1617 """
1622 """
1618 if targets == 'all':
1623 if targets == 'all':
1619 # allow 'all' to be evaluated on the engine
1624 # allow 'all' to be evaluated on the engine
1620 engine_ids = None
1625 engine_ids = None
1621 else:
1626 else:
1622 engine_ids = self._build_targets(targets)[1]
1627 engine_ids = self._build_targets(targets)[1]
1623 content = dict(targets=engine_ids, verbose=verbose)
1628 content = dict(targets=engine_ids, verbose=verbose)
1624 self.session.send(self._query_socket, "queue_request", content=content)
1629 self.session.send(self._query_socket, "queue_request", content=content)
1625 idents,msg = self.session.recv(self._query_socket, 0)
1630 idents,msg = self.session.recv(self._query_socket, 0)
1626 if self.debug:
1631 if self.debug:
1627 pprint(msg)
1632 pprint(msg)
1628 content = msg['content']
1633 content = msg['content']
1629 status = content.pop('status')
1634 status = content.pop('status')
1630 if status != 'ok':
1635 if status != 'ok':
1631 raise self._unwrap_exception(content)
1636 raise self._unwrap_exception(content)
1632 content = rekey(content)
1637 content = rekey(content)
1633 if isinstance(targets, int):
1638 if isinstance(targets, int):
1634 return content[targets]
1639 return content[targets]
1635 else:
1640 else:
1636 return content
1641 return content
1637
1642
1638 def _build_msgids_from_target(self, targets=None):
1643 def _build_msgids_from_target(self, targets=None):
1639 """Build a list of msg_ids from the list of engine targets"""
1644 """Build a list of msg_ids from the list of engine targets"""
1640 if not targets: # needed as _build_targets otherwise uses all engines
1645 if not targets: # needed as _build_targets otherwise uses all engines
1641 return []
1646 return []
1642 target_ids = self._build_targets(targets)[0]
1647 target_ids = self._build_targets(targets)[0]
1643 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1648 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1644
1649
1645 def _build_msgids_from_jobs(self, jobs=None):
1650 def _build_msgids_from_jobs(self, jobs=None):
1646 """Build a list of msg_ids from "jobs" """
1651 """Build a list of msg_ids from "jobs" """
1647 if not jobs:
1652 if not jobs:
1648 return []
1653 return []
1649 msg_ids = []
1654 msg_ids = []
1650 if isinstance(jobs, (basestring,AsyncResult)):
1655 if isinstance(jobs, (basestring,AsyncResult)):
1651 jobs = [jobs]
1656 jobs = [jobs]
1652 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1657 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1653 if bad_ids:
1658 if bad_ids:
1654 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1659 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1655 for j in jobs:
1660 for j in jobs:
1656 if isinstance(j, AsyncResult):
1661 if isinstance(j, AsyncResult):
1657 msg_ids.extend(j.msg_ids)
1662 msg_ids.extend(j.msg_ids)
1658 else:
1663 else:
1659 msg_ids.append(j)
1664 msg_ids.append(j)
1660 return msg_ids
1665 return msg_ids
1661
1666
1662 def purge_local_results(self, jobs=[], targets=[]):
1667 def purge_local_results(self, jobs=[], targets=[]):
1663 """Clears the client caches of results and frees such memory.
1668 """Clears the client caches of results and frees such memory.
1664
1669
1665 Individual results can be purged by msg_id, or the entire
1670 Individual results can be purged by msg_id, or the entire
1666 history of specific targets can be purged.
1671 history of specific targets can be purged.
1667
1672
1668 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1673 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1669
1674
1670 The client must have no outstanding tasks before purging the caches.
1675 The client must have no outstanding tasks before purging the caches.
1671 Raises `AssertionError` if there are still outstanding tasks.
1676 Raises `AssertionError` if there are still outstanding tasks.
1672
1677
1673 After this call all `AsyncResults` are invalid and should be discarded.
1678 After this call all `AsyncResults` are invalid and should be discarded.
1674
1679
1675 If you must "reget" the results, you can still do so by using
1680 If you must "reget" the results, you can still do so by using
1676 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1681 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1677 redownload the results from the hub if they are still available
1682 redownload the results from the hub if they are still available
1678 (i.e `client.purge_hub_results(...)` has not been called.
1683 (i.e `client.purge_hub_results(...)` has not been called.
1679
1684
1680 Parameters
1685 Parameters
1681 ----------
1686 ----------
1682
1687
1683 jobs : str or list of str or AsyncResult objects
1688 jobs : str or list of str or AsyncResult objects
1684 the msg_ids whose results should be purged.
1689 the msg_ids whose results should be purged.
1685 targets : int/str/list of ints/strs
1690 targets : int/str/list of ints/strs
1686 The targets, by int_id, whose entire results are to be purged.
1691 The targets, by int_id, whose entire results are to be purged.
1687
1692
1688 default : None
1693 default : None
1689 """
1694 """
1690 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1695 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1691
1696
1692 if not targets and not jobs:
1697 if not targets and not jobs:
1693 raise ValueError("Must specify at least one of `targets` and `jobs`")
1698 raise ValueError("Must specify at least one of `targets` and `jobs`")
1694
1699
1695 if jobs == 'all':
1700 if jobs == 'all':
1696 self.results.clear()
1701 self.results.clear()
1697 self.metadata.clear()
1702 self.metadata.clear()
1698 return
1703 return
1699 else:
1704 else:
1700 msg_ids = []
1705 msg_ids = []
1701 msg_ids.extend(self._build_msgids_from_target(targets))
1706 msg_ids.extend(self._build_msgids_from_target(targets))
1702 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1707 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1703 map(self.results.pop, msg_ids)
1708 map(self.results.pop, msg_ids)
1704 map(self.metadata.pop, msg_ids)
1709 map(self.metadata.pop, msg_ids)
1705
1710
1706
1711
1707 @spin_first
1712 @spin_first
1708 def purge_hub_results(self, jobs=[], targets=[]):
1713 def purge_hub_results(self, jobs=[], targets=[]):
1709 """Tell the Hub to forget results.
1714 """Tell the Hub to forget results.
1710
1715
1711 Individual results can be purged by msg_id, or the entire
1716 Individual results can be purged by msg_id, or the entire
1712 history of specific targets can be purged.
1717 history of specific targets can be purged.
1713
1718
1714 Use `purge_results('all')` to scrub everything from the Hub's db.
1719 Use `purge_results('all')` to scrub everything from the Hub's db.
1715
1720
1716 Parameters
1721 Parameters
1717 ----------
1722 ----------
1718
1723
1719 jobs : str or list of str or AsyncResult objects
1724 jobs : str or list of str or AsyncResult objects
1720 the msg_ids whose results should be forgotten.
1725 the msg_ids whose results should be forgotten.
1721 targets : int/str/list of ints/strs
1726 targets : int/str/list of ints/strs
1722 The targets, by int_id, whose entire history is to be purged.
1727 The targets, by int_id, whose entire history is to be purged.
1723
1728
1724 default : None
1729 default : None
1725 """
1730 """
1726 if not targets and not jobs:
1731 if not targets and not jobs:
1727 raise ValueError("Must specify at least one of `targets` and `jobs`")
1732 raise ValueError("Must specify at least one of `targets` and `jobs`")
1728 if targets:
1733 if targets:
1729 targets = self._build_targets(targets)[1]
1734 targets = self._build_targets(targets)[1]
1730
1735
1731 # construct msg_ids from jobs
1736 # construct msg_ids from jobs
1732 if jobs == 'all':
1737 if jobs == 'all':
1733 msg_ids = jobs
1738 msg_ids = jobs
1734 else:
1739 else:
1735 msg_ids = self._build_msgids_from_jobs(jobs)
1740 msg_ids = self._build_msgids_from_jobs(jobs)
1736
1741
1737 content = dict(engine_ids=targets, msg_ids=msg_ids)
1742 content = dict(engine_ids=targets, msg_ids=msg_ids)
1738 self.session.send(self._query_socket, "purge_request", content=content)
1743 self.session.send(self._query_socket, "purge_request", content=content)
1739 idents, msg = self.session.recv(self._query_socket, 0)
1744 idents, msg = self.session.recv(self._query_socket, 0)
1740 if self.debug:
1745 if self.debug:
1741 pprint(msg)
1746 pprint(msg)
1742 content = msg['content']
1747 content = msg['content']
1743 if content['status'] != 'ok':
1748 if content['status'] != 'ok':
1744 raise self._unwrap_exception(content)
1749 raise self._unwrap_exception(content)
1745
1750
1746 def purge_results(self, jobs=[], targets=[]):
1751 def purge_results(self, jobs=[], targets=[]):
1747 """Clears the cached results from both the hub and the local client
1752 """Clears the cached results from both the hub and the local client
1748
1753
1749 Individual results can be purged by msg_id, or the entire
1754 Individual results can be purged by msg_id, or the entire
1750 history of specific targets can be purged.
1755 history of specific targets can be purged.
1751
1756
1752 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1757 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1753 the Client's db.
1758 the Client's db.
1754
1759
1755 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1760 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1756 the same arguments.
1761 the same arguments.
1757
1762
1758 Parameters
1763 Parameters
1759 ----------
1764 ----------
1760
1765
1761 jobs : str or list of str or AsyncResult objects
1766 jobs : str or list of str or AsyncResult objects
1762 the msg_ids whose results should be forgotten.
1767 the msg_ids whose results should be forgotten.
1763 targets : int/str/list of ints/strs
1768 targets : int/str/list of ints/strs
1764 The targets, by int_id, whose entire history is to be purged.
1769 The targets, by int_id, whose entire history is to be purged.
1765
1770
1766 default : None
1771 default : None
1767 """
1772 """
1768 self.purge_local_results(jobs=jobs, targets=targets)
1773 self.purge_local_results(jobs=jobs, targets=targets)
1769 self.purge_hub_results(jobs=jobs, targets=targets)
1774 self.purge_hub_results(jobs=jobs, targets=targets)
1770
1775
1771 def purge_everything(self):
1776 def purge_everything(self):
1772 """Clears all content from previous Tasks from both the hub and the local client
1777 """Clears all content from previous Tasks from both the hub and the local client
1773
1778
1774 In addition to calling `purge_results("all")` it also deletes the history and
1779 In addition to calling `purge_results("all")` it also deletes the history and
1775 other bookkeeping lists.
1780 other bookkeeping lists.
1776 """
1781 """
1777 self.purge_results("all")
1782 self.purge_results("all")
1778 self.history = []
1783 self.history = []
1779 self.session.digest_history.clear()
1784 self.session.digest_history.clear()
1780
1785
1781 @spin_first
1786 @spin_first
1782 def hub_history(self):
1787 def hub_history(self):
1783 """Get the Hub's history
1788 """Get the Hub's history
1784
1789
1785 Just like the Client, the Hub has a history, which is a list of msg_ids.
1790 Just like the Client, the Hub has a history, which is a list of msg_ids.
1786 This will contain the history of all clients, and, depending on configuration,
1791 This will contain the history of all clients, and, depending on configuration,
1787 may contain history across multiple cluster sessions.
1792 may contain history across multiple cluster sessions.
1788
1793
1789 Any msg_id returned here is a valid argument to `get_result`.
1794 Any msg_id returned here is a valid argument to `get_result`.
1790
1795
1791 Returns
1796 Returns
1792 -------
1797 -------
1793
1798
1794 msg_ids : list of strs
1799 msg_ids : list of strs
1795 list of all msg_ids, ordered by task submission time.
1800 list of all msg_ids, ordered by task submission time.
1796 """
1801 """
1797
1802
1798 self.session.send(self._query_socket, "history_request", content={})
1803 self.session.send(self._query_socket, "history_request", content={})
1799 idents, msg = self.session.recv(self._query_socket, 0)
1804 idents, msg = self.session.recv(self._query_socket, 0)
1800
1805
1801 if self.debug:
1806 if self.debug:
1802 pprint(msg)
1807 pprint(msg)
1803 content = msg['content']
1808 content = msg['content']
1804 if content['status'] != 'ok':
1809 if content['status'] != 'ok':
1805 raise self._unwrap_exception(content)
1810 raise self._unwrap_exception(content)
1806 else:
1811 else:
1807 return content['history']
1812 return content['history']
1808
1813
1809 @spin_first
1814 @spin_first
1810 def db_query(self, query, keys=None):
1815 def db_query(self, query, keys=None):
1811 """Query the Hub's TaskRecord database
1816 """Query the Hub's TaskRecord database
1812
1817
1813 This will return a list of task record dicts that match `query`
1818 This will return a list of task record dicts that match `query`
1814
1819
1815 Parameters
1820 Parameters
1816 ----------
1821 ----------
1817
1822
1818 query : mongodb query dict
1823 query : mongodb query dict
1819 The search dict. See mongodb query docs for details.
1824 The search dict. See mongodb query docs for details.
1820 keys : list of strs [optional]
1825 keys : list of strs [optional]
1821 The subset of keys to be returned. The default is to fetch everything but buffers.
1826 The subset of keys to be returned. The default is to fetch everything but buffers.
1822 'msg_id' will *always* be included.
1827 'msg_id' will *always* be included.
1823 """
1828 """
1824 if isinstance(keys, basestring):
1829 if isinstance(keys, basestring):
1825 keys = [keys]
1830 keys = [keys]
1826 content = dict(query=query, keys=keys)
1831 content = dict(query=query, keys=keys)
1827 self.session.send(self._query_socket, "db_request", content=content)
1832 self.session.send(self._query_socket, "db_request", content=content)
1828 idents, msg = self.session.recv(self._query_socket, 0)
1833 idents, msg = self.session.recv(self._query_socket, 0)
1829 if self.debug:
1834 if self.debug:
1830 pprint(msg)
1835 pprint(msg)
1831 content = msg['content']
1836 content = msg['content']
1832 if content['status'] != 'ok':
1837 if content['status'] != 'ok':
1833 raise self._unwrap_exception(content)
1838 raise self._unwrap_exception(content)
1834
1839
1835 records = content['records']
1840 records = content['records']
1836
1841
1837 buffer_lens = content['buffer_lens']
1842 buffer_lens = content['buffer_lens']
1838 result_buffer_lens = content['result_buffer_lens']
1843 result_buffer_lens = content['result_buffer_lens']
1839 buffers = msg['buffers']
1844 buffers = msg['buffers']
1840 has_bufs = buffer_lens is not None
1845 has_bufs = buffer_lens is not None
1841 has_rbufs = result_buffer_lens is not None
1846 has_rbufs = result_buffer_lens is not None
1842 for i,rec in enumerate(records):
1847 for i,rec in enumerate(records):
1843 # relink buffers
1848 # relink buffers
1844 if has_bufs:
1849 if has_bufs:
1845 blen = buffer_lens[i]
1850 blen = buffer_lens[i]
1846 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1851 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1847 if has_rbufs:
1852 if has_rbufs:
1848 blen = result_buffer_lens[i]
1853 blen = result_buffer_lens[i]
1849 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1854 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1850
1855
1851 return records
1856 return records
1852
1857
1853 __all__ = [ 'Client' ]
1858 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now