##// END OF EJS Templates
handle data_pub in parallel Client
MinRK -
Show More
@@ -1,1713 +1,1718 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
36
36
37 from IPython.utils.coloransi import TermColors
37 from IPython.utils.coloransi import TermColors
38 from IPython.utils.jsonutil import rekey
38 from IPython.utils.jsonutil import rekey
39 from IPython.utils.localinterfaces import LOCAL_IPS
39 from IPython.utils.localinterfaces import LOCAL_IPS
40 from IPython.utils.path import get_ipython_dir
40 from IPython.utils.path import get_ipython_dir
41 from IPython.utils.py3compat import cast_bytes
41 from IPython.utils.py3compat import cast_bytes
42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
43 Dict, List, Bool, Set, Any)
43 Dict, List, Bool, Set, Any)
44 from IPython.external.decorator import decorator
44 from IPython.external.decorator import decorator
45 from IPython.external.ssh import tunnel
45 from IPython.external.ssh import tunnel
46
46
47 from IPython.parallel import Reference
47 from IPython.parallel import Reference
48 from IPython.parallel import error
48 from IPython.parallel import error
49 from IPython.parallel import util
49 from IPython.parallel import util
50
50
51 from IPython.zmq.session import Session, Message
51 from IPython.zmq.session import Session, Message
52 from IPython.zmq import serialize
52
53
53 from .asyncresult import AsyncResult, AsyncHubResult
54 from .asyncresult import AsyncResult, AsyncHubResult
54 from .view import DirectView, LoadBalancedView
55 from .view import DirectView, LoadBalancedView
55
56
56 if sys.version_info[0] >= 3:
57 if sys.version_info[0] >= 3:
57 # xrange is used in a couple 'isinstance' tests in py2
58 # xrange is used in a couple 'isinstance' tests in py2
58 # should be just 'range' in 3k
59 # should be just 'range' in 3k
59 xrange = range
60 xrange = range
60
61
61 #--------------------------------------------------------------------------
62 #--------------------------------------------------------------------------
62 # Decorators for Client methods
63 # Decorators for Client methods
63 #--------------------------------------------------------------------------
64 #--------------------------------------------------------------------------
64
65
65 @decorator
66 @decorator
66 def spin_first(f, self, *args, **kwargs):
67 def spin_first(f, self, *args, **kwargs):
67 """Call spin() to sync state prior to calling the method."""
68 """Call spin() to sync state prior to calling the method."""
68 self.spin()
69 self.spin()
69 return f(self, *args, **kwargs)
70 return f(self, *args, **kwargs)
70
71
71
72
72 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
73 # Classes
74 # Classes
74 #--------------------------------------------------------------------------
75 #--------------------------------------------------------------------------
75
76
76
77
77 class ExecuteReply(object):
78 class ExecuteReply(object):
78 """wrapper for finished Execute results"""
79 """wrapper for finished Execute results"""
79 def __init__(self, msg_id, content, metadata):
80 def __init__(self, msg_id, content, metadata):
80 self.msg_id = msg_id
81 self.msg_id = msg_id
81 self._content = content
82 self._content = content
82 self.execution_count = content['execution_count']
83 self.execution_count = content['execution_count']
83 self.metadata = metadata
84 self.metadata = metadata
84
85
85 def __getitem__(self, key):
86 def __getitem__(self, key):
86 return self.metadata[key]
87 return self.metadata[key]
87
88
88 def __getattr__(self, key):
89 def __getattr__(self, key):
89 if key not in self.metadata:
90 if key not in self.metadata:
90 raise AttributeError(key)
91 raise AttributeError(key)
91 return self.metadata[key]
92 return self.metadata[key]
92
93
93 def __repr__(self):
94 def __repr__(self):
94 pyout = self.metadata['pyout'] or {'data':{}}
95 pyout = self.metadata['pyout'] or {'data':{}}
95 text_out = pyout['data'].get('text/plain', '')
96 text_out = pyout['data'].get('text/plain', '')
96 if len(text_out) > 32:
97 if len(text_out) > 32:
97 text_out = text_out[:29] + '...'
98 text_out = text_out[:29] + '...'
98
99
99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100
101
101 def _repr_pretty_(self, p, cycle):
102 def _repr_pretty_(self, p, cycle):
102 pyout = self.metadata['pyout'] or {'data':{}}
103 pyout = self.metadata['pyout'] or {'data':{}}
103 text_out = pyout['data'].get('text/plain', '')
104 text_out = pyout['data'].get('text/plain', '')
104
105
105 if not text_out:
106 if not text_out:
106 return
107 return
107
108
108 try:
109 try:
109 ip = get_ipython()
110 ip = get_ipython()
110 except NameError:
111 except NameError:
111 colors = "NoColor"
112 colors = "NoColor"
112 else:
113 else:
113 colors = ip.colors
114 colors = ip.colors
114
115
115 if colors == "NoColor":
116 if colors == "NoColor":
116 out = normal = ""
117 out = normal = ""
117 else:
118 else:
118 out = TermColors.Red
119 out = TermColors.Red
119 normal = TermColors.Normal
120 normal = TermColors.Normal
120
121
121 if '\n' in text_out and not text_out.startswith('\n'):
122 if '\n' in text_out and not text_out.startswith('\n'):
122 # add newline for multiline reprs
123 # add newline for multiline reprs
123 text_out = '\n' + text_out
124 text_out = '\n' + text_out
124
125
125 p.text(
126 p.text(
126 out + u'Out[%i:%i]: ' % (
127 out + u'Out[%i:%i]: ' % (
127 self.metadata['engine_id'], self.execution_count
128 self.metadata['engine_id'], self.execution_count
128 ) + normal + text_out
129 ) + normal + text_out
129 )
130 )
130
131
131 def _repr_html_(self):
132 def _repr_html_(self):
132 pyout = self.metadata['pyout'] or {'data':{}}
133 pyout = self.metadata['pyout'] or {'data':{}}
133 return pyout['data'].get("text/html")
134 return pyout['data'].get("text/html")
134
135
135 def _repr_latex_(self):
136 def _repr_latex_(self):
136 pyout = self.metadata['pyout'] or {'data':{}}
137 pyout = self.metadata['pyout'] or {'data':{}}
137 return pyout['data'].get("text/latex")
138 return pyout['data'].get("text/latex")
138
139
139 def _repr_json_(self):
140 def _repr_json_(self):
140 pyout = self.metadata['pyout'] or {'data':{}}
141 pyout = self.metadata['pyout'] or {'data':{}}
141 return pyout['data'].get("application/json")
142 return pyout['data'].get("application/json")
142
143
143 def _repr_javascript_(self):
144 def _repr_javascript_(self):
144 pyout = self.metadata['pyout'] or {'data':{}}
145 pyout = self.metadata['pyout'] or {'data':{}}
145 return pyout['data'].get("application/javascript")
146 return pyout['data'].get("application/javascript")
146
147
147 def _repr_png_(self):
148 def _repr_png_(self):
148 pyout = self.metadata['pyout'] or {'data':{}}
149 pyout = self.metadata['pyout'] or {'data':{}}
149 return pyout['data'].get("image/png")
150 return pyout['data'].get("image/png")
150
151
151 def _repr_jpeg_(self):
152 def _repr_jpeg_(self):
152 pyout = self.metadata['pyout'] or {'data':{}}
153 pyout = self.metadata['pyout'] or {'data':{}}
153 return pyout['data'].get("image/jpeg")
154 return pyout['data'].get("image/jpeg")
154
155
155 def _repr_svg_(self):
156 def _repr_svg_(self):
156 pyout = self.metadata['pyout'] or {'data':{}}
157 pyout = self.metadata['pyout'] or {'data':{}}
157 return pyout['data'].get("image/svg+xml")
158 return pyout['data'].get("image/svg+xml")
158
159
159
160
160 class Metadata(dict):
161 class Metadata(dict):
161 """Subclass of dict for initializing metadata values.
162 """Subclass of dict for initializing metadata values.
162
163
163 Attribute access works on keys.
164 Attribute access works on keys.
164
165
165 These objects have a strict set of keys - errors will raise if you try
166 These objects have a strict set of keys - errors will raise if you try
166 to add new keys.
167 to add new keys.
167 """
168 """
168 def __init__(self, *args, **kwargs):
169 def __init__(self, *args, **kwargs):
169 dict.__init__(self)
170 dict.__init__(self)
170 md = {'msg_id' : None,
171 md = {'msg_id' : None,
171 'submitted' : None,
172 'submitted' : None,
172 'started' : None,
173 'started' : None,
173 'completed' : None,
174 'completed' : None,
174 'received' : None,
175 'received' : None,
175 'engine_uuid' : None,
176 'engine_uuid' : None,
176 'engine_id' : None,
177 'engine_id' : None,
177 'follow' : None,
178 'follow' : None,
178 'after' : None,
179 'after' : None,
179 'status' : None,
180 'status' : None,
180
181
181 'pyin' : None,
182 'pyin' : None,
182 'pyout' : None,
183 'pyout' : None,
183 'pyerr' : None,
184 'pyerr' : None,
184 'stdout' : '',
185 'stdout' : '',
185 'stderr' : '',
186 'stderr' : '',
186 'outputs' : [],
187 'outputs' : [],
188 'data': {},
187 'outputs_ready' : False,
189 'outputs_ready' : False,
188 }
190 }
189 self.update(md)
191 self.update(md)
190 self.update(dict(*args, **kwargs))
192 self.update(dict(*args, **kwargs))
191
193
192 def __getattr__(self, key):
194 def __getattr__(self, key):
193 """getattr aliased to getitem"""
195 """getattr aliased to getitem"""
194 if key in self.iterkeys():
196 if key in self.iterkeys():
195 return self[key]
197 return self[key]
196 else:
198 else:
197 raise AttributeError(key)
199 raise AttributeError(key)
198
200
199 def __setattr__(self, key, value):
201 def __setattr__(self, key, value):
200 """setattr aliased to setitem, with strict"""
202 """setattr aliased to setitem, with strict"""
201 if key in self.iterkeys():
203 if key in self.iterkeys():
202 self[key] = value
204 self[key] = value
203 else:
205 else:
204 raise AttributeError(key)
206 raise AttributeError(key)
205
207
206 def __setitem__(self, key, value):
208 def __setitem__(self, key, value):
207 """strict static key enforcement"""
209 """strict static key enforcement"""
208 if key in self.iterkeys():
210 if key in self.iterkeys():
209 dict.__setitem__(self, key, value)
211 dict.__setitem__(self, key, value)
210 else:
212 else:
211 raise KeyError(key)
213 raise KeyError(key)
212
214
213
215
214 class Client(HasTraits):
216 class Client(HasTraits):
215 """A semi-synchronous client to the IPython ZMQ cluster
217 """A semi-synchronous client to the IPython ZMQ cluster
216
218
217 Parameters
219 Parameters
218 ----------
220 ----------
219
221
220 url_file : str/unicode; path to ipcontroller-client.json
222 url_file : str/unicode; path to ipcontroller-client.json
221 This JSON file should contain all the information needed to connect to a cluster,
223 This JSON file should contain all the information needed to connect to a cluster,
222 and is likely the only argument needed.
224 and is likely the only argument needed.
223 Connection information for the Hub's registration. If a json connector
225 Connection information for the Hub's registration. If a json connector
224 file is given, then likely no further configuration is necessary.
226 file is given, then likely no further configuration is necessary.
225 [Default: use profile]
227 [Default: use profile]
226 profile : bytes
228 profile : bytes
227 The name of the Cluster profile to be used to find connector information.
229 The name of the Cluster profile to be used to find connector information.
228 If run from an IPython application, the default profile will be the same
230 If run from an IPython application, the default profile will be the same
229 as the running application, otherwise it will be 'default'.
231 as the running application, otherwise it will be 'default'.
230 context : zmq.Context
232 context : zmq.Context
231 Pass an existing zmq.Context instance, otherwise the client will create its own.
233 Pass an existing zmq.Context instance, otherwise the client will create its own.
232 debug : bool
234 debug : bool
233 flag for lots of message printing for debug purposes
235 flag for lots of message printing for debug purposes
234 timeout : int/float
236 timeout : int/float
235 time (in seconds) to wait for connection replies from the Hub
237 time (in seconds) to wait for connection replies from the Hub
236 [Default: 10]
238 [Default: 10]
237
239
238 #-------------- session related args ----------------
240 #-------------- session related args ----------------
239
241
240 config : Config object
242 config : Config object
241 If specified, this will be relayed to the Session for configuration
243 If specified, this will be relayed to the Session for configuration
242 username : str
244 username : str
243 set username for the session object
245 set username for the session object
244
246
245 #-------------- ssh related args ----------------
247 #-------------- ssh related args ----------------
246 # These are args for configuring the ssh tunnel to be used
248 # These are args for configuring the ssh tunnel to be used
247 # credentials are used to forward connections over ssh to the Controller
249 # credentials are used to forward connections over ssh to the Controller
248 # Note that the ip given in `addr` needs to be relative to sshserver
250 # Note that the ip given in `addr` needs to be relative to sshserver
249 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
251 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
250 # and set sshserver as the same machine the Controller is on. However,
252 # and set sshserver as the same machine the Controller is on. However,
251 # the only requirement is that sshserver is able to see the Controller
253 # the only requirement is that sshserver is able to see the Controller
252 # (i.e. is within the same trusted network).
254 # (i.e. is within the same trusted network).
253
255
254 sshserver : str
256 sshserver : str
255 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
257 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
256 If keyfile or password is specified, and this is not, it will default to
258 If keyfile or password is specified, and this is not, it will default to
257 the ip given in addr.
259 the ip given in addr.
258 sshkey : str; path to ssh private key file
260 sshkey : str; path to ssh private key file
259 This specifies a key to be used in ssh login, default None.
261 This specifies a key to be used in ssh login, default None.
260 Regular default ssh keys will be used without specifying this argument.
262 Regular default ssh keys will be used without specifying this argument.
261 password : str
263 password : str
262 Your ssh password to sshserver. Note that if this is left None,
264 Your ssh password to sshserver. Note that if this is left None,
263 you will be prompted for it if passwordless key based login is unavailable.
265 you will be prompted for it if passwordless key based login is unavailable.
264 paramiko : bool
266 paramiko : bool
265 flag for whether to use paramiko instead of shell ssh for tunneling.
267 flag for whether to use paramiko instead of shell ssh for tunneling.
266 [default: True on win32, False else]
268 [default: True on win32, False else]
267
269
268
270
269 Attributes
271 Attributes
270 ----------
272 ----------
271
273
272 ids : list of int engine IDs
274 ids : list of int engine IDs
273 requesting the ids attribute always synchronizes
275 requesting the ids attribute always synchronizes
274 the registration state. To request ids without synchronization,
276 the registration state. To request ids without synchronization,
275 use semi-private _ids attributes.
277 use semi-private _ids attributes.
276
278
277 history : list of msg_ids
279 history : list of msg_ids
278 a list of msg_ids, keeping track of all the execution
280 a list of msg_ids, keeping track of all the execution
279 messages you have submitted in order.
281 messages you have submitted in order.
280
282
281 outstanding : set of msg_ids
283 outstanding : set of msg_ids
282 a set of msg_ids that have been submitted, but whose
284 a set of msg_ids that have been submitted, but whose
283 results have not yet been received.
285 results have not yet been received.
284
286
285 results : dict
287 results : dict
286 a dict of all our results, keyed by msg_id
288 a dict of all our results, keyed by msg_id
287
289
288 block : bool
290 block : bool
289 determines default behavior when block not specified
291 determines default behavior when block not specified
290 in execution methods
292 in execution methods
291
293
292 Methods
294 Methods
293 -------
295 -------
294
296
295 spin
297 spin
296 flushes incoming results and registration state changes
298 flushes incoming results and registration state changes
297 control methods spin, and requesting `ids` also ensures up to date
299 control methods spin, and requesting `ids` also ensures up to date
298
300
299 wait
301 wait
300 wait on one or more msg_ids
302 wait on one or more msg_ids
301
303
302 execution methods
304 execution methods
303 apply
305 apply
304 legacy: execute, run
306 legacy: execute, run
305
307
306 data movement
308 data movement
307 push, pull, scatter, gather
309 push, pull, scatter, gather
308
310
309 query methods
311 query methods
310 queue_status, get_result, purge, result_status
312 queue_status, get_result, purge, result_status
311
313
312 control methods
314 control methods
313 abort, shutdown
315 abort, shutdown
314
316
315 """
317 """
316
318
317
319
318 block = Bool(False)
320 block = Bool(False)
319 outstanding = Set()
321 outstanding = Set()
320 results = Instance('collections.defaultdict', (dict,))
322 results = Instance('collections.defaultdict', (dict,))
321 metadata = Instance('collections.defaultdict', (Metadata,))
323 metadata = Instance('collections.defaultdict', (Metadata,))
322 history = List()
324 history = List()
323 debug = Bool(False)
325 debug = Bool(False)
324 _spin_thread = Any()
326 _spin_thread = Any()
325 _stop_spinning = Any()
327 _stop_spinning = Any()
326
328
327 profile=Unicode()
329 profile=Unicode()
328 def _profile_default(self):
330 def _profile_default(self):
329 if BaseIPythonApplication.initialized():
331 if BaseIPythonApplication.initialized():
330 # an IPython app *might* be running, try to get its profile
332 # an IPython app *might* be running, try to get its profile
331 try:
333 try:
332 return BaseIPythonApplication.instance().profile
334 return BaseIPythonApplication.instance().profile
333 except (AttributeError, MultipleInstanceError):
335 except (AttributeError, MultipleInstanceError):
334 # could be a *different* subclass of config.Application,
336 # could be a *different* subclass of config.Application,
335 # which would raise one of these two errors.
337 # which would raise one of these two errors.
336 return u'default'
338 return u'default'
337 else:
339 else:
338 return u'default'
340 return u'default'
339
341
340
342
341 _outstanding_dict = Instance('collections.defaultdict', (set,))
343 _outstanding_dict = Instance('collections.defaultdict', (set,))
342 _ids = List()
344 _ids = List()
343 _connected=Bool(False)
345 _connected=Bool(False)
344 _ssh=Bool(False)
346 _ssh=Bool(False)
345 _context = Instance('zmq.Context')
347 _context = Instance('zmq.Context')
346 _config = Dict()
348 _config = Dict()
347 _engines=Instance(util.ReverseDict, (), {})
349 _engines=Instance(util.ReverseDict, (), {})
348 # _hub_socket=Instance('zmq.Socket')
350 # _hub_socket=Instance('zmq.Socket')
349 _query_socket=Instance('zmq.Socket')
351 _query_socket=Instance('zmq.Socket')
350 _control_socket=Instance('zmq.Socket')
352 _control_socket=Instance('zmq.Socket')
351 _iopub_socket=Instance('zmq.Socket')
353 _iopub_socket=Instance('zmq.Socket')
352 _notification_socket=Instance('zmq.Socket')
354 _notification_socket=Instance('zmq.Socket')
353 _mux_socket=Instance('zmq.Socket')
355 _mux_socket=Instance('zmq.Socket')
354 _task_socket=Instance('zmq.Socket')
356 _task_socket=Instance('zmq.Socket')
355 _task_scheme=Unicode()
357 _task_scheme=Unicode()
356 _closed = False
358 _closed = False
357 _ignored_control_replies=Integer(0)
359 _ignored_control_replies=Integer(0)
358 _ignored_hub_replies=Integer(0)
360 _ignored_hub_replies=Integer(0)
359
361
360 def __new__(self, *args, **kw):
362 def __new__(self, *args, **kw):
361 # don't raise on positional args
363 # don't raise on positional args
362 return HasTraits.__new__(self, **kw)
364 return HasTraits.__new__(self, **kw)
363
365
364 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
366 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
365 context=None, debug=False,
367 context=None, debug=False,
366 sshserver=None, sshkey=None, password=None, paramiko=None,
368 sshserver=None, sshkey=None, password=None, paramiko=None,
367 timeout=10, **extra_args
369 timeout=10, **extra_args
368 ):
370 ):
369 if profile:
371 if profile:
370 super(Client, self).__init__(debug=debug, profile=profile)
372 super(Client, self).__init__(debug=debug, profile=profile)
371 else:
373 else:
372 super(Client, self).__init__(debug=debug)
374 super(Client, self).__init__(debug=debug)
373 if context is None:
375 if context is None:
374 context = zmq.Context.instance()
376 context = zmq.Context.instance()
375 self._context = context
377 self._context = context
376 self._stop_spinning = Event()
378 self._stop_spinning = Event()
377
379
378 if 'url_or_file' in extra_args:
380 if 'url_or_file' in extra_args:
379 url_file = extra_args['url_or_file']
381 url_file = extra_args['url_or_file']
380 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
382 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
381
383
382 if url_file and util.is_url(url_file):
384 if url_file and util.is_url(url_file):
383 raise ValueError("single urls cannot be specified, url-files must be used.")
385 raise ValueError("single urls cannot be specified, url-files must be used.")
384
386
385 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
387 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
386
388
387 if self._cd is not None:
389 if self._cd is not None:
388 if url_file is None:
390 if url_file is None:
389 url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
391 url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
390 if url_file is None:
392 if url_file is None:
391 raise ValueError(
393 raise ValueError(
392 "I can't find enough information to connect to a hub!"
394 "I can't find enough information to connect to a hub!"
393 " Please specify at least one of url_file or profile."
395 " Please specify at least one of url_file or profile."
394 )
396 )
395
397
396 with open(url_file) as f:
398 with open(url_file) as f:
397 cfg = json.load(f)
399 cfg = json.load(f)
398
400
399 self._task_scheme = cfg['task_scheme']
401 self._task_scheme = cfg['task_scheme']
400
402
401 # sync defaults from args, json:
403 # sync defaults from args, json:
402 if sshserver:
404 if sshserver:
403 cfg['ssh'] = sshserver
405 cfg['ssh'] = sshserver
404
406
405 location = cfg.setdefault('location', None)
407 location = cfg.setdefault('location', None)
406
408
407 proto,addr = cfg['interface'].split('://')
409 proto,addr = cfg['interface'].split('://')
408 addr = util.disambiguate_ip_address(addr)
410 addr = util.disambiguate_ip_address(addr)
409 cfg['interface'] = "%s://%s" % (proto, addr)
411 cfg['interface'] = "%s://%s" % (proto, addr)
410
412
411 # turn interface,port into full urls:
413 # turn interface,port into full urls:
412 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
414 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
413 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
415 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
414
416
415 url = cfg['registration']
417 url = cfg['registration']
416
418
417 if location is not None and addr == '127.0.0.1':
419 if location is not None and addr == '127.0.0.1':
418 # location specified, and connection is expected to be local
420 # location specified, and connection is expected to be local
419 if location not in LOCAL_IPS and not sshserver:
421 if location not in LOCAL_IPS and not sshserver:
420 # load ssh from JSON *only* if the controller is not on
422 # load ssh from JSON *only* if the controller is not on
421 # this machine
423 # this machine
422 sshserver=cfg['ssh']
424 sshserver=cfg['ssh']
423 if location not in LOCAL_IPS and not sshserver:
425 if location not in LOCAL_IPS and not sshserver:
424 # warn if no ssh specified, but SSH is probably needed
426 # warn if no ssh specified, but SSH is probably needed
425 # This is only a warning, because the most likely cause
427 # This is only a warning, because the most likely cause
426 # is a local Controller on a laptop whose IP is dynamic
428 # is a local Controller on a laptop whose IP is dynamic
427 warnings.warn("""
429 warnings.warn("""
428 Controller appears to be listening on localhost, but not on this machine.
430 Controller appears to be listening on localhost, but not on this machine.
429 If this is true, you should specify Client(...,sshserver='you@%s')
431 If this is true, you should specify Client(...,sshserver='you@%s')
430 or instruct your controller to listen on an external IP."""%location,
432 or instruct your controller to listen on an external IP."""%location,
431 RuntimeWarning)
433 RuntimeWarning)
432 elif not sshserver:
434 elif not sshserver:
433 # otherwise sync with cfg
435 # otherwise sync with cfg
434 sshserver = cfg['ssh']
436 sshserver = cfg['ssh']
435
437
436 self._config = cfg
438 self._config = cfg
437
439
438 self._ssh = bool(sshserver or sshkey or password)
440 self._ssh = bool(sshserver or sshkey or password)
439 if self._ssh and sshserver is None:
441 if self._ssh and sshserver is None:
440 # default to ssh via localhost
442 # default to ssh via localhost
441 sshserver = addr
443 sshserver = addr
442 if self._ssh and password is None:
444 if self._ssh and password is None:
443 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
445 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
444 password=False
446 password=False
445 else:
447 else:
446 password = getpass("SSH Password for %s: "%sshserver)
448 password = getpass("SSH Password for %s: "%sshserver)
447 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
449 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
448
450
449 # configure and construct the session
451 # configure and construct the session
450 extra_args['packer'] = cfg['pack']
452 extra_args['packer'] = cfg['pack']
451 extra_args['unpacker'] = cfg['unpack']
453 extra_args['unpacker'] = cfg['unpack']
452 extra_args['key'] = cast_bytes(cfg['exec_key'])
454 extra_args['key'] = cast_bytes(cfg['exec_key'])
453
455
454 self.session = Session(**extra_args)
456 self.session = Session(**extra_args)
455
457
456 self._query_socket = self._context.socket(zmq.DEALER)
458 self._query_socket = self._context.socket(zmq.DEALER)
457
459
458 if self._ssh:
460 if self._ssh:
459 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
461 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
460 else:
462 else:
461 self._query_socket.connect(cfg['registration'])
463 self._query_socket.connect(cfg['registration'])
462
464
463 self.session.debug = self.debug
465 self.session.debug = self.debug
464
466
465 self._notification_handlers = {'registration_notification' : self._register_engine,
467 self._notification_handlers = {'registration_notification' : self._register_engine,
466 'unregistration_notification' : self._unregister_engine,
468 'unregistration_notification' : self._unregister_engine,
467 'shutdown_notification' : lambda msg: self.close(),
469 'shutdown_notification' : lambda msg: self.close(),
468 }
470 }
469 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
471 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
470 'apply_reply' : self._handle_apply_reply}
472 'apply_reply' : self._handle_apply_reply}
471 self._connect(sshserver, ssh_kwargs, timeout)
473 self._connect(sshserver, ssh_kwargs, timeout)
472
474
473 # last step: setup magics, if we are in IPython:
475 # last step: setup magics, if we are in IPython:
474
476
475 try:
477 try:
476 ip = get_ipython()
478 ip = get_ipython()
477 except NameError:
479 except NameError:
478 return
480 return
479 else:
481 else:
480 if 'px' not in ip.magics_manager.magics:
482 if 'px' not in ip.magics_manager.magics:
481 # in IPython but we are the first Client.
483 # in IPython but we are the first Client.
482 # activate a default view for parallel magics.
484 # activate a default view for parallel magics.
483 self.activate()
485 self.activate()
484
486
485 def __del__(self):
487 def __del__(self):
486 """cleanup sockets, but _not_ context."""
488 """cleanup sockets, but _not_ context."""
487 self.close()
489 self.close()
488
490
489 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
491 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
490 if ipython_dir is None:
492 if ipython_dir is None:
491 ipython_dir = get_ipython_dir()
493 ipython_dir = get_ipython_dir()
492 if profile_dir is not None:
494 if profile_dir is not None:
493 try:
495 try:
494 self._cd = ProfileDir.find_profile_dir(profile_dir)
496 self._cd = ProfileDir.find_profile_dir(profile_dir)
495 return
497 return
496 except ProfileDirError:
498 except ProfileDirError:
497 pass
499 pass
498 elif profile is not None:
500 elif profile is not None:
499 try:
501 try:
500 self._cd = ProfileDir.find_profile_dir_by_name(
502 self._cd = ProfileDir.find_profile_dir_by_name(
501 ipython_dir, profile)
503 ipython_dir, profile)
502 return
504 return
503 except ProfileDirError:
505 except ProfileDirError:
504 pass
506 pass
505 self._cd = None
507 self._cd = None
506
508
507 def _update_engines(self, engines):
509 def _update_engines(self, engines):
508 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
510 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
509 for k,v in engines.iteritems():
511 for k,v in engines.iteritems():
510 eid = int(k)
512 eid = int(k)
511 if eid not in self._engines:
513 if eid not in self._engines:
512 self._ids.append(eid)
514 self._ids.append(eid)
513 self._engines[eid] = v
515 self._engines[eid] = v
514 self._ids = sorted(self._ids)
516 self._ids = sorted(self._ids)
515 if sorted(self._engines.keys()) != range(len(self._engines)) and \
517 if sorted(self._engines.keys()) != range(len(self._engines)) and \
516 self._task_scheme == 'pure' and self._task_socket:
518 self._task_scheme == 'pure' and self._task_socket:
517 self._stop_scheduling_tasks()
519 self._stop_scheduling_tasks()
518
520
519 def _stop_scheduling_tasks(self):
521 def _stop_scheduling_tasks(self):
520 """Stop scheduling tasks because an engine has been unregistered
522 """Stop scheduling tasks because an engine has been unregistered
521 from a pure ZMQ scheduler.
523 from a pure ZMQ scheduler.
522 """
524 """
523 self._task_socket.close()
525 self._task_socket.close()
524 self._task_socket = None
526 self._task_socket = None
525 msg = "An engine has been unregistered, and we are using pure " +\
527 msg = "An engine has been unregistered, and we are using pure " +\
526 "ZMQ task scheduling. Task farming will be disabled."
528 "ZMQ task scheduling. Task farming will be disabled."
527 if self.outstanding:
529 if self.outstanding:
528 msg += " If you were running tasks when this happened, " +\
530 msg += " If you were running tasks when this happened, " +\
529 "some `outstanding` msg_ids may never resolve."
531 "some `outstanding` msg_ids may never resolve."
530 warnings.warn(msg, RuntimeWarning)
532 warnings.warn(msg, RuntimeWarning)
531
533
532 def _build_targets(self, targets):
534 def _build_targets(self, targets):
533 """Turn valid target IDs or 'all' into two lists:
535 """Turn valid target IDs or 'all' into two lists:
534 (int_ids, uuids).
536 (int_ids, uuids).
535 """
537 """
536 if not self._ids:
538 if not self._ids:
537 # flush notification socket if no engines yet, just in case
539 # flush notification socket if no engines yet, just in case
538 if not self.ids:
540 if not self.ids:
539 raise error.NoEnginesRegistered("Can't build targets without any engines")
541 raise error.NoEnginesRegistered("Can't build targets without any engines")
540
542
541 if targets is None:
543 if targets is None:
542 targets = self._ids
544 targets = self._ids
543 elif isinstance(targets, basestring):
545 elif isinstance(targets, basestring):
544 if targets.lower() == 'all':
546 if targets.lower() == 'all':
545 targets = self._ids
547 targets = self._ids
546 else:
548 else:
547 raise TypeError("%r not valid str target, must be 'all'"%(targets))
549 raise TypeError("%r not valid str target, must be 'all'"%(targets))
548 elif isinstance(targets, int):
550 elif isinstance(targets, int):
549 if targets < 0:
551 if targets < 0:
550 targets = self.ids[targets]
552 targets = self.ids[targets]
551 if targets not in self._ids:
553 if targets not in self._ids:
552 raise IndexError("No such engine: %i"%targets)
554 raise IndexError("No such engine: %i"%targets)
553 targets = [targets]
555 targets = [targets]
554
556
555 if isinstance(targets, slice):
557 if isinstance(targets, slice):
556 indices = range(len(self._ids))[targets]
558 indices = range(len(self._ids))[targets]
557 ids = self.ids
559 ids = self.ids
558 targets = [ ids[i] for i in indices ]
560 targets = [ ids[i] for i in indices ]
559
561
560 if not isinstance(targets, (tuple, list, xrange)):
562 if not isinstance(targets, (tuple, list, xrange)):
561 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
563 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
562
564
563 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
565 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
564
566
565 def _connect(self, sshserver, ssh_kwargs, timeout):
567 def _connect(self, sshserver, ssh_kwargs, timeout):
566 """setup all our socket connections to the cluster. This is called from
568 """setup all our socket connections to the cluster. This is called from
567 __init__."""
569 __init__."""
568
570
569 # Maybe allow reconnecting?
571 # Maybe allow reconnecting?
570 if self._connected:
572 if self._connected:
571 return
573 return
572 self._connected=True
574 self._connected=True
573
575
574 def connect_socket(s, url):
576 def connect_socket(s, url):
575 # url = util.disambiguate_url(url, self._config['location'])
577 # url = util.disambiguate_url(url, self._config['location'])
576 if self._ssh:
578 if self._ssh:
577 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
579 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
578 else:
580 else:
579 return s.connect(url)
581 return s.connect(url)
580
582
581 self.session.send(self._query_socket, 'connection_request')
583 self.session.send(self._query_socket, 'connection_request')
582 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
584 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
583 poller = zmq.Poller()
585 poller = zmq.Poller()
584 poller.register(self._query_socket, zmq.POLLIN)
586 poller.register(self._query_socket, zmq.POLLIN)
585 # poll expects milliseconds, timeout is seconds
587 # poll expects milliseconds, timeout is seconds
586 evts = poller.poll(timeout*1000)
588 evts = poller.poll(timeout*1000)
587 if not evts:
589 if not evts:
588 raise error.TimeoutError("Hub connection request timed out")
590 raise error.TimeoutError("Hub connection request timed out")
589 idents,msg = self.session.recv(self._query_socket,mode=0)
591 idents,msg = self.session.recv(self._query_socket,mode=0)
590 if self.debug:
592 if self.debug:
591 pprint(msg)
593 pprint(msg)
592 content = msg['content']
594 content = msg['content']
593 # self._config['registration'] = dict(content)
595 # self._config['registration'] = dict(content)
594 cfg = self._config
596 cfg = self._config
595 if content['status'] == 'ok':
597 if content['status'] == 'ok':
596 self._mux_socket = self._context.socket(zmq.DEALER)
598 self._mux_socket = self._context.socket(zmq.DEALER)
597 connect_socket(self._mux_socket, cfg['mux'])
599 connect_socket(self._mux_socket, cfg['mux'])
598
600
599 self._task_socket = self._context.socket(zmq.DEALER)
601 self._task_socket = self._context.socket(zmq.DEALER)
600 connect_socket(self._task_socket, cfg['task'])
602 connect_socket(self._task_socket, cfg['task'])
601
603
602 self._notification_socket = self._context.socket(zmq.SUB)
604 self._notification_socket = self._context.socket(zmq.SUB)
603 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
605 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
604 connect_socket(self._notification_socket, cfg['notification'])
606 connect_socket(self._notification_socket, cfg['notification'])
605
607
606 self._control_socket = self._context.socket(zmq.DEALER)
608 self._control_socket = self._context.socket(zmq.DEALER)
607 connect_socket(self._control_socket, cfg['control'])
609 connect_socket(self._control_socket, cfg['control'])
608
610
609 self._iopub_socket = self._context.socket(zmq.SUB)
611 self._iopub_socket = self._context.socket(zmq.SUB)
610 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
612 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
611 connect_socket(self._iopub_socket, cfg['iopub'])
613 connect_socket(self._iopub_socket, cfg['iopub'])
612
614
613 self._update_engines(dict(content['engines']))
615 self._update_engines(dict(content['engines']))
614 else:
616 else:
615 self._connected = False
617 self._connected = False
616 raise Exception("Failed to connect!")
618 raise Exception("Failed to connect!")
617
619
618 #--------------------------------------------------------------------------
620 #--------------------------------------------------------------------------
619 # handlers and callbacks for incoming messages
621 # handlers and callbacks for incoming messages
620 #--------------------------------------------------------------------------
622 #--------------------------------------------------------------------------
621
623
622 def _unwrap_exception(self, content):
624 def _unwrap_exception(self, content):
623 """unwrap exception, and remap engine_id to int."""
625 """unwrap exception, and remap engine_id to int."""
624 e = error.unwrap_exception(content)
626 e = error.unwrap_exception(content)
625 # print e.traceback
627 # print e.traceback
626 if e.engine_info:
628 if e.engine_info:
627 e_uuid = e.engine_info['engine_uuid']
629 e_uuid = e.engine_info['engine_uuid']
628 eid = self._engines[e_uuid]
630 eid = self._engines[e_uuid]
629 e.engine_info['engine_id'] = eid
631 e.engine_info['engine_id'] = eid
630 return e
632 return e
631
633
632 def _extract_metadata(self, msg):
634 def _extract_metadata(self, msg):
633 header = msg['header']
635 header = msg['header']
634 parent = msg['parent_header']
636 parent = msg['parent_header']
635 msg_meta = msg['metadata']
637 msg_meta = msg['metadata']
636 content = msg['content']
638 content = msg['content']
637 md = {'msg_id' : parent['msg_id'],
639 md = {'msg_id' : parent['msg_id'],
638 'received' : datetime.now(),
640 'received' : datetime.now(),
639 'engine_uuid' : msg_meta.get('engine', None),
641 'engine_uuid' : msg_meta.get('engine', None),
640 'follow' : msg_meta.get('follow', []),
642 'follow' : msg_meta.get('follow', []),
641 'after' : msg_meta.get('after', []),
643 'after' : msg_meta.get('after', []),
642 'status' : content['status'],
644 'status' : content['status'],
643 }
645 }
644
646
645 if md['engine_uuid'] is not None:
647 if md['engine_uuid'] is not None:
646 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
648 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
647
649
648 if 'date' in parent:
650 if 'date' in parent:
649 md['submitted'] = parent['date']
651 md['submitted'] = parent['date']
650 if 'started' in msg_meta:
652 if 'started' in msg_meta:
651 md['started'] = msg_meta['started']
653 md['started'] = msg_meta['started']
652 if 'date' in header:
654 if 'date' in header:
653 md['completed'] = header['date']
655 md['completed'] = header['date']
654 return md
656 return md
655
657
656 def _register_engine(self, msg):
658 def _register_engine(self, msg):
657 """Register a new engine, and update our connection info."""
659 """Register a new engine, and update our connection info."""
658 content = msg['content']
660 content = msg['content']
659 eid = content['id']
661 eid = content['id']
660 d = {eid : content['uuid']}
662 d = {eid : content['uuid']}
661 self._update_engines(d)
663 self._update_engines(d)
662
664
663 def _unregister_engine(self, msg):
665 def _unregister_engine(self, msg):
664 """Unregister an engine that has died."""
666 """Unregister an engine that has died."""
665 content = msg['content']
667 content = msg['content']
666 eid = int(content['id'])
668 eid = int(content['id'])
667 if eid in self._ids:
669 if eid in self._ids:
668 self._ids.remove(eid)
670 self._ids.remove(eid)
669 uuid = self._engines.pop(eid)
671 uuid = self._engines.pop(eid)
670
672
671 self._handle_stranded_msgs(eid, uuid)
673 self._handle_stranded_msgs(eid, uuid)
672
674
673 if self._task_socket and self._task_scheme == 'pure':
675 if self._task_socket and self._task_scheme == 'pure':
674 self._stop_scheduling_tasks()
676 self._stop_scheduling_tasks()
675
677
676 def _handle_stranded_msgs(self, eid, uuid):
678 def _handle_stranded_msgs(self, eid, uuid):
677 """Handle messages known to be on an engine when the engine unregisters.
679 """Handle messages known to be on an engine when the engine unregisters.
678
680
679 It is possible that this will fire prematurely - that is, an engine will
681 It is possible that this will fire prematurely - that is, an engine will
680 go down after completing a result, and the client will be notified
682 go down after completing a result, and the client will be notified
681 of the unregistration and later receive the successful result.
683 of the unregistration and later receive the successful result.
682 """
684 """
683
685
684 outstanding = self._outstanding_dict[uuid]
686 outstanding = self._outstanding_dict[uuid]
685
687
686 for msg_id in list(outstanding):
688 for msg_id in list(outstanding):
687 if msg_id in self.results:
689 if msg_id in self.results:
688 # we already
690 # we already
689 continue
691 continue
690 try:
692 try:
691 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
693 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
692 except:
694 except:
693 content = error.wrap_exception()
695 content = error.wrap_exception()
694 # build a fake message:
696 # build a fake message:
695 parent = {}
697 parent = {}
696 header = {}
698 header = {}
697 parent['msg_id'] = msg_id
699 parent['msg_id'] = msg_id
698 header['engine'] = uuid
700 header['engine'] = uuid
699 header['date'] = datetime.now()
701 header['date'] = datetime.now()
700 msg = dict(parent_header=parent, header=header, content=content)
702 msg = dict(parent_header=parent, header=header, content=content)
701 self._handle_apply_reply(msg)
703 self._handle_apply_reply(msg)
702
704
703 def _handle_execute_reply(self, msg):
705 def _handle_execute_reply(self, msg):
704 """Save the reply to an execute_request into our results.
706 """Save the reply to an execute_request into our results.
705
707
706 execute messages are never actually used. apply is used instead.
708 execute messages are never actually used. apply is used instead.
707 """
709 """
708
710
709 parent = msg['parent_header']
711 parent = msg['parent_header']
710 msg_id = parent['msg_id']
712 msg_id = parent['msg_id']
711 if msg_id not in self.outstanding:
713 if msg_id not in self.outstanding:
712 if msg_id in self.history:
714 if msg_id in self.history:
713 print ("got stale result: %s"%msg_id)
715 print ("got stale result: %s"%msg_id)
714 else:
716 else:
715 print ("got unknown result: %s"%msg_id)
717 print ("got unknown result: %s"%msg_id)
716 else:
718 else:
717 self.outstanding.remove(msg_id)
719 self.outstanding.remove(msg_id)
718
720
719 content = msg['content']
721 content = msg['content']
720 header = msg['header']
722 header = msg['header']
721
723
722 # construct metadata:
724 # construct metadata:
723 md = self.metadata[msg_id]
725 md = self.metadata[msg_id]
724 md.update(self._extract_metadata(msg))
726 md.update(self._extract_metadata(msg))
725 # is this redundant?
727 # is this redundant?
726 self.metadata[msg_id] = md
728 self.metadata[msg_id] = md
727
729
728 e_outstanding = self._outstanding_dict[md['engine_uuid']]
730 e_outstanding = self._outstanding_dict[md['engine_uuid']]
729 if msg_id in e_outstanding:
731 if msg_id in e_outstanding:
730 e_outstanding.remove(msg_id)
732 e_outstanding.remove(msg_id)
731
733
732 # construct result:
734 # construct result:
733 if content['status'] == 'ok':
735 if content['status'] == 'ok':
734 self.results[msg_id] = ExecuteReply(msg_id, content, md)
736 self.results[msg_id] = ExecuteReply(msg_id, content, md)
735 elif content['status'] == 'aborted':
737 elif content['status'] == 'aborted':
736 self.results[msg_id] = error.TaskAborted(msg_id)
738 self.results[msg_id] = error.TaskAborted(msg_id)
737 elif content['status'] == 'resubmitted':
739 elif content['status'] == 'resubmitted':
738 # TODO: handle resubmission
740 # TODO: handle resubmission
739 pass
741 pass
740 else:
742 else:
741 self.results[msg_id] = self._unwrap_exception(content)
743 self.results[msg_id] = self._unwrap_exception(content)
742
744
743 def _handle_apply_reply(self, msg):
745 def _handle_apply_reply(self, msg):
744 """Save the reply to an apply_request into our results."""
746 """Save the reply to an apply_request into our results."""
745 parent = msg['parent_header']
747 parent = msg['parent_header']
746 msg_id = parent['msg_id']
748 msg_id = parent['msg_id']
747 if msg_id not in self.outstanding:
749 if msg_id not in self.outstanding:
748 if msg_id in self.history:
750 if msg_id in self.history:
749 print ("got stale result: %s"%msg_id)
751 print ("got stale result: %s"%msg_id)
750 print self.results[msg_id]
752 print self.results[msg_id]
751 print msg
753 print msg
752 else:
754 else:
753 print ("got unknown result: %s"%msg_id)
755 print ("got unknown result: %s"%msg_id)
754 else:
756 else:
755 self.outstanding.remove(msg_id)
757 self.outstanding.remove(msg_id)
756 content = msg['content']
758 content = msg['content']
757 header = msg['header']
759 header = msg['header']
758
760
759 # construct metadata:
761 # construct metadata:
760 md = self.metadata[msg_id]
762 md = self.metadata[msg_id]
761 md.update(self._extract_metadata(msg))
763 md.update(self._extract_metadata(msg))
762 # is this redundant?
764 # is this redundant?
763 self.metadata[msg_id] = md
765 self.metadata[msg_id] = md
764
766
765 e_outstanding = self._outstanding_dict[md['engine_uuid']]
767 e_outstanding = self._outstanding_dict[md['engine_uuid']]
766 if msg_id in e_outstanding:
768 if msg_id in e_outstanding:
767 e_outstanding.remove(msg_id)
769 e_outstanding.remove(msg_id)
768
770
769 # construct result:
771 # construct result:
770 if content['status'] == 'ok':
772 if content['status'] == 'ok':
771 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
773 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
772 elif content['status'] == 'aborted':
774 elif content['status'] == 'aborted':
773 self.results[msg_id] = error.TaskAborted(msg_id)
775 self.results[msg_id] = error.TaskAborted(msg_id)
774 elif content['status'] == 'resubmitted':
776 elif content['status'] == 'resubmitted':
775 # TODO: handle resubmission
777 # TODO: handle resubmission
776 pass
778 pass
777 else:
779 else:
778 self.results[msg_id] = self._unwrap_exception(content)
780 self.results[msg_id] = self._unwrap_exception(content)
779
781
780 def _flush_notifications(self):
782 def _flush_notifications(self):
781 """Flush notifications of engine registrations waiting
783 """Flush notifications of engine registrations waiting
782 in ZMQ queue."""
784 in ZMQ queue."""
783 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
785 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
784 while msg is not None:
786 while msg is not None:
785 if self.debug:
787 if self.debug:
786 pprint(msg)
788 pprint(msg)
787 msg_type = msg['header']['msg_type']
789 msg_type = msg['header']['msg_type']
788 handler = self._notification_handlers.get(msg_type, None)
790 handler = self._notification_handlers.get(msg_type, None)
789 if handler is None:
791 if handler is None:
790 raise Exception("Unhandled message type: %s"%msg.msg_type)
792 raise Exception("Unhandled message type: %s"%msg.msg_type)
791 else:
793 else:
792 handler(msg)
794 handler(msg)
793 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
795 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
794
796
795 def _flush_results(self, sock):
797 def _flush_results(self, sock):
796 """Flush task or queue results waiting in ZMQ queue."""
798 """Flush task or queue results waiting in ZMQ queue."""
797 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
799 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
798 while msg is not None:
800 while msg is not None:
799 if self.debug:
801 if self.debug:
800 pprint(msg)
802 pprint(msg)
801 msg_type = msg['header']['msg_type']
803 msg_type = msg['header']['msg_type']
802 handler = self._queue_handlers.get(msg_type, None)
804 handler = self._queue_handlers.get(msg_type, None)
803 if handler is None:
805 if handler is None:
804 raise Exception("Unhandled message type: %s"%msg.msg_type)
806 raise Exception("Unhandled message type: %s"%msg.msg_type)
805 else:
807 else:
806 handler(msg)
808 handler(msg)
807 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
809 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
808
810
809 def _flush_control(self, sock):
811 def _flush_control(self, sock):
810 """Flush replies from the control channel waiting
812 """Flush replies from the control channel waiting
811 in the ZMQ queue.
813 in the ZMQ queue.
812
814
813 Currently: ignore them."""
815 Currently: ignore them."""
814 if self._ignored_control_replies <= 0:
816 if self._ignored_control_replies <= 0:
815 return
817 return
816 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
818 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
817 while msg is not None:
819 while msg is not None:
818 self._ignored_control_replies -= 1
820 self._ignored_control_replies -= 1
819 if self.debug:
821 if self.debug:
820 pprint(msg)
822 pprint(msg)
821 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
823 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
822
824
823 def _flush_ignored_control(self):
825 def _flush_ignored_control(self):
824 """flush ignored control replies"""
826 """flush ignored control replies"""
825 while self._ignored_control_replies > 0:
827 while self._ignored_control_replies > 0:
826 self.session.recv(self._control_socket)
828 self.session.recv(self._control_socket)
827 self._ignored_control_replies -= 1
829 self._ignored_control_replies -= 1
828
830
829 def _flush_ignored_hub_replies(self):
831 def _flush_ignored_hub_replies(self):
830 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
832 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
831 while msg is not None:
833 while msg is not None:
832 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
834 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
833
835
834 def _flush_iopub(self, sock):
836 def _flush_iopub(self, sock):
835 """Flush replies from the iopub channel waiting
837 """Flush replies from the iopub channel waiting
836 in the ZMQ queue.
838 in the ZMQ queue.
837 """
839 """
838 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
840 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
839 while msg is not None:
841 while msg is not None:
840 if self.debug:
842 if self.debug:
841 pprint(msg)
843 pprint(msg)
842 parent = msg['parent_header']
844 parent = msg['parent_header']
843 # ignore IOPub messages with no parent.
845 # ignore IOPub messages with no parent.
844 # Caused by print statements or warnings from before the first execution.
846 # Caused by print statements or warnings from before the first execution.
845 if not parent:
847 if not parent:
846 continue
848 continue
847 msg_id = parent['msg_id']
849 msg_id = parent['msg_id']
848 content = msg['content']
850 content = msg['content']
849 header = msg['header']
851 header = msg['header']
850 msg_type = msg['header']['msg_type']
852 msg_type = msg['header']['msg_type']
851
853
852 # init metadata:
854 # init metadata:
853 md = self.metadata[msg_id]
855 md = self.metadata[msg_id]
854
856
855 if msg_type == 'stream':
857 if msg_type == 'stream':
856 name = content['name']
858 name = content['name']
857 s = md[name] or ''
859 s = md[name] or ''
858 md[name] = s + content['data']
860 md[name] = s + content['data']
859 elif msg_type == 'pyerr':
861 elif msg_type == 'pyerr':
860 md.update({'pyerr' : self._unwrap_exception(content)})
862 md.update({'pyerr' : self._unwrap_exception(content)})
861 elif msg_type == 'pyin':
863 elif msg_type == 'pyin':
862 md.update({'pyin' : content['code']})
864 md.update({'pyin' : content['code']})
863 elif msg_type == 'display_data':
865 elif msg_type == 'display_data':
864 md['outputs'].append(content)
866 md['outputs'].append(content)
865 elif msg_type == 'pyout':
867 elif msg_type == 'pyout':
866 md['pyout'] = content
868 md['pyout'] = content
869 elif msg_type == 'data_message':
870 data, remainder = serialize.unserialize_object(msg['buffers'])
871 md['data'].update(data)
867 elif msg_type == 'status':
872 elif msg_type == 'status':
868 # idle message comes after all outputs
873 # idle message comes after all outputs
869 if content['execution_state'] == 'idle':
874 if content['execution_state'] == 'idle':
870 md['outputs_ready'] = True
875 md['outputs_ready'] = True
871 else:
876 else:
872 # unhandled msg_type (status, etc.)
877 # unhandled msg_type (status, etc.)
873 pass
878 pass
874
879
875 # reduntant?
880 # reduntant?
876 self.metadata[msg_id] = md
881 self.metadata[msg_id] = md
877
882
878 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
883 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
879
884
880 #--------------------------------------------------------------------------
885 #--------------------------------------------------------------------------
881 # len, getitem
886 # len, getitem
882 #--------------------------------------------------------------------------
887 #--------------------------------------------------------------------------
883
888
884 def __len__(self):
889 def __len__(self):
885 """len(client) returns # of engines."""
890 """len(client) returns # of engines."""
886 return len(self.ids)
891 return len(self.ids)
887
892
888 def __getitem__(self, key):
893 def __getitem__(self, key):
889 """index access returns DirectView multiplexer objects
894 """index access returns DirectView multiplexer objects
890
895
891 Must be int, slice, or list/tuple/xrange of ints"""
896 Must be int, slice, or list/tuple/xrange of ints"""
892 if not isinstance(key, (int, slice, tuple, list, xrange)):
897 if not isinstance(key, (int, slice, tuple, list, xrange)):
893 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
898 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
894 else:
899 else:
895 return self.direct_view(key)
900 return self.direct_view(key)
896
901
897 #--------------------------------------------------------------------------
902 #--------------------------------------------------------------------------
898 # Begin public methods
903 # Begin public methods
899 #--------------------------------------------------------------------------
904 #--------------------------------------------------------------------------
900
905
901 @property
906 @property
902 def ids(self):
907 def ids(self):
903 """Always up-to-date ids property."""
908 """Always up-to-date ids property."""
904 self._flush_notifications()
909 self._flush_notifications()
905 # always copy:
910 # always copy:
906 return list(self._ids)
911 return list(self._ids)
907
912
908 def activate(self, targets='all', suffix=''):
913 def activate(self, targets='all', suffix=''):
909 """Create a DirectView and register it with IPython magics
914 """Create a DirectView and register it with IPython magics
910
915
911 Defines the magics `%px, %autopx, %pxresult, %%px`
916 Defines the magics `%px, %autopx, %pxresult, %%px`
912
917
913 Parameters
918 Parameters
914 ----------
919 ----------
915
920
916 targets: int, list of ints, or 'all'
921 targets: int, list of ints, or 'all'
917 The engines on which the view's magics will run
922 The engines on which the view's magics will run
918 suffix: str [default: '']
923 suffix: str [default: '']
919 The suffix, if any, for the magics. This allows you to have
924 The suffix, if any, for the magics. This allows you to have
920 multiple views associated with parallel magics at the same time.
925 multiple views associated with parallel magics at the same time.
921
926
922 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
927 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
923 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
928 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
924 on engine 0.
929 on engine 0.
925 """
930 """
926 view = self.direct_view(targets)
931 view = self.direct_view(targets)
927 view.block = True
932 view.block = True
928 view.activate(suffix)
933 view.activate(suffix)
929 return view
934 return view
930
935
931 def close(self):
936 def close(self):
932 if self._closed:
937 if self._closed:
933 return
938 return
934 self.stop_spin_thread()
939 self.stop_spin_thread()
935 snames = filter(lambda n: n.endswith('socket'), dir(self))
940 snames = filter(lambda n: n.endswith('socket'), dir(self))
936 for socket in map(lambda name: getattr(self, name), snames):
941 for socket in map(lambda name: getattr(self, name), snames):
937 if isinstance(socket, zmq.Socket) and not socket.closed:
942 if isinstance(socket, zmq.Socket) and not socket.closed:
938 socket.close()
943 socket.close()
939 self._closed = True
944 self._closed = True
940
945
941 def _spin_every(self, interval=1):
946 def _spin_every(self, interval=1):
942 """target func for use in spin_thread"""
947 """target func for use in spin_thread"""
943 while True:
948 while True:
944 if self._stop_spinning.is_set():
949 if self._stop_spinning.is_set():
945 return
950 return
946 time.sleep(interval)
951 time.sleep(interval)
947 self.spin()
952 self.spin()
948
953
949 def spin_thread(self, interval=1):
954 def spin_thread(self, interval=1):
950 """call Client.spin() in a background thread on some regular interval
955 """call Client.spin() in a background thread on some regular interval
951
956
952 This helps ensure that messages don't pile up too much in the zmq queue
957 This helps ensure that messages don't pile up too much in the zmq queue
953 while you are working on other things, or just leaving an idle terminal.
958 while you are working on other things, or just leaving an idle terminal.
954
959
955 It also helps limit potential padding of the `received` timestamp
960 It also helps limit potential padding of the `received` timestamp
956 on AsyncResult objects, used for timings.
961 on AsyncResult objects, used for timings.
957
962
958 Parameters
963 Parameters
959 ----------
964 ----------
960
965
961 interval : float, optional
966 interval : float, optional
962 The interval on which to spin the client in the background thread
967 The interval on which to spin the client in the background thread
963 (simply passed to time.sleep).
968 (simply passed to time.sleep).
964
969
965 Notes
970 Notes
966 -----
971 -----
967
972
968 For precision timing, you may want to use this method to put a bound
973 For precision timing, you may want to use this method to put a bound
969 on the jitter (in seconds) in `received` timestamps used
974 on the jitter (in seconds) in `received` timestamps used
970 in AsyncResult.wall_time.
975 in AsyncResult.wall_time.
971
976
972 """
977 """
973 if self._spin_thread is not None:
978 if self._spin_thread is not None:
974 self.stop_spin_thread()
979 self.stop_spin_thread()
975 self._stop_spinning.clear()
980 self._stop_spinning.clear()
976 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
981 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
977 self._spin_thread.daemon = True
982 self._spin_thread.daemon = True
978 self._spin_thread.start()
983 self._spin_thread.start()
979
984
980 def stop_spin_thread(self):
985 def stop_spin_thread(self):
981 """stop background spin_thread, if any"""
986 """stop background spin_thread, if any"""
982 if self._spin_thread is not None:
987 if self._spin_thread is not None:
983 self._stop_spinning.set()
988 self._stop_spinning.set()
984 self._spin_thread.join()
989 self._spin_thread.join()
985 self._spin_thread = None
990 self._spin_thread = None
986
991
987 def spin(self):
992 def spin(self):
988 """Flush any registration notifications and execution results
993 """Flush any registration notifications and execution results
989 waiting in the ZMQ queue.
994 waiting in the ZMQ queue.
990 """
995 """
991 if self._notification_socket:
996 if self._notification_socket:
992 self._flush_notifications()
997 self._flush_notifications()
993 if self._iopub_socket:
998 if self._iopub_socket:
994 self._flush_iopub(self._iopub_socket)
999 self._flush_iopub(self._iopub_socket)
995 if self._mux_socket:
1000 if self._mux_socket:
996 self._flush_results(self._mux_socket)
1001 self._flush_results(self._mux_socket)
997 if self._task_socket:
1002 if self._task_socket:
998 self._flush_results(self._task_socket)
1003 self._flush_results(self._task_socket)
999 if self._control_socket:
1004 if self._control_socket:
1000 self._flush_control(self._control_socket)
1005 self._flush_control(self._control_socket)
1001 if self._query_socket:
1006 if self._query_socket:
1002 self._flush_ignored_hub_replies()
1007 self._flush_ignored_hub_replies()
1003
1008
1004 def wait(self, jobs=None, timeout=-1):
1009 def wait(self, jobs=None, timeout=-1):
1005 """waits on one or more `jobs`, for up to `timeout` seconds.
1010 """waits on one or more `jobs`, for up to `timeout` seconds.
1006
1011
1007 Parameters
1012 Parameters
1008 ----------
1013 ----------
1009
1014
1010 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1015 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1011 ints are indices to self.history
1016 ints are indices to self.history
1012 strs are msg_ids
1017 strs are msg_ids
1013 default: wait on all outstanding messages
1018 default: wait on all outstanding messages
1014 timeout : float
1019 timeout : float
1015 a time in seconds, after which to give up.
1020 a time in seconds, after which to give up.
1016 default is -1, which means no timeout
1021 default is -1, which means no timeout
1017
1022
1018 Returns
1023 Returns
1019 -------
1024 -------
1020
1025
1021 True : when all msg_ids are done
1026 True : when all msg_ids are done
1022 False : timeout reached, some msg_ids still outstanding
1027 False : timeout reached, some msg_ids still outstanding
1023 """
1028 """
1024 tic = time.time()
1029 tic = time.time()
1025 if jobs is None:
1030 if jobs is None:
1026 theids = self.outstanding
1031 theids = self.outstanding
1027 else:
1032 else:
1028 if isinstance(jobs, (int, basestring, AsyncResult)):
1033 if isinstance(jobs, (int, basestring, AsyncResult)):
1029 jobs = [jobs]
1034 jobs = [jobs]
1030 theids = set()
1035 theids = set()
1031 for job in jobs:
1036 for job in jobs:
1032 if isinstance(job, int):
1037 if isinstance(job, int):
1033 # index access
1038 # index access
1034 job = self.history[job]
1039 job = self.history[job]
1035 elif isinstance(job, AsyncResult):
1040 elif isinstance(job, AsyncResult):
1036 map(theids.add, job.msg_ids)
1041 map(theids.add, job.msg_ids)
1037 continue
1042 continue
1038 theids.add(job)
1043 theids.add(job)
1039 if not theids.intersection(self.outstanding):
1044 if not theids.intersection(self.outstanding):
1040 return True
1045 return True
1041 self.spin()
1046 self.spin()
1042 while theids.intersection(self.outstanding):
1047 while theids.intersection(self.outstanding):
1043 if timeout >= 0 and ( time.time()-tic ) > timeout:
1048 if timeout >= 0 and ( time.time()-tic ) > timeout:
1044 break
1049 break
1045 time.sleep(1e-3)
1050 time.sleep(1e-3)
1046 self.spin()
1051 self.spin()
1047 return len(theids.intersection(self.outstanding)) == 0
1052 return len(theids.intersection(self.outstanding)) == 0
1048
1053
1049 #--------------------------------------------------------------------------
1054 #--------------------------------------------------------------------------
1050 # Control methods
1055 # Control methods
1051 #--------------------------------------------------------------------------
1056 #--------------------------------------------------------------------------
1052
1057
1053 @spin_first
1058 @spin_first
1054 def clear(self, targets=None, block=None):
1059 def clear(self, targets=None, block=None):
1055 """Clear the namespace in target(s)."""
1060 """Clear the namespace in target(s)."""
1056 block = self.block if block is None else block
1061 block = self.block if block is None else block
1057 targets = self._build_targets(targets)[0]
1062 targets = self._build_targets(targets)[0]
1058 for t in targets:
1063 for t in targets:
1059 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1064 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1060 error = False
1065 error = False
1061 if block:
1066 if block:
1062 self._flush_ignored_control()
1067 self._flush_ignored_control()
1063 for i in range(len(targets)):
1068 for i in range(len(targets)):
1064 idents,msg = self.session.recv(self._control_socket,0)
1069 idents,msg = self.session.recv(self._control_socket,0)
1065 if self.debug:
1070 if self.debug:
1066 pprint(msg)
1071 pprint(msg)
1067 if msg['content']['status'] != 'ok':
1072 if msg['content']['status'] != 'ok':
1068 error = self._unwrap_exception(msg['content'])
1073 error = self._unwrap_exception(msg['content'])
1069 else:
1074 else:
1070 self._ignored_control_replies += len(targets)
1075 self._ignored_control_replies += len(targets)
1071 if error:
1076 if error:
1072 raise error
1077 raise error
1073
1078
1074
1079
1075 @spin_first
1080 @spin_first
1076 def abort(self, jobs=None, targets=None, block=None):
1081 def abort(self, jobs=None, targets=None, block=None):
1077 """Abort specific jobs from the execution queues of target(s).
1082 """Abort specific jobs from the execution queues of target(s).
1078
1083
1079 This is a mechanism to prevent jobs that have already been submitted
1084 This is a mechanism to prevent jobs that have already been submitted
1080 from executing.
1085 from executing.
1081
1086
1082 Parameters
1087 Parameters
1083 ----------
1088 ----------
1084
1089
1085 jobs : msg_id, list of msg_ids, or AsyncResult
1090 jobs : msg_id, list of msg_ids, or AsyncResult
1086 The jobs to be aborted
1091 The jobs to be aborted
1087
1092
1088 If unspecified/None: abort all outstanding jobs.
1093 If unspecified/None: abort all outstanding jobs.
1089
1094
1090 """
1095 """
1091 block = self.block if block is None else block
1096 block = self.block if block is None else block
1092 jobs = jobs if jobs is not None else list(self.outstanding)
1097 jobs = jobs if jobs is not None else list(self.outstanding)
1093 targets = self._build_targets(targets)[0]
1098 targets = self._build_targets(targets)[0]
1094
1099
1095 msg_ids = []
1100 msg_ids = []
1096 if isinstance(jobs, (basestring,AsyncResult)):
1101 if isinstance(jobs, (basestring,AsyncResult)):
1097 jobs = [jobs]
1102 jobs = [jobs]
1098 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1103 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1099 if bad_ids:
1104 if bad_ids:
1100 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1105 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1101 for j in jobs:
1106 for j in jobs:
1102 if isinstance(j, AsyncResult):
1107 if isinstance(j, AsyncResult):
1103 msg_ids.extend(j.msg_ids)
1108 msg_ids.extend(j.msg_ids)
1104 else:
1109 else:
1105 msg_ids.append(j)
1110 msg_ids.append(j)
1106 content = dict(msg_ids=msg_ids)
1111 content = dict(msg_ids=msg_ids)
1107 for t in targets:
1112 for t in targets:
1108 self.session.send(self._control_socket, 'abort_request',
1113 self.session.send(self._control_socket, 'abort_request',
1109 content=content, ident=t)
1114 content=content, ident=t)
1110 error = False
1115 error = False
1111 if block:
1116 if block:
1112 self._flush_ignored_control()
1117 self._flush_ignored_control()
1113 for i in range(len(targets)):
1118 for i in range(len(targets)):
1114 idents,msg = self.session.recv(self._control_socket,0)
1119 idents,msg = self.session.recv(self._control_socket,0)
1115 if self.debug:
1120 if self.debug:
1116 pprint(msg)
1121 pprint(msg)
1117 if msg['content']['status'] != 'ok':
1122 if msg['content']['status'] != 'ok':
1118 error = self._unwrap_exception(msg['content'])
1123 error = self._unwrap_exception(msg['content'])
1119 else:
1124 else:
1120 self._ignored_control_replies += len(targets)
1125 self._ignored_control_replies += len(targets)
1121 if error:
1126 if error:
1122 raise error
1127 raise error
1123
1128
1124 @spin_first
1129 @spin_first
1125 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1130 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1126 """Terminates one or more engine processes, optionally including the hub.
1131 """Terminates one or more engine processes, optionally including the hub.
1127
1132
1128 Parameters
1133 Parameters
1129 ----------
1134 ----------
1130
1135
1131 targets: list of ints or 'all' [default: all]
1136 targets: list of ints or 'all' [default: all]
1132 Which engines to shutdown.
1137 Which engines to shutdown.
1133 hub: bool [default: False]
1138 hub: bool [default: False]
1134 Whether to include the Hub. hub=True implies targets='all'.
1139 Whether to include the Hub. hub=True implies targets='all'.
1135 block: bool [default: self.block]
1140 block: bool [default: self.block]
1136 Whether to wait for clean shutdown replies or not.
1141 Whether to wait for clean shutdown replies or not.
1137 restart: bool [default: False]
1142 restart: bool [default: False]
1138 NOT IMPLEMENTED
1143 NOT IMPLEMENTED
1139 whether to restart engines after shutting them down.
1144 whether to restart engines after shutting them down.
1140 """
1145 """
1141
1146
1142 if restart:
1147 if restart:
1143 raise NotImplementedError("Engine restart is not yet implemented")
1148 raise NotImplementedError("Engine restart is not yet implemented")
1144
1149
1145 block = self.block if block is None else block
1150 block = self.block if block is None else block
1146 if hub:
1151 if hub:
1147 targets = 'all'
1152 targets = 'all'
1148 targets = self._build_targets(targets)[0]
1153 targets = self._build_targets(targets)[0]
1149 for t in targets:
1154 for t in targets:
1150 self.session.send(self._control_socket, 'shutdown_request',
1155 self.session.send(self._control_socket, 'shutdown_request',
1151 content={'restart':restart},ident=t)
1156 content={'restart':restart},ident=t)
1152 error = False
1157 error = False
1153 if block or hub:
1158 if block or hub:
1154 self._flush_ignored_control()
1159 self._flush_ignored_control()
1155 for i in range(len(targets)):
1160 for i in range(len(targets)):
1156 idents,msg = self.session.recv(self._control_socket, 0)
1161 idents,msg = self.session.recv(self._control_socket, 0)
1157 if self.debug:
1162 if self.debug:
1158 pprint(msg)
1163 pprint(msg)
1159 if msg['content']['status'] != 'ok':
1164 if msg['content']['status'] != 'ok':
1160 error = self._unwrap_exception(msg['content'])
1165 error = self._unwrap_exception(msg['content'])
1161 else:
1166 else:
1162 self._ignored_control_replies += len(targets)
1167 self._ignored_control_replies += len(targets)
1163
1168
1164 if hub:
1169 if hub:
1165 time.sleep(0.25)
1170 time.sleep(0.25)
1166 self.session.send(self._query_socket, 'shutdown_request')
1171 self.session.send(self._query_socket, 'shutdown_request')
1167 idents,msg = self.session.recv(self._query_socket, 0)
1172 idents,msg = self.session.recv(self._query_socket, 0)
1168 if self.debug:
1173 if self.debug:
1169 pprint(msg)
1174 pprint(msg)
1170 if msg['content']['status'] != 'ok':
1175 if msg['content']['status'] != 'ok':
1171 error = self._unwrap_exception(msg['content'])
1176 error = self._unwrap_exception(msg['content'])
1172
1177
1173 if error:
1178 if error:
1174 raise error
1179 raise error
1175
1180
1176 #--------------------------------------------------------------------------
1181 #--------------------------------------------------------------------------
1177 # Execution related methods
1182 # Execution related methods
1178 #--------------------------------------------------------------------------
1183 #--------------------------------------------------------------------------
1179
1184
1180 def _maybe_raise(self, result):
1185 def _maybe_raise(self, result):
1181 """wrapper for maybe raising an exception if apply failed."""
1186 """wrapper for maybe raising an exception if apply failed."""
1182 if isinstance(result, error.RemoteError):
1187 if isinstance(result, error.RemoteError):
1183 raise result
1188 raise result
1184
1189
1185 return result
1190 return result
1186
1191
1187 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1192 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1188 ident=None):
1193 ident=None):
1189 """construct and send an apply message via a socket.
1194 """construct and send an apply message via a socket.
1190
1195
1191 This is the principal method with which all engine execution is performed by views.
1196 This is the principal method with which all engine execution is performed by views.
1192 """
1197 """
1193
1198
1194 if self._closed:
1199 if self._closed:
1195 raise RuntimeError("Client cannot be used after its sockets have been closed")
1200 raise RuntimeError("Client cannot be used after its sockets have been closed")
1196
1201
1197 # defaults:
1202 # defaults:
1198 args = args if args is not None else []
1203 args = args if args is not None else []
1199 kwargs = kwargs if kwargs is not None else {}
1204 kwargs = kwargs if kwargs is not None else {}
1200 metadata = metadata if metadata is not None else {}
1205 metadata = metadata if metadata is not None else {}
1201
1206
1202 # validate arguments
1207 # validate arguments
1203 if not callable(f) and not isinstance(f, Reference):
1208 if not callable(f) and not isinstance(f, Reference):
1204 raise TypeError("f must be callable, not %s"%type(f))
1209 raise TypeError("f must be callable, not %s"%type(f))
1205 if not isinstance(args, (tuple, list)):
1210 if not isinstance(args, (tuple, list)):
1206 raise TypeError("args must be tuple or list, not %s"%type(args))
1211 raise TypeError("args must be tuple or list, not %s"%type(args))
1207 if not isinstance(kwargs, dict):
1212 if not isinstance(kwargs, dict):
1208 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1213 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1209 if not isinstance(metadata, dict):
1214 if not isinstance(metadata, dict):
1210 raise TypeError("metadata must be dict, not %s"%type(metadata))
1215 raise TypeError("metadata must be dict, not %s"%type(metadata))
1211
1216
1212 bufs = util.pack_apply_message(f, args, kwargs,
1217 bufs = serialize.pack_apply_message(f, args, kwargs,
1213 buffer_threshold=self.session.buffer_threshold,
1218 buffer_threshold=self.session.buffer_threshold,
1214 item_threshold=self.session.item_threshold,
1219 item_threshold=self.session.item_threshold,
1215 )
1220 )
1216
1221
1217 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1222 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1218 metadata=metadata, track=track)
1223 metadata=metadata, track=track)
1219
1224
1220 msg_id = msg['header']['msg_id']
1225 msg_id = msg['header']['msg_id']
1221 self.outstanding.add(msg_id)
1226 self.outstanding.add(msg_id)
1222 if ident:
1227 if ident:
1223 # possibly routed to a specific engine
1228 # possibly routed to a specific engine
1224 if isinstance(ident, list):
1229 if isinstance(ident, list):
1225 ident = ident[-1]
1230 ident = ident[-1]
1226 if ident in self._engines.values():
1231 if ident in self._engines.values():
1227 # save for later, in case of engine death
1232 # save for later, in case of engine death
1228 self._outstanding_dict[ident].add(msg_id)
1233 self._outstanding_dict[ident].add(msg_id)
1229 self.history.append(msg_id)
1234 self.history.append(msg_id)
1230 self.metadata[msg_id]['submitted'] = datetime.now()
1235 self.metadata[msg_id]['submitted'] = datetime.now()
1231
1236
1232 return msg
1237 return msg
1233
1238
1234 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1239 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1235 """construct and send an execute request via a socket.
1240 """construct and send an execute request via a socket.
1236
1241
1237 """
1242 """
1238
1243
1239 if self._closed:
1244 if self._closed:
1240 raise RuntimeError("Client cannot be used after its sockets have been closed")
1245 raise RuntimeError("Client cannot be used after its sockets have been closed")
1241
1246
1242 # defaults:
1247 # defaults:
1243 metadata = metadata if metadata is not None else {}
1248 metadata = metadata if metadata is not None else {}
1244
1249
1245 # validate arguments
1250 # validate arguments
1246 if not isinstance(code, basestring):
1251 if not isinstance(code, basestring):
1247 raise TypeError("code must be text, not %s" % type(code))
1252 raise TypeError("code must be text, not %s" % type(code))
1248 if not isinstance(metadata, dict):
1253 if not isinstance(metadata, dict):
1249 raise TypeError("metadata must be dict, not %s" % type(metadata))
1254 raise TypeError("metadata must be dict, not %s" % type(metadata))
1250
1255
1251 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1256 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1252
1257
1253
1258
1254 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1259 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1255 metadata=metadata)
1260 metadata=metadata)
1256
1261
1257 msg_id = msg['header']['msg_id']
1262 msg_id = msg['header']['msg_id']
1258 self.outstanding.add(msg_id)
1263 self.outstanding.add(msg_id)
1259 if ident:
1264 if ident:
1260 # possibly routed to a specific engine
1265 # possibly routed to a specific engine
1261 if isinstance(ident, list):
1266 if isinstance(ident, list):
1262 ident = ident[-1]
1267 ident = ident[-1]
1263 if ident in self._engines.values():
1268 if ident in self._engines.values():
1264 # save for later, in case of engine death
1269 # save for later, in case of engine death
1265 self._outstanding_dict[ident].add(msg_id)
1270 self._outstanding_dict[ident].add(msg_id)
1266 self.history.append(msg_id)
1271 self.history.append(msg_id)
1267 self.metadata[msg_id]['submitted'] = datetime.now()
1272 self.metadata[msg_id]['submitted'] = datetime.now()
1268
1273
1269 return msg
1274 return msg
1270
1275
1271 #--------------------------------------------------------------------------
1276 #--------------------------------------------------------------------------
1272 # construct a View object
1277 # construct a View object
1273 #--------------------------------------------------------------------------
1278 #--------------------------------------------------------------------------
1274
1279
1275 def load_balanced_view(self, targets=None):
1280 def load_balanced_view(self, targets=None):
1276 """construct a DirectView object.
1281 """construct a DirectView object.
1277
1282
1278 If no arguments are specified, create a LoadBalancedView
1283 If no arguments are specified, create a LoadBalancedView
1279 using all engines.
1284 using all engines.
1280
1285
1281 Parameters
1286 Parameters
1282 ----------
1287 ----------
1283
1288
1284 targets: list,slice,int,etc. [default: use all engines]
1289 targets: list,slice,int,etc. [default: use all engines]
1285 The subset of engines across which to load-balance
1290 The subset of engines across which to load-balance
1286 """
1291 """
1287 if targets == 'all':
1292 if targets == 'all':
1288 targets = None
1293 targets = None
1289 if targets is not None:
1294 if targets is not None:
1290 targets = self._build_targets(targets)[1]
1295 targets = self._build_targets(targets)[1]
1291 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1296 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1292
1297
1293 def direct_view(self, targets='all'):
1298 def direct_view(self, targets='all'):
1294 """construct a DirectView object.
1299 """construct a DirectView object.
1295
1300
1296 If no targets are specified, create a DirectView using all engines.
1301 If no targets are specified, create a DirectView using all engines.
1297
1302
1298 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1303 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1299 evaluate the target engines at each execution, whereas rc[:] will connect to
1304 evaluate the target engines at each execution, whereas rc[:] will connect to
1300 all *current* engines, and that list will not change.
1305 all *current* engines, and that list will not change.
1301
1306
1302 That is, 'all' will always use all engines, whereas rc[:] will not use
1307 That is, 'all' will always use all engines, whereas rc[:] will not use
1303 engines added after the DirectView is constructed.
1308 engines added after the DirectView is constructed.
1304
1309
1305 Parameters
1310 Parameters
1306 ----------
1311 ----------
1307
1312
1308 targets: list,slice,int,etc. [default: use all engines]
1313 targets: list,slice,int,etc. [default: use all engines]
1309 The engines to use for the View
1314 The engines to use for the View
1310 """
1315 """
1311 single = isinstance(targets, int)
1316 single = isinstance(targets, int)
1312 # allow 'all' to be lazily evaluated at each execution
1317 # allow 'all' to be lazily evaluated at each execution
1313 if targets != 'all':
1318 if targets != 'all':
1314 targets = self._build_targets(targets)[1]
1319 targets = self._build_targets(targets)[1]
1315 if single:
1320 if single:
1316 targets = targets[0]
1321 targets = targets[0]
1317 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1322 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1318
1323
1319 #--------------------------------------------------------------------------
1324 #--------------------------------------------------------------------------
1320 # Query methods
1325 # Query methods
1321 #--------------------------------------------------------------------------
1326 #--------------------------------------------------------------------------
1322
1327
1323 @spin_first
1328 @spin_first
1324 def get_result(self, indices_or_msg_ids=None, block=None):
1329 def get_result(self, indices_or_msg_ids=None, block=None):
1325 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1330 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1326
1331
1327 If the client already has the results, no request to the Hub will be made.
1332 If the client already has the results, no request to the Hub will be made.
1328
1333
1329 This is a convenient way to construct AsyncResult objects, which are wrappers
1334 This is a convenient way to construct AsyncResult objects, which are wrappers
1330 that include metadata about execution, and allow for awaiting results that
1335 that include metadata about execution, and allow for awaiting results that
1331 were not submitted by this Client.
1336 were not submitted by this Client.
1332
1337
1333 It can also be a convenient way to retrieve the metadata associated with
1338 It can also be a convenient way to retrieve the metadata associated with
1334 blocking execution, since it always retrieves
1339 blocking execution, since it always retrieves
1335
1340
1336 Examples
1341 Examples
1337 --------
1342 --------
1338 ::
1343 ::
1339
1344
1340 In [10]: r = client.apply()
1345 In [10]: r = client.apply()
1341
1346
1342 Parameters
1347 Parameters
1343 ----------
1348 ----------
1344
1349
1345 indices_or_msg_ids : integer history index, str msg_id, or list of either
1350 indices_or_msg_ids : integer history index, str msg_id, or list of either
1346 The indices or msg_ids of indices to be retrieved
1351 The indices or msg_ids of indices to be retrieved
1347
1352
1348 block : bool
1353 block : bool
1349 Whether to wait for the result to be done
1354 Whether to wait for the result to be done
1350
1355
1351 Returns
1356 Returns
1352 -------
1357 -------
1353
1358
1354 AsyncResult
1359 AsyncResult
1355 A single AsyncResult object will always be returned.
1360 A single AsyncResult object will always be returned.
1356
1361
1357 AsyncHubResult
1362 AsyncHubResult
1358 A subclass of AsyncResult that retrieves results from the Hub
1363 A subclass of AsyncResult that retrieves results from the Hub
1359
1364
1360 """
1365 """
1361 block = self.block if block is None else block
1366 block = self.block if block is None else block
1362 if indices_or_msg_ids is None:
1367 if indices_or_msg_ids is None:
1363 indices_or_msg_ids = -1
1368 indices_or_msg_ids = -1
1364
1369
1365 if not isinstance(indices_or_msg_ids, (list,tuple)):
1370 if not isinstance(indices_or_msg_ids, (list,tuple)):
1366 indices_or_msg_ids = [indices_or_msg_ids]
1371 indices_or_msg_ids = [indices_or_msg_ids]
1367
1372
1368 theids = []
1373 theids = []
1369 for id in indices_or_msg_ids:
1374 for id in indices_or_msg_ids:
1370 if isinstance(id, int):
1375 if isinstance(id, int):
1371 id = self.history[id]
1376 id = self.history[id]
1372 if not isinstance(id, basestring):
1377 if not isinstance(id, basestring):
1373 raise TypeError("indices must be str or int, not %r"%id)
1378 raise TypeError("indices must be str or int, not %r"%id)
1374 theids.append(id)
1379 theids.append(id)
1375
1380
1376 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1381 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1377 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1382 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1378
1383
1379 if remote_ids:
1384 if remote_ids:
1380 ar = AsyncHubResult(self, msg_ids=theids)
1385 ar = AsyncHubResult(self, msg_ids=theids)
1381 else:
1386 else:
1382 ar = AsyncResult(self, msg_ids=theids)
1387 ar = AsyncResult(self, msg_ids=theids)
1383
1388
1384 if block:
1389 if block:
1385 ar.wait()
1390 ar.wait()
1386
1391
1387 return ar
1392 return ar
1388
1393
1389 @spin_first
1394 @spin_first
1390 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1395 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1391 """Resubmit one or more tasks.
1396 """Resubmit one or more tasks.
1392
1397
1393 in-flight tasks may not be resubmitted.
1398 in-flight tasks may not be resubmitted.
1394
1399
1395 Parameters
1400 Parameters
1396 ----------
1401 ----------
1397
1402
1398 indices_or_msg_ids : integer history index, str msg_id, or list of either
1403 indices_or_msg_ids : integer history index, str msg_id, or list of either
1399 The indices or msg_ids of indices to be retrieved
1404 The indices or msg_ids of indices to be retrieved
1400
1405
1401 block : bool
1406 block : bool
1402 Whether to wait for the result to be done
1407 Whether to wait for the result to be done
1403
1408
1404 Returns
1409 Returns
1405 -------
1410 -------
1406
1411
1407 AsyncHubResult
1412 AsyncHubResult
1408 A subclass of AsyncResult that retrieves results from the Hub
1413 A subclass of AsyncResult that retrieves results from the Hub
1409
1414
1410 """
1415 """
1411 block = self.block if block is None else block
1416 block = self.block if block is None else block
1412 if indices_or_msg_ids is None:
1417 if indices_or_msg_ids is None:
1413 indices_or_msg_ids = -1
1418 indices_or_msg_ids = -1
1414
1419
1415 if not isinstance(indices_or_msg_ids, (list,tuple)):
1420 if not isinstance(indices_or_msg_ids, (list,tuple)):
1416 indices_or_msg_ids = [indices_or_msg_ids]
1421 indices_or_msg_ids = [indices_or_msg_ids]
1417
1422
1418 theids = []
1423 theids = []
1419 for id in indices_or_msg_ids:
1424 for id in indices_or_msg_ids:
1420 if isinstance(id, int):
1425 if isinstance(id, int):
1421 id = self.history[id]
1426 id = self.history[id]
1422 if not isinstance(id, basestring):
1427 if not isinstance(id, basestring):
1423 raise TypeError("indices must be str or int, not %r"%id)
1428 raise TypeError("indices must be str or int, not %r"%id)
1424 theids.append(id)
1429 theids.append(id)
1425
1430
1426 content = dict(msg_ids = theids)
1431 content = dict(msg_ids = theids)
1427
1432
1428 self.session.send(self._query_socket, 'resubmit_request', content)
1433 self.session.send(self._query_socket, 'resubmit_request', content)
1429
1434
1430 zmq.select([self._query_socket], [], [])
1435 zmq.select([self._query_socket], [], [])
1431 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1436 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1432 if self.debug:
1437 if self.debug:
1433 pprint(msg)
1438 pprint(msg)
1434 content = msg['content']
1439 content = msg['content']
1435 if content['status'] != 'ok':
1440 if content['status'] != 'ok':
1436 raise self._unwrap_exception(content)
1441 raise self._unwrap_exception(content)
1437 mapping = content['resubmitted']
1442 mapping = content['resubmitted']
1438 new_ids = [ mapping[msg_id] for msg_id in theids ]
1443 new_ids = [ mapping[msg_id] for msg_id in theids ]
1439
1444
1440 ar = AsyncHubResult(self, msg_ids=new_ids)
1445 ar = AsyncHubResult(self, msg_ids=new_ids)
1441
1446
1442 if block:
1447 if block:
1443 ar.wait()
1448 ar.wait()
1444
1449
1445 return ar
1450 return ar
1446
1451
1447 @spin_first
1452 @spin_first
1448 def result_status(self, msg_ids, status_only=True):
1453 def result_status(self, msg_ids, status_only=True):
1449 """Check on the status of the result(s) of the apply request with `msg_ids`.
1454 """Check on the status of the result(s) of the apply request with `msg_ids`.
1450
1455
1451 If status_only is False, then the actual results will be retrieved, else
1456 If status_only is False, then the actual results will be retrieved, else
1452 only the status of the results will be checked.
1457 only the status of the results will be checked.
1453
1458
1454 Parameters
1459 Parameters
1455 ----------
1460 ----------
1456
1461
1457 msg_ids : list of msg_ids
1462 msg_ids : list of msg_ids
1458 if int:
1463 if int:
1459 Passed as index to self.history for convenience.
1464 Passed as index to self.history for convenience.
1460 status_only : bool (default: True)
1465 status_only : bool (default: True)
1461 if False:
1466 if False:
1462 Retrieve the actual results of completed tasks.
1467 Retrieve the actual results of completed tasks.
1463
1468
1464 Returns
1469 Returns
1465 -------
1470 -------
1466
1471
1467 results : dict
1472 results : dict
1468 There will always be the keys 'pending' and 'completed', which will
1473 There will always be the keys 'pending' and 'completed', which will
1469 be lists of msg_ids that are incomplete or complete. If `status_only`
1474 be lists of msg_ids that are incomplete or complete. If `status_only`
1470 is False, then completed results will be keyed by their `msg_id`.
1475 is False, then completed results will be keyed by their `msg_id`.
1471 """
1476 """
1472 if not isinstance(msg_ids, (list,tuple)):
1477 if not isinstance(msg_ids, (list,tuple)):
1473 msg_ids = [msg_ids]
1478 msg_ids = [msg_ids]
1474
1479
1475 theids = []
1480 theids = []
1476 for msg_id in msg_ids:
1481 for msg_id in msg_ids:
1477 if isinstance(msg_id, int):
1482 if isinstance(msg_id, int):
1478 msg_id = self.history[msg_id]
1483 msg_id = self.history[msg_id]
1479 if not isinstance(msg_id, basestring):
1484 if not isinstance(msg_id, basestring):
1480 raise TypeError("msg_ids must be str, not %r"%msg_id)
1485 raise TypeError("msg_ids must be str, not %r"%msg_id)
1481 theids.append(msg_id)
1486 theids.append(msg_id)
1482
1487
1483 completed = []
1488 completed = []
1484 local_results = {}
1489 local_results = {}
1485
1490
1486 # comment this block out to temporarily disable local shortcut:
1491 # comment this block out to temporarily disable local shortcut:
1487 for msg_id in theids:
1492 for msg_id in theids:
1488 if msg_id in self.results:
1493 if msg_id in self.results:
1489 completed.append(msg_id)
1494 completed.append(msg_id)
1490 local_results[msg_id] = self.results[msg_id]
1495 local_results[msg_id] = self.results[msg_id]
1491 theids.remove(msg_id)
1496 theids.remove(msg_id)
1492
1497
1493 if theids: # some not locally cached
1498 if theids: # some not locally cached
1494 content = dict(msg_ids=theids, status_only=status_only)
1499 content = dict(msg_ids=theids, status_only=status_only)
1495 msg = self.session.send(self._query_socket, "result_request", content=content)
1500 msg = self.session.send(self._query_socket, "result_request", content=content)
1496 zmq.select([self._query_socket], [], [])
1501 zmq.select([self._query_socket], [], [])
1497 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1502 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1498 if self.debug:
1503 if self.debug:
1499 pprint(msg)
1504 pprint(msg)
1500 content = msg['content']
1505 content = msg['content']
1501 if content['status'] != 'ok':
1506 if content['status'] != 'ok':
1502 raise self._unwrap_exception(content)
1507 raise self._unwrap_exception(content)
1503 buffers = msg['buffers']
1508 buffers = msg['buffers']
1504 else:
1509 else:
1505 content = dict(completed=[],pending=[])
1510 content = dict(completed=[],pending=[])
1506
1511
1507 content['completed'].extend(completed)
1512 content['completed'].extend(completed)
1508
1513
1509 if status_only:
1514 if status_only:
1510 return content
1515 return content
1511
1516
1512 failures = []
1517 failures = []
1513 # load cached results into result:
1518 # load cached results into result:
1514 content.update(local_results)
1519 content.update(local_results)
1515
1520
1516 # update cache with results:
1521 # update cache with results:
1517 for msg_id in sorted(theids):
1522 for msg_id in sorted(theids):
1518 if msg_id in content['completed']:
1523 if msg_id in content['completed']:
1519 rec = content[msg_id]
1524 rec = content[msg_id]
1520 parent = rec['header']
1525 parent = rec['header']
1521 header = rec['result_header']
1526 header = rec['result_header']
1522 rcontent = rec['result_content']
1527 rcontent = rec['result_content']
1523 iodict = rec['io']
1528 iodict = rec['io']
1524 if isinstance(rcontent, str):
1529 if isinstance(rcontent, str):
1525 rcontent = self.session.unpack(rcontent)
1530 rcontent = self.session.unpack(rcontent)
1526
1531
1527 md = self.metadata[msg_id]
1532 md = self.metadata[msg_id]
1528 md_msg = dict(
1533 md_msg = dict(
1529 content=rcontent,
1534 content=rcontent,
1530 parent_header=parent,
1535 parent_header=parent,
1531 header=header,
1536 header=header,
1532 metadata=rec['result_metadata'],
1537 metadata=rec['result_metadata'],
1533 )
1538 )
1534 md.update(self._extract_metadata(md_msg))
1539 md.update(self._extract_metadata(md_msg))
1535 if rec.get('received'):
1540 if rec.get('received'):
1536 md['received'] = rec['received']
1541 md['received'] = rec['received']
1537 md.update(iodict)
1542 md.update(iodict)
1538
1543
1539 if rcontent['status'] == 'ok':
1544 if rcontent['status'] == 'ok':
1540 if header['msg_type'] == 'apply_reply':
1545 if header['msg_type'] == 'apply_reply':
1541 res,buffers = util.unserialize_object(buffers)
1546 res,buffers = serialize.unserialize_object(buffers)
1542 elif header['msg_type'] == 'execute_reply':
1547 elif header['msg_type'] == 'execute_reply':
1543 res = ExecuteReply(msg_id, rcontent, md)
1548 res = ExecuteReply(msg_id, rcontent, md)
1544 else:
1549 else:
1545 raise KeyError("unhandled msg type: %r" % header[msg_type])
1550 raise KeyError("unhandled msg type: %r" % header[msg_type])
1546 else:
1551 else:
1547 res = self._unwrap_exception(rcontent)
1552 res = self._unwrap_exception(rcontent)
1548 failures.append(res)
1553 failures.append(res)
1549
1554
1550 self.results[msg_id] = res
1555 self.results[msg_id] = res
1551 content[msg_id] = res
1556 content[msg_id] = res
1552
1557
1553 if len(theids) == 1 and failures:
1558 if len(theids) == 1 and failures:
1554 raise failures[0]
1559 raise failures[0]
1555
1560
1556 error.collect_exceptions(failures, "result_status")
1561 error.collect_exceptions(failures, "result_status")
1557 return content
1562 return content
1558
1563
1559 @spin_first
1564 @spin_first
1560 def queue_status(self, targets='all', verbose=False):
1565 def queue_status(self, targets='all', verbose=False):
1561 """Fetch the status of engine queues.
1566 """Fetch the status of engine queues.
1562
1567
1563 Parameters
1568 Parameters
1564 ----------
1569 ----------
1565
1570
1566 targets : int/str/list of ints/strs
1571 targets : int/str/list of ints/strs
1567 the engines whose states are to be queried.
1572 the engines whose states are to be queried.
1568 default : all
1573 default : all
1569 verbose : bool
1574 verbose : bool
1570 Whether to return lengths only, or lists of ids for each element
1575 Whether to return lengths only, or lists of ids for each element
1571 """
1576 """
1572 if targets == 'all':
1577 if targets == 'all':
1573 # allow 'all' to be evaluated on the engine
1578 # allow 'all' to be evaluated on the engine
1574 engine_ids = None
1579 engine_ids = None
1575 else:
1580 else:
1576 engine_ids = self._build_targets(targets)[1]
1581 engine_ids = self._build_targets(targets)[1]
1577 content = dict(targets=engine_ids, verbose=verbose)
1582 content = dict(targets=engine_ids, verbose=verbose)
1578 self.session.send(self._query_socket, "queue_request", content=content)
1583 self.session.send(self._query_socket, "queue_request", content=content)
1579 idents,msg = self.session.recv(self._query_socket, 0)
1584 idents,msg = self.session.recv(self._query_socket, 0)
1580 if self.debug:
1585 if self.debug:
1581 pprint(msg)
1586 pprint(msg)
1582 content = msg['content']
1587 content = msg['content']
1583 status = content.pop('status')
1588 status = content.pop('status')
1584 if status != 'ok':
1589 if status != 'ok':
1585 raise self._unwrap_exception(content)
1590 raise self._unwrap_exception(content)
1586 content = rekey(content)
1591 content = rekey(content)
1587 if isinstance(targets, int):
1592 if isinstance(targets, int):
1588 return content[targets]
1593 return content[targets]
1589 else:
1594 else:
1590 return content
1595 return content
1591
1596
1592 @spin_first
1597 @spin_first
1593 def purge_results(self, jobs=[], targets=[]):
1598 def purge_results(self, jobs=[], targets=[]):
1594 """Tell the Hub to forget results.
1599 """Tell the Hub to forget results.
1595
1600
1596 Individual results can be purged by msg_id, or the entire
1601 Individual results can be purged by msg_id, or the entire
1597 history of specific targets can be purged.
1602 history of specific targets can be purged.
1598
1603
1599 Use `purge_results('all')` to scrub everything from the Hub's db.
1604 Use `purge_results('all')` to scrub everything from the Hub's db.
1600
1605
1601 Parameters
1606 Parameters
1602 ----------
1607 ----------
1603
1608
1604 jobs : str or list of str or AsyncResult objects
1609 jobs : str or list of str or AsyncResult objects
1605 the msg_ids whose results should be forgotten.
1610 the msg_ids whose results should be forgotten.
1606 targets : int/str/list of ints/strs
1611 targets : int/str/list of ints/strs
1607 The targets, by int_id, whose entire history is to be purged.
1612 The targets, by int_id, whose entire history is to be purged.
1608
1613
1609 default : None
1614 default : None
1610 """
1615 """
1611 if not targets and not jobs:
1616 if not targets and not jobs:
1612 raise ValueError("Must specify at least one of `targets` and `jobs`")
1617 raise ValueError("Must specify at least one of `targets` and `jobs`")
1613 if targets:
1618 if targets:
1614 targets = self._build_targets(targets)[1]
1619 targets = self._build_targets(targets)[1]
1615
1620
1616 # construct msg_ids from jobs
1621 # construct msg_ids from jobs
1617 if jobs == 'all':
1622 if jobs == 'all':
1618 msg_ids = jobs
1623 msg_ids = jobs
1619 else:
1624 else:
1620 msg_ids = []
1625 msg_ids = []
1621 if isinstance(jobs, (basestring,AsyncResult)):
1626 if isinstance(jobs, (basestring,AsyncResult)):
1622 jobs = [jobs]
1627 jobs = [jobs]
1623 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1628 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1624 if bad_ids:
1629 if bad_ids:
1625 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1630 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1626 for j in jobs:
1631 for j in jobs:
1627 if isinstance(j, AsyncResult):
1632 if isinstance(j, AsyncResult):
1628 msg_ids.extend(j.msg_ids)
1633 msg_ids.extend(j.msg_ids)
1629 else:
1634 else:
1630 msg_ids.append(j)
1635 msg_ids.append(j)
1631
1636
1632 content = dict(engine_ids=targets, msg_ids=msg_ids)
1637 content = dict(engine_ids=targets, msg_ids=msg_ids)
1633 self.session.send(self._query_socket, "purge_request", content=content)
1638 self.session.send(self._query_socket, "purge_request", content=content)
1634 idents, msg = self.session.recv(self._query_socket, 0)
1639 idents, msg = self.session.recv(self._query_socket, 0)
1635 if self.debug:
1640 if self.debug:
1636 pprint(msg)
1641 pprint(msg)
1637 content = msg['content']
1642 content = msg['content']
1638 if content['status'] != 'ok':
1643 if content['status'] != 'ok':
1639 raise self._unwrap_exception(content)
1644 raise self._unwrap_exception(content)
1640
1645
1641 @spin_first
1646 @spin_first
1642 def hub_history(self):
1647 def hub_history(self):
1643 """Get the Hub's history
1648 """Get the Hub's history
1644
1649
1645 Just like the Client, the Hub has a history, which is a list of msg_ids.
1650 Just like the Client, the Hub has a history, which is a list of msg_ids.
1646 This will contain the history of all clients, and, depending on configuration,
1651 This will contain the history of all clients, and, depending on configuration,
1647 may contain history across multiple cluster sessions.
1652 may contain history across multiple cluster sessions.
1648
1653
1649 Any msg_id returned here is a valid argument to `get_result`.
1654 Any msg_id returned here is a valid argument to `get_result`.
1650
1655
1651 Returns
1656 Returns
1652 -------
1657 -------
1653
1658
1654 msg_ids : list of strs
1659 msg_ids : list of strs
1655 list of all msg_ids, ordered by task submission time.
1660 list of all msg_ids, ordered by task submission time.
1656 """
1661 """
1657
1662
1658 self.session.send(self._query_socket, "history_request", content={})
1663 self.session.send(self._query_socket, "history_request", content={})
1659 idents, msg = self.session.recv(self._query_socket, 0)
1664 idents, msg = self.session.recv(self._query_socket, 0)
1660
1665
1661 if self.debug:
1666 if self.debug:
1662 pprint(msg)
1667 pprint(msg)
1663 content = msg['content']
1668 content = msg['content']
1664 if content['status'] != 'ok':
1669 if content['status'] != 'ok':
1665 raise self._unwrap_exception(content)
1670 raise self._unwrap_exception(content)
1666 else:
1671 else:
1667 return content['history']
1672 return content['history']
1668
1673
1669 @spin_first
1674 @spin_first
1670 def db_query(self, query, keys=None):
1675 def db_query(self, query, keys=None):
1671 """Query the Hub's TaskRecord database
1676 """Query the Hub's TaskRecord database
1672
1677
1673 This will return a list of task record dicts that match `query`
1678 This will return a list of task record dicts that match `query`
1674
1679
1675 Parameters
1680 Parameters
1676 ----------
1681 ----------
1677
1682
1678 query : mongodb query dict
1683 query : mongodb query dict
1679 The search dict. See mongodb query docs for details.
1684 The search dict. See mongodb query docs for details.
1680 keys : list of strs [optional]
1685 keys : list of strs [optional]
1681 The subset of keys to be returned. The default is to fetch everything but buffers.
1686 The subset of keys to be returned. The default is to fetch everything but buffers.
1682 'msg_id' will *always* be included.
1687 'msg_id' will *always* be included.
1683 """
1688 """
1684 if isinstance(keys, basestring):
1689 if isinstance(keys, basestring):
1685 keys = [keys]
1690 keys = [keys]
1686 content = dict(query=query, keys=keys)
1691 content = dict(query=query, keys=keys)
1687 self.session.send(self._query_socket, "db_request", content=content)
1692 self.session.send(self._query_socket, "db_request", content=content)
1688 idents, msg = self.session.recv(self._query_socket, 0)
1693 idents, msg = self.session.recv(self._query_socket, 0)
1689 if self.debug:
1694 if self.debug:
1690 pprint(msg)
1695 pprint(msg)
1691 content = msg['content']
1696 content = msg['content']
1692 if content['status'] != 'ok':
1697 if content['status'] != 'ok':
1693 raise self._unwrap_exception(content)
1698 raise self._unwrap_exception(content)
1694
1699
1695 records = content['records']
1700 records = content['records']
1696
1701
1697 buffer_lens = content['buffer_lens']
1702 buffer_lens = content['buffer_lens']
1698 result_buffer_lens = content['result_buffer_lens']
1703 result_buffer_lens = content['result_buffer_lens']
1699 buffers = msg['buffers']
1704 buffers = msg['buffers']
1700 has_bufs = buffer_lens is not None
1705 has_bufs = buffer_lens is not None
1701 has_rbufs = result_buffer_lens is not None
1706 has_rbufs = result_buffer_lens is not None
1702 for i,rec in enumerate(records):
1707 for i,rec in enumerate(records):
1703 # relink buffers
1708 # relink buffers
1704 if has_bufs:
1709 if has_bufs:
1705 blen = buffer_lens[i]
1710 blen = buffer_lens[i]
1706 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1711 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1707 if has_rbufs:
1712 if has_rbufs:
1708 blen = result_buffer_lens[i]
1713 blen = result_buffer_lens[i]
1709 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1714 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1710
1715
1711 return records
1716 return records
1712
1717
1713 __all__ = [ 'Client' ]
1718 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now