##// END OF EJS Templates
Raise better IOError if a connection file is not found...
MinRK -
Show More
@@ -1,1870 +1,1875 b''
1 """A semi-synchronous Client for IPython parallel"""
1 """A semi-synchronous Client for IPython parallel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import print_function
6 from __future__ import print_function
7
7
8 import os
8 import os
9 import json
9 import json
10 import sys
10 import sys
11 from threading import Thread, Event
11 from threading import Thread, Event
12 import time
12 import time
13 import warnings
13 import warnings
14 from datetime import datetime
14 from datetime import datetime
15 from getpass import getpass
15 from getpass import getpass
16 from pprint import pprint
16 from pprint import pprint
17
17
18 pjoin = os.path.join
18 pjoin = os.path.join
19
19
20 import zmq
20 import zmq
21
21
22 from IPython.config.configurable import MultipleInstanceError
22 from IPython.config.configurable import MultipleInstanceError
23 from IPython.core.application import BaseIPythonApplication
23 from IPython.core.application import BaseIPythonApplication
24 from IPython.core.profiledir import ProfileDir, ProfileDirError
24 from IPython.core.profiledir import ProfileDir, ProfileDirError
25
25
26 from IPython.utils.capture import RichOutput
26 from IPython.utils.capture import RichOutput
27 from IPython.utils.coloransi import TermColors
27 from IPython.utils.coloransi import TermColors
28 from IPython.utils.jsonutil import rekey, extract_dates, parse_date
28 from IPython.utils.jsonutil import rekey, extract_dates, parse_date
29 from IPython.utils.localinterfaces import localhost, is_local_ip
29 from IPython.utils.localinterfaces import localhost, is_local_ip
30 from IPython.utils.path import get_ipython_dir
30 from IPython.utils.path import get_ipython_dir, compress_user
31 from IPython.utils.py3compat import cast_bytes, string_types, xrange, iteritems
31 from IPython.utils.py3compat import cast_bytes, string_types, xrange, iteritems
32 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
32 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
33 Dict, List, Bool, Set, Any)
33 Dict, List, Bool, Set, Any)
34 from IPython.external.decorator import decorator
34 from IPython.external.decorator import decorator
35
35
36 from IPython.parallel import Reference
36 from IPython.parallel import Reference
37 from IPython.parallel import error
37 from IPython.parallel import error
38 from IPython.parallel import util
38 from IPython.parallel import util
39
39
40 from IPython.kernel.zmq.session import Session, Message
40 from IPython.kernel.zmq.session import Session, Message
41 from IPython.kernel.zmq import serialize
41 from IPython.kernel.zmq import serialize
42
42
43 from .asyncresult import AsyncResult, AsyncHubResult
43 from .asyncresult import AsyncResult, AsyncHubResult
44 from .view import DirectView, LoadBalancedView
44 from .view import DirectView, LoadBalancedView
45
45
46 #--------------------------------------------------------------------------
46 #--------------------------------------------------------------------------
47 # Decorators for Client methods
47 # Decorators for Client methods
48 #--------------------------------------------------------------------------
48 #--------------------------------------------------------------------------
49
49
50 @decorator
50 @decorator
51 def spin_first(f, self, *args, **kwargs):
51 def spin_first(f, self, *args, **kwargs):
52 """Call spin() to sync state prior to calling the method."""
52 """Call spin() to sync state prior to calling the method."""
53 self.spin()
53 self.spin()
54 return f(self, *args, **kwargs)
54 return f(self, *args, **kwargs)
55
55
56
56
57 #--------------------------------------------------------------------------
57 #--------------------------------------------------------------------------
58 # Classes
58 # Classes
59 #--------------------------------------------------------------------------
59 #--------------------------------------------------------------------------
60
60
61
61
62 class ExecuteReply(RichOutput):
62 class ExecuteReply(RichOutput):
63 """wrapper for finished Execute results"""
63 """wrapper for finished Execute results"""
64 def __init__(self, msg_id, content, metadata):
64 def __init__(self, msg_id, content, metadata):
65 self.msg_id = msg_id
65 self.msg_id = msg_id
66 self._content = content
66 self._content = content
67 self.execution_count = content['execution_count']
67 self.execution_count = content['execution_count']
68 self.metadata = metadata
68 self.metadata = metadata
69
69
70 # RichOutput overrides
70 # RichOutput overrides
71
71
72 @property
72 @property
73 def source(self):
73 def source(self):
74 execute_result = self.metadata['execute_result']
74 execute_result = self.metadata['execute_result']
75 if execute_result:
75 if execute_result:
76 return execute_result.get('source', '')
76 return execute_result.get('source', '')
77
77
78 @property
78 @property
79 def data(self):
79 def data(self):
80 execute_result = self.metadata['execute_result']
80 execute_result = self.metadata['execute_result']
81 if execute_result:
81 if execute_result:
82 return execute_result.get('data', {})
82 return execute_result.get('data', {})
83
83
84 @property
84 @property
85 def _metadata(self):
85 def _metadata(self):
86 execute_result = self.metadata['execute_result']
86 execute_result = self.metadata['execute_result']
87 if execute_result:
87 if execute_result:
88 return execute_result.get('metadata', {})
88 return execute_result.get('metadata', {})
89
89
90 def display(self):
90 def display(self):
91 from IPython.display import publish_display_data
91 from IPython.display import publish_display_data
92 publish_display_data(self.data, self.metadata)
92 publish_display_data(self.data, self.metadata)
93
93
94 def _repr_mime_(self, mime):
94 def _repr_mime_(self, mime):
95 if mime not in self.data:
95 if mime not in self.data:
96 return
96 return
97 data = self.data[mime]
97 data = self.data[mime]
98 if mime in self._metadata:
98 if mime in self._metadata:
99 return data, self._metadata[mime]
99 return data, self._metadata[mime]
100 else:
100 else:
101 return data
101 return data
102
102
103 def __getitem__(self, key):
103 def __getitem__(self, key):
104 return self.metadata[key]
104 return self.metadata[key]
105
105
106 def __getattr__(self, key):
106 def __getattr__(self, key):
107 if key not in self.metadata:
107 if key not in self.metadata:
108 raise AttributeError(key)
108 raise AttributeError(key)
109 return self.metadata[key]
109 return self.metadata[key]
110
110
111 def __repr__(self):
111 def __repr__(self):
112 execute_result = self.metadata['execute_result'] or {'data':{}}
112 execute_result = self.metadata['execute_result'] or {'data':{}}
113 text_out = execute_result['data'].get('text/plain', '')
113 text_out = execute_result['data'].get('text/plain', '')
114 if len(text_out) > 32:
114 if len(text_out) > 32:
115 text_out = text_out[:29] + '...'
115 text_out = text_out[:29] + '...'
116
116
117 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
117 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
118
118
119 def _repr_pretty_(self, p, cycle):
119 def _repr_pretty_(self, p, cycle):
120 execute_result = self.metadata['execute_result'] or {'data':{}}
120 execute_result = self.metadata['execute_result'] or {'data':{}}
121 text_out = execute_result['data'].get('text/plain', '')
121 text_out = execute_result['data'].get('text/plain', '')
122
122
123 if not text_out:
123 if not text_out:
124 return
124 return
125
125
126 try:
126 try:
127 ip = get_ipython()
127 ip = get_ipython()
128 except NameError:
128 except NameError:
129 colors = "NoColor"
129 colors = "NoColor"
130 else:
130 else:
131 colors = ip.colors
131 colors = ip.colors
132
132
133 if colors == "NoColor":
133 if colors == "NoColor":
134 out = normal = ""
134 out = normal = ""
135 else:
135 else:
136 out = TermColors.Red
136 out = TermColors.Red
137 normal = TermColors.Normal
137 normal = TermColors.Normal
138
138
139 if '\n' in text_out and not text_out.startswith('\n'):
139 if '\n' in text_out and not text_out.startswith('\n'):
140 # add newline for multiline reprs
140 # add newline for multiline reprs
141 text_out = '\n' + text_out
141 text_out = '\n' + text_out
142
142
143 p.text(
143 p.text(
144 out + u'Out[%i:%i]: ' % (
144 out + u'Out[%i:%i]: ' % (
145 self.metadata['engine_id'], self.execution_count
145 self.metadata['engine_id'], self.execution_count
146 ) + normal + text_out
146 ) + normal + text_out
147 )
147 )
148
148
149
149
150 class Metadata(dict):
150 class Metadata(dict):
151 """Subclass of dict for initializing metadata values.
151 """Subclass of dict for initializing metadata values.
152
152
153 Attribute access works on keys.
153 Attribute access works on keys.
154
154
155 These objects have a strict set of keys - errors will raise if you try
155 These objects have a strict set of keys - errors will raise if you try
156 to add new keys.
156 to add new keys.
157 """
157 """
158 def __init__(self, *args, **kwargs):
158 def __init__(self, *args, **kwargs):
159 dict.__init__(self)
159 dict.__init__(self)
160 md = {'msg_id' : None,
160 md = {'msg_id' : None,
161 'submitted' : None,
161 'submitted' : None,
162 'started' : None,
162 'started' : None,
163 'completed' : None,
163 'completed' : None,
164 'received' : None,
164 'received' : None,
165 'engine_uuid' : None,
165 'engine_uuid' : None,
166 'engine_id' : None,
166 'engine_id' : None,
167 'follow' : None,
167 'follow' : None,
168 'after' : None,
168 'after' : None,
169 'status' : None,
169 'status' : None,
170
170
171 'execute_input' : None,
171 'execute_input' : None,
172 'execute_result' : None,
172 'execute_result' : None,
173 'error' : None,
173 'error' : None,
174 'stdout' : '',
174 'stdout' : '',
175 'stderr' : '',
175 'stderr' : '',
176 'outputs' : [],
176 'outputs' : [],
177 'data': {},
177 'data': {},
178 'outputs_ready' : False,
178 'outputs_ready' : False,
179 }
179 }
180 self.update(md)
180 self.update(md)
181 self.update(dict(*args, **kwargs))
181 self.update(dict(*args, **kwargs))
182
182
183 def __getattr__(self, key):
183 def __getattr__(self, key):
184 """getattr aliased to getitem"""
184 """getattr aliased to getitem"""
185 if key in self:
185 if key in self:
186 return self[key]
186 return self[key]
187 else:
187 else:
188 raise AttributeError(key)
188 raise AttributeError(key)
189
189
190 def __setattr__(self, key, value):
190 def __setattr__(self, key, value):
191 """setattr aliased to setitem, with strict"""
191 """setattr aliased to setitem, with strict"""
192 if key in self:
192 if key in self:
193 self[key] = value
193 self[key] = value
194 else:
194 else:
195 raise AttributeError(key)
195 raise AttributeError(key)
196
196
197 def __setitem__(self, key, value):
197 def __setitem__(self, key, value):
198 """strict static key enforcement"""
198 """strict static key enforcement"""
199 if key in self:
199 if key in self:
200 dict.__setitem__(self, key, value)
200 dict.__setitem__(self, key, value)
201 else:
201 else:
202 raise KeyError(key)
202 raise KeyError(key)
203
203
204
204
205 class Client(HasTraits):
205 class Client(HasTraits):
206 """A semi-synchronous client to the IPython ZMQ cluster
206 """A semi-synchronous client to the IPython ZMQ cluster
207
207
208 Parameters
208 Parameters
209 ----------
209 ----------
210
210
211 url_file : str/unicode; path to ipcontroller-client.json
211 url_file : str/unicode; path to ipcontroller-client.json
212 This JSON file should contain all the information needed to connect to a cluster,
212 This JSON file should contain all the information needed to connect to a cluster,
213 and is likely the only argument needed.
213 and is likely the only argument needed.
214 Connection information for the Hub's registration. If a json connector
214 Connection information for the Hub's registration. If a json connector
215 file is given, then likely no further configuration is necessary.
215 file is given, then likely no further configuration is necessary.
216 [Default: use profile]
216 [Default: use profile]
217 profile : bytes
217 profile : bytes
218 The name of the Cluster profile to be used to find connector information.
218 The name of the Cluster profile to be used to find connector information.
219 If run from an IPython application, the default profile will be the same
219 If run from an IPython application, the default profile will be the same
220 as the running application, otherwise it will be 'default'.
220 as the running application, otherwise it will be 'default'.
221 cluster_id : str
221 cluster_id : str
222 String id to added to runtime files, to prevent name collisions when using
222 String id to added to runtime files, to prevent name collisions when using
223 multiple clusters with a single profile simultaneously.
223 multiple clusters with a single profile simultaneously.
224 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
224 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
225 Since this is text inserted into filenames, typical recommendations apply:
225 Since this is text inserted into filenames, typical recommendations apply:
226 Simple character strings are ideal, and spaces are not recommended (but
226 Simple character strings are ideal, and spaces are not recommended (but
227 should generally work)
227 should generally work)
228 context : zmq.Context
228 context : zmq.Context
229 Pass an existing zmq.Context instance, otherwise the client will create its own.
229 Pass an existing zmq.Context instance, otherwise the client will create its own.
230 debug : bool
230 debug : bool
231 flag for lots of message printing for debug purposes
231 flag for lots of message printing for debug purposes
232 timeout : int/float
232 timeout : int/float
233 time (in seconds) to wait for connection replies from the Hub
233 time (in seconds) to wait for connection replies from the Hub
234 [Default: 10]
234 [Default: 10]
235
235
236 #-------------- session related args ----------------
236 #-------------- session related args ----------------
237
237
238 config : Config object
238 config : Config object
239 If specified, this will be relayed to the Session for configuration
239 If specified, this will be relayed to the Session for configuration
240 username : str
240 username : str
241 set username for the session object
241 set username for the session object
242
242
243 #-------------- ssh related args ----------------
243 #-------------- ssh related args ----------------
244 # These are args for configuring the ssh tunnel to be used
244 # These are args for configuring the ssh tunnel to be used
245 # credentials are used to forward connections over ssh to the Controller
245 # credentials are used to forward connections over ssh to the Controller
246 # Note that the ip given in `addr` needs to be relative to sshserver
246 # Note that the ip given in `addr` needs to be relative to sshserver
247 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
247 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
248 # and set sshserver as the same machine the Controller is on. However,
248 # and set sshserver as the same machine the Controller is on. However,
249 # the only requirement is that sshserver is able to see the Controller
249 # the only requirement is that sshserver is able to see the Controller
250 # (i.e. is within the same trusted network).
250 # (i.e. is within the same trusted network).
251
251
252 sshserver : str
252 sshserver : str
253 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
253 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
254 If keyfile or password is specified, and this is not, it will default to
254 If keyfile or password is specified, and this is not, it will default to
255 the ip given in addr.
255 the ip given in addr.
256 sshkey : str; path to ssh private key file
256 sshkey : str; path to ssh private key file
257 This specifies a key to be used in ssh login, default None.
257 This specifies a key to be used in ssh login, default None.
258 Regular default ssh keys will be used without specifying this argument.
258 Regular default ssh keys will be used without specifying this argument.
259 password : str
259 password : str
260 Your ssh password to sshserver. Note that if this is left None,
260 Your ssh password to sshserver. Note that if this is left None,
261 you will be prompted for it if passwordless key based login is unavailable.
261 you will be prompted for it if passwordless key based login is unavailable.
262 paramiko : bool
262 paramiko : bool
263 flag for whether to use paramiko instead of shell ssh for tunneling.
263 flag for whether to use paramiko instead of shell ssh for tunneling.
264 [default: True on win32, False else]
264 [default: True on win32, False else]
265
265
266
266
267 Attributes
267 Attributes
268 ----------
268 ----------
269
269
270 ids : list of int engine IDs
270 ids : list of int engine IDs
271 requesting the ids attribute always synchronizes
271 requesting the ids attribute always synchronizes
272 the registration state. To request ids without synchronization,
272 the registration state. To request ids without synchronization,
273 use semi-private _ids attributes.
273 use semi-private _ids attributes.
274
274
275 history : list of msg_ids
275 history : list of msg_ids
276 a list of msg_ids, keeping track of all the execution
276 a list of msg_ids, keeping track of all the execution
277 messages you have submitted in order.
277 messages you have submitted in order.
278
278
279 outstanding : set of msg_ids
279 outstanding : set of msg_ids
280 a set of msg_ids that have been submitted, but whose
280 a set of msg_ids that have been submitted, but whose
281 results have not yet been received.
281 results have not yet been received.
282
282
283 results : dict
283 results : dict
284 a dict of all our results, keyed by msg_id
284 a dict of all our results, keyed by msg_id
285
285
286 block : bool
286 block : bool
287 determines default behavior when block not specified
287 determines default behavior when block not specified
288 in execution methods
288 in execution methods
289
289
290 Methods
290 Methods
291 -------
291 -------
292
292
293 spin
293 spin
294 flushes incoming results and registration state changes
294 flushes incoming results and registration state changes
295 control methods spin, and requesting `ids` also ensures up to date
295 control methods spin, and requesting `ids` also ensures up to date
296
296
297 wait
297 wait
298 wait on one or more msg_ids
298 wait on one or more msg_ids
299
299
300 execution methods
300 execution methods
301 apply
301 apply
302 legacy: execute, run
302 legacy: execute, run
303
303
304 data movement
304 data movement
305 push, pull, scatter, gather
305 push, pull, scatter, gather
306
306
307 query methods
307 query methods
308 queue_status, get_result, purge, result_status
308 queue_status, get_result, purge, result_status
309
309
310 control methods
310 control methods
311 abort, shutdown
311 abort, shutdown
312
312
313 """
313 """
314
314
315
315
316 block = Bool(False)
316 block = Bool(False)
317 outstanding = Set()
317 outstanding = Set()
318 results = Instance('collections.defaultdict', (dict,))
318 results = Instance('collections.defaultdict', (dict,))
319 metadata = Instance('collections.defaultdict', (Metadata,))
319 metadata = Instance('collections.defaultdict', (Metadata,))
320 history = List()
320 history = List()
321 debug = Bool(False)
321 debug = Bool(False)
322 _spin_thread = Any()
322 _spin_thread = Any()
323 _stop_spinning = Any()
323 _stop_spinning = Any()
324
324
325 profile=Unicode()
325 profile=Unicode()
326 def _profile_default(self):
326 def _profile_default(self):
327 if BaseIPythonApplication.initialized():
327 if BaseIPythonApplication.initialized():
328 # an IPython app *might* be running, try to get its profile
328 # an IPython app *might* be running, try to get its profile
329 try:
329 try:
330 return BaseIPythonApplication.instance().profile
330 return BaseIPythonApplication.instance().profile
331 except (AttributeError, MultipleInstanceError):
331 except (AttributeError, MultipleInstanceError):
332 # could be a *different* subclass of config.Application,
332 # could be a *different* subclass of config.Application,
333 # which would raise one of these two errors.
333 # which would raise one of these two errors.
334 return u'default'
334 return u'default'
335 else:
335 else:
336 return u'default'
336 return u'default'
337
337
338
338
339 _outstanding_dict = Instance('collections.defaultdict', (set,))
339 _outstanding_dict = Instance('collections.defaultdict', (set,))
340 _ids = List()
340 _ids = List()
341 _connected=Bool(False)
341 _connected=Bool(False)
342 _ssh=Bool(False)
342 _ssh=Bool(False)
343 _context = Instance('zmq.Context')
343 _context = Instance('zmq.Context')
344 _config = Dict()
344 _config = Dict()
345 _engines=Instance(util.ReverseDict, (), {})
345 _engines=Instance(util.ReverseDict, (), {})
346 # _hub_socket=Instance('zmq.Socket')
346 # _hub_socket=Instance('zmq.Socket')
347 _query_socket=Instance('zmq.Socket')
347 _query_socket=Instance('zmq.Socket')
348 _control_socket=Instance('zmq.Socket')
348 _control_socket=Instance('zmq.Socket')
349 _iopub_socket=Instance('zmq.Socket')
349 _iopub_socket=Instance('zmq.Socket')
350 _notification_socket=Instance('zmq.Socket')
350 _notification_socket=Instance('zmq.Socket')
351 _mux_socket=Instance('zmq.Socket')
351 _mux_socket=Instance('zmq.Socket')
352 _task_socket=Instance('zmq.Socket')
352 _task_socket=Instance('zmq.Socket')
353 _task_scheme=Unicode()
353 _task_scheme=Unicode()
354 _closed = False
354 _closed = False
355 _ignored_control_replies=Integer(0)
355 _ignored_control_replies=Integer(0)
356 _ignored_hub_replies=Integer(0)
356 _ignored_hub_replies=Integer(0)
357
357
358 def __new__(self, *args, **kw):
358 def __new__(self, *args, **kw):
359 # don't raise on positional args
359 # don't raise on positional args
360 return HasTraits.__new__(self, **kw)
360 return HasTraits.__new__(self, **kw)
361
361
362 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
362 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
363 context=None, debug=False,
363 context=None, debug=False,
364 sshserver=None, sshkey=None, password=None, paramiko=None,
364 sshserver=None, sshkey=None, password=None, paramiko=None,
365 timeout=10, cluster_id=None, **extra_args
365 timeout=10, cluster_id=None, **extra_args
366 ):
366 ):
367 if profile:
367 if profile:
368 super(Client, self).__init__(debug=debug, profile=profile)
368 super(Client, self).__init__(debug=debug, profile=profile)
369 else:
369 else:
370 super(Client, self).__init__(debug=debug)
370 super(Client, self).__init__(debug=debug)
371 if context is None:
371 if context is None:
372 context = zmq.Context.instance()
372 context = zmq.Context.instance()
373 self._context = context
373 self._context = context
374 self._stop_spinning = Event()
374 self._stop_spinning = Event()
375
375
376 if 'url_or_file' in extra_args:
376 if 'url_or_file' in extra_args:
377 url_file = extra_args['url_or_file']
377 url_file = extra_args['url_or_file']
378 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
378 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
379
379
380 if url_file and util.is_url(url_file):
380 if url_file and util.is_url(url_file):
381 raise ValueError("single urls cannot be specified, url-files must be used.")
381 raise ValueError("single urls cannot be specified, url-files must be used.")
382
382
383 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
383 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
384
384
385 if self._cd is not None:
385 if self._cd is not None:
386 if url_file is None:
386 if url_file is None:
387 if not cluster_id:
387 if not cluster_id:
388 client_json = 'ipcontroller-client.json'
388 client_json = 'ipcontroller-client.json'
389 else:
389 else:
390 client_json = 'ipcontroller-%s-client.json' % cluster_id
390 client_json = 'ipcontroller-%s-client.json' % cluster_id
391 url_file = pjoin(self._cd.security_dir, client_json)
391 url_file = pjoin(self._cd.security_dir, client_json)
392 if url_file is None:
392 if url_file is None:
393 raise ValueError(
393 raise ValueError(
394 "I can't find enough information to connect to a hub!"
394 "I can't find enough information to connect to a hub!"
395 " Please specify at least one of url_file or profile."
395 " Please specify at least one of url_file or profile."
396 )
396 )
397
397
398 if not os.path.exists(url_file):
399 raise IOError("Connection file %r not found. Is a controller running?" % \
400 compress_user(url_file)
401 )
402
398 with open(url_file) as f:
403 with open(url_file) as f:
399 cfg = json.load(f)
404 cfg = json.load(f)
400
405
401 self._task_scheme = cfg['task_scheme']
406 self._task_scheme = cfg['task_scheme']
402
407
403 # sync defaults from args, json:
408 # sync defaults from args, json:
404 if sshserver:
409 if sshserver:
405 cfg['ssh'] = sshserver
410 cfg['ssh'] = sshserver
406
411
407 location = cfg.setdefault('location', None)
412 location = cfg.setdefault('location', None)
408
413
409 proto,addr = cfg['interface'].split('://')
414 proto,addr = cfg['interface'].split('://')
410 addr = util.disambiguate_ip_address(addr, location)
415 addr = util.disambiguate_ip_address(addr, location)
411 cfg['interface'] = "%s://%s" % (proto, addr)
416 cfg['interface'] = "%s://%s" % (proto, addr)
412
417
413 # turn interface,port into full urls:
418 # turn interface,port into full urls:
414 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
419 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
415 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
420 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
416
421
417 url = cfg['registration']
422 url = cfg['registration']
418
423
419 if location is not None and addr == localhost():
424 if location is not None and addr == localhost():
420 # location specified, and connection is expected to be local
425 # location specified, and connection is expected to be local
421 if not is_local_ip(location) and not sshserver:
426 if not is_local_ip(location) and not sshserver:
422 # load ssh from JSON *only* if the controller is not on
427 # load ssh from JSON *only* if the controller is not on
423 # this machine
428 # this machine
424 sshserver=cfg['ssh']
429 sshserver=cfg['ssh']
425 if not is_local_ip(location) and not sshserver:
430 if not is_local_ip(location) and not sshserver:
426 # warn if no ssh specified, but SSH is probably needed
431 # warn if no ssh specified, but SSH is probably needed
427 # This is only a warning, because the most likely cause
432 # This is only a warning, because the most likely cause
428 # is a local Controller on a laptop whose IP is dynamic
433 # is a local Controller on a laptop whose IP is dynamic
429 warnings.warn("""
434 warnings.warn("""
430 Controller appears to be listening on localhost, but not on this machine.
435 Controller appears to be listening on localhost, but not on this machine.
431 If this is true, you should specify Client(...,sshserver='you@%s')
436 If this is true, you should specify Client(...,sshserver='you@%s')
432 or instruct your controller to listen on an external IP."""%location,
437 or instruct your controller to listen on an external IP."""%location,
433 RuntimeWarning)
438 RuntimeWarning)
434 elif not sshserver:
439 elif not sshserver:
435 # otherwise sync with cfg
440 # otherwise sync with cfg
436 sshserver = cfg['ssh']
441 sshserver = cfg['ssh']
437
442
438 self._config = cfg
443 self._config = cfg
439
444
440 self._ssh = bool(sshserver or sshkey or password)
445 self._ssh = bool(sshserver or sshkey or password)
441 if self._ssh and sshserver is None:
446 if self._ssh and sshserver is None:
442 # default to ssh via localhost
447 # default to ssh via localhost
443 sshserver = addr
448 sshserver = addr
444 if self._ssh and password is None:
449 if self._ssh and password is None:
445 from zmq.ssh import tunnel
450 from zmq.ssh import tunnel
446 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
451 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
447 password=False
452 password=False
448 else:
453 else:
449 password = getpass("SSH Password for %s: "%sshserver)
454 password = getpass("SSH Password for %s: "%sshserver)
450 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
455 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
451
456
452 # configure and construct the session
457 # configure and construct the session
453 try:
458 try:
454 extra_args['packer'] = cfg['pack']
459 extra_args['packer'] = cfg['pack']
455 extra_args['unpacker'] = cfg['unpack']
460 extra_args['unpacker'] = cfg['unpack']
456 extra_args['key'] = cast_bytes(cfg['key'])
461 extra_args['key'] = cast_bytes(cfg['key'])
457 extra_args['signature_scheme'] = cfg['signature_scheme']
462 extra_args['signature_scheme'] = cfg['signature_scheme']
458 except KeyError as exc:
463 except KeyError as exc:
459 msg = '\n'.join([
464 msg = '\n'.join([
460 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
465 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
461 "If you are reusing connection files, remove them and start ipcontroller again."
466 "If you are reusing connection files, remove them and start ipcontroller again."
462 ])
467 ])
463 raise ValueError(msg.format(exc.message))
468 raise ValueError(msg.format(exc.message))
464
469
465 self.session = Session(**extra_args)
470 self.session = Session(**extra_args)
466
471
467 self._query_socket = self._context.socket(zmq.DEALER)
472 self._query_socket = self._context.socket(zmq.DEALER)
468
473
469 if self._ssh:
474 if self._ssh:
470 from zmq.ssh import tunnel
475 from zmq.ssh import tunnel
471 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
476 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
472 else:
477 else:
473 self._query_socket.connect(cfg['registration'])
478 self._query_socket.connect(cfg['registration'])
474
479
475 self.session.debug = self.debug
480 self.session.debug = self.debug
476
481
477 self._notification_handlers = {'registration_notification' : self._register_engine,
482 self._notification_handlers = {'registration_notification' : self._register_engine,
478 'unregistration_notification' : self._unregister_engine,
483 'unregistration_notification' : self._unregister_engine,
479 'shutdown_notification' : lambda msg: self.close(),
484 'shutdown_notification' : lambda msg: self.close(),
480 }
485 }
481 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
486 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
482 'apply_reply' : self._handle_apply_reply}
487 'apply_reply' : self._handle_apply_reply}
483
488
484 try:
489 try:
485 self._connect(sshserver, ssh_kwargs, timeout)
490 self._connect(sshserver, ssh_kwargs, timeout)
486 except:
491 except:
487 self.close(linger=0)
492 self.close(linger=0)
488 raise
493 raise
489
494
490 # last step: setup magics, if we are in IPython:
495 # last step: setup magics, if we are in IPython:
491
496
492 try:
497 try:
493 ip = get_ipython()
498 ip = get_ipython()
494 except NameError:
499 except NameError:
495 return
500 return
496 else:
501 else:
497 if 'px' not in ip.magics_manager.magics:
502 if 'px' not in ip.magics_manager.magics:
498 # in IPython but we are the first Client.
503 # in IPython but we are the first Client.
499 # activate a default view for parallel magics.
504 # activate a default view for parallel magics.
500 self.activate()
505 self.activate()
501
506
502 def __del__(self):
507 def __del__(self):
503 """cleanup sockets, but _not_ context."""
508 """cleanup sockets, but _not_ context."""
504 self.close()
509 self.close()
505
510
506 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
511 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
507 if ipython_dir is None:
512 if ipython_dir is None:
508 ipython_dir = get_ipython_dir()
513 ipython_dir = get_ipython_dir()
509 if profile_dir is not None:
514 if profile_dir is not None:
510 try:
515 try:
511 self._cd = ProfileDir.find_profile_dir(profile_dir)
516 self._cd = ProfileDir.find_profile_dir(profile_dir)
512 return
517 return
513 except ProfileDirError:
518 except ProfileDirError:
514 pass
519 pass
515 elif profile is not None:
520 elif profile is not None:
516 try:
521 try:
517 self._cd = ProfileDir.find_profile_dir_by_name(
522 self._cd = ProfileDir.find_profile_dir_by_name(
518 ipython_dir, profile)
523 ipython_dir, profile)
519 return
524 return
520 except ProfileDirError:
525 except ProfileDirError:
521 pass
526 pass
522 self._cd = None
527 self._cd = None
523
528
524 def _update_engines(self, engines):
529 def _update_engines(self, engines):
525 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
530 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
526 for k,v in iteritems(engines):
531 for k,v in iteritems(engines):
527 eid = int(k)
532 eid = int(k)
528 if eid not in self._engines:
533 if eid not in self._engines:
529 self._ids.append(eid)
534 self._ids.append(eid)
530 self._engines[eid] = v
535 self._engines[eid] = v
531 self._ids = sorted(self._ids)
536 self._ids = sorted(self._ids)
532 if sorted(self._engines.keys()) != list(range(len(self._engines))) and \
537 if sorted(self._engines.keys()) != list(range(len(self._engines))) and \
533 self._task_scheme == 'pure' and self._task_socket:
538 self._task_scheme == 'pure' and self._task_socket:
534 self._stop_scheduling_tasks()
539 self._stop_scheduling_tasks()
535
540
536 def _stop_scheduling_tasks(self):
541 def _stop_scheduling_tasks(self):
537 """Stop scheduling tasks because an engine has been unregistered
542 """Stop scheduling tasks because an engine has been unregistered
538 from a pure ZMQ scheduler.
543 from a pure ZMQ scheduler.
539 """
544 """
540 self._task_socket.close()
545 self._task_socket.close()
541 self._task_socket = None
546 self._task_socket = None
542 msg = "An engine has been unregistered, and we are using pure " +\
547 msg = "An engine has been unregistered, and we are using pure " +\
543 "ZMQ task scheduling. Task farming will be disabled."
548 "ZMQ task scheduling. Task farming will be disabled."
544 if self.outstanding:
549 if self.outstanding:
545 msg += " If you were running tasks when this happened, " +\
550 msg += " If you were running tasks when this happened, " +\
546 "some `outstanding` msg_ids may never resolve."
551 "some `outstanding` msg_ids may never resolve."
547 warnings.warn(msg, RuntimeWarning)
552 warnings.warn(msg, RuntimeWarning)
548
553
549 def _build_targets(self, targets):
554 def _build_targets(self, targets):
550 """Turn valid target IDs or 'all' into two lists:
555 """Turn valid target IDs or 'all' into two lists:
551 (int_ids, uuids).
556 (int_ids, uuids).
552 """
557 """
553 if not self._ids:
558 if not self._ids:
554 # flush notification socket if no engines yet, just in case
559 # flush notification socket if no engines yet, just in case
555 if not self.ids:
560 if not self.ids:
556 raise error.NoEnginesRegistered("Can't build targets without any engines")
561 raise error.NoEnginesRegistered("Can't build targets without any engines")
557
562
558 if targets is None:
563 if targets is None:
559 targets = self._ids
564 targets = self._ids
560 elif isinstance(targets, string_types):
565 elif isinstance(targets, string_types):
561 if targets.lower() == 'all':
566 if targets.lower() == 'all':
562 targets = self._ids
567 targets = self._ids
563 else:
568 else:
564 raise TypeError("%r not valid str target, must be 'all'"%(targets))
569 raise TypeError("%r not valid str target, must be 'all'"%(targets))
565 elif isinstance(targets, int):
570 elif isinstance(targets, int):
566 if targets < 0:
571 if targets < 0:
567 targets = self.ids[targets]
572 targets = self.ids[targets]
568 if targets not in self._ids:
573 if targets not in self._ids:
569 raise IndexError("No such engine: %i"%targets)
574 raise IndexError("No such engine: %i"%targets)
570 targets = [targets]
575 targets = [targets]
571
576
572 if isinstance(targets, slice):
577 if isinstance(targets, slice):
573 indices = list(range(len(self._ids))[targets])
578 indices = list(range(len(self._ids))[targets])
574 ids = self.ids
579 ids = self.ids
575 targets = [ ids[i] for i in indices ]
580 targets = [ ids[i] for i in indices ]
576
581
577 if not isinstance(targets, (tuple, list, xrange)):
582 if not isinstance(targets, (tuple, list, xrange)):
578 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
583 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
579
584
580 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
585 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
581
586
582 def _connect(self, sshserver, ssh_kwargs, timeout):
587 def _connect(self, sshserver, ssh_kwargs, timeout):
583 """setup all our socket connections to the cluster. This is called from
588 """setup all our socket connections to the cluster. This is called from
584 __init__."""
589 __init__."""
585
590
586 # Maybe allow reconnecting?
591 # Maybe allow reconnecting?
587 if self._connected:
592 if self._connected:
588 return
593 return
589 self._connected=True
594 self._connected=True
590
595
591 def connect_socket(s, url):
596 def connect_socket(s, url):
592 if self._ssh:
597 if self._ssh:
593 from zmq.ssh import tunnel
598 from zmq.ssh import tunnel
594 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
599 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
595 else:
600 else:
596 return s.connect(url)
601 return s.connect(url)
597
602
598 self.session.send(self._query_socket, 'connection_request')
603 self.session.send(self._query_socket, 'connection_request')
599 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
604 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
600 poller = zmq.Poller()
605 poller = zmq.Poller()
601 poller.register(self._query_socket, zmq.POLLIN)
606 poller.register(self._query_socket, zmq.POLLIN)
602 # poll expects milliseconds, timeout is seconds
607 # poll expects milliseconds, timeout is seconds
603 evts = poller.poll(timeout*1000)
608 evts = poller.poll(timeout*1000)
604 if not evts:
609 if not evts:
605 raise error.TimeoutError("Hub connection request timed out")
610 raise error.TimeoutError("Hub connection request timed out")
606 idents,msg = self.session.recv(self._query_socket,mode=0)
611 idents,msg = self.session.recv(self._query_socket,mode=0)
607 if self.debug:
612 if self.debug:
608 pprint(msg)
613 pprint(msg)
609 content = msg['content']
614 content = msg['content']
610 # self._config['registration'] = dict(content)
615 # self._config['registration'] = dict(content)
611 cfg = self._config
616 cfg = self._config
612 if content['status'] == 'ok':
617 if content['status'] == 'ok':
613 self._mux_socket = self._context.socket(zmq.DEALER)
618 self._mux_socket = self._context.socket(zmq.DEALER)
614 connect_socket(self._mux_socket, cfg['mux'])
619 connect_socket(self._mux_socket, cfg['mux'])
615
620
616 self._task_socket = self._context.socket(zmq.DEALER)
621 self._task_socket = self._context.socket(zmq.DEALER)
617 connect_socket(self._task_socket, cfg['task'])
622 connect_socket(self._task_socket, cfg['task'])
618
623
619 self._notification_socket = self._context.socket(zmq.SUB)
624 self._notification_socket = self._context.socket(zmq.SUB)
620 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
625 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
621 connect_socket(self._notification_socket, cfg['notification'])
626 connect_socket(self._notification_socket, cfg['notification'])
622
627
623 self._control_socket = self._context.socket(zmq.DEALER)
628 self._control_socket = self._context.socket(zmq.DEALER)
624 connect_socket(self._control_socket, cfg['control'])
629 connect_socket(self._control_socket, cfg['control'])
625
630
626 self._iopub_socket = self._context.socket(zmq.SUB)
631 self._iopub_socket = self._context.socket(zmq.SUB)
627 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
632 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
628 connect_socket(self._iopub_socket, cfg['iopub'])
633 connect_socket(self._iopub_socket, cfg['iopub'])
629
634
630 self._update_engines(dict(content['engines']))
635 self._update_engines(dict(content['engines']))
631 else:
636 else:
632 self._connected = False
637 self._connected = False
633 raise Exception("Failed to connect!")
638 raise Exception("Failed to connect!")
634
639
635 #--------------------------------------------------------------------------
640 #--------------------------------------------------------------------------
636 # handlers and callbacks for incoming messages
641 # handlers and callbacks for incoming messages
637 #--------------------------------------------------------------------------
642 #--------------------------------------------------------------------------
638
643
639 def _unwrap_exception(self, content):
644 def _unwrap_exception(self, content):
640 """unwrap exception, and remap engine_id to int."""
645 """unwrap exception, and remap engine_id to int."""
641 e = error.unwrap_exception(content)
646 e = error.unwrap_exception(content)
642 # print e.traceback
647 # print e.traceback
643 if e.engine_info:
648 if e.engine_info:
644 e_uuid = e.engine_info['engine_uuid']
649 e_uuid = e.engine_info['engine_uuid']
645 eid = self._engines[e_uuid]
650 eid = self._engines[e_uuid]
646 e.engine_info['engine_id'] = eid
651 e.engine_info['engine_id'] = eid
647 return e
652 return e
648
653
649 def _extract_metadata(self, msg):
654 def _extract_metadata(self, msg):
650 header = msg['header']
655 header = msg['header']
651 parent = msg['parent_header']
656 parent = msg['parent_header']
652 msg_meta = msg['metadata']
657 msg_meta = msg['metadata']
653 content = msg['content']
658 content = msg['content']
654 md = {'msg_id' : parent['msg_id'],
659 md = {'msg_id' : parent['msg_id'],
655 'received' : datetime.now(),
660 'received' : datetime.now(),
656 'engine_uuid' : msg_meta.get('engine', None),
661 'engine_uuid' : msg_meta.get('engine', None),
657 'follow' : msg_meta.get('follow', []),
662 'follow' : msg_meta.get('follow', []),
658 'after' : msg_meta.get('after', []),
663 'after' : msg_meta.get('after', []),
659 'status' : content['status'],
664 'status' : content['status'],
660 }
665 }
661
666
662 if md['engine_uuid'] is not None:
667 if md['engine_uuid'] is not None:
663 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
668 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
664
669
665 if 'date' in parent:
670 if 'date' in parent:
666 md['submitted'] = parent['date']
671 md['submitted'] = parent['date']
667 if 'started' in msg_meta:
672 if 'started' in msg_meta:
668 md['started'] = parse_date(msg_meta['started'])
673 md['started'] = parse_date(msg_meta['started'])
669 if 'date' in header:
674 if 'date' in header:
670 md['completed'] = header['date']
675 md['completed'] = header['date']
671 return md
676 return md
672
677
673 def _register_engine(self, msg):
678 def _register_engine(self, msg):
674 """Register a new engine, and update our connection info."""
679 """Register a new engine, and update our connection info."""
675 content = msg['content']
680 content = msg['content']
676 eid = content['id']
681 eid = content['id']
677 d = {eid : content['uuid']}
682 d = {eid : content['uuid']}
678 self._update_engines(d)
683 self._update_engines(d)
679
684
680 def _unregister_engine(self, msg):
685 def _unregister_engine(self, msg):
681 """Unregister an engine that has died."""
686 """Unregister an engine that has died."""
682 content = msg['content']
687 content = msg['content']
683 eid = int(content['id'])
688 eid = int(content['id'])
684 if eid in self._ids:
689 if eid in self._ids:
685 self._ids.remove(eid)
690 self._ids.remove(eid)
686 uuid = self._engines.pop(eid)
691 uuid = self._engines.pop(eid)
687
692
688 self._handle_stranded_msgs(eid, uuid)
693 self._handle_stranded_msgs(eid, uuid)
689
694
690 if self._task_socket and self._task_scheme == 'pure':
695 if self._task_socket and self._task_scheme == 'pure':
691 self._stop_scheduling_tasks()
696 self._stop_scheduling_tasks()
692
697
693 def _handle_stranded_msgs(self, eid, uuid):
698 def _handle_stranded_msgs(self, eid, uuid):
694 """Handle messages known to be on an engine when the engine unregisters.
699 """Handle messages known to be on an engine when the engine unregisters.
695
700
696 It is possible that this will fire prematurely - that is, an engine will
701 It is possible that this will fire prematurely - that is, an engine will
697 go down after completing a result, and the client will be notified
702 go down after completing a result, and the client will be notified
698 of the unregistration and later receive the successful result.
703 of the unregistration and later receive the successful result.
699 """
704 """
700
705
701 outstanding = self._outstanding_dict[uuid]
706 outstanding = self._outstanding_dict[uuid]
702
707
703 for msg_id in list(outstanding):
708 for msg_id in list(outstanding):
704 if msg_id in self.results:
709 if msg_id in self.results:
705 # we already
710 # we already
706 continue
711 continue
707 try:
712 try:
708 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
713 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
709 except:
714 except:
710 content = error.wrap_exception()
715 content = error.wrap_exception()
711 # build a fake message:
716 # build a fake message:
712 msg = self.session.msg('apply_reply', content=content)
717 msg = self.session.msg('apply_reply', content=content)
713 msg['parent_header']['msg_id'] = msg_id
718 msg['parent_header']['msg_id'] = msg_id
714 msg['metadata']['engine'] = uuid
719 msg['metadata']['engine'] = uuid
715 self._handle_apply_reply(msg)
720 self._handle_apply_reply(msg)
716
721
717 def _handle_execute_reply(self, msg):
722 def _handle_execute_reply(self, msg):
718 """Save the reply to an execute_request into our results.
723 """Save the reply to an execute_request into our results.
719
724
720 execute messages are never actually used. apply is used instead.
725 execute messages are never actually used. apply is used instead.
721 """
726 """
722
727
723 parent = msg['parent_header']
728 parent = msg['parent_header']
724 msg_id = parent['msg_id']
729 msg_id = parent['msg_id']
725 if msg_id not in self.outstanding:
730 if msg_id not in self.outstanding:
726 if msg_id in self.history:
731 if msg_id in self.history:
727 print("got stale result: %s"%msg_id)
732 print("got stale result: %s"%msg_id)
728 else:
733 else:
729 print("got unknown result: %s"%msg_id)
734 print("got unknown result: %s"%msg_id)
730 else:
735 else:
731 self.outstanding.remove(msg_id)
736 self.outstanding.remove(msg_id)
732
737
733 content = msg['content']
738 content = msg['content']
734 header = msg['header']
739 header = msg['header']
735
740
736 # construct metadata:
741 # construct metadata:
737 md = self.metadata[msg_id]
742 md = self.metadata[msg_id]
738 md.update(self._extract_metadata(msg))
743 md.update(self._extract_metadata(msg))
739 # is this redundant?
744 # is this redundant?
740 self.metadata[msg_id] = md
745 self.metadata[msg_id] = md
741
746
742 e_outstanding = self._outstanding_dict[md['engine_uuid']]
747 e_outstanding = self._outstanding_dict[md['engine_uuid']]
743 if msg_id in e_outstanding:
748 if msg_id in e_outstanding:
744 e_outstanding.remove(msg_id)
749 e_outstanding.remove(msg_id)
745
750
746 # construct result:
751 # construct result:
747 if content['status'] == 'ok':
752 if content['status'] == 'ok':
748 self.results[msg_id] = ExecuteReply(msg_id, content, md)
753 self.results[msg_id] = ExecuteReply(msg_id, content, md)
749 elif content['status'] == 'aborted':
754 elif content['status'] == 'aborted':
750 self.results[msg_id] = error.TaskAborted(msg_id)
755 self.results[msg_id] = error.TaskAborted(msg_id)
751 elif content['status'] == 'resubmitted':
756 elif content['status'] == 'resubmitted':
752 # TODO: handle resubmission
757 # TODO: handle resubmission
753 pass
758 pass
754 else:
759 else:
755 self.results[msg_id] = self._unwrap_exception(content)
760 self.results[msg_id] = self._unwrap_exception(content)
756
761
757 def _handle_apply_reply(self, msg):
762 def _handle_apply_reply(self, msg):
758 """Save the reply to an apply_request into our results."""
763 """Save the reply to an apply_request into our results."""
759 parent = msg['parent_header']
764 parent = msg['parent_header']
760 msg_id = parent['msg_id']
765 msg_id = parent['msg_id']
761 if msg_id not in self.outstanding:
766 if msg_id not in self.outstanding:
762 if msg_id in self.history:
767 if msg_id in self.history:
763 print("got stale result: %s"%msg_id)
768 print("got stale result: %s"%msg_id)
764 print(self.results[msg_id])
769 print(self.results[msg_id])
765 print(msg)
770 print(msg)
766 else:
771 else:
767 print("got unknown result: %s"%msg_id)
772 print("got unknown result: %s"%msg_id)
768 else:
773 else:
769 self.outstanding.remove(msg_id)
774 self.outstanding.remove(msg_id)
770 content = msg['content']
775 content = msg['content']
771 header = msg['header']
776 header = msg['header']
772
777
773 # construct metadata:
778 # construct metadata:
774 md = self.metadata[msg_id]
779 md = self.metadata[msg_id]
775 md.update(self._extract_metadata(msg))
780 md.update(self._extract_metadata(msg))
776 # is this redundant?
781 # is this redundant?
777 self.metadata[msg_id] = md
782 self.metadata[msg_id] = md
778
783
779 e_outstanding = self._outstanding_dict[md['engine_uuid']]
784 e_outstanding = self._outstanding_dict[md['engine_uuid']]
780 if msg_id in e_outstanding:
785 if msg_id in e_outstanding:
781 e_outstanding.remove(msg_id)
786 e_outstanding.remove(msg_id)
782
787
783 # construct result:
788 # construct result:
784 if content['status'] == 'ok':
789 if content['status'] == 'ok':
785 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
790 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
786 elif content['status'] == 'aborted':
791 elif content['status'] == 'aborted':
787 self.results[msg_id] = error.TaskAborted(msg_id)
792 self.results[msg_id] = error.TaskAborted(msg_id)
788 elif content['status'] == 'resubmitted':
793 elif content['status'] == 'resubmitted':
789 # TODO: handle resubmission
794 # TODO: handle resubmission
790 pass
795 pass
791 else:
796 else:
792 self.results[msg_id] = self._unwrap_exception(content)
797 self.results[msg_id] = self._unwrap_exception(content)
793
798
794 def _flush_notifications(self):
799 def _flush_notifications(self):
795 """Flush notifications of engine registrations waiting
800 """Flush notifications of engine registrations waiting
796 in ZMQ queue."""
801 in ZMQ queue."""
797 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
802 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
798 while msg is not None:
803 while msg is not None:
799 if self.debug:
804 if self.debug:
800 pprint(msg)
805 pprint(msg)
801 msg_type = msg['header']['msg_type']
806 msg_type = msg['header']['msg_type']
802 handler = self._notification_handlers.get(msg_type, None)
807 handler = self._notification_handlers.get(msg_type, None)
803 if handler is None:
808 if handler is None:
804 raise Exception("Unhandled message type: %s" % msg_type)
809 raise Exception("Unhandled message type: %s" % msg_type)
805 else:
810 else:
806 handler(msg)
811 handler(msg)
807 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
812 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
808
813
809 def _flush_results(self, sock):
814 def _flush_results(self, sock):
810 """Flush task or queue results waiting in ZMQ queue."""
815 """Flush task or queue results waiting in ZMQ queue."""
811 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
816 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
812 while msg is not None:
817 while msg is not None:
813 if self.debug:
818 if self.debug:
814 pprint(msg)
819 pprint(msg)
815 msg_type = msg['header']['msg_type']
820 msg_type = msg['header']['msg_type']
816 handler = self._queue_handlers.get(msg_type, None)
821 handler = self._queue_handlers.get(msg_type, None)
817 if handler is None:
822 if handler is None:
818 raise Exception("Unhandled message type: %s" % msg_type)
823 raise Exception("Unhandled message type: %s" % msg_type)
819 else:
824 else:
820 handler(msg)
825 handler(msg)
821 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
826 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
822
827
823 def _flush_control(self, sock):
828 def _flush_control(self, sock):
824 """Flush replies from the control channel waiting
829 """Flush replies from the control channel waiting
825 in the ZMQ queue.
830 in the ZMQ queue.
826
831
827 Currently: ignore them."""
832 Currently: ignore them."""
828 if self._ignored_control_replies <= 0:
833 if self._ignored_control_replies <= 0:
829 return
834 return
830 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
835 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
831 while msg is not None:
836 while msg is not None:
832 self._ignored_control_replies -= 1
837 self._ignored_control_replies -= 1
833 if self.debug:
838 if self.debug:
834 pprint(msg)
839 pprint(msg)
835 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
840 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
836
841
837 def _flush_ignored_control(self):
842 def _flush_ignored_control(self):
838 """flush ignored control replies"""
843 """flush ignored control replies"""
839 while self._ignored_control_replies > 0:
844 while self._ignored_control_replies > 0:
840 self.session.recv(self._control_socket)
845 self.session.recv(self._control_socket)
841 self._ignored_control_replies -= 1
846 self._ignored_control_replies -= 1
842
847
843 def _flush_ignored_hub_replies(self):
848 def _flush_ignored_hub_replies(self):
844 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
849 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
845 while msg is not None:
850 while msg is not None:
846 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
851 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
847
852
848 def _flush_iopub(self, sock):
853 def _flush_iopub(self, sock):
849 """Flush replies from the iopub channel waiting
854 """Flush replies from the iopub channel waiting
850 in the ZMQ queue.
855 in the ZMQ queue.
851 """
856 """
852 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
857 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
853 while msg is not None:
858 while msg is not None:
854 if self.debug:
859 if self.debug:
855 pprint(msg)
860 pprint(msg)
856 parent = msg['parent_header']
861 parent = msg['parent_header']
857 # ignore IOPub messages with no parent.
862 # ignore IOPub messages with no parent.
858 # Caused by print statements or warnings from before the first execution.
863 # Caused by print statements or warnings from before the first execution.
859 if not parent:
864 if not parent:
860 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
865 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
861 continue
866 continue
862 msg_id = parent['msg_id']
867 msg_id = parent['msg_id']
863 content = msg['content']
868 content = msg['content']
864 header = msg['header']
869 header = msg['header']
865 msg_type = msg['header']['msg_type']
870 msg_type = msg['header']['msg_type']
866
871
867 # init metadata:
872 # init metadata:
868 md = self.metadata[msg_id]
873 md = self.metadata[msg_id]
869
874
870 if msg_type == 'stream':
875 if msg_type == 'stream':
871 name = content['name']
876 name = content['name']
872 s = md[name] or ''
877 s = md[name] or ''
873 md[name] = s + content['data']
878 md[name] = s + content['data']
874 elif msg_type == 'error':
879 elif msg_type == 'error':
875 md.update({'error' : self._unwrap_exception(content)})
880 md.update({'error' : self._unwrap_exception(content)})
876 elif msg_type == 'execute_input':
881 elif msg_type == 'execute_input':
877 md.update({'execute_input' : content['code']})
882 md.update({'execute_input' : content['code']})
878 elif msg_type == 'display_data':
883 elif msg_type == 'display_data':
879 md['outputs'].append(content)
884 md['outputs'].append(content)
880 elif msg_type == 'execute_result':
885 elif msg_type == 'execute_result':
881 md['execute_result'] = content
886 md['execute_result'] = content
882 elif msg_type == 'data_message':
887 elif msg_type == 'data_message':
883 data, remainder = serialize.unserialize_object(msg['buffers'])
888 data, remainder = serialize.unserialize_object(msg['buffers'])
884 md['data'].update(data)
889 md['data'].update(data)
885 elif msg_type == 'status':
890 elif msg_type == 'status':
886 # idle message comes after all outputs
891 # idle message comes after all outputs
887 if content['execution_state'] == 'idle':
892 if content['execution_state'] == 'idle':
888 md['outputs_ready'] = True
893 md['outputs_ready'] = True
889 else:
894 else:
890 # unhandled msg_type (status, etc.)
895 # unhandled msg_type (status, etc.)
891 pass
896 pass
892
897
893 # reduntant?
898 # reduntant?
894 self.metadata[msg_id] = md
899 self.metadata[msg_id] = md
895
900
896 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
901 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
897
902
898 #--------------------------------------------------------------------------
903 #--------------------------------------------------------------------------
899 # len, getitem
904 # len, getitem
900 #--------------------------------------------------------------------------
905 #--------------------------------------------------------------------------
901
906
902 def __len__(self):
907 def __len__(self):
903 """len(client) returns # of engines."""
908 """len(client) returns # of engines."""
904 return len(self.ids)
909 return len(self.ids)
905
910
906 def __getitem__(self, key):
911 def __getitem__(self, key):
907 """index access returns DirectView multiplexer objects
912 """index access returns DirectView multiplexer objects
908
913
909 Must be int, slice, or list/tuple/xrange of ints"""
914 Must be int, slice, or list/tuple/xrange of ints"""
910 if not isinstance(key, (int, slice, tuple, list, xrange)):
915 if not isinstance(key, (int, slice, tuple, list, xrange)):
911 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
916 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
912 else:
917 else:
913 return self.direct_view(key)
918 return self.direct_view(key)
914
919
915 def __iter__(self):
920 def __iter__(self):
916 """Since we define getitem, Client is iterable
921 """Since we define getitem, Client is iterable
917
922
918 but unless we also define __iter__, it won't work correctly unless engine IDs
923 but unless we also define __iter__, it won't work correctly unless engine IDs
919 start at zero and are continuous.
924 start at zero and are continuous.
920 """
925 """
921 for eid in self.ids:
926 for eid in self.ids:
922 yield self.direct_view(eid)
927 yield self.direct_view(eid)
923
928
924 #--------------------------------------------------------------------------
929 #--------------------------------------------------------------------------
925 # Begin public methods
930 # Begin public methods
926 #--------------------------------------------------------------------------
931 #--------------------------------------------------------------------------
927
932
928 @property
933 @property
929 def ids(self):
934 def ids(self):
930 """Always up-to-date ids property."""
935 """Always up-to-date ids property."""
931 self._flush_notifications()
936 self._flush_notifications()
932 # always copy:
937 # always copy:
933 return list(self._ids)
938 return list(self._ids)
934
939
935 def activate(self, targets='all', suffix=''):
940 def activate(self, targets='all', suffix=''):
936 """Create a DirectView and register it with IPython magics
941 """Create a DirectView and register it with IPython magics
937
942
938 Defines the magics `%px, %autopx, %pxresult, %%px`
943 Defines the magics `%px, %autopx, %pxresult, %%px`
939
944
940 Parameters
945 Parameters
941 ----------
946 ----------
942
947
943 targets: int, list of ints, or 'all'
948 targets: int, list of ints, or 'all'
944 The engines on which the view's magics will run
949 The engines on which the view's magics will run
945 suffix: str [default: '']
950 suffix: str [default: '']
946 The suffix, if any, for the magics. This allows you to have
951 The suffix, if any, for the magics. This allows you to have
947 multiple views associated with parallel magics at the same time.
952 multiple views associated with parallel magics at the same time.
948
953
949 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
954 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
950 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
955 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
951 on engine 0.
956 on engine 0.
952 """
957 """
953 view = self.direct_view(targets)
958 view = self.direct_view(targets)
954 view.block = True
959 view.block = True
955 view.activate(suffix)
960 view.activate(suffix)
956 return view
961 return view
957
962
958 def close(self, linger=None):
963 def close(self, linger=None):
959 """Close my zmq Sockets
964 """Close my zmq Sockets
960
965
961 If `linger`, set the zmq LINGER socket option,
966 If `linger`, set the zmq LINGER socket option,
962 which allows discarding of messages.
967 which allows discarding of messages.
963 """
968 """
964 if self._closed:
969 if self._closed:
965 return
970 return
966 self.stop_spin_thread()
971 self.stop_spin_thread()
967 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
972 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
968 for name in snames:
973 for name in snames:
969 socket = getattr(self, name)
974 socket = getattr(self, name)
970 if socket is not None and not socket.closed:
975 if socket is not None and not socket.closed:
971 if linger is not None:
976 if linger is not None:
972 socket.close(linger=linger)
977 socket.close(linger=linger)
973 else:
978 else:
974 socket.close()
979 socket.close()
975 self._closed = True
980 self._closed = True
976
981
977 def _spin_every(self, interval=1):
982 def _spin_every(self, interval=1):
978 """target func for use in spin_thread"""
983 """target func for use in spin_thread"""
979 while True:
984 while True:
980 if self._stop_spinning.is_set():
985 if self._stop_spinning.is_set():
981 return
986 return
982 time.sleep(interval)
987 time.sleep(interval)
983 self.spin()
988 self.spin()
984
989
985 def spin_thread(self, interval=1):
990 def spin_thread(self, interval=1):
986 """call Client.spin() in a background thread on some regular interval
991 """call Client.spin() in a background thread on some regular interval
987
992
988 This helps ensure that messages don't pile up too much in the zmq queue
993 This helps ensure that messages don't pile up too much in the zmq queue
989 while you are working on other things, or just leaving an idle terminal.
994 while you are working on other things, or just leaving an idle terminal.
990
995
991 It also helps limit potential padding of the `received` timestamp
996 It also helps limit potential padding of the `received` timestamp
992 on AsyncResult objects, used for timings.
997 on AsyncResult objects, used for timings.
993
998
994 Parameters
999 Parameters
995 ----------
1000 ----------
996
1001
997 interval : float, optional
1002 interval : float, optional
998 The interval on which to spin the client in the background thread
1003 The interval on which to spin the client in the background thread
999 (simply passed to time.sleep).
1004 (simply passed to time.sleep).
1000
1005
1001 Notes
1006 Notes
1002 -----
1007 -----
1003
1008
1004 For precision timing, you may want to use this method to put a bound
1009 For precision timing, you may want to use this method to put a bound
1005 on the jitter (in seconds) in `received` timestamps used
1010 on the jitter (in seconds) in `received` timestamps used
1006 in AsyncResult.wall_time.
1011 in AsyncResult.wall_time.
1007
1012
1008 """
1013 """
1009 if self._spin_thread is not None:
1014 if self._spin_thread is not None:
1010 self.stop_spin_thread()
1015 self.stop_spin_thread()
1011 self._stop_spinning.clear()
1016 self._stop_spinning.clear()
1012 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1017 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1013 self._spin_thread.daemon = True
1018 self._spin_thread.daemon = True
1014 self._spin_thread.start()
1019 self._spin_thread.start()
1015
1020
1016 def stop_spin_thread(self):
1021 def stop_spin_thread(self):
1017 """stop background spin_thread, if any"""
1022 """stop background spin_thread, if any"""
1018 if self._spin_thread is not None:
1023 if self._spin_thread is not None:
1019 self._stop_spinning.set()
1024 self._stop_spinning.set()
1020 self._spin_thread.join()
1025 self._spin_thread.join()
1021 self._spin_thread = None
1026 self._spin_thread = None
1022
1027
1023 def spin(self):
1028 def spin(self):
1024 """Flush any registration notifications and execution results
1029 """Flush any registration notifications and execution results
1025 waiting in the ZMQ queue.
1030 waiting in the ZMQ queue.
1026 """
1031 """
1027 if self._notification_socket:
1032 if self._notification_socket:
1028 self._flush_notifications()
1033 self._flush_notifications()
1029 if self._iopub_socket:
1034 if self._iopub_socket:
1030 self._flush_iopub(self._iopub_socket)
1035 self._flush_iopub(self._iopub_socket)
1031 if self._mux_socket:
1036 if self._mux_socket:
1032 self._flush_results(self._mux_socket)
1037 self._flush_results(self._mux_socket)
1033 if self._task_socket:
1038 if self._task_socket:
1034 self._flush_results(self._task_socket)
1039 self._flush_results(self._task_socket)
1035 if self._control_socket:
1040 if self._control_socket:
1036 self._flush_control(self._control_socket)
1041 self._flush_control(self._control_socket)
1037 if self._query_socket:
1042 if self._query_socket:
1038 self._flush_ignored_hub_replies()
1043 self._flush_ignored_hub_replies()
1039
1044
1040 def wait(self, jobs=None, timeout=-1):
1045 def wait(self, jobs=None, timeout=-1):
1041 """waits on one or more `jobs`, for up to `timeout` seconds.
1046 """waits on one or more `jobs`, for up to `timeout` seconds.
1042
1047
1043 Parameters
1048 Parameters
1044 ----------
1049 ----------
1045
1050
1046 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1051 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1047 ints are indices to self.history
1052 ints are indices to self.history
1048 strs are msg_ids
1053 strs are msg_ids
1049 default: wait on all outstanding messages
1054 default: wait on all outstanding messages
1050 timeout : float
1055 timeout : float
1051 a time in seconds, after which to give up.
1056 a time in seconds, after which to give up.
1052 default is -1, which means no timeout
1057 default is -1, which means no timeout
1053
1058
1054 Returns
1059 Returns
1055 -------
1060 -------
1056
1061
1057 True : when all msg_ids are done
1062 True : when all msg_ids are done
1058 False : timeout reached, some msg_ids still outstanding
1063 False : timeout reached, some msg_ids still outstanding
1059 """
1064 """
1060 tic = time.time()
1065 tic = time.time()
1061 if jobs is None:
1066 if jobs is None:
1062 theids = self.outstanding
1067 theids = self.outstanding
1063 else:
1068 else:
1064 if isinstance(jobs, string_types + (int, AsyncResult)):
1069 if isinstance(jobs, string_types + (int, AsyncResult)):
1065 jobs = [jobs]
1070 jobs = [jobs]
1066 theids = set()
1071 theids = set()
1067 for job in jobs:
1072 for job in jobs:
1068 if isinstance(job, int):
1073 if isinstance(job, int):
1069 # index access
1074 # index access
1070 job = self.history[job]
1075 job = self.history[job]
1071 elif isinstance(job, AsyncResult):
1076 elif isinstance(job, AsyncResult):
1072 theids.update(job.msg_ids)
1077 theids.update(job.msg_ids)
1073 continue
1078 continue
1074 theids.add(job)
1079 theids.add(job)
1075 if not theids.intersection(self.outstanding):
1080 if not theids.intersection(self.outstanding):
1076 return True
1081 return True
1077 self.spin()
1082 self.spin()
1078 while theids.intersection(self.outstanding):
1083 while theids.intersection(self.outstanding):
1079 if timeout >= 0 and ( time.time()-tic ) > timeout:
1084 if timeout >= 0 and ( time.time()-tic ) > timeout:
1080 break
1085 break
1081 time.sleep(1e-3)
1086 time.sleep(1e-3)
1082 self.spin()
1087 self.spin()
1083 return len(theids.intersection(self.outstanding)) == 0
1088 return len(theids.intersection(self.outstanding)) == 0
1084
1089
1085 #--------------------------------------------------------------------------
1090 #--------------------------------------------------------------------------
1086 # Control methods
1091 # Control methods
1087 #--------------------------------------------------------------------------
1092 #--------------------------------------------------------------------------
1088
1093
1089 @spin_first
1094 @spin_first
1090 def clear(self, targets=None, block=None):
1095 def clear(self, targets=None, block=None):
1091 """Clear the namespace in target(s)."""
1096 """Clear the namespace in target(s)."""
1092 block = self.block if block is None else block
1097 block = self.block if block is None else block
1093 targets = self._build_targets(targets)[0]
1098 targets = self._build_targets(targets)[0]
1094 for t in targets:
1099 for t in targets:
1095 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1100 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1096 error = False
1101 error = False
1097 if block:
1102 if block:
1098 self._flush_ignored_control()
1103 self._flush_ignored_control()
1099 for i in range(len(targets)):
1104 for i in range(len(targets)):
1100 idents,msg = self.session.recv(self._control_socket,0)
1105 idents,msg = self.session.recv(self._control_socket,0)
1101 if self.debug:
1106 if self.debug:
1102 pprint(msg)
1107 pprint(msg)
1103 if msg['content']['status'] != 'ok':
1108 if msg['content']['status'] != 'ok':
1104 error = self._unwrap_exception(msg['content'])
1109 error = self._unwrap_exception(msg['content'])
1105 else:
1110 else:
1106 self._ignored_control_replies += len(targets)
1111 self._ignored_control_replies += len(targets)
1107 if error:
1112 if error:
1108 raise error
1113 raise error
1109
1114
1110
1115
1111 @spin_first
1116 @spin_first
1112 def abort(self, jobs=None, targets=None, block=None):
1117 def abort(self, jobs=None, targets=None, block=None):
1113 """Abort specific jobs from the execution queues of target(s).
1118 """Abort specific jobs from the execution queues of target(s).
1114
1119
1115 This is a mechanism to prevent jobs that have already been submitted
1120 This is a mechanism to prevent jobs that have already been submitted
1116 from executing.
1121 from executing.
1117
1122
1118 Parameters
1123 Parameters
1119 ----------
1124 ----------
1120
1125
1121 jobs : msg_id, list of msg_ids, or AsyncResult
1126 jobs : msg_id, list of msg_ids, or AsyncResult
1122 The jobs to be aborted
1127 The jobs to be aborted
1123
1128
1124 If unspecified/None: abort all outstanding jobs.
1129 If unspecified/None: abort all outstanding jobs.
1125
1130
1126 """
1131 """
1127 block = self.block if block is None else block
1132 block = self.block if block is None else block
1128 jobs = jobs if jobs is not None else list(self.outstanding)
1133 jobs = jobs if jobs is not None else list(self.outstanding)
1129 targets = self._build_targets(targets)[0]
1134 targets = self._build_targets(targets)[0]
1130
1135
1131 msg_ids = []
1136 msg_ids = []
1132 if isinstance(jobs, string_types + (AsyncResult,)):
1137 if isinstance(jobs, string_types + (AsyncResult,)):
1133 jobs = [jobs]
1138 jobs = [jobs]
1134 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1139 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1135 if bad_ids:
1140 if bad_ids:
1136 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1141 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1137 for j in jobs:
1142 for j in jobs:
1138 if isinstance(j, AsyncResult):
1143 if isinstance(j, AsyncResult):
1139 msg_ids.extend(j.msg_ids)
1144 msg_ids.extend(j.msg_ids)
1140 else:
1145 else:
1141 msg_ids.append(j)
1146 msg_ids.append(j)
1142 content = dict(msg_ids=msg_ids)
1147 content = dict(msg_ids=msg_ids)
1143 for t in targets:
1148 for t in targets:
1144 self.session.send(self._control_socket, 'abort_request',
1149 self.session.send(self._control_socket, 'abort_request',
1145 content=content, ident=t)
1150 content=content, ident=t)
1146 error = False
1151 error = False
1147 if block:
1152 if block:
1148 self._flush_ignored_control()
1153 self._flush_ignored_control()
1149 for i in range(len(targets)):
1154 for i in range(len(targets)):
1150 idents,msg = self.session.recv(self._control_socket,0)
1155 idents,msg = self.session.recv(self._control_socket,0)
1151 if self.debug:
1156 if self.debug:
1152 pprint(msg)
1157 pprint(msg)
1153 if msg['content']['status'] != 'ok':
1158 if msg['content']['status'] != 'ok':
1154 error = self._unwrap_exception(msg['content'])
1159 error = self._unwrap_exception(msg['content'])
1155 else:
1160 else:
1156 self._ignored_control_replies += len(targets)
1161 self._ignored_control_replies += len(targets)
1157 if error:
1162 if error:
1158 raise error
1163 raise error
1159
1164
1160 @spin_first
1165 @spin_first
1161 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1166 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1162 """Terminates one or more engine processes, optionally including the hub.
1167 """Terminates one or more engine processes, optionally including the hub.
1163
1168
1164 Parameters
1169 Parameters
1165 ----------
1170 ----------
1166
1171
1167 targets: list of ints or 'all' [default: all]
1172 targets: list of ints or 'all' [default: all]
1168 Which engines to shutdown.
1173 Which engines to shutdown.
1169 hub: bool [default: False]
1174 hub: bool [default: False]
1170 Whether to include the Hub. hub=True implies targets='all'.
1175 Whether to include the Hub. hub=True implies targets='all'.
1171 block: bool [default: self.block]
1176 block: bool [default: self.block]
1172 Whether to wait for clean shutdown replies or not.
1177 Whether to wait for clean shutdown replies or not.
1173 restart: bool [default: False]
1178 restart: bool [default: False]
1174 NOT IMPLEMENTED
1179 NOT IMPLEMENTED
1175 whether to restart engines after shutting them down.
1180 whether to restart engines after shutting them down.
1176 """
1181 """
1177 from IPython.parallel.error import NoEnginesRegistered
1182 from IPython.parallel.error import NoEnginesRegistered
1178 if restart:
1183 if restart:
1179 raise NotImplementedError("Engine restart is not yet implemented")
1184 raise NotImplementedError("Engine restart is not yet implemented")
1180
1185
1181 block = self.block if block is None else block
1186 block = self.block if block is None else block
1182 if hub:
1187 if hub:
1183 targets = 'all'
1188 targets = 'all'
1184 try:
1189 try:
1185 targets = self._build_targets(targets)[0]
1190 targets = self._build_targets(targets)[0]
1186 except NoEnginesRegistered:
1191 except NoEnginesRegistered:
1187 targets = []
1192 targets = []
1188 for t in targets:
1193 for t in targets:
1189 self.session.send(self._control_socket, 'shutdown_request',
1194 self.session.send(self._control_socket, 'shutdown_request',
1190 content={'restart':restart},ident=t)
1195 content={'restart':restart},ident=t)
1191 error = False
1196 error = False
1192 if block or hub:
1197 if block or hub:
1193 self._flush_ignored_control()
1198 self._flush_ignored_control()
1194 for i in range(len(targets)):
1199 for i in range(len(targets)):
1195 idents,msg = self.session.recv(self._control_socket, 0)
1200 idents,msg = self.session.recv(self._control_socket, 0)
1196 if self.debug:
1201 if self.debug:
1197 pprint(msg)
1202 pprint(msg)
1198 if msg['content']['status'] != 'ok':
1203 if msg['content']['status'] != 'ok':
1199 error = self._unwrap_exception(msg['content'])
1204 error = self._unwrap_exception(msg['content'])
1200 else:
1205 else:
1201 self._ignored_control_replies += len(targets)
1206 self._ignored_control_replies += len(targets)
1202
1207
1203 if hub:
1208 if hub:
1204 time.sleep(0.25)
1209 time.sleep(0.25)
1205 self.session.send(self._query_socket, 'shutdown_request')
1210 self.session.send(self._query_socket, 'shutdown_request')
1206 idents,msg = self.session.recv(self._query_socket, 0)
1211 idents,msg = self.session.recv(self._query_socket, 0)
1207 if self.debug:
1212 if self.debug:
1208 pprint(msg)
1213 pprint(msg)
1209 if msg['content']['status'] != 'ok':
1214 if msg['content']['status'] != 'ok':
1210 error = self._unwrap_exception(msg['content'])
1215 error = self._unwrap_exception(msg['content'])
1211
1216
1212 if error:
1217 if error:
1213 raise error
1218 raise error
1214
1219
1215 #--------------------------------------------------------------------------
1220 #--------------------------------------------------------------------------
1216 # Execution related methods
1221 # Execution related methods
1217 #--------------------------------------------------------------------------
1222 #--------------------------------------------------------------------------
1218
1223
1219 def _maybe_raise(self, result):
1224 def _maybe_raise(self, result):
1220 """wrapper for maybe raising an exception if apply failed."""
1225 """wrapper for maybe raising an exception if apply failed."""
1221 if isinstance(result, error.RemoteError):
1226 if isinstance(result, error.RemoteError):
1222 raise result
1227 raise result
1223
1228
1224 return result
1229 return result
1225
1230
1226 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1231 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1227 ident=None):
1232 ident=None):
1228 """construct and send an apply message via a socket.
1233 """construct and send an apply message via a socket.
1229
1234
1230 This is the principal method with which all engine execution is performed by views.
1235 This is the principal method with which all engine execution is performed by views.
1231 """
1236 """
1232
1237
1233 if self._closed:
1238 if self._closed:
1234 raise RuntimeError("Client cannot be used after its sockets have been closed")
1239 raise RuntimeError("Client cannot be used after its sockets have been closed")
1235
1240
1236 # defaults:
1241 # defaults:
1237 args = args if args is not None else []
1242 args = args if args is not None else []
1238 kwargs = kwargs if kwargs is not None else {}
1243 kwargs = kwargs if kwargs is not None else {}
1239 metadata = metadata if metadata is not None else {}
1244 metadata = metadata if metadata is not None else {}
1240
1245
1241 # validate arguments
1246 # validate arguments
1242 if not callable(f) and not isinstance(f, Reference):
1247 if not callable(f) and not isinstance(f, Reference):
1243 raise TypeError("f must be callable, not %s"%type(f))
1248 raise TypeError("f must be callable, not %s"%type(f))
1244 if not isinstance(args, (tuple, list)):
1249 if not isinstance(args, (tuple, list)):
1245 raise TypeError("args must be tuple or list, not %s"%type(args))
1250 raise TypeError("args must be tuple or list, not %s"%type(args))
1246 if not isinstance(kwargs, dict):
1251 if not isinstance(kwargs, dict):
1247 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1252 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1248 if not isinstance(metadata, dict):
1253 if not isinstance(metadata, dict):
1249 raise TypeError("metadata must be dict, not %s"%type(metadata))
1254 raise TypeError("metadata must be dict, not %s"%type(metadata))
1250
1255
1251 bufs = serialize.pack_apply_message(f, args, kwargs,
1256 bufs = serialize.pack_apply_message(f, args, kwargs,
1252 buffer_threshold=self.session.buffer_threshold,
1257 buffer_threshold=self.session.buffer_threshold,
1253 item_threshold=self.session.item_threshold,
1258 item_threshold=self.session.item_threshold,
1254 )
1259 )
1255
1260
1256 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1261 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1257 metadata=metadata, track=track)
1262 metadata=metadata, track=track)
1258
1263
1259 msg_id = msg['header']['msg_id']
1264 msg_id = msg['header']['msg_id']
1260 self.outstanding.add(msg_id)
1265 self.outstanding.add(msg_id)
1261 if ident:
1266 if ident:
1262 # possibly routed to a specific engine
1267 # possibly routed to a specific engine
1263 if isinstance(ident, list):
1268 if isinstance(ident, list):
1264 ident = ident[-1]
1269 ident = ident[-1]
1265 if ident in self._engines.values():
1270 if ident in self._engines.values():
1266 # save for later, in case of engine death
1271 # save for later, in case of engine death
1267 self._outstanding_dict[ident].add(msg_id)
1272 self._outstanding_dict[ident].add(msg_id)
1268 self.history.append(msg_id)
1273 self.history.append(msg_id)
1269 self.metadata[msg_id]['submitted'] = datetime.now()
1274 self.metadata[msg_id]['submitted'] = datetime.now()
1270
1275
1271 return msg
1276 return msg
1272
1277
1273 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1278 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1274 """construct and send an execute request via a socket.
1279 """construct and send an execute request via a socket.
1275
1280
1276 """
1281 """
1277
1282
1278 if self._closed:
1283 if self._closed:
1279 raise RuntimeError("Client cannot be used after its sockets have been closed")
1284 raise RuntimeError("Client cannot be used after its sockets have been closed")
1280
1285
1281 # defaults:
1286 # defaults:
1282 metadata = metadata if metadata is not None else {}
1287 metadata = metadata if metadata is not None else {}
1283
1288
1284 # validate arguments
1289 # validate arguments
1285 if not isinstance(code, string_types):
1290 if not isinstance(code, string_types):
1286 raise TypeError("code must be text, not %s" % type(code))
1291 raise TypeError("code must be text, not %s" % type(code))
1287 if not isinstance(metadata, dict):
1292 if not isinstance(metadata, dict):
1288 raise TypeError("metadata must be dict, not %s" % type(metadata))
1293 raise TypeError("metadata must be dict, not %s" % type(metadata))
1289
1294
1290 content = dict(code=code, silent=bool(silent), user_expressions={})
1295 content = dict(code=code, silent=bool(silent), user_expressions={})
1291
1296
1292
1297
1293 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1298 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1294 metadata=metadata)
1299 metadata=metadata)
1295
1300
1296 msg_id = msg['header']['msg_id']
1301 msg_id = msg['header']['msg_id']
1297 self.outstanding.add(msg_id)
1302 self.outstanding.add(msg_id)
1298 if ident:
1303 if ident:
1299 # possibly routed to a specific engine
1304 # possibly routed to a specific engine
1300 if isinstance(ident, list):
1305 if isinstance(ident, list):
1301 ident = ident[-1]
1306 ident = ident[-1]
1302 if ident in self._engines.values():
1307 if ident in self._engines.values():
1303 # save for later, in case of engine death
1308 # save for later, in case of engine death
1304 self._outstanding_dict[ident].add(msg_id)
1309 self._outstanding_dict[ident].add(msg_id)
1305 self.history.append(msg_id)
1310 self.history.append(msg_id)
1306 self.metadata[msg_id]['submitted'] = datetime.now()
1311 self.metadata[msg_id]['submitted'] = datetime.now()
1307
1312
1308 return msg
1313 return msg
1309
1314
1310 #--------------------------------------------------------------------------
1315 #--------------------------------------------------------------------------
1311 # construct a View object
1316 # construct a View object
1312 #--------------------------------------------------------------------------
1317 #--------------------------------------------------------------------------
1313
1318
1314 def load_balanced_view(self, targets=None):
1319 def load_balanced_view(self, targets=None):
1315 """construct a DirectView object.
1320 """construct a DirectView object.
1316
1321
1317 If no arguments are specified, create a LoadBalancedView
1322 If no arguments are specified, create a LoadBalancedView
1318 using all engines.
1323 using all engines.
1319
1324
1320 Parameters
1325 Parameters
1321 ----------
1326 ----------
1322
1327
1323 targets: list,slice,int,etc. [default: use all engines]
1328 targets: list,slice,int,etc. [default: use all engines]
1324 The subset of engines across which to load-balance
1329 The subset of engines across which to load-balance
1325 """
1330 """
1326 if targets == 'all':
1331 if targets == 'all':
1327 targets = None
1332 targets = None
1328 if targets is not None:
1333 if targets is not None:
1329 targets = self._build_targets(targets)[1]
1334 targets = self._build_targets(targets)[1]
1330 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1335 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1331
1336
1332 def direct_view(self, targets='all'):
1337 def direct_view(self, targets='all'):
1333 """construct a DirectView object.
1338 """construct a DirectView object.
1334
1339
1335 If no targets are specified, create a DirectView using all engines.
1340 If no targets are specified, create a DirectView using all engines.
1336
1341
1337 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1342 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1338 evaluate the target engines at each execution, whereas rc[:] will connect to
1343 evaluate the target engines at each execution, whereas rc[:] will connect to
1339 all *current* engines, and that list will not change.
1344 all *current* engines, and that list will not change.
1340
1345
1341 That is, 'all' will always use all engines, whereas rc[:] will not use
1346 That is, 'all' will always use all engines, whereas rc[:] will not use
1342 engines added after the DirectView is constructed.
1347 engines added after the DirectView is constructed.
1343
1348
1344 Parameters
1349 Parameters
1345 ----------
1350 ----------
1346
1351
1347 targets: list,slice,int,etc. [default: use all engines]
1352 targets: list,slice,int,etc. [default: use all engines]
1348 The engines to use for the View
1353 The engines to use for the View
1349 """
1354 """
1350 single = isinstance(targets, int)
1355 single = isinstance(targets, int)
1351 # allow 'all' to be lazily evaluated at each execution
1356 # allow 'all' to be lazily evaluated at each execution
1352 if targets != 'all':
1357 if targets != 'all':
1353 targets = self._build_targets(targets)[1]
1358 targets = self._build_targets(targets)[1]
1354 if single:
1359 if single:
1355 targets = targets[0]
1360 targets = targets[0]
1356 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1361 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1357
1362
1358 #--------------------------------------------------------------------------
1363 #--------------------------------------------------------------------------
1359 # Query methods
1364 # Query methods
1360 #--------------------------------------------------------------------------
1365 #--------------------------------------------------------------------------
1361
1366
1362 @spin_first
1367 @spin_first
1363 def get_result(self, indices_or_msg_ids=None, block=None, owner=True):
1368 def get_result(self, indices_or_msg_ids=None, block=None, owner=True):
1364 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1369 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1365
1370
1366 If the client already has the results, no request to the Hub will be made.
1371 If the client already has the results, no request to the Hub will be made.
1367
1372
1368 This is a convenient way to construct AsyncResult objects, which are wrappers
1373 This is a convenient way to construct AsyncResult objects, which are wrappers
1369 that include metadata about execution, and allow for awaiting results that
1374 that include metadata about execution, and allow for awaiting results that
1370 were not submitted by this Client.
1375 were not submitted by this Client.
1371
1376
1372 It can also be a convenient way to retrieve the metadata associated with
1377 It can also be a convenient way to retrieve the metadata associated with
1373 blocking execution, since it always retrieves
1378 blocking execution, since it always retrieves
1374
1379
1375 Examples
1380 Examples
1376 --------
1381 --------
1377 ::
1382 ::
1378
1383
1379 In [10]: r = client.apply()
1384 In [10]: r = client.apply()
1380
1385
1381 Parameters
1386 Parameters
1382 ----------
1387 ----------
1383
1388
1384 indices_or_msg_ids : integer history index, str msg_id, or list of either
1389 indices_or_msg_ids : integer history index, str msg_id, or list of either
1385 The indices or msg_ids of indices to be retrieved
1390 The indices or msg_ids of indices to be retrieved
1386
1391
1387 block : bool
1392 block : bool
1388 Whether to wait for the result to be done
1393 Whether to wait for the result to be done
1389 owner : bool [default: True]
1394 owner : bool [default: True]
1390 Whether this AsyncResult should own the result.
1395 Whether this AsyncResult should own the result.
1391 If so, calling `ar.get()` will remove data from the
1396 If so, calling `ar.get()` will remove data from the
1392 client's result and metadata cache.
1397 client's result and metadata cache.
1393 There should only be one owner of any given msg_id.
1398 There should only be one owner of any given msg_id.
1394
1399
1395 Returns
1400 Returns
1396 -------
1401 -------
1397
1402
1398 AsyncResult
1403 AsyncResult
1399 A single AsyncResult object will always be returned.
1404 A single AsyncResult object will always be returned.
1400
1405
1401 AsyncHubResult
1406 AsyncHubResult
1402 A subclass of AsyncResult that retrieves results from the Hub
1407 A subclass of AsyncResult that retrieves results from the Hub
1403
1408
1404 """
1409 """
1405 block = self.block if block is None else block
1410 block = self.block if block is None else block
1406 if indices_or_msg_ids is None:
1411 if indices_or_msg_ids is None:
1407 indices_or_msg_ids = -1
1412 indices_or_msg_ids = -1
1408
1413
1409 single_result = False
1414 single_result = False
1410 if not isinstance(indices_or_msg_ids, (list,tuple)):
1415 if not isinstance(indices_or_msg_ids, (list,tuple)):
1411 indices_or_msg_ids = [indices_or_msg_ids]
1416 indices_or_msg_ids = [indices_or_msg_ids]
1412 single_result = True
1417 single_result = True
1413
1418
1414 theids = []
1419 theids = []
1415 for id in indices_or_msg_ids:
1420 for id in indices_or_msg_ids:
1416 if isinstance(id, int):
1421 if isinstance(id, int):
1417 id = self.history[id]
1422 id = self.history[id]
1418 if not isinstance(id, string_types):
1423 if not isinstance(id, string_types):
1419 raise TypeError("indices must be str or int, not %r"%id)
1424 raise TypeError("indices must be str or int, not %r"%id)
1420 theids.append(id)
1425 theids.append(id)
1421
1426
1422 local_ids = [msg_id for msg_id in theids if (msg_id in self.outstanding or msg_id in self.results)]
1427 local_ids = [msg_id for msg_id in theids if (msg_id in self.outstanding or msg_id in self.results)]
1423 remote_ids = [msg_id for msg_id in theids if msg_id not in local_ids]
1428 remote_ids = [msg_id for msg_id in theids if msg_id not in local_ids]
1424
1429
1425 # given single msg_id initially, get_result shot get the result itself,
1430 # given single msg_id initially, get_result shot get the result itself,
1426 # not a length-one list
1431 # not a length-one list
1427 if single_result:
1432 if single_result:
1428 theids = theids[0]
1433 theids = theids[0]
1429
1434
1430 if remote_ids:
1435 if remote_ids:
1431 ar = AsyncHubResult(self, msg_ids=theids, owner=owner)
1436 ar = AsyncHubResult(self, msg_ids=theids, owner=owner)
1432 else:
1437 else:
1433 ar = AsyncResult(self, msg_ids=theids, owner=owner)
1438 ar = AsyncResult(self, msg_ids=theids, owner=owner)
1434
1439
1435 if block:
1440 if block:
1436 ar.wait()
1441 ar.wait()
1437
1442
1438 return ar
1443 return ar
1439
1444
1440 @spin_first
1445 @spin_first
1441 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1446 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1442 """Resubmit one or more tasks.
1447 """Resubmit one or more tasks.
1443
1448
1444 in-flight tasks may not be resubmitted.
1449 in-flight tasks may not be resubmitted.
1445
1450
1446 Parameters
1451 Parameters
1447 ----------
1452 ----------
1448
1453
1449 indices_or_msg_ids : integer history index, str msg_id, or list of either
1454 indices_or_msg_ids : integer history index, str msg_id, or list of either
1450 The indices or msg_ids of indices to be retrieved
1455 The indices or msg_ids of indices to be retrieved
1451
1456
1452 block : bool
1457 block : bool
1453 Whether to wait for the result to be done
1458 Whether to wait for the result to be done
1454
1459
1455 Returns
1460 Returns
1456 -------
1461 -------
1457
1462
1458 AsyncHubResult
1463 AsyncHubResult
1459 A subclass of AsyncResult that retrieves results from the Hub
1464 A subclass of AsyncResult that retrieves results from the Hub
1460
1465
1461 """
1466 """
1462 block = self.block if block is None else block
1467 block = self.block if block is None else block
1463 if indices_or_msg_ids is None:
1468 if indices_or_msg_ids is None:
1464 indices_or_msg_ids = -1
1469 indices_or_msg_ids = -1
1465
1470
1466 if not isinstance(indices_or_msg_ids, (list,tuple)):
1471 if not isinstance(indices_or_msg_ids, (list,tuple)):
1467 indices_or_msg_ids = [indices_or_msg_ids]
1472 indices_or_msg_ids = [indices_or_msg_ids]
1468
1473
1469 theids = []
1474 theids = []
1470 for id in indices_or_msg_ids:
1475 for id in indices_or_msg_ids:
1471 if isinstance(id, int):
1476 if isinstance(id, int):
1472 id = self.history[id]
1477 id = self.history[id]
1473 if not isinstance(id, string_types):
1478 if not isinstance(id, string_types):
1474 raise TypeError("indices must be str or int, not %r"%id)
1479 raise TypeError("indices must be str or int, not %r"%id)
1475 theids.append(id)
1480 theids.append(id)
1476
1481
1477 content = dict(msg_ids = theids)
1482 content = dict(msg_ids = theids)
1478
1483
1479 self.session.send(self._query_socket, 'resubmit_request', content)
1484 self.session.send(self._query_socket, 'resubmit_request', content)
1480
1485
1481 zmq.select([self._query_socket], [], [])
1486 zmq.select([self._query_socket], [], [])
1482 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1487 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1483 if self.debug:
1488 if self.debug:
1484 pprint(msg)
1489 pprint(msg)
1485 content = msg['content']
1490 content = msg['content']
1486 if content['status'] != 'ok':
1491 if content['status'] != 'ok':
1487 raise self._unwrap_exception(content)
1492 raise self._unwrap_exception(content)
1488 mapping = content['resubmitted']
1493 mapping = content['resubmitted']
1489 new_ids = [ mapping[msg_id] for msg_id in theids ]
1494 new_ids = [ mapping[msg_id] for msg_id in theids ]
1490
1495
1491 ar = AsyncHubResult(self, msg_ids=new_ids)
1496 ar = AsyncHubResult(self, msg_ids=new_ids)
1492
1497
1493 if block:
1498 if block:
1494 ar.wait()
1499 ar.wait()
1495
1500
1496 return ar
1501 return ar
1497
1502
1498 @spin_first
1503 @spin_first
1499 def result_status(self, msg_ids, status_only=True):
1504 def result_status(self, msg_ids, status_only=True):
1500 """Check on the status of the result(s) of the apply request with `msg_ids`.
1505 """Check on the status of the result(s) of the apply request with `msg_ids`.
1501
1506
1502 If status_only is False, then the actual results will be retrieved, else
1507 If status_only is False, then the actual results will be retrieved, else
1503 only the status of the results will be checked.
1508 only the status of the results will be checked.
1504
1509
1505 Parameters
1510 Parameters
1506 ----------
1511 ----------
1507
1512
1508 msg_ids : list of msg_ids
1513 msg_ids : list of msg_ids
1509 if int:
1514 if int:
1510 Passed as index to self.history for convenience.
1515 Passed as index to self.history for convenience.
1511 status_only : bool (default: True)
1516 status_only : bool (default: True)
1512 if False:
1517 if False:
1513 Retrieve the actual results of completed tasks.
1518 Retrieve the actual results of completed tasks.
1514
1519
1515 Returns
1520 Returns
1516 -------
1521 -------
1517
1522
1518 results : dict
1523 results : dict
1519 There will always be the keys 'pending' and 'completed', which will
1524 There will always be the keys 'pending' and 'completed', which will
1520 be lists of msg_ids that are incomplete or complete. If `status_only`
1525 be lists of msg_ids that are incomplete or complete. If `status_only`
1521 is False, then completed results will be keyed by their `msg_id`.
1526 is False, then completed results will be keyed by their `msg_id`.
1522 """
1527 """
1523 if not isinstance(msg_ids, (list,tuple)):
1528 if not isinstance(msg_ids, (list,tuple)):
1524 msg_ids = [msg_ids]
1529 msg_ids = [msg_ids]
1525
1530
1526 theids = []
1531 theids = []
1527 for msg_id in msg_ids:
1532 for msg_id in msg_ids:
1528 if isinstance(msg_id, int):
1533 if isinstance(msg_id, int):
1529 msg_id = self.history[msg_id]
1534 msg_id = self.history[msg_id]
1530 if not isinstance(msg_id, string_types):
1535 if not isinstance(msg_id, string_types):
1531 raise TypeError("msg_ids must be str, not %r"%msg_id)
1536 raise TypeError("msg_ids must be str, not %r"%msg_id)
1532 theids.append(msg_id)
1537 theids.append(msg_id)
1533
1538
1534 completed = []
1539 completed = []
1535 local_results = {}
1540 local_results = {}
1536
1541
1537 # comment this block out to temporarily disable local shortcut:
1542 # comment this block out to temporarily disable local shortcut:
1538 for msg_id in theids:
1543 for msg_id in theids:
1539 if msg_id in self.results:
1544 if msg_id in self.results:
1540 completed.append(msg_id)
1545 completed.append(msg_id)
1541 local_results[msg_id] = self.results[msg_id]
1546 local_results[msg_id] = self.results[msg_id]
1542 theids.remove(msg_id)
1547 theids.remove(msg_id)
1543
1548
1544 if theids: # some not locally cached
1549 if theids: # some not locally cached
1545 content = dict(msg_ids=theids, status_only=status_only)
1550 content = dict(msg_ids=theids, status_only=status_only)
1546 msg = self.session.send(self._query_socket, "result_request", content=content)
1551 msg = self.session.send(self._query_socket, "result_request", content=content)
1547 zmq.select([self._query_socket], [], [])
1552 zmq.select([self._query_socket], [], [])
1548 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1553 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1549 if self.debug:
1554 if self.debug:
1550 pprint(msg)
1555 pprint(msg)
1551 content = msg['content']
1556 content = msg['content']
1552 if content['status'] != 'ok':
1557 if content['status'] != 'ok':
1553 raise self._unwrap_exception(content)
1558 raise self._unwrap_exception(content)
1554 buffers = msg['buffers']
1559 buffers = msg['buffers']
1555 else:
1560 else:
1556 content = dict(completed=[],pending=[])
1561 content = dict(completed=[],pending=[])
1557
1562
1558 content['completed'].extend(completed)
1563 content['completed'].extend(completed)
1559
1564
1560 if status_only:
1565 if status_only:
1561 return content
1566 return content
1562
1567
1563 failures = []
1568 failures = []
1564 # load cached results into result:
1569 # load cached results into result:
1565 content.update(local_results)
1570 content.update(local_results)
1566
1571
1567 # update cache with results:
1572 # update cache with results:
1568 for msg_id in sorted(theids):
1573 for msg_id in sorted(theids):
1569 if msg_id in content['completed']:
1574 if msg_id in content['completed']:
1570 rec = content[msg_id]
1575 rec = content[msg_id]
1571 parent = extract_dates(rec['header'])
1576 parent = extract_dates(rec['header'])
1572 header = extract_dates(rec['result_header'])
1577 header = extract_dates(rec['result_header'])
1573 rcontent = rec['result_content']
1578 rcontent = rec['result_content']
1574 iodict = rec['io']
1579 iodict = rec['io']
1575 if isinstance(rcontent, str):
1580 if isinstance(rcontent, str):
1576 rcontent = self.session.unpack(rcontent)
1581 rcontent = self.session.unpack(rcontent)
1577
1582
1578 md = self.metadata[msg_id]
1583 md = self.metadata[msg_id]
1579 md_msg = dict(
1584 md_msg = dict(
1580 content=rcontent,
1585 content=rcontent,
1581 parent_header=parent,
1586 parent_header=parent,
1582 header=header,
1587 header=header,
1583 metadata=rec['result_metadata'],
1588 metadata=rec['result_metadata'],
1584 )
1589 )
1585 md.update(self._extract_metadata(md_msg))
1590 md.update(self._extract_metadata(md_msg))
1586 if rec.get('received'):
1591 if rec.get('received'):
1587 md['received'] = parse_date(rec['received'])
1592 md['received'] = parse_date(rec['received'])
1588 md.update(iodict)
1593 md.update(iodict)
1589
1594
1590 if rcontent['status'] == 'ok':
1595 if rcontent['status'] == 'ok':
1591 if header['msg_type'] == 'apply_reply':
1596 if header['msg_type'] == 'apply_reply':
1592 res,buffers = serialize.unserialize_object(buffers)
1597 res,buffers = serialize.unserialize_object(buffers)
1593 elif header['msg_type'] == 'execute_reply':
1598 elif header['msg_type'] == 'execute_reply':
1594 res = ExecuteReply(msg_id, rcontent, md)
1599 res = ExecuteReply(msg_id, rcontent, md)
1595 else:
1600 else:
1596 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1601 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1597 else:
1602 else:
1598 res = self._unwrap_exception(rcontent)
1603 res = self._unwrap_exception(rcontent)
1599 failures.append(res)
1604 failures.append(res)
1600
1605
1601 self.results[msg_id] = res
1606 self.results[msg_id] = res
1602 content[msg_id] = res
1607 content[msg_id] = res
1603
1608
1604 if len(theids) == 1 and failures:
1609 if len(theids) == 1 and failures:
1605 raise failures[0]
1610 raise failures[0]
1606
1611
1607 error.collect_exceptions(failures, "result_status")
1612 error.collect_exceptions(failures, "result_status")
1608 return content
1613 return content
1609
1614
1610 @spin_first
1615 @spin_first
1611 def queue_status(self, targets='all', verbose=False):
1616 def queue_status(self, targets='all', verbose=False):
1612 """Fetch the status of engine queues.
1617 """Fetch the status of engine queues.
1613
1618
1614 Parameters
1619 Parameters
1615 ----------
1620 ----------
1616
1621
1617 targets : int/str/list of ints/strs
1622 targets : int/str/list of ints/strs
1618 the engines whose states are to be queried.
1623 the engines whose states are to be queried.
1619 default : all
1624 default : all
1620 verbose : bool
1625 verbose : bool
1621 Whether to return lengths only, or lists of ids for each element
1626 Whether to return lengths only, or lists of ids for each element
1622 """
1627 """
1623 if targets == 'all':
1628 if targets == 'all':
1624 # allow 'all' to be evaluated on the engine
1629 # allow 'all' to be evaluated on the engine
1625 engine_ids = None
1630 engine_ids = None
1626 else:
1631 else:
1627 engine_ids = self._build_targets(targets)[1]
1632 engine_ids = self._build_targets(targets)[1]
1628 content = dict(targets=engine_ids, verbose=verbose)
1633 content = dict(targets=engine_ids, verbose=verbose)
1629 self.session.send(self._query_socket, "queue_request", content=content)
1634 self.session.send(self._query_socket, "queue_request", content=content)
1630 idents,msg = self.session.recv(self._query_socket, 0)
1635 idents,msg = self.session.recv(self._query_socket, 0)
1631 if self.debug:
1636 if self.debug:
1632 pprint(msg)
1637 pprint(msg)
1633 content = msg['content']
1638 content = msg['content']
1634 status = content.pop('status')
1639 status = content.pop('status')
1635 if status != 'ok':
1640 if status != 'ok':
1636 raise self._unwrap_exception(content)
1641 raise self._unwrap_exception(content)
1637 content = rekey(content)
1642 content = rekey(content)
1638 if isinstance(targets, int):
1643 if isinstance(targets, int):
1639 return content[targets]
1644 return content[targets]
1640 else:
1645 else:
1641 return content
1646 return content
1642
1647
1643 def _build_msgids_from_target(self, targets=None):
1648 def _build_msgids_from_target(self, targets=None):
1644 """Build a list of msg_ids from the list of engine targets"""
1649 """Build a list of msg_ids from the list of engine targets"""
1645 if not targets: # needed as _build_targets otherwise uses all engines
1650 if not targets: # needed as _build_targets otherwise uses all engines
1646 return []
1651 return []
1647 target_ids = self._build_targets(targets)[0]
1652 target_ids = self._build_targets(targets)[0]
1648 return [md_id for md_id in self.metadata if self.metadata[md_id]["engine_uuid"] in target_ids]
1653 return [md_id for md_id in self.metadata if self.metadata[md_id]["engine_uuid"] in target_ids]
1649
1654
1650 def _build_msgids_from_jobs(self, jobs=None):
1655 def _build_msgids_from_jobs(self, jobs=None):
1651 """Build a list of msg_ids from "jobs" """
1656 """Build a list of msg_ids from "jobs" """
1652 if not jobs:
1657 if not jobs:
1653 return []
1658 return []
1654 msg_ids = []
1659 msg_ids = []
1655 if isinstance(jobs, string_types + (AsyncResult,)):
1660 if isinstance(jobs, string_types + (AsyncResult,)):
1656 jobs = [jobs]
1661 jobs = [jobs]
1657 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1662 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1658 if bad_ids:
1663 if bad_ids:
1659 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1664 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1660 for j in jobs:
1665 for j in jobs:
1661 if isinstance(j, AsyncResult):
1666 if isinstance(j, AsyncResult):
1662 msg_ids.extend(j.msg_ids)
1667 msg_ids.extend(j.msg_ids)
1663 else:
1668 else:
1664 msg_ids.append(j)
1669 msg_ids.append(j)
1665 return msg_ids
1670 return msg_ids
1666
1671
1667 def purge_local_results(self, jobs=[], targets=[]):
1672 def purge_local_results(self, jobs=[], targets=[]):
1668 """Clears the client caches of results and their metadata.
1673 """Clears the client caches of results and their metadata.
1669
1674
1670 Individual results can be purged by msg_id, or the entire
1675 Individual results can be purged by msg_id, or the entire
1671 history of specific targets can be purged.
1676 history of specific targets can be purged.
1672
1677
1673 Use `purge_local_results('all')` to scrub everything from the Clients's
1678 Use `purge_local_results('all')` to scrub everything from the Clients's
1674 results and metadata caches.
1679 results and metadata caches.
1675
1680
1676 After this call all `AsyncResults` are invalid and should be discarded.
1681 After this call all `AsyncResults` are invalid and should be discarded.
1677
1682
1678 If you must "reget" the results, you can still do so by using
1683 If you must "reget" the results, you can still do so by using
1679 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1684 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1680 redownload the results from the hub if they are still available
1685 redownload the results from the hub if they are still available
1681 (i.e `client.purge_hub_results(...)` has not been called.
1686 (i.e `client.purge_hub_results(...)` has not been called.
1682
1687
1683 Parameters
1688 Parameters
1684 ----------
1689 ----------
1685
1690
1686 jobs : str or list of str or AsyncResult objects
1691 jobs : str or list of str or AsyncResult objects
1687 the msg_ids whose results should be purged.
1692 the msg_ids whose results should be purged.
1688 targets : int/list of ints
1693 targets : int/list of ints
1689 The engines, by integer ID, whose entire result histories are to be purged.
1694 The engines, by integer ID, whose entire result histories are to be purged.
1690
1695
1691 Raises
1696 Raises
1692 ------
1697 ------
1693
1698
1694 RuntimeError : if any of the tasks to be purged are still outstanding.
1699 RuntimeError : if any of the tasks to be purged are still outstanding.
1695
1700
1696 """
1701 """
1697 if not targets and not jobs:
1702 if not targets and not jobs:
1698 raise ValueError("Must specify at least one of `targets` and `jobs`")
1703 raise ValueError("Must specify at least one of `targets` and `jobs`")
1699
1704
1700 if jobs == 'all':
1705 if jobs == 'all':
1701 if self.outstanding:
1706 if self.outstanding:
1702 raise RuntimeError("Can't purge outstanding tasks: %s" % self.outstanding)
1707 raise RuntimeError("Can't purge outstanding tasks: %s" % self.outstanding)
1703 self.results.clear()
1708 self.results.clear()
1704 self.metadata.clear()
1709 self.metadata.clear()
1705 else:
1710 else:
1706 msg_ids = set()
1711 msg_ids = set()
1707 msg_ids.update(self._build_msgids_from_target(targets))
1712 msg_ids.update(self._build_msgids_from_target(targets))
1708 msg_ids.update(self._build_msgids_from_jobs(jobs))
1713 msg_ids.update(self._build_msgids_from_jobs(jobs))
1709 still_outstanding = self.outstanding.intersection(msg_ids)
1714 still_outstanding = self.outstanding.intersection(msg_ids)
1710 if still_outstanding:
1715 if still_outstanding:
1711 raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding)
1716 raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding)
1712 for mid in msg_ids:
1717 for mid in msg_ids:
1713 self.results.pop(mid, None)
1718 self.results.pop(mid, None)
1714 self.metadata.pop(mid, None)
1719 self.metadata.pop(mid, None)
1715
1720
1716
1721
1717 @spin_first
1722 @spin_first
1718 def purge_hub_results(self, jobs=[], targets=[]):
1723 def purge_hub_results(self, jobs=[], targets=[]):
1719 """Tell the Hub to forget results.
1724 """Tell the Hub to forget results.
1720
1725
1721 Individual results can be purged by msg_id, or the entire
1726 Individual results can be purged by msg_id, or the entire
1722 history of specific targets can be purged.
1727 history of specific targets can be purged.
1723
1728
1724 Use `purge_results('all')` to scrub everything from the Hub's db.
1729 Use `purge_results('all')` to scrub everything from the Hub's db.
1725
1730
1726 Parameters
1731 Parameters
1727 ----------
1732 ----------
1728
1733
1729 jobs : str or list of str or AsyncResult objects
1734 jobs : str or list of str or AsyncResult objects
1730 the msg_ids whose results should be forgotten.
1735 the msg_ids whose results should be forgotten.
1731 targets : int/str/list of ints/strs
1736 targets : int/str/list of ints/strs
1732 The targets, by int_id, whose entire history is to be purged.
1737 The targets, by int_id, whose entire history is to be purged.
1733
1738
1734 default : None
1739 default : None
1735 """
1740 """
1736 if not targets and not jobs:
1741 if not targets and not jobs:
1737 raise ValueError("Must specify at least one of `targets` and `jobs`")
1742 raise ValueError("Must specify at least one of `targets` and `jobs`")
1738 if targets:
1743 if targets:
1739 targets = self._build_targets(targets)[1]
1744 targets = self._build_targets(targets)[1]
1740
1745
1741 # construct msg_ids from jobs
1746 # construct msg_ids from jobs
1742 if jobs == 'all':
1747 if jobs == 'all':
1743 msg_ids = jobs
1748 msg_ids = jobs
1744 else:
1749 else:
1745 msg_ids = self._build_msgids_from_jobs(jobs)
1750 msg_ids = self._build_msgids_from_jobs(jobs)
1746
1751
1747 content = dict(engine_ids=targets, msg_ids=msg_ids)
1752 content = dict(engine_ids=targets, msg_ids=msg_ids)
1748 self.session.send(self._query_socket, "purge_request", content=content)
1753 self.session.send(self._query_socket, "purge_request", content=content)
1749 idents, msg = self.session.recv(self._query_socket, 0)
1754 idents, msg = self.session.recv(self._query_socket, 0)
1750 if self.debug:
1755 if self.debug:
1751 pprint(msg)
1756 pprint(msg)
1752 content = msg['content']
1757 content = msg['content']
1753 if content['status'] != 'ok':
1758 if content['status'] != 'ok':
1754 raise self._unwrap_exception(content)
1759 raise self._unwrap_exception(content)
1755
1760
1756 def purge_results(self, jobs=[], targets=[]):
1761 def purge_results(self, jobs=[], targets=[]):
1757 """Clears the cached results from both the hub and the local client
1762 """Clears the cached results from both the hub and the local client
1758
1763
1759 Individual results can be purged by msg_id, or the entire
1764 Individual results can be purged by msg_id, or the entire
1760 history of specific targets can be purged.
1765 history of specific targets can be purged.
1761
1766
1762 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1767 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1763 the Client's db.
1768 the Client's db.
1764
1769
1765 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1770 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1766 the same arguments.
1771 the same arguments.
1767
1772
1768 Parameters
1773 Parameters
1769 ----------
1774 ----------
1770
1775
1771 jobs : str or list of str or AsyncResult objects
1776 jobs : str or list of str or AsyncResult objects
1772 the msg_ids whose results should be forgotten.
1777 the msg_ids whose results should be forgotten.
1773 targets : int/str/list of ints/strs
1778 targets : int/str/list of ints/strs
1774 The targets, by int_id, whose entire history is to be purged.
1779 The targets, by int_id, whose entire history is to be purged.
1775
1780
1776 default : None
1781 default : None
1777 """
1782 """
1778 self.purge_local_results(jobs=jobs, targets=targets)
1783 self.purge_local_results(jobs=jobs, targets=targets)
1779 self.purge_hub_results(jobs=jobs, targets=targets)
1784 self.purge_hub_results(jobs=jobs, targets=targets)
1780
1785
1781 def purge_everything(self):
1786 def purge_everything(self):
1782 """Clears all content from previous Tasks from both the hub and the local client
1787 """Clears all content from previous Tasks from both the hub and the local client
1783
1788
1784 In addition to calling `purge_results("all")` it also deletes the history and
1789 In addition to calling `purge_results("all")` it also deletes the history and
1785 other bookkeeping lists.
1790 other bookkeeping lists.
1786 """
1791 """
1787 self.purge_results("all")
1792 self.purge_results("all")
1788 self.history = []
1793 self.history = []
1789 self.session.digest_history.clear()
1794 self.session.digest_history.clear()
1790
1795
1791 @spin_first
1796 @spin_first
1792 def hub_history(self):
1797 def hub_history(self):
1793 """Get the Hub's history
1798 """Get the Hub's history
1794
1799
1795 Just like the Client, the Hub has a history, which is a list of msg_ids.
1800 Just like the Client, the Hub has a history, which is a list of msg_ids.
1796 This will contain the history of all clients, and, depending on configuration,
1801 This will contain the history of all clients, and, depending on configuration,
1797 may contain history across multiple cluster sessions.
1802 may contain history across multiple cluster sessions.
1798
1803
1799 Any msg_id returned here is a valid argument to `get_result`.
1804 Any msg_id returned here is a valid argument to `get_result`.
1800
1805
1801 Returns
1806 Returns
1802 -------
1807 -------
1803
1808
1804 msg_ids : list of strs
1809 msg_ids : list of strs
1805 list of all msg_ids, ordered by task submission time.
1810 list of all msg_ids, ordered by task submission time.
1806 """
1811 """
1807
1812
1808 self.session.send(self._query_socket, "history_request", content={})
1813 self.session.send(self._query_socket, "history_request", content={})
1809 idents, msg = self.session.recv(self._query_socket, 0)
1814 idents, msg = self.session.recv(self._query_socket, 0)
1810
1815
1811 if self.debug:
1816 if self.debug:
1812 pprint(msg)
1817 pprint(msg)
1813 content = msg['content']
1818 content = msg['content']
1814 if content['status'] != 'ok':
1819 if content['status'] != 'ok':
1815 raise self._unwrap_exception(content)
1820 raise self._unwrap_exception(content)
1816 else:
1821 else:
1817 return content['history']
1822 return content['history']
1818
1823
1819 @spin_first
1824 @spin_first
1820 def db_query(self, query, keys=None):
1825 def db_query(self, query, keys=None):
1821 """Query the Hub's TaskRecord database
1826 """Query the Hub's TaskRecord database
1822
1827
1823 This will return a list of task record dicts that match `query`
1828 This will return a list of task record dicts that match `query`
1824
1829
1825 Parameters
1830 Parameters
1826 ----------
1831 ----------
1827
1832
1828 query : mongodb query dict
1833 query : mongodb query dict
1829 The search dict. See mongodb query docs for details.
1834 The search dict. See mongodb query docs for details.
1830 keys : list of strs [optional]
1835 keys : list of strs [optional]
1831 The subset of keys to be returned. The default is to fetch everything but buffers.
1836 The subset of keys to be returned. The default is to fetch everything but buffers.
1832 'msg_id' will *always* be included.
1837 'msg_id' will *always* be included.
1833 """
1838 """
1834 if isinstance(keys, string_types):
1839 if isinstance(keys, string_types):
1835 keys = [keys]
1840 keys = [keys]
1836 content = dict(query=query, keys=keys)
1841 content = dict(query=query, keys=keys)
1837 self.session.send(self._query_socket, "db_request", content=content)
1842 self.session.send(self._query_socket, "db_request", content=content)
1838 idents, msg = self.session.recv(self._query_socket, 0)
1843 idents, msg = self.session.recv(self._query_socket, 0)
1839 if self.debug:
1844 if self.debug:
1840 pprint(msg)
1845 pprint(msg)
1841 content = msg['content']
1846 content = msg['content']
1842 if content['status'] != 'ok':
1847 if content['status'] != 'ok':
1843 raise self._unwrap_exception(content)
1848 raise self._unwrap_exception(content)
1844
1849
1845 records = content['records']
1850 records = content['records']
1846
1851
1847 buffer_lens = content['buffer_lens']
1852 buffer_lens = content['buffer_lens']
1848 result_buffer_lens = content['result_buffer_lens']
1853 result_buffer_lens = content['result_buffer_lens']
1849 buffers = msg['buffers']
1854 buffers = msg['buffers']
1850 has_bufs = buffer_lens is not None
1855 has_bufs = buffer_lens is not None
1851 has_rbufs = result_buffer_lens is not None
1856 has_rbufs = result_buffer_lens is not None
1852 for i,rec in enumerate(records):
1857 for i,rec in enumerate(records):
1853 # unpack datetime objects
1858 # unpack datetime objects
1854 for hkey in ('header', 'result_header'):
1859 for hkey in ('header', 'result_header'):
1855 if hkey in rec:
1860 if hkey in rec:
1856 rec[hkey] = extract_dates(rec[hkey])
1861 rec[hkey] = extract_dates(rec[hkey])
1857 for dtkey in ('submitted', 'started', 'completed', 'received'):
1862 for dtkey in ('submitted', 'started', 'completed', 'received'):
1858 if dtkey in rec:
1863 if dtkey in rec:
1859 rec[dtkey] = parse_date(rec[dtkey])
1864 rec[dtkey] = parse_date(rec[dtkey])
1860 # relink buffers
1865 # relink buffers
1861 if has_bufs:
1866 if has_bufs:
1862 blen = buffer_lens[i]
1867 blen = buffer_lens[i]
1863 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1868 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1864 if has_rbufs:
1869 if has_rbufs:
1865 blen = result_buffer_lens[i]
1870 blen = result_buffer_lens[i]
1866 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1871 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1867
1872
1868 return records
1873 return records
1869
1874
1870 __all__ = [ 'Client' ]
1875 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now