##// END OF EJS Templates
cleanup Client.close...
MinRK -
Show More
@@ -1,1845 +1,1853 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
36
36
37 from IPython.utils.capture import RichOutput
37 from IPython.utils.capture import RichOutput
38 from IPython.utils.coloransi import TermColors
38 from IPython.utils.coloransi import TermColors
39 from IPython.utils.jsonutil import rekey
39 from IPython.utils.jsonutil import rekey
40 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
40 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
41 from IPython.utils.path import get_ipython_dir
41 from IPython.utils.path import get_ipython_dir
42 from IPython.utils.py3compat import cast_bytes
42 from IPython.utils.py3compat import cast_bytes
43 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
43 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
44 Dict, List, Bool, Set, Any)
44 Dict, List, Bool, Set, Any)
45 from IPython.external.decorator import decorator
45 from IPython.external.decorator import decorator
46 from IPython.external.ssh import tunnel
46 from IPython.external.ssh import tunnel
47
47
48 from IPython.parallel import Reference
48 from IPython.parallel import Reference
49 from IPython.parallel import error
49 from IPython.parallel import error
50 from IPython.parallel import util
50 from IPython.parallel import util
51
51
52 from IPython.kernel.zmq.session import Session, Message
52 from IPython.kernel.zmq.session import Session, Message
53 from IPython.kernel.zmq import serialize
53 from IPython.kernel.zmq import serialize
54
54
55 from .asyncresult import AsyncResult, AsyncHubResult
55 from .asyncresult import AsyncResult, AsyncHubResult
56 from .view import DirectView, LoadBalancedView
56 from .view import DirectView, LoadBalancedView
57
57
58 if sys.version_info[0] >= 3:
58 if sys.version_info[0] >= 3:
59 # xrange is used in a couple 'isinstance' tests in py2
59 # xrange is used in a couple 'isinstance' tests in py2
60 # should be just 'range' in 3k
60 # should be just 'range' in 3k
61 xrange = range
61 xrange = range
62
62
63 #--------------------------------------------------------------------------
63 #--------------------------------------------------------------------------
64 # Decorators for Client methods
64 # Decorators for Client methods
65 #--------------------------------------------------------------------------
65 #--------------------------------------------------------------------------
66
66
67 @decorator
67 @decorator
68 def spin_first(f, self, *args, **kwargs):
68 def spin_first(f, self, *args, **kwargs):
69 """Call spin() to sync state prior to calling the method."""
69 """Call spin() to sync state prior to calling the method."""
70 self.spin()
70 self.spin()
71 return f(self, *args, **kwargs)
71 return f(self, *args, **kwargs)
72
72
73
73
74 #--------------------------------------------------------------------------
74 #--------------------------------------------------------------------------
75 # Classes
75 # Classes
76 #--------------------------------------------------------------------------
76 #--------------------------------------------------------------------------
77
77
78
78
79 class ExecuteReply(RichOutput):
79 class ExecuteReply(RichOutput):
80 """wrapper for finished Execute results"""
80 """wrapper for finished Execute results"""
81 def __init__(self, msg_id, content, metadata):
81 def __init__(self, msg_id, content, metadata):
82 self.msg_id = msg_id
82 self.msg_id = msg_id
83 self._content = content
83 self._content = content
84 self.execution_count = content['execution_count']
84 self.execution_count = content['execution_count']
85 self.metadata = metadata
85 self.metadata = metadata
86
86
87 # RichOutput overrides
87 # RichOutput overrides
88
88
89 @property
89 @property
90 def source(self):
90 def source(self):
91 pyout = self.metadata['pyout']
91 pyout = self.metadata['pyout']
92 if pyout:
92 if pyout:
93 return pyout.get('source', '')
93 return pyout.get('source', '')
94
94
95 @property
95 @property
96 def data(self):
96 def data(self):
97 pyout = self.metadata['pyout']
97 pyout = self.metadata['pyout']
98 if pyout:
98 if pyout:
99 return pyout.get('data', {})
99 return pyout.get('data', {})
100
100
101 @property
101 @property
102 def _metadata(self):
102 def _metadata(self):
103 pyout = self.metadata['pyout']
103 pyout = self.metadata['pyout']
104 if pyout:
104 if pyout:
105 return pyout.get('metadata', {})
105 return pyout.get('metadata', {})
106
106
107 def display(self):
107 def display(self):
108 from IPython.display import publish_display_data
108 from IPython.display import publish_display_data
109 publish_display_data(self.source, self.data, self.metadata)
109 publish_display_data(self.source, self.data, self.metadata)
110
110
111 def _repr_mime_(self, mime):
111 def _repr_mime_(self, mime):
112 if mime not in self.data:
112 if mime not in self.data:
113 return
113 return
114 data = self.data[mime]
114 data = self.data[mime]
115 if mime in self._metadata:
115 if mime in self._metadata:
116 return data, self._metadata[mime]
116 return data, self._metadata[mime]
117 else:
117 else:
118 return data
118 return data
119
119
120 def __getitem__(self, key):
120 def __getitem__(self, key):
121 return self.metadata[key]
121 return self.metadata[key]
122
122
123 def __getattr__(self, key):
123 def __getattr__(self, key):
124 if key not in self.metadata:
124 if key not in self.metadata:
125 raise AttributeError(key)
125 raise AttributeError(key)
126 return self.metadata[key]
126 return self.metadata[key]
127
127
128 def __repr__(self):
128 def __repr__(self):
129 pyout = self.metadata['pyout'] or {'data':{}}
129 pyout = self.metadata['pyout'] or {'data':{}}
130 text_out = pyout['data'].get('text/plain', '')
130 text_out = pyout['data'].get('text/plain', '')
131 if len(text_out) > 32:
131 if len(text_out) > 32:
132 text_out = text_out[:29] + '...'
132 text_out = text_out[:29] + '...'
133
133
134 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
134 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
135
135
136 def _repr_pretty_(self, p, cycle):
136 def _repr_pretty_(self, p, cycle):
137 pyout = self.metadata['pyout'] or {'data':{}}
137 pyout = self.metadata['pyout'] or {'data':{}}
138 text_out = pyout['data'].get('text/plain', '')
138 text_out = pyout['data'].get('text/plain', '')
139
139
140 if not text_out:
140 if not text_out:
141 return
141 return
142
142
143 try:
143 try:
144 ip = get_ipython()
144 ip = get_ipython()
145 except NameError:
145 except NameError:
146 colors = "NoColor"
146 colors = "NoColor"
147 else:
147 else:
148 colors = ip.colors
148 colors = ip.colors
149
149
150 if colors == "NoColor":
150 if colors == "NoColor":
151 out = normal = ""
151 out = normal = ""
152 else:
152 else:
153 out = TermColors.Red
153 out = TermColors.Red
154 normal = TermColors.Normal
154 normal = TermColors.Normal
155
155
156 if '\n' in text_out and not text_out.startswith('\n'):
156 if '\n' in text_out and not text_out.startswith('\n'):
157 # add newline for multiline reprs
157 # add newline for multiline reprs
158 text_out = '\n' + text_out
158 text_out = '\n' + text_out
159
159
160 p.text(
160 p.text(
161 out + u'Out[%i:%i]: ' % (
161 out + u'Out[%i:%i]: ' % (
162 self.metadata['engine_id'], self.execution_count
162 self.metadata['engine_id'], self.execution_count
163 ) + normal + text_out
163 ) + normal + text_out
164 )
164 )
165
165
166
166
167 class Metadata(dict):
167 class Metadata(dict):
168 """Subclass of dict for initializing metadata values.
168 """Subclass of dict for initializing metadata values.
169
169
170 Attribute access works on keys.
170 Attribute access works on keys.
171
171
172 These objects have a strict set of keys - errors will raise if you try
172 These objects have a strict set of keys - errors will raise if you try
173 to add new keys.
173 to add new keys.
174 """
174 """
175 def __init__(self, *args, **kwargs):
175 def __init__(self, *args, **kwargs):
176 dict.__init__(self)
176 dict.__init__(self)
177 md = {'msg_id' : None,
177 md = {'msg_id' : None,
178 'submitted' : None,
178 'submitted' : None,
179 'started' : None,
179 'started' : None,
180 'completed' : None,
180 'completed' : None,
181 'received' : None,
181 'received' : None,
182 'engine_uuid' : None,
182 'engine_uuid' : None,
183 'engine_id' : None,
183 'engine_id' : None,
184 'follow' : None,
184 'follow' : None,
185 'after' : None,
185 'after' : None,
186 'status' : None,
186 'status' : None,
187
187
188 'pyin' : None,
188 'pyin' : None,
189 'pyout' : None,
189 'pyout' : None,
190 'pyerr' : None,
190 'pyerr' : None,
191 'stdout' : '',
191 'stdout' : '',
192 'stderr' : '',
192 'stderr' : '',
193 'outputs' : [],
193 'outputs' : [],
194 'data': {},
194 'data': {},
195 'outputs_ready' : False,
195 'outputs_ready' : False,
196 }
196 }
197 self.update(md)
197 self.update(md)
198 self.update(dict(*args, **kwargs))
198 self.update(dict(*args, **kwargs))
199
199
200 def __getattr__(self, key):
200 def __getattr__(self, key):
201 """getattr aliased to getitem"""
201 """getattr aliased to getitem"""
202 if key in self.iterkeys():
202 if key in self.iterkeys():
203 return self[key]
203 return self[key]
204 else:
204 else:
205 raise AttributeError(key)
205 raise AttributeError(key)
206
206
207 def __setattr__(self, key, value):
207 def __setattr__(self, key, value):
208 """setattr aliased to setitem, with strict"""
208 """setattr aliased to setitem, with strict"""
209 if key in self.iterkeys():
209 if key in self.iterkeys():
210 self[key] = value
210 self[key] = value
211 else:
211 else:
212 raise AttributeError(key)
212 raise AttributeError(key)
213
213
214 def __setitem__(self, key, value):
214 def __setitem__(self, key, value):
215 """strict static key enforcement"""
215 """strict static key enforcement"""
216 if key in self.iterkeys():
216 if key in self.iterkeys():
217 dict.__setitem__(self, key, value)
217 dict.__setitem__(self, key, value)
218 else:
218 else:
219 raise KeyError(key)
219 raise KeyError(key)
220
220
221
221
222 class Client(HasTraits):
222 class Client(HasTraits):
223 """A semi-synchronous client to the IPython ZMQ cluster
223 """A semi-synchronous client to the IPython ZMQ cluster
224
224
225 Parameters
225 Parameters
226 ----------
226 ----------
227
227
228 url_file : str/unicode; path to ipcontroller-client.json
228 url_file : str/unicode; path to ipcontroller-client.json
229 This JSON file should contain all the information needed to connect to a cluster,
229 This JSON file should contain all the information needed to connect to a cluster,
230 and is likely the only argument needed.
230 and is likely the only argument needed.
231 Connection information for the Hub's registration. If a json connector
231 Connection information for the Hub's registration. If a json connector
232 file is given, then likely no further configuration is necessary.
232 file is given, then likely no further configuration is necessary.
233 [Default: use profile]
233 [Default: use profile]
234 profile : bytes
234 profile : bytes
235 The name of the Cluster profile to be used to find connector information.
235 The name of the Cluster profile to be used to find connector information.
236 If run from an IPython application, the default profile will be the same
236 If run from an IPython application, the default profile will be the same
237 as the running application, otherwise it will be 'default'.
237 as the running application, otherwise it will be 'default'.
238 cluster_id : str
238 cluster_id : str
239 String id to added to runtime files, to prevent name collisions when using
239 String id to added to runtime files, to prevent name collisions when using
240 multiple clusters with a single profile simultaneously.
240 multiple clusters with a single profile simultaneously.
241 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
241 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
242 Since this is text inserted into filenames, typical recommendations apply:
242 Since this is text inserted into filenames, typical recommendations apply:
243 Simple character strings are ideal, and spaces are not recommended (but
243 Simple character strings are ideal, and spaces are not recommended (but
244 should generally work)
244 should generally work)
245 context : zmq.Context
245 context : zmq.Context
246 Pass an existing zmq.Context instance, otherwise the client will create its own.
246 Pass an existing zmq.Context instance, otherwise the client will create its own.
247 debug : bool
247 debug : bool
248 flag for lots of message printing for debug purposes
248 flag for lots of message printing for debug purposes
249 timeout : int/float
249 timeout : int/float
250 time (in seconds) to wait for connection replies from the Hub
250 time (in seconds) to wait for connection replies from the Hub
251 [Default: 10]
251 [Default: 10]
252
252
253 #-------------- session related args ----------------
253 #-------------- session related args ----------------
254
254
255 config : Config object
255 config : Config object
256 If specified, this will be relayed to the Session for configuration
256 If specified, this will be relayed to the Session for configuration
257 username : str
257 username : str
258 set username for the session object
258 set username for the session object
259
259
260 #-------------- ssh related args ----------------
260 #-------------- ssh related args ----------------
261 # These are args for configuring the ssh tunnel to be used
261 # These are args for configuring the ssh tunnel to be used
262 # credentials are used to forward connections over ssh to the Controller
262 # credentials are used to forward connections over ssh to the Controller
263 # Note that the ip given in `addr` needs to be relative to sshserver
263 # Note that the ip given in `addr` needs to be relative to sshserver
264 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
264 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
265 # and set sshserver as the same machine the Controller is on. However,
265 # and set sshserver as the same machine the Controller is on. However,
266 # the only requirement is that sshserver is able to see the Controller
266 # the only requirement is that sshserver is able to see the Controller
267 # (i.e. is within the same trusted network).
267 # (i.e. is within the same trusted network).
268
268
269 sshserver : str
269 sshserver : str
270 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
270 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
271 If keyfile or password is specified, and this is not, it will default to
271 If keyfile or password is specified, and this is not, it will default to
272 the ip given in addr.
272 the ip given in addr.
273 sshkey : str; path to ssh private key file
273 sshkey : str; path to ssh private key file
274 This specifies a key to be used in ssh login, default None.
274 This specifies a key to be used in ssh login, default None.
275 Regular default ssh keys will be used without specifying this argument.
275 Regular default ssh keys will be used without specifying this argument.
276 password : str
276 password : str
277 Your ssh password to sshserver. Note that if this is left None,
277 Your ssh password to sshserver. Note that if this is left None,
278 you will be prompted for it if passwordless key based login is unavailable.
278 you will be prompted for it if passwordless key based login is unavailable.
279 paramiko : bool
279 paramiko : bool
280 flag for whether to use paramiko instead of shell ssh for tunneling.
280 flag for whether to use paramiko instead of shell ssh for tunneling.
281 [default: True on win32, False else]
281 [default: True on win32, False else]
282
282
283
283
284 Attributes
284 Attributes
285 ----------
285 ----------
286
286
287 ids : list of int engine IDs
287 ids : list of int engine IDs
288 requesting the ids attribute always synchronizes
288 requesting the ids attribute always synchronizes
289 the registration state. To request ids without synchronization,
289 the registration state. To request ids without synchronization,
290 use semi-private _ids attributes.
290 use semi-private _ids attributes.
291
291
292 history : list of msg_ids
292 history : list of msg_ids
293 a list of msg_ids, keeping track of all the execution
293 a list of msg_ids, keeping track of all the execution
294 messages you have submitted in order.
294 messages you have submitted in order.
295
295
296 outstanding : set of msg_ids
296 outstanding : set of msg_ids
297 a set of msg_ids that have been submitted, but whose
297 a set of msg_ids that have been submitted, but whose
298 results have not yet been received.
298 results have not yet been received.
299
299
300 results : dict
300 results : dict
301 a dict of all our results, keyed by msg_id
301 a dict of all our results, keyed by msg_id
302
302
303 block : bool
303 block : bool
304 determines default behavior when block not specified
304 determines default behavior when block not specified
305 in execution methods
305 in execution methods
306
306
307 Methods
307 Methods
308 -------
308 -------
309
309
310 spin
310 spin
311 flushes incoming results and registration state changes
311 flushes incoming results and registration state changes
312 control methods spin, and requesting `ids` also ensures up to date
312 control methods spin, and requesting `ids` also ensures up to date
313
313
314 wait
314 wait
315 wait on one or more msg_ids
315 wait on one or more msg_ids
316
316
317 execution methods
317 execution methods
318 apply
318 apply
319 legacy: execute, run
319 legacy: execute, run
320
320
321 data movement
321 data movement
322 push, pull, scatter, gather
322 push, pull, scatter, gather
323
323
324 query methods
324 query methods
325 queue_status, get_result, purge, result_status
325 queue_status, get_result, purge, result_status
326
326
327 control methods
327 control methods
328 abort, shutdown
328 abort, shutdown
329
329
330 """
330 """
331
331
332
332
333 block = Bool(False)
333 block = Bool(False)
334 outstanding = Set()
334 outstanding = Set()
335 results = Instance('collections.defaultdict', (dict,))
335 results = Instance('collections.defaultdict', (dict,))
336 metadata = Instance('collections.defaultdict', (Metadata,))
336 metadata = Instance('collections.defaultdict', (Metadata,))
337 history = List()
337 history = List()
338 debug = Bool(False)
338 debug = Bool(False)
339 _spin_thread = Any()
339 _spin_thread = Any()
340 _stop_spinning = Any()
340 _stop_spinning = Any()
341
341
342 profile=Unicode()
342 profile=Unicode()
343 def _profile_default(self):
343 def _profile_default(self):
344 if BaseIPythonApplication.initialized():
344 if BaseIPythonApplication.initialized():
345 # an IPython app *might* be running, try to get its profile
345 # an IPython app *might* be running, try to get its profile
346 try:
346 try:
347 return BaseIPythonApplication.instance().profile
347 return BaseIPythonApplication.instance().profile
348 except (AttributeError, MultipleInstanceError):
348 except (AttributeError, MultipleInstanceError):
349 # could be a *different* subclass of config.Application,
349 # could be a *different* subclass of config.Application,
350 # which would raise one of these two errors.
350 # which would raise one of these two errors.
351 return u'default'
351 return u'default'
352 else:
352 else:
353 return u'default'
353 return u'default'
354
354
355
355
356 _outstanding_dict = Instance('collections.defaultdict', (set,))
356 _outstanding_dict = Instance('collections.defaultdict', (set,))
357 _ids = List()
357 _ids = List()
358 _connected=Bool(False)
358 _connected=Bool(False)
359 _ssh=Bool(False)
359 _ssh=Bool(False)
360 _context = Instance('zmq.Context')
360 _context = Instance('zmq.Context')
361 _config = Dict()
361 _config = Dict()
362 _engines=Instance(util.ReverseDict, (), {})
362 _engines=Instance(util.ReverseDict, (), {})
363 # _hub_socket=Instance('zmq.Socket')
363 # _hub_socket=Instance('zmq.Socket')
364 _query_socket=Instance('zmq.Socket')
364 _query_socket=Instance('zmq.Socket')
365 _control_socket=Instance('zmq.Socket')
365 _control_socket=Instance('zmq.Socket')
366 _iopub_socket=Instance('zmq.Socket')
366 _iopub_socket=Instance('zmq.Socket')
367 _notification_socket=Instance('zmq.Socket')
367 _notification_socket=Instance('zmq.Socket')
368 _mux_socket=Instance('zmq.Socket')
368 _mux_socket=Instance('zmq.Socket')
369 _task_socket=Instance('zmq.Socket')
369 _task_socket=Instance('zmq.Socket')
370 _task_scheme=Unicode()
370 _task_scheme=Unicode()
371 _closed = False
371 _closed = False
372 _ignored_control_replies=Integer(0)
372 _ignored_control_replies=Integer(0)
373 _ignored_hub_replies=Integer(0)
373 _ignored_hub_replies=Integer(0)
374
374
375 def __new__(self, *args, **kw):
375 def __new__(self, *args, **kw):
376 # don't raise on positional args
376 # don't raise on positional args
377 return HasTraits.__new__(self, **kw)
377 return HasTraits.__new__(self, **kw)
378
378
379 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
379 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
380 context=None, debug=False,
380 context=None, debug=False,
381 sshserver=None, sshkey=None, password=None, paramiko=None,
381 sshserver=None, sshkey=None, password=None, paramiko=None,
382 timeout=10, cluster_id=None, **extra_args
382 timeout=10, cluster_id=None, **extra_args
383 ):
383 ):
384 if profile:
384 if profile:
385 super(Client, self).__init__(debug=debug, profile=profile)
385 super(Client, self).__init__(debug=debug, profile=profile)
386 else:
386 else:
387 super(Client, self).__init__(debug=debug)
387 super(Client, self).__init__(debug=debug)
388 if context is None:
388 if context is None:
389 context = zmq.Context.instance()
389 context = zmq.Context.instance()
390 self._context = context
390 self._context = context
391 self._stop_spinning = Event()
391 self._stop_spinning = Event()
392
392
393 if 'url_or_file' in extra_args:
393 if 'url_or_file' in extra_args:
394 url_file = extra_args['url_or_file']
394 url_file = extra_args['url_or_file']
395 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
395 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
396
396
397 if url_file and util.is_url(url_file):
397 if url_file and util.is_url(url_file):
398 raise ValueError("single urls cannot be specified, url-files must be used.")
398 raise ValueError("single urls cannot be specified, url-files must be used.")
399
399
400 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
400 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
401
401
402 if self._cd is not None:
402 if self._cd is not None:
403 if url_file is None:
403 if url_file is None:
404 if not cluster_id:
404 if not cluster_id:
405 client_json = 'ipcontroller-client.json'
405 client_json = 'ipcontroller-client.json'
406 else:
406 else:
407 client_json = 'ipcontroller-%s-client.json' % cluster_id
407 client_json = 'ipcontroller-%s-client.json' % cluster_id
408 url_file = pjoin(self._cd.security_dir, client_json)
408 url_file = pjoin(self._cd.security_dir, client_json)
409 if url_file is None:
409 if url_file is None:
410 raise ValueError(
410 raise ValueError(
411 "I can't find enough information to connect to a hub!"
411 "I can't find enough information to connect to a hub!"
412 " Please specify at least one of url_file or profile."
412 " Please specify at least one of url_file or profile."
413 )
413 )
414
414
415 with open(url_file) as f:
415 with open(url_file) as f:
416 cfg = json.load(f)
416 cfg = json.load(f)
417
417
418 self._task_scheme = cfg['task_scheme']
418 self._task_scheme = cfg['task_scheme']
419
419
420 # sync defaults from args, json:
420 # sync defaults from args, json:
421 if sshserver:
421 if sshserver:
422 cfg['ssh'] = sshserver
422 cfg['ssh'] = sshserver
423
423
424 location = cfg.setdefault('location', None)
424 location = cfg.setdefault('location', None)
425
425
426 proto,addr = cfg['interface'].split('://')
426 proto,addr = cfg['interface'].split('://')
427 addr = util.disambiguate_ip_address(addr, location)
427 addr = util.disambiguate_ip_address(addr, location)
428 cfg['interface'] = "%s://%s" % (proto, addr)
428 cfg['interface'] = "%s://%s" % (proto, addr)
429
429
430 # turn interface,port into full urls:
430 # turn interface,port into full urls:
431 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
431 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
432 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
432 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
433
433
434 url = cfg['registration']
434 url = cfg['registration']
435
435
436 if location is not None and addr == LOCALHOST:
436 if location is not None and addr == LOCALHOST:
437 # location specified, and connection is expected to be local
437 # location specified, and connection is expected to be local
438 if location not in LOCAL_IPS and not sshserver:
438 if location not in LOCAL_IPS and not sshserver:
439 # load ssh from JSON *only* if the controller is not on
439 # load ssh from JSON *only* if the controller is not on
440 # this machine
440 # this machine
441 sshserver=cfg['ssh']
441 sshserver=cfg['ssh']
442 if location not in LOCAL_IPS and not sshserver:
442 if location not in LOCAL_IPS and not sshserver:
443 # warn if no ssh specified, but SSH is probably needed
443 # warn if no ssh specified, but SSH is probably needed
444 # This is only a warning, because the most likely cause
444 # This is only a warning, because the most likely cause
445 # is a local Controller on a laptop whose IP is dynamic
445 # is a local Controller on a laptop whose IP is dynamic
446 warnings.warn("""
446 warnings.warn("""
447 Controller appears to be listening on localhost, but not on this machine.
447 Controller appears to be listening on localhost, but not on this machine.
448 If this is true, you should specify Client(...,sshserver='you@%s')
448 If this is true, you should specify Client(...,sshserver='you@%s')
449 or instruct your controller to listen on an external IP."""%location,
449 or instruct your controller to listen on an external IP."""%location,
450 RuntimeWarning)
450 RuntimeWarning)
451 elif not sshserver:
451 elif not sshserver:
452 # otherwise sync with cfg
452 # otherwise sync with cfg
453 sshserver = cfg['ssh']
453 sshserver = cfg['ssh']
454
454
455 self._config = cfg
455 self._config = cfg
456
456
457 self._ssh = bool(sshserver or sshkey or password)
457 self._ssh = bool(sshserver or sshkey or password)
458 if self._ssh and sshserver is None:
458 if self._ssh and sshserver is None:
459 # default to ssh via localhost
459 # default to ssh via localhost
460 sshserver = addr
460 sshserver = addr
461 if self._ssh and password is None:
461 if self._ssh and password is None:
462 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
462 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
463 password=False
463 password=False
464 else:
464 else:
465 password = getpass("SSH Password for %s: "%sshserver)
465 password = getpass("SSH Password for %s: "%sshserver)
466 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
466 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
467
467
468 # configure and construct the session
468 # configure and construct the session
469 try:
469 try:
470 extra_args['packer'] = cfg['pack']
470 extra_args['packer'] = cfg['pack']
471 extra_args['unpacker'] = cfg['unpack']
471 extra_args['unpacker'] = cfg['unpack']
472 extra_args['key'] = cast_bytes(cfg['key'])
472 extra_args['key'] = cast_bytes(cfg['key'])
473 extra_args['signature_scheme'] = cfg['signature_scheme']
473 extra_args['signature_scheme'] = cfg['signature_scheme']
474 except KeyError as exc:
474 except KeyError as exc:
475 msg = '\n'.join([
475 msg = '\n'.join([
476 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
476 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
477 "If you are reusing connection files, remove them and start ipcontroller again."
477 "If you are reusing connection files, remove them and start ipcontroller again."
478 ])
478 ])
479 raise ValueError(msg.format(exc.message))
479 raise ValueError(msg.format(exc.message))
480
480
481 self.session = Session(**extra_args)
481 self.session = Session(**extra_args)
482
482
483 self._query_socket = self._context.socket(zmq.DEALER)
483 self._query_socket = self._context.socket(zmq.DEALER)
484
484
485 if self._ssh:
485 if self._ssh:
486 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
486 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
487 else:
487 else:
488 self._query_socket.connect(cfg['registration'])
488 self._query_socket.connect(cfg['registration'])
489
489
490 self.session.debug = self.debug
490 self.session.debug = self.debug
491
491
492 self._notification_handlers = {'registration_notification' : self._register_engine,
492 self._notification_handlers = {'registration_notification' : self._register_engine,
493 'unregistration_notification' : self._unregister_engine,
493 'unregistration_notification' : self._unregister_engine,
494 'shutdown_notification' : lambda msg: self.close(),
494 'shutdown_notification' : lambda msg: self.close(),
495 }
495 }
496 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
496 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
497 'apply_reply' : self._handle_apply_reply}
497 'apply_reply' : self._handle_apply_reply}
498 self._connect(sshserver, ssh_kwargs, timeout)
498 self._connect(sshserver, ssh_kwargs, timeout)
499
499
500 # last step: setup magics, if we are in IPython:
500 # last step: setup magics, if we are in IPython:
501
501
502 try:
502 try:
503 ip = get_ipython()
503 ip = get_ipython()
504 except NameError:
504 except NameError:
505 return
505 return
506 else:
506 else:
507 if 'px' not in ip.magics_manager.magics:
507 if 'px' not in ip.magics_manager.magics:
508 # in IPython but we are the first Client.
508 # in IPython but we are the first Client.
509 # activate a default view for parallel magics.
509 # activate a default view for parallel magics.
510 self.activate()
510 self.activate()
511
511
512 def __del__(self):
512 def __del__(self):
513 """cleanup sockets, but _not_ context."""
513 """cleanup sockets, but _not_ context."""
514 self.close()
514 self.close()
515
515
516 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
516 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
517 if ipython_dir is None:
517 if ipython_dir is None:
518 ipython_dir = get_ipython_dir()
518 ipython_dir = get_ipython_dir()
519 if profile_dir is not None:
519 if profile_dir is not None:
520 try:
520 try:
521 self._cd = ProfileDir.find_profile_dir(profile_dir)
521 self._cd = ProfileDir.find_profile_dir(profile_dir)
522 return
522 return
523 except ProfileDirError:
523 except ProfileDirError:
524 pass
524 pass
525 elif profile is not None:
525 elif profile is not None:
526 try:
526 try:
527 self._cd = ProfileDir.find_profile_dir_by_name(
527 self._cd = ProfileDir.find_profile_dir_by_name(
528 ipython_dir, profile)
528 ipython_dir, profile)
529 return
529 return
530 except ProfileDirError:
530 except ProfileDirError:
531 pass
531 pass
532 self._cd = None
532 self._cd = None
533
533
534 def _update_engines(self, engines):
534 def _update_engines(self, engines):
535 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
535 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
536 for k,v in engines.iteritems():
536 for k,v in engines.iteritems():
537 eid = int(k)
537 eid = int(k)
538 if eid not in self._engines:
538 if eid not in self._engines:
539 self._ids.append(eid)
539 self._ids.append(eid)
540 self._engines[eid] = v
540 self._engines[eid] = v
541 self._ids = sorted(self._ids)
541 self._ids = sorted(self._ids)
542 if sorted(self._engines.keys()) != range(len(self._engines)) and \
542 if sorted(self._engines.keys()) != range(len(self._engines)) and \
543 self._task_scheme == 'pure' and self._task_socket:
543 self._task_scheme == 'pure' and self._task_socket:
544 self._stop_scheduling_tasks()
544 self._stop_scheduling_tasks()
545
545
546 def _stop_scheduling_tasks(self):
546 def _stop_scheduling_tasks(self):
547 """Stop scheduling tasks because an engine has been unregistered
547 """Stop scheduling tasks because an engine has been unregistered
548 from a pure ZMQ scheduler.
548 from a pure ZMQ scheduler.
549 """
549 """
550 self._task_socket.close()
550 self._task_socket.close()
551 self._task_socket = None
551 self._task_socket = None
552 msg = "An engine has been unregistered, and we are using pure " +\
552 msg = "An engine has been unregistered, and we are using pure " +\
553 "ZMQ task scheduling. Task farming will be disabled."
553 "ZMQ task scheduling. Task farming will be disabled."
554 if self.outstanding:
554 if self.outstanding:
555 msg += " If you were running tasks when this happened, " +\
555 msg += " If you were running tasks when this happened, " +\
556 "some `outstanding` msg_ids may never resolve."
556 "some `outstanding` msg_ids may never resolve."
557 warnings.warn(msg, RuntimeWarning)
557 warnings.warn(msg, RuntimeWarning)
558
558
559 def _build_targets(self, targets):
559 def _build_targets(self, targets):
560 """Turn valid target IDs or 'all' into two lists:
560 """Turn valid target IDs or 'all' into two lists:
561 (int_ids, uuids).
561 (int_ids, uuids).
562 """
562 """
563 if not self._ids:
563 if not self._ids:
564 # flush notification socket if no engines yet, just in case
564 # flush notification socket if no engines yet, just in case
565 if not self.ids:
565 if not self.ids:
566 raise error.NoEnginesRegistered("Can't build targets without any engines")
566 raise error.NoEnginesRegistered("Can't build targets without any engines")
567
567
568 if targets is None:
568 if targets is None:
569 targets = self._ids
569 targets = self._ids
570 elif isinstance(targets, basestring):
570 elif isinstance(targets, basestring):
571 if targets.lower() == 'all':
571 if targets.lower() == 'all':
572 targets = self._ids
572 targets = self._ids
573 else:
573 else:
574 raise TypeError("%r not valid str target, must be 'all'"%(targets))
574 raise TypeError("%r not valid str target, must be 'all'"%(targets))
575 elif isinstance(targets, int):
575 elif isinstance(targets, int):
576 if targets < 0:
576 if targets < 0:
577 targets = self.ids[targets]
577 targets = self.ids[targets]
578 if targets not in self._ids:
578 if targets not in self._ids:
579 raise IndexError("No such engine: %i"%targets)
579 raise IndexError("No such engine: %i"%targets)
580 targets = [targets]
580 targets = [targets]
581
581
582 if isinstance(targets, slice):
582 if isinstance(targets, slice):
583 indices = range(len(self._ids))[targets]
583 indices = range(len(self._ids))[targets]
584 ids = self.ids
584 ids = self.ids
585 targets = [ ids[i] for i in indices ]
585 targets = [ ids[i] for i in indices ]
586
586
587 if not isinstance(targets, (tuple, list, xrange)):
587 if not isinstance(targets, (tuple, list, xrange)):
588 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
588 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
589
589
590 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
590 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
591
591
592 def _connect(self, sshserver, ssh_kwargs, timeout):
592 def _connect(self, sshserver, ssh_kwargs, timeout):
593 """setup all our socket connections to the cluster. This is called from
593 """setup all our socket connections to the cluster. This is called from
594 __init__."""
594 __init__."""
595
595
596 # Maybe allow reconnecting?
596 # Maybe allow reconnecting?
597 if self._connected:
597 if self._connected:
598 return
598 return
599 self._connected=True
599 self._connected=True
600
600
601 def connect_socket(s, url):
601 def connect_socket(s, url):
602 # url = util.disambiguate_url(url, self._config['location'])
603 if self._ssh:
602 if self._ssh:
604 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
603 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
605 else:
604 else:
606 return s.connect(url)
605 return s.connect(url)
607
606
608 self.session.send(self._query_socket, 'connection_request')
607 self.session.send(self._query_socket, 'connection_request')
609 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
608 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
610 poller = zmq.Poller()
609 poller = zmq.Poller()
611 poller.register(self._query_socket, zmq.POLLIN)
610 poller.register(self._query_socket, zmq.POLLIN)
612 # poll expects milliseconds, timeout is seconds
611 # poll expects milliseconds, timeout is seconds
613 evts = poller.poll(timeout*1000)
612 evts = poller.poll(timeout*1000)
614 if not evts:
613 if not evts:
615 raise error.TimeoutError("Hub connection request timed out")
614 raise error.TimeoutError("Hub connection request timed out")
616 idents,msg = self.session.recv(self._query_socket,mode=0)
615 idents,msg = self.session.recv(self._query_socket,mode=0)
617 if self.debug:
616 if self.debug:
618 pprint(msg)
617 pprint(msg)
619 content = msg['content']
618 content = msg['content']
620 # self._config['registration'] = dict(content)
619 # self._config['registration'] = dict(content)
621 cfg = self._config
620 cfg = self._config
622 if content['status'] == 'ok':
621 if content['status'] == 'ok':
623 self._mux_socket = self._context.socket(zmq.DEALER)
622 self._mux_socket = self._context.socket(zmq.DEALER)
624 connect_socket(self._mux_socket, cfg['mux'])
623 connect_socket(self._mux_socket, cfg['mux'])
625
624
626 self._task_socket = self._context.socket(zmq.DEALER)
625 self._task_socket = self._context.socket(zmq.DEALER)
627 connect_socket(self._task_socket, cfg['task'])
626 connect_socket(self._task_socket, cfg['task'])
628
627
629 self._notification_socket = self._context.socket(zmq.SUB)
628 self._notification_socket = self._context.socket(zmq.SUB)
630 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
629 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
631 connect_socket(self._notification_socket, cfg['notification'])
630 connect_socket(self._notification_socket, cfg['notification'])
632
631
633 self._control_socket = self._context.socket(zmq.DEALER)
632 self._control_socket = self._context.socket(zmq.DEALER)
634 connect_socket(self._control_socket, cfg['control'])
633 connect_socket(self._control_socket, cfg['control'])
635
634
636 self._iopub_socket = self._context.socket(zmq.SUB)
635 self._iopub_socket = self._context.socket(zmq.SUB)
637 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
636 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
638 connect_socket(self._iopub_socket, cfg['iopub'])
637 connect_socket(self._iopub_socket, cfg['iopub'])
639
638
640 self._update_engines(dict(content['engines']))
639 self._update_engines(dict(content['engines']))
641 else:
640 else:
642 self._connected = False
641 self._connected = False
643 raise Exception("Failed to connect!")
642 raise Exception("Failed to connect!")
644
643
645 #--------------------------------------------------------------------------
644 #--------------------------------------------------------------------------
646 # handlers and callbacks for incoming messages
645 # handlers and callbacks for incoming messages
647 #--------------------------------------------------------------------------
646 #--------------------------------------------------------------------------
648
647
649 def _unwrap_exception(self, content):
648 def _unwrap_exception(self, content):
650 """unwrap exception, and remap engine_id to int."""
649 """unwrap exception, and remap engine_id to int."""
651 e = error.unwrap_exception(content)
650 e = error.unwrap_exception(content)
652 # print e.traceback
651 # print e.traceback
653 if e.engine_info:
652 if e.engine_info:
654 e_uuid = e.engine_info['engine_uuid']
653 e_uuid = e.engine_info['engine_uuid']
655 eid = self._engines[e_uuid]
654 eid = self._engines[e_uuid]
656 e.engine_info['engine_id'] = eid
655 e.engine_info['engine_id'] = eid
657 return e
656 return e
658
657
659 def _extract_metadata(self, msg):
658 def _extract_metadata(self, msg):
660 header = msg['header']
659 header = msg['header']
661 parent = msg['parent_header']
660 parent = msg['parent_header']
662 msg_meta = msg['metadata']
661 msg_meta = msg['metadata']
663 content = msg['content']
662 content = msg['content']
664 md = {'msg_id' : parent['msg_id'],
663 md = {'msg_id' : parent['msg_id'],
665 'received' : datetime.now(),
664 'received' : datetime.now(),
666 'engine_uuid' : msg_meta.get('engine', None),
665 'engine_uuid' : msg_meta.get('engine', None),
667 'follow' : msg_meta.get('follow', []),
666 'follow' : msg_meta.get('follow', []),
668 'after' : msg_meta.get('after', []),
667 'after' : msg_meta.get('after', []),
669 'status' : content['status'],
668 'status' : content['status'],
670 }
669 }
671
670
672 if md['engine_uuid'] is not None:
671 if md['engine_uuid'] is not None:
673 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
672 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
674
673
675 if 'date' in parent:
674 if 'date' in parent:
676 md['submitted'] = parent['date']
675 md['submitted'] = parent['date']
677 if 'started' in msg_meta:
676 if 'started' in msg_meta:
678 md['started'] = msg_meta['started']
677 md['started'] = msg_meta['started']
679 if 'date' in header:
678 if 'date' in header:
680 md['completed'] = header['date']
679 md['completed'] = header['date']
681 return md
680 return md
682
681
683 def _register_engine(self, msg):
682 def _register_engine(self, msg):
684 """Register a new engine, and update our connection info."""
683 """Register a new engine, and update our connection info."""
685 content = msg['content']
684 content = msg['content']
686 eid = content['id']
685 eid = content['id']
687 d = {eid : content['uuid']}
686 d = {eid : content['uuid']}
688 self._update_engines(d)
687 self._update_engines(d)
689
688
690 def _unregister_engine(self, msg):
689 def _unregister_engine(self, msg):
691 """Unregister an engine that has died."""
690 """Unregister an engine that has died."""
692 content = msg['content']
691 content = msg['content']
693 eid = int(content['id'])
692 eid = int(content['id'])
694 if eid in self._ids:
693 if eid in self._ids:
695 self._ids.remove(eid)
694 self._ids.remove(eid)
696 uuid = self._engines.pop(eid)
695 uuid = self._engines.pop(eid)
697
696
698 self._handle_stranded_msgs(eid, uuid)
697 self._handle_stranded_msgs(eid, uuid)
699
698
700 if self._task_socket and self._task_scheme == 'pure':
699 if self._task_socket and self._task_scheme == 'pure':
701 self._stop_scheduling_tasks()
700 self._stop_scheduling_tasks()
702
701
703 def _handle_stranded_msgs(self, eid, uuid):
702 def _handle_stranded_msgs(self, eid, uuid):
704 """Handle messages known to be on an engine when the engine unregisters.
703 """Handle messages known to be on an engine when the engine unregisters.
705
704
706 It is possible that this will fire prematurely - that is, an engine will
705 It is possible that this will fire prematurely - that is, an engine will
707 go down after completing a result, and the client will be notified
706 go down after completing a result, and the client will be notified
708 of the unregistration and later receive the successful result.
707 of the unregistration and later receive the successful result.
709 """
708 """
710
709
711 outstanding = self._outstanding_dict[uuid]
710 outstanding = self._outstanding_dict[uuid]
712
711
713 for msg_id in list(outstanding):
712 for msg_id in list(outstanding):
714 if msg_id in self.results:
713 if msg_id in self.results:
715 # we already
714 # we already
716 continue
715 continue
717 try:
716 try:
718 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
717 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
719 except:
718 except:
720 content = error.wrap_exception()
719 content = error.wrap_exception()
721 # build a fake message:
720 # build a fake message:
722 msg = self.session.msg('apply_reply', content=content)
721 msg = self.session.msg('apply_reply', content=content)
723 msg['parent_header']['msg_id'] = msg_id
722 msg['parent_header']['msg_id'] = msg_id
724 msg['metadata']['engine'] = uuid
723 msg['metadata']['engine'] = uuid
725 self._handle_apply_reply(msg)
724 self._handle_apply_reply(msg)
726
725
727 def _handle_execute_reply(self, msg):
726 def _handle_execute_reply(self, msg):
728 """Save the reply to an execute_request into our results.
727 """Save the reply to an execute_request into our results.
729
728
730 execute messages are never actually used. apply is used instead.
729 execute messages are never actually used. apply is used instead.
731 """
730 """
732
731
733 parent = msg['parent_header']
732 parent = msg['parent_header']
734 msg_id = parent['msg_id']
733 msg_id = parent['msg_id']
735 if msg_id not in self.outstanding:
734 if msg_id not in self.outstanding:
736 if msg_id in self.history:
735 if msg_id in self.history:
737 print ("got stale result: %s"%msg_id)
736 print ("got stale result: %s"%msg_id)
738 else:
737 else:
739 print ("got unknown result: %s"%msg_id)
738 print ("got unknown result: %s"%msg_id)
740 else:
739 else:
741 self.outstanding.remove(msg_id)
740 self.outstanding.remove(msg_id)
742
741
743 content = msg['content']
742 content = msg['content']
744 header = msg['header']
743 header = msg['header']
745
744
746 # construct metadata:
745 # construct metadata:
747 md = self.metadata[msg_id]
746 md = self.metadata[msg_id]
748 md.update(self._extract_metadata(msg))
747 md.update(self._extract_metadata(msg))
749 # is this redundant?
748 # is this redundant?
750 self.metadata[msg_id] = md
749 self.metadata[msg_id] = md
751
750
752 e_outstanding = self._outstanding_dict[md['engine_uuid']]
751 e_outstanding = self._outstanding_dict[md['engine_uuid']]
753 if msg_id in e_outstanding:
752 if msg_id in e_outstanding:
754 e_outstanding.remove(msg_id)
753 e_outstanding.remove(msg_id)
755
754
756 # construct result:
755 # construct result:
757 if content['status'] == 'ok':
756 if content['status'] == 'ok':
758 self.results[msg_id] = ExecuteReply(msg_id, content, md)
757 self.results[msg_id] = ExecuteReply(msg_id, content, md)
759 elif content['status'] == 'aborted':
758 elif content['status'] == 'aborted':
760 self.results[msg_id] = error.TaskAborted(msg_id)
759 self.results[msg_id] = error.TaskAborted(msg_id)
761 elif content['status'] == 'resubmitted':
760 elif content['status'] == 'resubmitted':
762 # TODO: handle resubmission
761 # TODO: handle resubmission
763 pass
762 pass
764 else:
763 else:
765 self.results[msg_id] = self._unwrap_exception(content)
764 self.results[msg_id] = self._unwrap_exception(content)
766
765
767 def _handle_apply_reply(self, msg):
766 def _handle_apply_reply(self, msg):
768 """Save the reply to an apply_request into our results."""
767 """Save the reply to an apply_request into our results."""
769 parent = msg['parent_header']
768 parent = msg['parent_header']
770 msg_id = parent['msg_id']
769 msg_id = parent['msg_id']
771 if msg_id not in self.outstanding:
770 if msg_id not in self.outstanding:
772 if msg_id in self.history:
771 if msg_id in self.history:
773 print ("got stale result: %s"%msg_id)
772 print ("got stale result: %s"%msg_id)
774 print self.results[msg_id]
773 print self.results[msg_id]
775 print msg
774 print msg
776 else:
775 else:
777 print ("got unknown result: %s"%msg_id)
776 print ("got unknown result: %s"%msg_id)
778 else:
777 else:
779 self.outstanding.remove(msg_id)
778 self.outstanding.remove(msg_id)
780 content = msg['content']
779 content = msg['content']
781 header = msg['header']
780 header = msg['header']
782
781
783 # construct metadata:
782 # construct metadata:
784 md = self.metadata[msg_id]
783 md = self.metadata[msg_id]
785 md.update(self._extract_metadata(msg))
784 md.update(self._extract_metadata(msg))
786 # is this redundant?
785 # is this redundant?
787 self.metadata[msg_id] = md
786 self.metadata[msg_id] = md
788
787
789 e_outstanding = self._outstanding_dict[md['engine_uuid']]
788 e_outstanding = self._outstanding_dict[md['engine_uuid']]
790 if msg_id in e_outstanding:
789 if msg_id in e_outstanding:
791 e_outstanding.remove(msg_id)
790 e_outstanding.remove(msg_id)
792
791
793 # construct result:
792 # construct result:
794 if content['status'] == 'ok':
793 if content['status'] == 'ok':
795 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
794 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
796 elif content['status'] == 'aborted':
795 elif content['status'] == 'aborted':
797 self.results[msg_id] = error.TaskAborted(msg_id)
796 self.results[msg_id] = error.TaskAborted(msg_id)
798 elif content['status'] == 'resubmitted':
797 elif content['status'] == 'resubmitted':
799 # TODO: handle resubmission
798 # TODO: handle resubmission
800 pass
799 pass
801 else:
800 else:
802 self.results[msg_id] = self._unwrap_exception(content)
801 self.results[msg_id] = self._unwrap_exception(content)
803
802
804 def _flush_notifications(self):
803 def _flush_notifications(self):
805 """Flush notifications of engine registrations waiting
804 """Flush notifications of engine registrations waiting
806 in ZMQ queue."""
805 in ZMQ queue."""
807 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
806 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
808 while msg is not None:
807 while msg is not None:
809 if self.debug:
808 if self.debug:
810 pprint(msg)
809 pprint(msg)
811 msg_type = msg['header']['msg_type']
810 msg_type = msg['header']['msg_type']
812 handler = self._notification_handlers.get(msg_type, None)
811 handler = self._notification_handlers.get(msg_type, None)
813 if handler is None:
812 if handler is None:
814 raise Exception("Unhandled message type: %s" % msg_type)
813 raise Exception("Unhandled message type: %s" % msg_type)
815 else:
814 else:
816 handler(msg)
815 handler(msg)
817 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
816 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
818
817
819 def _flush_results(self, sock):
818 def _flush_results(self, sock):
820 """Flush task or queue results waiting in ZMQ queue."""
819 """Flush task or queue results waiting in ZMQ queue."""
821 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
820 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
822 while msg is not None:
821 while msg is not None:
823 if self.debug:
822 if self.debug:
824 pprint(msg)
823 pprint(msg)
825 msg_type = msg['header']['msg_type']
824 msg_type = msg['header']['msg_type']
826 handler = self._queue_handlers.get(msg_type, None)
825 handler = self._queue_handlers.get(msg_type, None)
827 if handler is None:
826 if handler is None:
828 raise Exception("Unhandled message type: %s" % msg_type)
827 raise Exception("Unhandled message type: %s" % msg_type)
829 else:
828 else:
830 handler(msg)
829 handler(msg)
831 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
830 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
832
831
833 def _flush_control(self, sock):
832 def _flush_control(self, sock):
834 """Flush replies from the control channel waiting
833 """Flush replies from the control channel waiting
835 in the ZMQ queue.
834 in the ZMQ queue.
836
835
837 Currently: ignore them."""
836 Currently: ignore them."""
838 if self._ignored_control_replies <= 0:
837 if self._ignored_control_replies <= 0:
839 return
838 return
840 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
839 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
841 while msg is not None:
840 while msg is not None:
842 self._ignored_control_replies -= 1
841 self._ignored_control_replies -= 1
843 if self.debug:
842 if self.debug:
844 pprint(msg)
843 pprint(msg)
845 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
844 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
846
845
847 def _flush_ignored_control(self):
846 def _flush_ignored_control(self):
848 """flush ignored control replies"""
847 """flush ignored control replies"""
849 while self._ignored_control_replies > 0:
848 while self._ignored_control_replies > 0:
850 self.session.recv(self._control_socket)
849 self.session.recv(self._control_socket)
851 self._ignored_control_replies -= 1
850 self._ignored_control_replies -= 1
852
851
853 def _flush_ignored_hub_replies(self):
852 def _flush_ignored_hub_replies(self):
854 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
853 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
855 while msg is not None:
854 while msg is not None:
856 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
855 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
857
856
858 def _flush_iopub(self, sock):
857 def _flush_iopub(self, sock):
859 """Flush replies from the iopub channel waiting
858 """Flush replies from the iopub channel waiting
860 in the ZMQ queue.
859 in the ZMQ queue.
861 """
860 """
862 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
861 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
863 while msg is not None:
862 while msg is not None:
864 if self.debug:
863 if self.debug:
865 pprint(msg)
864 pprint(msg)
866 parent = msg['parent_header']
865 parent = msg['parent_header']
867 # ignore IOPub messages with no parent.
866 # ignore IOPub messages with no parent.
868 # Caused by print statements or warnings from before the first execution.
867 # Caused by print statements or warnings from before the first execution.
869 if not parent:
868 if not parent:
870 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
869 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
871 continue
870 continue
872 msg_id = parent['msg_id']
871 msg_id = parent['msg_id']
873 content = msg['content']
872 content = msg['content']
874 header = msg['header']
873 header = msg['header']
875 msg_type = msg['header']['msg_type']
874 msg_type = msg['header']['msg_type']
876
875
877 # init metadata:
876 # init metadata:
878 md = self.metadata[msg_id]
877 md = self.metadata[msg_id]
879
878
880 if msg_type == 'stream':
879 if msg_type == 'stream':
881 name = content['name']
880 name = content['name']
882 s = md[name] or ''
881 s = md[name] or ''
883 md[name] = s + content['data']
882 md[name] = s + content['data']
884 elif msg_type == 'pyerr':
883 elif msg_type == 'pyerr':
885 md.update({'pyerr' : self._unwrap_exception(content)})
884 md.update({'pyerr' : self._unwrap_exception(content)})
886 elif msg_type == 'pyin':
885 elif msg_type == 'pyin':
887 md.update({'pyin' : content['code']})
886 md.update({'pyin' : content['code']})
888 elif msg_type == 'display_data':
887 elif msg_type == 'display_data':
889 md['outputs'].append(content)
888 md['outputs'].append(content)
890 elif msg_type == 'pyout':
889 elif msg_type == 'pyout':
891 md['pyout'] = content
890 md['pyout'] = content
892 elif msg_type == 'data_message':
891 elif msg_type == 'data_message':
893 data, remainder = serialize.unserialize_object(msg['buffers'])
892 data, remainder = serialize.unserialize_object(msg['buffers'])
894 md['data'].update(data)
893 md['data'].update(data)
895 elif msg_type == 'status':
894 elif msg_type == 'status':
896 # idle message comes after all outputs
895 # idle message comes after all outputs
897 if content['execution_state'] == 'idle':
896 if content['execution_state'] == 'idle':
898 md['outputs_ready'] = True
897 md['outputs_ready'] = True
899 else:
898 else:
900 # unhandled msg_type (status, etc.)
899 # unhandled msg_type (status, etc.)
901 pass
900 pass
902
901
903 # reduntant?
902 # reduntant?
904 self.metadata[msg_id] = md
903 self.metadata[msg_id] = md
905
904
906 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
905 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
907
906
908 #--------------------------------------------------------------------------
907 #--------------------------------------------------------------------------
909 # len, getitem
908 # len, getitem
910 #--------------------------------------------------------------------------
909 #--------------------------------------------------------------------------
911
910
912 def __len__(self):
911 def __len__(self):
913 """len(client) returns # of engines."""
912 """len(client) returns # of engines."""
914 return len(self.ids)
913 return len(self.ids)
915
914
916 def __getitem__(self, key):
915 def __getitem__(self, key):
917 """index access returns DirectView multiplexer objects
916 """index access returns DirectView multiplexer objects
918
917
919 Must be int, slice, or list/tuple/xrange of ints"""
918 Must be int, slice, or list/tuple/xrange of ints"""
920 if not isinstance(key, (int, slice, tuple, list, xrange)):
919 if not isinstance(key, (int, slice, tuple, list, xrange)):
921 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
920 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
922 else:
921 else:
923 return self.direct_view(key)
922 return self.direct_view(key)
924
923
925 #--------------------------------------------------------------------------
924 #--------------------------------------------------------------------------
926 # Begin public methods
925 # Begin public methods
927 #--------------------------------------------------------------------------
926 #--------------------------------------------------------------------------
928
927
929 @property
928 @property
930 def ids(self):
929 def ids(self):
931 """Always up-to-date ids property."""
930 """Always up-to-date ids property."""
932 self._flush_notifications()
931 self._flush_notifications()
933 # always copy:
932 # always copy:
934 return list(self._ids)
933 return list(self._ids)
935
934
936 def activate(self, targets='all', suffix=''):
935 def activate(self, targets='all', suffix=''):
937 """Create a DirectView and register it with IPython magics
936 """Create a DirectView and register it with IPython magics
938
937
939 Defines the magics `%px, %autopx, %pxresult, %%px`
938 Defines the magics `%px, %autopx, %pxresult, %%px`
940
939
941 Parameters
940 Parameters
942 ----------
941 ----------
943
942
944 targets: int, list of ints, or 'all'
943 targets: int, list of ints, or 'all'
945 The engines on which the view's magics will run
944 The engines on which the view's magics will run
946 suffix: str [default: '']
945 suffix: str [default: '']
947 The suffix, if any, for the magics. This allows you to have
946 The suffix, if any, for the magics. This allows you to have
948 multiple views associated with parallel magics at the same time.
947 multiple views associated with parallel magics at the same time.
949
948
950 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
949 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
951 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
950 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
952 on engine 0.
951 on engine 0.
953 """
952 """
954 view = self.direct_view(targets)
953 view = self.direct_view(targets)
955 view.block = True
954 view.block = True
956 view.activate(suffix)
955 view.activate(suffix)
957 return view
956 return view
958
957
959 def close(self):
958 def close(self, linger=None):
959 """Close my zmq Sockets
960
961 If `linger`, set the zmq LINGER socket option,
962 which allows discarding of messages.
963 """
960 if self._closed:
964 if self._closed:
961 return
965 return
962 self.stop_spin_thread()
966 self.stop_spin_thread()
963 snames = filter(lambda n: n.endswith('socket'), dir(self))
967 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
964 for socket in map(lambda name: getattr(self, name), snames):
968 for name in snames:
965 if isinstance(socket, zmq.Socket) and not socket.closed:
969 socket = getattr(self, name)
966 socket.close()
970 if socket is not None and not socket.closed:
971 if linger is not None:
972 socket.close(linger=linger)
973 else:
974 socket.close()
967 self._closed = True
975 self._closed = True
968
976
969 def _spin_every(self, interval=1):
977 def _spin_every(self, interval=1):
970 """target func for use in spin_thread"""
978 """target func for use in spin_thread"""
971 while True:
979 while True:
972 if self._stop_spinning.is_set():
980 if self._stop_spinning.is_set():
973 return
981 return
974 time.sleep(interval)
982 time.sleep(interval)
975 self.spin()
983 self.spin()
976
984
977 def spin_thread(self, interval=1):
985 def spin_thread(self, interval=1):
978 """call Client.spin() in a background thread on some regular interval
986 """call Client.spin() in a background thread on some regular interval
979
987
980 This helps ensure that messages don't pile up too much in the zmq queue
988 This helps ensure that messages don't pile up too much in the zmq queue
981 while you are working on other things, or just leaving an idle terminal.
989 while you are working on other things, or just leaving an idle terminal.
982
990
983 It also helps limit potential padding of the `received` timestamp
991 It also helps limit potential padding of the `received` timestamp
984 on AsyncResult objects, used for timings.
992 on AsyncResult objects, used for timings.
985
993
986 Parameters
994 Parameters
987 ----------
995 ----------
988
996
989 interval : float, optional
997 interval : float, optional
990 The interval on which to spin the client in the background thread
998 The interval on which to spin the client in the background thread
991 (simply passed to time.sleep).
999 (simply passed to time.sleep).
992
1000
993 Notes
1001 Notes
994 -----
1002 -----
995
1003
996 For precision timing, you may want to use this method to put a bound
1004 For precision timing, you may want to use this method to put a bound
997 on the jitter (in seconds) in `received` timestamps used
1005 on the jitter (in seconds) in `received` timestamps used
998 in AsyncResult.wall_time.
1006 in AsyncResult.wall_time.
999
1007
1000 """
1008 """
1001 if self._spin_thread is not None:
1009 if self._spin_thread is not None:
1002 self.stop_spin_thread()
1010 self.stop_spin_thread()
1003 self._stop_spinning.clear()
1011 self._stop_spinning.clear()
1004 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1012 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1005 self._spin_thread.daemon = True
1013 self._spin_thread.daemon = True
1006 self._spin_thread.start()
1014 self._spin_thread.start()
1007
1015
1008 def stop_spin_thread(self):
1016 def stop_spin_thread(self):
1009 """stop background spin_thread, if any"""
1017 """stop background spin_thread, if any"""
1010 if self._spin_thread is not None:
1018 if self._spin_thread is not None:
1011 self._stop_spinning.set()
1019 self._stop_spinning.set()
1012 self._spin_thread.join()
1020 self._spin_thread.join()
1013 self._spin_thread = None
1021 self._spin_thread = None
1014
1022
1015 def spin(self):
1023 def spin(self):
1016 """Flush any registration notifications and execution results
1024 """Flush any registration notifications and execution results
1017 waiting in the ZMQ queue.
1025 waiting in the ZMQ queue.
1018 """
1026 """
1019 if self._notification_socket:
1027 if self._notification_socket:
1020 self._flush_notifications()
1028 self._flush_notifications()
1021 if self._iopub_socket:
1029 if self._iopub_socket:
1022 self._flush_iopub(self._iopub_socket)
1030 self._flush_iopub(self._iopub_socket)
1023 if self._mux_socket:
1031 if self._mux_socket:
1024 self._flush_results(self._mux_socket)
1032 self._flush_results(self._mux_socket)
1025 if self._task_socket:
1033 if self._task_socket:
1026 self._flush_results(self._task_socket)
1034 self._flush_results(self._task_socket)
1027 if self._control_socket:
1035 if self._control_socket:
1028 self._flush_control(self._control_socket)
1036 self._flush_control(self._control_socket)
1029 if self._query_socket:
1037 if self._query_socket:
1030 self._flush_ignored_hub_replies()
1038 self._flush_ignored_hub_replies()
1031
1039
1032 def wait(self, jobs=None, timeout=-1):
1040 def wait(self, jobs=None, timeout=-1):
1033 """waits on one or more `jobs`, for up to `timeout` seconds.
1041 """waits on one or more `jobs`, for up to `timeout` seconds.
1034
1042
1035 Parameters
1043 Parameters
1036 ----------
1044 ----------
1037
1045
1038 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1046 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1039 ints are indices to self.history
1047 ints are indices to self.history
1040 strs are msg_ids
1048 strs are msg_ids
1041 default: wait on all outstanding messages
1049 default: wait on all outstanding messages
1042 timeout : float
1050 timeout : float
1043 a time in seconds, after which to give up.
1051 a time in seconds, after which to give up.
1044 default is -1, which means no timeout
1052 default is -1, which means no timeout
1045
1053
1046 Returns
1054 Returns
1047 -------
1055 -------
1048
1056
1049 True : when all msg_ids are done
1057 True : when all msg_ids are done
1050 False : timeout reached, some msg_ids still outstanding
1058 False : timeout reached, some msg_ids still outstanding
1051 """
1059 """
1052 tic = time.time()
1060 tic = time.time()
1053 if jobs is None:
1061 if jobs is None:
1054 theids = self.outstanding
1062 theids = self.outstanding
1055 else:
1063 else:
1056 if isinstance(jobs, (int, basestring, AsyncResult)):
1064 if isinstance(jobs, (int, basestring, AsyncResult)):
1057 jobs = [jobs]
1065 jobs = [jobs]
1058 theids = set()
1066 theids = set()
1059 for job in jobs:
1067 for job in jobs:
1060 if isinstance(job, int):
1068 if isinstance(job, int):
1061 # index access
1069 # index access
1062 job = self.history[job]
1070 job = self.history[job]
1063 elif isinstance(job, AsyncResult):
1071 elif isinstance(job, AsyncResult):
1064 map(theids.add, job.msg_ids)
1072 map(theids.add, job.msg_ids)
1065 continue
1073 continue
1066 theids.add(job)
1074 theids.add(job)
1067 if not theids.intersection(self.outstanding):
1075 if not theids.intersection(self.outstanding):
1068 return True
1076 return True
1069 self.spin()
1077 self.spin()
1070 while theids.intersection(self.outstanding):
1078 while theids.intersection(self.outstanding):
1071 if timeout >= 0 and ( time.time()-tic ) > timeout:
1079 if timeout >= 0 and ( time.time()-tic ) > timeout:
1072 break
1080 break
1073 time.sleep(1e-3)
1081 time.sleep(1e-3)
1074 self.spin()
1082 self.spin()
1075 return len(theids.intersection(self.outstanding)) == 0
1083 return len(theids.intersection(self.outstanding)) == 0
1076
1084
1077 #--------------------------------------------------------------------------
1085 #--------------------------------------------------------------------------
1078 # Control methods
1086 # Control methods
1079 #--------------------------------------------------------------------------
1087 #--------------------------------------------------------------------------
1080
1088
1081 @spin_first
1089 @spin_first
1082 def clear(self, targets=None, block=None):
1090 def clear(self, targets=None, block=None):
1083 """Clear the namespace in target(s)."""
1091 """Clear the namespace in target(s)."""
1084 block = self.block if block is None else block
1092 block = self.block if block is None else block
1085 targets = self._build_targets(targets)[0]
1093 targets = self._build_targets(targets)[0]
1086 for t in targets:
1094 for t in targets:
1087 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1095 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1088 error = False
1096 error = False
1089 if block:
1097 if block:
1090 self._flush_ignored_control()
1098 self._flush_ignored_control()
1091 for i in range(len(targets)):
1099 for i in range(len(targets)):
1092 idents,msg = self.session.recv(self._control_socket,0)
1100 idents,msg = self.session.recv(self._control_socket,0)
1093 if self.debug:
1101 if self.debug:
1094 pprint(msg)
1102 pprint(msg)
1095 if msg['content']['status'] != 'ok':
1103 if msg['content']['status'] != 'ok':
1096 error = self._unwrap_exception(msg['content'])
1104 error = self._unwrap_exception(msg['content'])
1097 else:
1105 else:
1098 self._ignored_control_replies += len(targets)
1106 self._ignored_control_replies += len(targets)
1099 if error:
1107 if error:
1100 raise error
1108 raise error
1101
1109
1102
1110
1103 @spin_first
1111 @spin_first
1104 def abort(self, jobs=None, targets=None, block=None):
1112 def abort(self, jobs=None, targets=None, block=None):
1105 """Abort specific jobs from the execution queues of target(s).
1113 """Abort specific jobs from the execution queues of target(s).
1106
1114
1107 This is a mechanism to prevent jobs that have already been submitted
1115 This is a mechanism to prevent jobs that have already been submitted
1108 from executing.
1116 from executing.
1109
1117
1110 Parameters
1118 Parameters
1111 ----------
1119 ----------
1112
1120
1113 jobs : msg_id, list of msg_ids, or AsyncResult
1121 jobs : msg_id, list of msg_ids, or AsyncResult
1114 The jobs to be aborted
1122 The jobs to be aborted
1115
1123
1116 If unspecified/None: abort all outstanding jobs.
1124 If unspecified/None: abort all outstanding jobs.
1117
1125
1118 """
1126 """
1119 block = self.block if block is None else block
1127 block = self.block if block is None else block
1120 jobs = jobs if jobs is not None else list(self.outstanding)
1128 jobs = jobs if jobs is not None else list(self.outstanding)
1121 targets = self._build_targets(targets)[0]
1129 targets = self._build_targets(targets)[0]
1122
1130
1123 msg_ids = []
1131 msg_ids = []
1124 if isinstance(jobs, (basestring,AsyncResult)):
1132 if isinstance(jobs, (basestring,AsyncResult)):
1125 jobs = [jobs]
1133 jobs = [jobs]
1126 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1134 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1127 if bad_ids:
1135 if bad_ids:
1128 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1136 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1129 for j in jobs:
1137 for j in jobs:
1130 if isinstance(j, AsyncResult):
1138 if isinstance(j, AsyncResult):
1131 msg_ids.extend(j.msg_ids)
1139 msg_ids.extend(j.msg_ids)
1132 else:
1140 else:
1133 msg_ids.append(j)
1141 msg_ids.append(j)
1134 content = dict(msg_ids=msg_ids)
1142 content = dict(msg_ids=msg_ids)
1135 for t in targets:
1143 for t in targets:
1136 self.session.send(self._control_socket, 'abort_request',
1144 self.session.send(self._control_socket, 'abort_request',
1137 content=content, ident=t)
1145 content=content, ident=t)
1138 error = False
1146 error = False
1139 if block:
1147 if block:
1140 self._flush_ignored_control()
1148 self._flush_ignored_control()
1141 for i in range(len(targets)):
1149 for i in range(len(targets)):
1142 idents,msg = self.session.recv(self._control_socket,0)
1150 idents,msg = self.session.recv(self._control_socket,0)
1143 if self.debug:
1151 if self.debug:
1144 pprint(msg)
1152 pprint(msg)
1145 if msg['content']['status'] != 'ok':
1153 if msg['content']['status'] != 'ok':
1146 error = self._unwrap_exception(msg['content'])
1154 error = self._unwrap_exception(msg['content'])
1147 else:
1155 else:
1148 self._ignored_control_replies += len(targets)
1156 self._ignored_control_replies += len(targets)
1149 if error:
1157 if error:
1150 raise error
1158 raise error
1151
1159
1152 @spin_first
1160 @spin_first
1153 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1161 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1154 """Terminates one or more engine processes, optionally including the hub.
1162 """Terminates one or more engine processes, optionally including the hub.
1155
1163
1156 Parameters
1164 Parameters
1157 ----------
1165 ----------
1158
1166
1159 targets: list of ints or 'all' [default: all]
1167 targets: list of ints or 'all' [default: all]
1160 Which engines to shutdown.
1168 Which engines to shutdown.
1161 hub: bool [default: False]
1169 hub: bool [default: False]
1162 Whether to include the Hub. hub=True implies targets='all'.
1170 Whether to include the Hub. hub=True implies targets='all'.
1163 block: bool [default: self.block]
1171 block: bool [default: self.block]
1164 Whether to wait for clean shutdown replies or not.
1172 Whether to wait for clean shutdown replies or not.
1165 restart: bool [default: False]
1173 restart: bool [default: False]
1166 NOT IMPLEMENTED
1174 NOT IMPLEMENTED
1167 whether to restart engines after shutting them down.
1175 whether to restart engines after shutting them down.
1168 """
1176 """
1169 from IPython.parallel.error import NoEnginesRegistered
1177 from IPython.parallel.error import NoEnginesRegistered
1170 if restart:
1178 if restart:
1171 raise NotImplementedError("Engine restart is not yet implemented")
1179 raise NotImplementedError("Engine restart is not yet implemented")
1172
1180
1173 block = self.block if block is None else block
1181 block = self.block if block is None else block
1174 if hub:
1182 if hub:
1175 targets = 'all'
1183 targets = 'all'
1176 try:
1184 try:
1177 targets = self._build_targets(targets)[0]
1185 targets = self._build_targets(targets)[0]
1178 except NoEnginesRegistered:
1186 except NoEnginesRegistered:
1179 targets = []
1187 targets = []
1180 for t in targets:
1188 for t in targets:
1181 self.session.send(self._control_socket, 'shutdown_request',
1189 self.session.send(self._control_socket, 'shutdown_request',
1182 content={'restart':restart},ident=t)
1190 content={'restart':restart},ident=t)
1183 error = False
1191 error = False
1184 if block or hub:
1192 if block or hub:
1185 self._flush_ignored_control()
1193 self._flush_ignored_control()
1186 for i in range(len(targets)):
1194 for i in range(len(targets)):
1187 idents,msg = self.session.recv(self._control_socket, 0)
1195 idents,msg = self.session.recv(self._control_socket, 0)
1188 if self.debug:
1196 if self.debug:
1189 pprint(msg)
1197 pprint(msg)
1190 if msg['content']['status'] != 'ok':
1198 if msg['content']['status'] != 'ok':
1191 error = self._unwrap_exception(msg['content'])
1199 error = self._unwrap_exception(msg['content'])
1192 else:
1200 else:
1193 self._ignored_control_replies += len(targets)
1201 self._ignored_control_replies += len(targets)
1194
1202
1195 if hub:
1203 if hub:
1196 time.sleep(0.25)
1204 time.sleep(0.25)
1197 self.session.send(self._query_socket, 'shutdown_request')
1205 self.session.send(self._query_socket, 'shutdown_request')
1198 idents,msg = self.session.recv(self._query_socket, 0)
1206 idents,msg = self.session.recv(self._query_socket, 0)
1199 if self.debug:
1207 if self.debug:
1200 pprint(msg)
1208 pprint(msg)
1201 if msg['content']['status'] != 'ok':
1209 if msg['content']['status'] != 'ok':
1202 error = self._unwrap_exception(msg['content'])
1210 error = self._unwrap_exception(msg['content'])
1203
1211
1204 if error:
1212 if error:
1205 raise error
1213 raise error
1206
1214
1207 #--------------------------------------------------------------------------
1215 #--------------------------------------------------------------------------
1208 # Execution related methods
1216 # Execution related methods
1209 #--------------------------------------------------------------------------
1217 #--------------------------------------------------------------------------
1210
1218
1211 def _maybe_raise(self, result):
1219 def _maybe_raise(self, result):
1212 """wrapper for maybe raising an exception if apply failed."""
1220 """wrapper for maybe raising an exception if apply failed."""
1213 if isinstance(result, error.RemoteError):
1221 if isinstance(result, error.RemoteError):
1214 raise result
1222 raise result
1215
1223
1216 return result
1224 return result
1217
1225
1218 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1226 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1219 ident=None):
1227 ident=None):
1220 """construct and send an apply message via a socket.
1228 """construct and send an apply message via a socket.
1221
1229
1222 This is the principal method with which all engine execution is performed by views.
1230 This is the principal method with which all engine execution is performed by views.
1223 """
1231 """
1224
1232
1225 if self._closed:
1233 if self._closed:
1226 raise RuntimeError("Client cannot be used after its sockets have been closed")
1234 raise RuntimeError("Client cannot be used after its sockets have been closed")
1227
1235
1228 # defaults:
1236 # defaults:
1229 args = args if args is not None else []
1237 args = args if args is not None else []
1230 kwargs = kwargs if kwargs is not None else {}
1238 kwargs = kwargs if kwargs is not None else {}
1231 metadata = metadata if metadata is not None else {}
1239 metadata = metadata if metadata is not None else {}
1232
1240
1233 # validate arguments
1241 # validate arguments
1234 if not callable(f) and not isinstance(f, Reference):
1242 if not callable(f) and not isinstance(f, Reference):
1235 raise TypeError("f must be callable, not %s"%type(f))
1243 raise TypeError("f must be callable, not %s"%type(f))
1236 if not isinstance(args, (tuple, list)):
1244 if not isinstance(args, (tuple, list)):
1237 raise TypeError("args must be tuple or list, not %s"%type(args))
1245 raise TypeError("args must be tuple or list, not %s"%type(args))
1238 if not isinstance(kwargs, dict):
1246 if not isinstance(kwargs, dict):
1239 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1247 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1240 if not isinstance(metadata, dict):
1248 if not isinstance(metadata, dict):
1241 raise TypeError("metadata must be dict, not %s"%type(metadata))
1249 raise TypeError("metadata must be dict, not %s"%type(metadata))
1242
1250
1243 bufs = serialize.pack_apply_message(f, args, kwargs,
1251 bufs = serialize.pack_apply_message(f, args, kwargs,
1244 buffer_threshold=self.session.buffer_threshold,
1252 buffer_threshold=self.session.buffer_threshold,
1245 item_threshold=self.session.item_threshold,
1253 item_threshold=self.session.item_threshold,
1246 )
1254 )
1247
1255
1248 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1256 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1249 metadata=metadata, track=track)
1257 metadata=metadata, track=track)
1250
1258
1251 msg_id = msg['header']['msg_id']
1259 msg_id = msg['header']['msg_id']
1252 self.outstanding.add(msg_id)
1260 self.outstanding.add(msg_id)
1253 if ident:
1261 if ident:
1254 # possibly routed to a specific engine
1262 # possibly routed to a specific engine
1255 if isinstance(ident, list):
1263 if isinstance(ident, list):
1256 ident = ident[-1]
1264 ident = ident[-1]
1257 if ident in self._engines.values():
1265 if ident in self._engines.values():
1258 # save for later, in case of engine death
1266 # save for later, in case of engine death
1259 self._outstanding_dict[ident].add(msg_id)
1267 self._outstanding_dict[ident].add(msg_id)
1260 self.history.append(msg_id)
1268 self.history.append(msg_id)
1261 self.metadata[msg_id]['submitted'] = datetime.now()
1269 self.metadata[msg_id]['submitted'] = datetime.now()
1262
1270
1263 return msg
1271 return msg
1264
1272
1265 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1273 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1266 """construct and send an execute request via a socket.
1274 """construct and send an execute request via a socket.
1267
1275
1268 """
1276 """
1269
1277
1270 if self._closed:
1278 if self._closed:
1271 raise RuntimeError("Client cannot be used after its sockets have been closed")
1279 raise RuntimeError("Client cannot be used after its sockets have been closed")
1272
1280
1273 # defaults:
1281 # defaults:
1274 metadata = metadata if metadata is not None else {}
1282 metadata = metadata if metadata is not None else {}
1275
1283
1276 # validate arguments
1284 # validate arguments
1277 if not isinstance(code, basestring):
1285 if not isinstance(code, basestring):
1278 raise TypeError("code must be text, not %s" % type(code))
1286 raise TypeError("code must be text, not %s" % type(code))
1279 if not isinstance(metadata, dict):
1287 if not isinstance(metadata, dict):
1280 raise TypeError("metadata must be dict, not %s" % type(metadata))
1288 raise TypeError("metadata must be dict, not %s" % type(metadata))
1281
1289
1282 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1290 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1283
1291
1284
1292
1285 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1293 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1286 metadata=metadata)
1294 metadata=metadata)
1287
1295
1288 msg_id = msg['header']['msg_id']
1296 msg_id = msg['header']['msg_id']
1289 self.outstanding.add(msg_id)
1297 self.outstanding.add(msg_id)
1290 if ident:
1298 if ident:
1291 # possibly routed to a specific engine
1299 # possibly routed to a specific engine
1292 if isinstance(ident, list):
1300 if isinstance(ident, list):
1293 ident = ident[-1]
1301 ident = ident[-1]
1294 if ident in self._engines.values():
1302 if ident in self._engines.values():
1295 # save for later, in case of engine death
1303 # save for later, in case of engine death
1296 self._outstanding_dict[ident].add(msg_id)
1304 self._outstanding_dict[ident].add(msg_id)
1297 self.history.append(msg_id)
1305 self.history.append(msg_id)
1298 self.metadata[msg_id]['submitted'] = datetime.now()
1306 self.metadata[msg_id]['submitted'] = datetime.now()
1299
1307
1300 return msg
1308 return msg
1301
1309
1302 #--------------------------------------------------------------------------
1310 #--------------------------------------------------------------------------
1303 # construct a View object
1311 # construct a View object
1304 #--------------------------------------------------------------------------
1312 #--------------------------------------------------------------------------
1305
1313
1306 def load_balanced_view(self, targets=None):
1314 def load_balanced_view(self, targets=None):
1307 """construct a DirectView object.
1315 """construct a DirectView object.
1308
1316
1309 If no arguments are specified, create a LoadBalancedView
1317 If no arguments are specified, create a LoadBalancedView
1310 using all engines.
1318 using all engines.
1311
1319
1312 Parameters
1320 Parameters
1313 ----------
1321 ----------
1314
1322
1315 targets: list,slice,int,etc. [default: use all engines]
1323 targets: list,slice,int,etc. [default: use all engines]
1316 The subset of engines across which to load-balance
1324 The subset of engines across which to load-balance
1317 """
1325 """
1318 if targets == 'all':
1326 if targets == 'all':
1319 targets = None
1327 targets = None
1320 if targets is not None:
1328 if targets is not None:
1321 targets = self._build_targets(targets)[1]
1329 targets = self._build_targets(targets)[1]
1322 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1330 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1323
1331
1324 def direct_view(self, targets='all'):
1332 def direct_view(self, targets='all'):
1325 """construct a DirectView object.
1333 """construct a DirectView object.
1326
1334
1327 If no targets are specified, create a DirectView using all engines.
1335 If no targets are specified, create a DirectView using all engines.
1328
1336
1329 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1337 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1330 evaluate the target engines at each execution, whereas rc[:] will connect to
1338 evaluate the target engines at each execution, whereas rc[:] will connect to
1331 all *current* engines, and that list will not change.
1339 all *current* engines, and that list will not change.
1332
1340
1333 That is, 'all' will always use all engines, whereas rc[:] will not use
1341 That is, 'all' will always use all engines, whereas rc[:] will not use
1334 engines added after the DirectView is constructed.
1342 engines added after the DirectView is constructed.
1335
1343
1336 Parameters
1344 Parameters
1337 ----------
1345 ----------
1338
1346
1339 targets: list,slice,int,etc. [default: use all engines]
1347 targets: list,slice,int,etc. [default: use all engines]
1340 The engines to use for the View
1348 The engines to use for the View
1341 """
1349 """
1342 single = isinstance(targets, int)
1350 single = isinstance(targets, int)
1343 # allow 'all' to be lazily evaluated at each execution
1351 # allow 'all' to be lazily evaluated at each execution
1344 if targets != 'all':
1352 if targets != 'all':
1345 targets = self._build_targets(targets)[1]
1353 targets = self._build_targets(targets)[1]
1346 if single:
1354 if single:
1347 targets = targets[0]
1355 targets = targets[0]
1348 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1356 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1349
1357
1350 #--------------------------------------------------------------------------
1358 #--------------------------------------------------------------------------
1351 # Query methods
1359 # Query methods
1352 #--------------------------------------------------------------------------
1360 #--------------------------------------------------------------------------
1353
1361
1354 @spin_first
1362 @spin_first
1355 def get_result(self, indices_or_msg_ids=None, block=None):
1363 def get_result(self, indices_or_msg_ids=None, block=None):
1356 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1364 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1357
1365
1358 If the client already has the results, no request to the Hub will be made.
1366 If the client already has the results, no request to the Hub will be made.
1359
1367
1360 This is a convenient way to construct AsyncResult objects, which are wrappers
1368 This is a convenient way to construct AsyncResult objects, which are wrappers
1361 that include metadata about execution, and allow for awaiting results that
1369 that include metadata about execution, and allow for awaiting results that
1362 were not submitted by this Client.
1370 were not submitted by this Client.
1363
1371
1364 It can also be a convenient way to retrieve the metadata associated with
1372 It can also be a convenient way to retrieve the metadata associated with
1365 blocking execution, since it always retrieves
1373 blocking execution, since it always retrieves
1366
1374
1367 Examples
1375 Examples
1368 --------
1376 --------
1369 ::
1377 ::
1370
1378
1371 In [10]: r = client.apply()
1379 In [10]: r = client.apply()
1372
1380
1373 Parameters
1381 Parameters
1374 ----------
1382 ----------
1375
1383
1376 indices_or_msg_ids : integer history index, str msg_id, or list of either
1384 indices_or_msg_ids : integer history index, str msg_id, or list of either
1377 The indices or msg_ids of indices to be retrieved
1385 The indices or msg_ids of indices to be retrieved
1378
1386
1379 block : bool
1387 block : bool
1380 Whether to wait for the result to be done
1388 Whether to wait for the result to be done
1381
1389
1382 Returns
1390 Returns
1383 -------
1391 -------
1384
1392
1385 AsyncResult
1393 AsyncResult
1386 A single AsyncResult object will always be returned.
1394 A single AsyncResult object will always be returned.
1387
1395
1388 AsyncHubResult
1396 AsyncHubResult
1389 A subclass of AsyncResult that retrieves results from the Hub
1397 A subclass of AsyncResult that retrieves results from the Hub
1390
1398
1391 """
1399 """
1392 block = self.block if block is None else block
1400 block = self.block if block is None else block
1393 if indices_or_msg_ids is None:
1401 if indices_or_msg_ids is None:
1394 indices_or_msg_ids = -1
1402 indices_or_msg_ids = -1
1395
1403
1396 single_result = False
1404 single_result = False
1397 if not isinstance(indices_or_msg_ids, (list,tuple)):
1405 if not isinstance(indices_or_msg_ids, (list,tuple)):
1398 indices_or_msg_ids = [indices_or_msg_ids]
1406 indices_or_msg_ids = [indices_or_msg_ids]
1399 single_result = True
1407 single_result = True
1400
1408
1401 theids = []
1409 theids = []
1402 for id in indices_or_msg_ids:
1410 for id in indices_or_msg_ids:
1403 if isinstance(id, int):
1411 if isinstance(id, int):
1404 id = self.history[id]
1412 id = self.history[id]
1405 if not isinstance(id, basestring):
1413 if not isinstance(id, basestring):
1406 raise TypeError("indices must be str or int, not %r"%id)
1414 raise TypeError("indices must be str or int, not %r"%id)
1407 theids.append(id)
1415 theids.append(id)
1408
1416
1409 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1417 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1410 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1418 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1411
1419
1412 # given single msg_id initially, get_result shot get the result itself,
1420 # given single msg_id initially, get_result shot get the result itself,
1413 # not a length-one list
1421 # not a length-one list
1414 if single_result:
1422 if single_result:
1415 theids = theids[0]
1423 theids = theids[0]
1416
1424
1417 if remote_ids:
1425 if remote_ids:
1418 ar = AsyncHubResult(self, msg_ids=theids)
1426 ar = AsyncHubResult(self, msg_ids=theids)
1419 else:
1427 else:
1420 ar = AsyncResult(self, msg_ids=theids)
1428 ar = AsyncResult(self, msg_ids=theids)
1421
1429
1422 if block:
1430 if block:
1423 ar.wait()
1431 ar.wait()
1424
1432
1425 return ar
1433 return ar
1426
1434
1427 @spin_first
1435 @spin_first
1428 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1436 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1429 """Resubmit one or more tasks.
1437 """Resubmit one or more tasks.
1430
1438
1431 in-flight tasks may not be resubmitted.
1439 in-flight tasks may not be resubmitted.
1432
1440
1433 Parameters
1441 Parameters
1434 ----------
1442 ----------
1435
1443
1436 indices_or_msg_ids : integer history index, str msg_id, or list of either
1444 indices_or_msg_ids : integer history index, str msg_id, or list of either
1437 The indices or msg_ids of indices to be retrieved
1445 The indices or msg_ids of indices to be retrieved
1438
1446
1439 block : bool
1447 block : bool
1440 Whether to wait for the result to be done
1448 Whether to wait for the result to be done
1441
1449
1442 Returns
1450 Returns
1443 -------
1451 -------
1444
1452
1445 AsyncHubResult
1453 AsyncHubResult
1446 A subclass of AsyncResult that retrieves results from the Hub
1454 A subclass of AsyncResult that retrieves results from the Hub
1447
1455
1448 """
1456 """
1449 block = self.block if block is None else block
1457 block = self.block if block is None else block
1450 if indices_or_msg_ids is None:
1458 if indices_or_msg_ids is None:
1451 indices_or_msg_ids = -1
1459 indices_or_msg_ids = -1
1452
1460
1453 if not isinstance(indices_or_msg_ids, (list,tuple)):
1461 if not isinstance(indices_or_msg_ids, (list,tuple)):
1454 indices_or_msg_ids = [indices_or_msg_ids]
1462 indices_or_msg_ids = [indices_or_msg_ids]
1455
1463
1456 theids = []
1464 theids = []
1457 for id in indices_or_msg_ids:
1465 for id in indices_or_msg_ids:
1458 if isinstance(id, int):
1466 if isinstance(id, int):
1459 id = self.history[id]
1467 id = self.history[id]
1460 if not isinstance(id, basestring):
1468 if not isinstance(id, basestring):
1461 raise TypeError("indices must be str or int, not %r"%id)
1469 raise TypeError("indices must be str or int, not %r"%id)
1462 theids.append(id)
1470 theids.append(id)
1463
1471
1464 content = dict(msg_ids = theids)
1472 content = dict(msg_ids = theids)
1465
1473
1466 self.session.send(self._query_socket, 'resubmit_request', content)
1474 self.session.send(self._query_socket, 'resubmit_request', content)
1467
1475
1468 zmq.select([self._query_socket], [], [])
1476 zmq.select([self._query_socket], [], [])
1469 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1477 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1470 if self.debug:
1478 if self.debug:
1471 pprint(msg)
1479 pprint(msg)
1472 content = msg['content']
1480 content = msg['content']
1473 if content['status'] != 'ok':
1481 if content['status'] != 'ok':
1474 raise self._unwrap_exception(content)
1482 raise self._unwrap_exception(content)
1475 mapping = content['resubmitted']
1483 mapping = content['resubmitted']
1476 new_ids = [ mapping[msg_id] for msg_id in theids ]
1484 new_ids = [ mapping[msg_id] for msg_id in theids ]
1477
1485
1478 ar = AsyncHubResult(self, msg_ids=new_ids)
1486 ar = AsyncHubResult(self, msg_ids=new_ids)
1479
1487
1480 if block:
1488 if block:
1481 ar.wait()
1489 ar.wait()
1482
1490
1483 return ar
1491 return ar
1484
1492
1485 @spin_first
1493 @spin_first
1486 def result_status(self, msg_ids, status_only=True):
1494 def result_status(self, msg_ids, status_only=True):
1487 """Check on the status of the result(s) of the apply request with `msg_ids`.
1495 """Check on the status of the result(s) of the apply request with `msg_ids`.
1488
1496
1489 If status_only is False, then the actual results will be retrieved, else
1497 If status_only is False, then the actual results will be retrieved, else
1490 only the status of the results will be checked.
1498 only the status of the results will be checked.
1491
1499
1492 Parameters
1500 Parameters
1493 ----------
1501 ----------
1494
1502
1495 msg_ids : list of msg_ids
1503 msg_ids : list of msg_ids
1496 if int:
1504 if int:
1497 Passed as index to self.history for convenience.
1505 Passed as index to self.history for convenience.
1498 status_only : bool (default: True)
1506 status_only : bool (default: True)
1499 if False:
1507 if False:
1500 Retrieve the actual results of completed tasks.
1508 Retrieve the actual results of completed tasks.
1501
1509
1502 Returns
1510 Returns
1503 -------
1511 -------
1504
1512
1505 results : dict
1513 results : dict
1506 There will always be the keys 'pending' and 'completed', which will
1514 There will always be the keys 'pending' and 'completed', which will
1507 be lists of msg_ids that are incomplete or complete. If `status_only`
1515 be lists of msg_ids that are incomplete or complete. If `status_only`
1508 is False, then completed results will be keyed by their `msg_id`.
1516 is False, then completed results will be keyed by their `msg_id`.
1509 """
1517 """
1510 if not isinstance(msg_ids, (list,tuple)):
1518 if not isinstance(msg_ids, (list,tuple)):
1511 msg_ids = [msg_ids]
1519 msg_ids = [msg_ids]
1512
1520
1513 theids = []
1521 theids = []
1514 for msg_id in msg_ids:
1522 for msg_id in msg_ids:
1515 if isinstance(msg_id, int):
1523 if isinstance(msg_id, int):
1516 msg_id = self.history[msg_id]
1524 msg_id = self.history[msg_id]
1517 if not isinstance(msg_id, basestring):
1525 if not isinstance(msg_id, basestring):
1518 raise TypeError("msg_ids must be str, not %r"%msg_id)
1526 raise TypeError("msg_ids must be str, not %r"%msg_id)
1519 theids.append(msg_id)
1527 theids.append(msg_id)
1520
1528
1521 completed = []
1529 completed = []
1522 local_results = {}
1530 local_results = {}
1523
1531
1524 # comment this block out to temporarily disable local shortcut:
1532 # comment this block out to temporarily disable local shortcut:
1525 for msg_id in theids:
1533 for msg_id in theids:
1526 if msg_id in self.results:
1534 if msg_id in self.results:
1527 completed.append(msg_id)
1535 completed.append(msg_id)
1528 local_results[msg_id] = self.results[msg_id]
1536 local_results[msg_id] = self.results[msg_id]
1529 theids.remove(msg_id)
1537 theids.remove(msg_id)
1530
1538
1531 if theids: # some not locally cached
1539 if theids: # some not locally cached
1532 content = dict(msg_ids=theids, status_only=status_only)
1540 content = dict(msg_ids=theids, status_only=status_only)
1533 msg = self.session.send(self._query_socket, "result_request", content=content)
1541 msg = self.session.send(self._query_socket, "result_request", content=content)
1534 zmq.select([self._query_socket], [], [])
1542 zmq.select([self._query_socket], [], [])
1535 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1543 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1536 if self.debug:
1544 if self.debug:
1537 pprint(msg)
1545 pprint(msg)
1538 content = msg['content']
1546 content = msg['content']
1539 if content['status'] != 'ok':
1547 if content['status'] != 'ok':
1540 raise self._unwrap_exception(content)
1548 raise self._unwrap_exception(content)
1541 buffers = msg['buffers']
1549 buffers = msg['buffers']
1542 else:
1550 else:
1543 content = dict(completed=[],pending=[])
1551 content = dict(completed=[],pending=[])
1544
1552
1545 content['completed'].extend(completed)
1553 content['completed'].extend(completed)
1546
1554
1547 if status_only:
1555 if status_only:
1548 return content
1556 return content
1549
1557
1550 failures = []
1558 failures = []
1551 # load cached results into result:
1559 # load cached results into result:
1552 content.update(local_results)
1560 content.update(local_results)
1553
1561
1554 # update cache with results:
1562 # update cache with results:
1555 for msg_id in sorted(theids):
1563 for msg_id in sorted(theids):
1556 if msg_id in content['completed']:
1564 if msg_id in content['completed']:
1557 rec = content[msg_id]
1565 rec = content[msg_id]
1558 parent = rec['header']
1566 parent = rec['header']
1559 header = rec['result_header']
1567 header = rec['result_header']
1560 rcontent = rec['result_content']
1568 rcontent = rec['result_content']
1561 iodict = rec['io']
1569 iodict = rec['io']
1562 if isinstance(rcontent, str):
1570 if isinstance(rcontent, str):
1563 rcontent = self.session.unpack(rcontent)
1571 rcontent = self.session.unpack(rcontent)
1564
1572
1565 md = self.metadata[msg_id]
1573 md = self.metadata[msg_id]
1566 md_msg = dict(
1574 md_msg = dict(
1567 content=rcontent,
1575 content=rcontent,
1568 parent_header=parent,
1576 parent_header=parent,
1569 header=header,
1577 header=header,
1570 metadata=rec['result_metadata'],
1578 metadata=rec['result_metadata'],
1571 )
1579 )
1572 md.update(self._extract_metadata(md_msg))
1580 md.update(self._extract_metadata(md_msg))
1573 if rec.get('received'):
1581 if rec.get('received'):
1574 md['received'] = rec['received']
1582 md['received'] = rec['received']
1575 md.update(iodict)
1583 md.update(iodict)
1576
1584
1577 if rcontent['status'] == 'ok':
1585 if rcontent['status'] == 'ok':
1578 if header['msg_type'] == 'apply_reply':
1586 if header['msg_type'] == 'apply_reply':
1579 res,buffers = serialize.unserialize_object(buffers)
1587 res,buffers = serialize.unserialize_object(buffers)
1580 elif header['msg_type'] == 'execute_reply':
1588 elif header['msg_type'] == 'execute_reply':
1581 res = ExecuteReply(msg_id, rcontent, md)
1589 res = ExecuteReply(msg_id, rcontent, md)
1582 else:
1590 else:
1583 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1591 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1584 else:
1592 else:
1585 res = self._unwrap_exception(rcontent)
1593 res = self._unwrap_exception(rcontent)
1586 failures.append(res)
1594 failures.append(res)
1587
1595
1588 self.results[msg_id] = res
1596 self.results[msg_id] = res
1589 content[msg_id] = res
1597 content[msg_id] = res
1590
1598
1591 if len(theids) == 1 and failures:
1599 if len(theids) == 1 and failures:
1592 raise failures[0]
1600 raise failures[0]
1593
1601
1594 error.collect_exceptions(failures, "result_status")
1602 error.collect_exceptions(failures, "result_status")
1595 return content
1603 return content
1596
1604
1597 @spin_first
1605 @spin_first
1598 def queue_status(self, targets='all', verbose=False):
1606 def queue_status(self, targets='all', verbose=False):
1599 """Fetch the status of engine queues.
1607 """Fetch the status of engine queues.
1600
1608
1601 Parameters
1609 Parameters
1602 ----------
1610 ----------
1603
1611
1604 targets : int/str/list of ints/strs
1612 targets : int/str/list of ints/strs
1605 the engines whose states are to be queried.
1613 the engines whose states are to be queried.
1606 default : all
1614 default : all
1607 verbose : bool
1615 verbose : bool
1608 Whether to return lengths only, or lists of ids for each element
1616 Whether to return lengths only, or lists of ids for each element
1609 """
1617 """
1610 if targets == 'all':
1618 if targets == 'all':
1611 # allow 'all' to be evaluated on the engine
1619 # allow 'all' to be evaluated on the engine
1612 engine_ids = None
1620 engine_ids = None
1613 else:
1621 else:
1614 engine_ids = self._build_targets(targets)[1]
1622 engine_ids = self._build_targets(targets)[1]
1615 content = dict(targets=engine_ids, verbose=verbose)
1623 content = dict(targets=engine_ids, verbose=verbose)
1616 self.session.send(self._query_socket, "queue_request", content=content)
1624 self.session.send(self._query_socket, "queue_request", content=content)
1617 idents,msg = self.session.recv(self._query_socket, 0)
1625 idents,msg = self.session.recv(self._query_socket, 0)
1618 if self.debug:
1626 if self.debug:
1619 pprint(msg)
1627 pprint(msg)
1620 content = msg['content']
1628 content = msg['content']
1621 status = content.pop('status')
1629 status = content.pop('status')
1622 if status != 'ok':
1630 if status != 'ok':
1623 raise self._unwrap_exception(content)
1631 raise self._unwrap_exception(content)
1624 content = rekey(content)
1632 content = rekey(content)
1625 if isinstance(targets, int):
1633 if isinstance(targets, int):
1626 return content[targets]
1634 return content[targets]
1627 else:
1635 else:
1628 return content
1636 return content
1629
1637
1630 def _build_msgids_from_target(self, targets=None):
1638 def _build_msgids_from_target(self, targets=None):
1631 """Build a list of msg_ids from the list of engine targets"""
1639 """Build a list of msg_ids from the list of engine targets"""
1632 if not targets: # needed as _build_targets otherwise uses all engines
1640 if not targets: # needed as _build_targets otherwise uses all engines
1633 return []
1641 return []
1634 target_ids = self._build_targets(targets)[0]
1642 target_ids = self._build_targets(targets)[0]
1635 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1643 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1636
1644
1637 def _build_msgids_from_jobs(self, jobs=None):
1645 def _build_msgids_from_jobs(self, jobs=None):
1638 """Build a list of msg_ids from "jobs" """
1646 """Build a list of msg_ids from "jobs" """
1639 if not jobs:
1647 if not jobs:
1640 return []
1648 return []
1641 msg_ids = []
1649 msg_ids = []
1642 if isinstance(jobs, (basestring,AsyncResult)):
1650 if isinstance(jobs, (basestring,AsyncResult)):
1643 jobs = [jobs]
1651 jobs = [jobs]
1644 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1652 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1645 if bad_ids:
1653 if bad_ids:
1646 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1654 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1647 for j in jobs:
1655 for j in jobs:
1648 if isinstance(j, AsyncResult):
1656 if isinstance(j, AsyncResult):
1649 msg_ids.extend(j.msg_ids)
1657 msg_ids.extend(j.msg_ids)
1650 else:
1658 else:
1651 msg_ids.append(j)
1659 msg_ids.append(j)
1652 return msg_ids
1660 return msg_ids
1653
1661
1654 def purge_local_results(self, jobs=[], targets=[]):
1662 def purge_local_results(self, jobs=[], targets=[]):
1655 """Clears the client caches of results and frees such memory.
1663 """Clears the client caches of results and frees such memory.
1656
1664
1657 Individual results can be purged by msg_id, or the entire
1665 Individual results can be purged by msg_id, or the entire
1658 history of specific targets can be purged.
1666 history of specific targets can be purged.
1659
1667
1660 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1668 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1661
1669
1662 The client must have no outstanding tasks before purging the caches.
1670 The client must have no outstanding tasks before purging the caches.
1663 Raises `AssertionError` if there are still outstanding tasks.
1671 Raises `AssertionError` if there are still outstanding tasks.
1664
1672
1665 After this call all `AsyncResults` are invalid and should be discarded.
1673 After this call all `AsyncResults` are invalid and should be discarded.
1666
1674
1667 If you must "reget" the results, you can still do so by using
1675 If you must "reget" the results, you can still do so by using
1668 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1676 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1669 redownload the results from the hub if they are still available
1677 redownload the results from the hub if they are still available
1670 (i.e `client.purge_hub_results(...)` has not been called.
1678 (i.e `client.purge_hub_results(...)` has not been called.
1671
1679
1672 Parameters
1680 Parameters
1673 ----------
1681 ----------
1674
1682
1675 jobs : str or list of str or AsyncResult objects
1683 jobs : str or list of str or AsyncResult objects
1676 the msg_ids whose results should be purged.
1684 the msg_ids whose results should be purged.
1677 targets : int/str/list of ints/strs
1685 targets : int/str/list of ints/strs
1678 The targets, by int_id, whose entire results are to be purged.
1686 The targets, by int_id, whose entire results are to be purged.
1679
1687
1680 default : None
1688 default : None
1681 """
1689 """
1682 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1690 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1683
1691
1684 if not targets and not jobs:
1692 if not targets and not jobs:
1685 raise ValueError("Must specify at least one of `targets` and `jobs`")
1693 raise ValueError("Must specify at least one of `targets` and `jobs`")
1686
1694
1687 if jobs == 'all':
1695 if jobs == 'all':
1688 self.results.clear()
1696 self.results.clear()
1689 self.metadata.clear()
1697 self.metadata.clear()
1690 return
1698 return
1691 else:
1699 else:
1692 msg_ids = []
1700 msg_ids = []
1693 msg_ids.extend(self._build_msgids_from_target(targets))
1701 msg_ids.extend(self._build_msgids_from_target(targets))
1694 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1702 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1695 map(self.results.pop, msg_ids)
1703 map(self.results.pop, msg_ids)
1696 map(self.metadata.pop, msg_ids)
1704 map(self.metadata.pop, msg_ids)
1697
1705
1698
1706
1699 @spin_first
1707 @spin_first
1700 def purge_hub_results(self, jobs=[], targets=[]):
1708 def purge_hub_results(self, jobs=[], targets=[]):
1701 """Tell the Hub to forget results.
1709 """Tell the Hub to forget results.
1702
1710
1703 Individual results can be purged by msg_id, or the entire
1711 Individual results can be purged by msg_id, or the entire
1704 history of specific targets can be purged.
1712 history of specific targets can be purged.
1705
1713
1706 Use `purge_results('all')` to scrub everything from the Hub's db.
1714 Use `purge_results('all')` to scrub everything from the Hub's db.
1707
1715
1708 Parameters
1716 Parameters
1709 ----------
1717 ----------
1710
1718
1711 jobs : str or list of str or AsyncResult objects
1719 jobs : str or list of str or AsyncResult objects
1712 the msg_ids whose results should be forgotten.
1720 the msg_ids whose results should be forgotten.
1713 targets : int/str/list of ints/strs
1721 targets : int/str/list of ints/strs
1714 The targets, by int_id, whose entire history is to be purged.
1722 The targets, by int_id, whose entire history is to be purged.
1715
1723
1716 default : None
1724 default : None
1717 """
1725 """
1718 if not targets and not jobs:
1726 if not targets and not jobs:
1719 raise ValueError("Must specify at least one of `targets` and `jobs`")
1727 raise ValueError("Must specify at least one of `targets` and `jobs`")
1720 if targets:
1728 if targets:
1721 targets = self._build_targets(targets)[1]
1729 targets = self._build_targets(targets)[1]
1722
1730
1723 # construct msg_ids from jobs
1731 # construct msg_ids from jobs
1724 if jobs == 'all':
1732 if jobs == 'all':
1725 msg_ids = jobs
1733 msg_ids = jobs
1726 else:
1734 else:
1727 msg_ids = self._build_msgids_from_jobs(jobs)
1735 msg_ids = self._build_msgids_from_jobs(jobs)
1728
1736
1729 content = dict(engine_ids=targets, msg_ids=msg_ids)
1737 content = dict(engine_ids=targets, msg_ids=msg_ids)
1730 self.session.send(self._query_socket, "purge_request", content=content)
1738 self.session.send(self._query_socket, "purge_request", content=content)
1731 idents, msg = self.session.recv(self._query_socket, 0)
1739 idents, msg = self.session.recv(self._query_socket, 0)
1732 if self.debug:
1740 if self.debug:
1733 pprint(msg)
1741 pprint(msg)
1734 content = msg['content']
1742 content = msg['content']
1735 if content['status'] != 'ok':
1743 if content['status'] != 'ok':
1736 raise self._unwrap_exception(content)
1744 raise self._unwrap_exception(content)
1737
1745
1738 def purge_results(self, jobs=[], targets=[]):
1746 def purge_results(self, jobs=[], targets=[]):
1739 """Clears the cached results from both the hub and the local client
1747 """Clears the cached results from both the hub and the local client
1740
1748
1741 Individual results can be purged by msg_id, or the entire
1749 Individual results can be purged by msg_id, or the entire
1742 history of specific targets can be purged.
1750 history of specific targets can be purged.
1743
1751
1744 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1752 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1745 the Client's db.
1753 the Client's db.
1746
1754
1747 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1755 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1748 the same arguments.
1756 the same arguments.
1749
1757
1750 Parameters
1758 Parameters
1751 ----------
1759 ----------
1752
1760
1753 jobs : str or list of str or AsyncResult objects
1761 jobs : str or list of str or AsyncResult objects
1754 the msg_ids whose results should be forgotten.
1762 the msg_ids whose results should be forgotten.
1755 targets : int/str/list of ints/strs
1763 targets : int/str/list of ints/strs
1756 The targets, by int_id, whose entire history is to be purged.
1764 The targets, by int_id, whose entire history is to be purged.
1757
1765
1758 default : None
1766 default : None
1759 """
1767 """
1760 self.purge_local_results(jobs=jobs, targets=targets)
1768 self.purge_local_results(jobs=jobs, targets=targets)
1761 self.purge_hub_results(jobs=jobs, targets=targets)
1769 self.purge_hub_results(jobs=jobs, targets=targets)
1762
1770
1763 def purge_everything(self):
1771 def purge_everything(self):
1764 """Clears all content from previous Tasks from both the hub and the local client
1772 """Clears all content from previous Tasks from both the hub and the local client
1765
1773
1766 In addition to calling `purge_results("all")` it also deletes the history and
1774 In addition to calling `purge_results("all")` it also deletes the history and
1767 other bookkeeping lists.
1775 other bookkeeping lists.
1768 """
1776 """
1769 self.purge_results("all")
1777 self.purge_results("all")
1770 self.history = []
1778 self.history = []
1771 self.session.digest_history.clear()
1779 self.session.digest_history.clear()
1772
1780
1773 @spin_first
1781 @spin_first
1774 def hub_history(self):
1782 def hub_history(self):
1775 """Get the Hub's history
1783 """Get the Hub's history
1776
1784
1777 Just like the Client, the Hub has a history, which is a list of msg_ids.
1785 Just like the Client, the Hub has a history, which is a list of msg_ids.
1778 This will contain the history of all clients, and, depending on configuration,
1786 This will contain the history of all clients, and, depending on configuration,
1779 may contain history across multiple cluster sessions.
1787 may contain history across multiple cluster sessions.
1780
1788
1781 Any msg_id returned here is a valid argument to `get_result`.
1789 Any msg_id returned here is a valid argument to `get_result`.
1782
1790
1783 Returns
1791 Returns
1784 -------
1792 -------
1785
1793
1786 msg_ids : list of strs
1794 msg_ids : list of strs
1787 list of all msg_ids, ordered by task submission time.
1795 list of all msg_ids, ordered by task submission time.
1788 """
1796 """
1789
1797
1790 self.session.send(self._query_socket, "history_request", content={})
1798 self.session.send(self._query_socket, "history_request", content={})
1791 idents, msg = self.session.recv(self._query_socket, 0)
1799 idents, msg = self.session.recv(self._query_socket, 0)
1792
1800
1793 if self.debug:
1801 if self.debug:
1794 pprint(msg)
1802 pprint(msg)
1795 content = msg['content']
1803 content = msg['content']
1796 if content['status'] != 'ok':
1804 if content['status'] != 'ok':
1797 raise self._unwrap_exception(content)
1805 raise self._unwrap_exception(content)
1798 else:
1806 else:
1799 return content['history']
1807 return content['history']
1800
1808
1801 @spin_first
1809 @spin_first
1802 def db_query(self, query, keys=None):
1810 def db_query(self, query, keys=None):
1803 """Query the Hub's TaskRecord database
1811 """Query the Hub's TaskRecord database
1804
1812
1805 This will return a list of task record dicts that match `query`
1813 This will return a list of task record dicts that match `query`
1806
1814
1807 Parameters
1815 Parameters
1808 ----------
1816 ----------
1809
1817
1810 query : mongodb query dict
1818 query : mongodb query dict
1811 The search dict. See mongodb query docs for details.
1819 The search dict. See mongodb query docs for details.
1812 keys : list of strs [optional]
1820 keys : list of strs [optional]
1813 The subset of keys to be returned. The default is to fetch everything but buffers.
1821 The subset of keys to be returned. The default is to fetch everything but buffers.
1814 'msg_id' will *always* be included.
1822 'msg_id' will *always* be included.
1815 """
1823 """
1816 if isinstance(keys, basestring):
1824 if isinstance(keys, basestring):
1817 keys = [keys]
1825 keys = [keys]
1818 content = dict(query=query, keys=keys)
1826 content = dict(query=query, keys=keys)
1819 self.session.send(self._query_socket, "db_request", content=content)
1827 self.session.send(self._query_socket, "db_request", content=content)
1820 idents, msg = self.session.recv(self._query_socket, 0)
1828 idents, msg = self.session.recv(self._query_socket, 0)
1821 if self.debug:
1829 if self.debug:
1822 pprint(msg)
1830 pprint(msg)
1823 content = msg['content']
1831 content = msg['content']
1824 if content['status'] != 'ok':
1832 if content['status'] != 'ok':
1825 raise self._unwrap_exception(content)
1833 raise self._unwrap_exception(content)
1826
1834
1827 records = content['records']
1835 records = content['records']
1828
1836
1829 buffer_lens = content['buffer_lens']
1837 buffer_lens = content['buffer_lens']
1830 result_buffer_lens = content['result_buffer_lens']
1838 result_buffer_lens = content['result_buffer_lens']
1831 buffers = msg['buffers']
1839 buffers = msg['buffers']
1832 has_bufs = buffer_lens is not None
1840 has_bufs = buffer_lens is not None
1833 has_rbufs = result_buffer_lens is not None
1841 has_rbufs = result_buffer_lens is not None
1834 for i,rec in enumerate(records):
1842 for i,rec in enumerate(records):
1835 # relink buffers
1843 # relink buffers
1836 if has_bufs:
1844 if has_bufs:
1837 blen = buffer_lens[i]
1845 blen = buffer_lens[i]
1838 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1846 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1839 if has_rbufs:
1847 if has_rbufs:
1840 blen = result_buffer_lens[i]
1848 blen = result_buffer_lens[i]
1841 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1849 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1842
1850
1843 return records
1851 return records
1844
1852
1845 __all__ = [ 'Client' ]
1853 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now