##// END OF EJS Templates
even better IOError for no connection file
MinRK -
Show More
@@ -1,1875 +1,1889 b''
1 """A semi-synchronous Client for IPython parallel"""
1 """A semi-synchronous Client for IPython parallel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import print_function
6 from __future__ import print_function
7
7
8 import os
8 import os
9 import json
9 import json
10 import sys
10 import sys
11 from threading import Thread, Event
11 from threading import Thread, Event
12 import time
12 import time
13 import warnings
13 import warnings
14 from datetime import datetime
14 from datetime import datetime
15 from getpass import getpass
15 from getpass import getpass
16 from pprint import pprint
16 from pprint import pprint
17
17
18 pjoin = os.path.join
18 pjoin = os.path.join
19
19
20 import zmq
20 import zmq
21
21
22 from IPython.config.configurable import MultipleInstanceError
22 from IPython.config.configurable import MultipleInstanceError
23 from IPython.core.application import BaseIPythonApplication
23 from IPython.core.application import BaseIPythonApplication
24 from IPython.core.profiledir import ProfileDir, ProfileDirError
24 from IPython.core.profiledir import ProfileDir, ProfileDirError
25
25
26 from IPython.utils.capture import RichOutput
26 from IPython.utils.capture import RichOutput
27 from IPython.utils.coloransi import TermColors
27 from IPython.utils.coloransi import TermColors
28 from IPython.utils.jsonutil import rekey, extract_dates, parse_date
28 from IPython.utils.jsonutil import rekey, extract_dates, parse_date
29 from IPython.utils.localinterfaces import localhost, is_local_ip
29 from IPython.utils.localinterfaces import localhost, is_local_ip
30 from IPython.utils.path import get_ipython_dir, compress_user
30 from IPython.utils.path import get_ipython_dir, compress_user
31 from IPython.utils.py3compat import cast_bytes, string_types, xrange, iteritems
31 from IPython.utils.py3compat import cast_bytes, string_types, xrange, iteritems
32 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
32 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
33 Dict, List, Bool, Set, Any)
33 Dict, List, Bool, Set, Any)
34 from IPython.external.decorator import decorator
34 from IPython.external.decorator import decorator
35
35
36 from IPython.parallel import Reference
36 from IPython.parallel import Reference
37 from IPython.parallel import error
37 from IPython.parallel import error
38 from IPython.parallel import util
38 from IPython.parallel import util
39
39
40 from IPython.kernel.zmq.session import Session, Message
40 from IPython.kernel.zmq.session import Session, Message
41 from IPython.kernel.zmq import serialize
41 from IPython.kernel.zmq import serialize
42
42
43 from .asyncresult import AsyncResult, AsyncHubResult
43 from .asyncresult import AsyncResult, AsyncHubResult
44 from .view import DirectView, LoadBalancedView
44 from .view import DirectView, LoadBalancedView
45
45
46 #--------------------------------------------------------------------------
46 #--------------------------------------------------------------------------
47 # Decorators for Client methods
47 # Decorators for Client methods
48 #--------------------------------------------------------------------------
48 #--------------------------------------------------------------------------
49
49
50
50 @decorator
51 @decorator
51 def spin_first(f, self, *args, **kwargs):
52 def spin_first(f, self, *args, **kwargs):
52 """Call spin() to sync state prior to calling the method."""
53 """Call spin() to sync state prior to calling the method."""
53 self.spin()
54 self.spin()
54 return f(self, *args, **kwargs)
55 return f(self, *args, **kwargs)
55
56
56
57
57 #--------------------------------------------------------------------------
58 #--------------------------------------------------------------------------
58 # Classes
59 # Classes
59 #--------------------------------------------------------------------------
60 #--------------------------------------------------------------------------
60
61
62 _no_connection_file_msg = """
63 Failed to connect because no Controller could be found.
64 Please double-check your profile and ensure that a cluster is running.
65 """
61
66
62 class ExecuteReply(RichOutput):
67 class ExecuteReply(RichOutput):
63 """wrapper for finished Execute results"""
68 """wrapper for finished Execute results"""
64 def __init__(self, msg_id, content, metadata):
69 def __init__(self, msg_id, content, metadata):
65 self.msg_id = msg_id
70 self.msg_id = msg_id
66 self._content = content
71 self._content = content
67 self.execution_count = content['execution_count']
72 self.execution_count = content['execution_count']
68 self.metadata = metadata
73 self.metadata = metadata
69
74
70 # RichOutput overrides
75 # RichOutput overrides
71
76
72 @property
77 @property
73 def source(self):
78 def source(self):
74 execute_result = self.metadata['execute_result']
79 execute_result = self.metadata['execute_result']
75 if execute_result:
80 if execute_result:
76 return execute_result.get('source', '')
81 return execute_result.get('source', '')
77
82
78 @property
83 @property
79 def data(self):
84 def data(self):
80 execute_result = self.metadata['execute_result']
85 execute_result = self.metadata['execute_result']
81 if execute_result:
86 if execute_result:
82 return execute_result.get('data', {})
87 return execute_result.get('data', {})
83
88
84 @property
89 @property
85 def _metadata(self):
90 def _metadata(self):
86 execute_result = self.metadata['execute_result']
91 execute_result = self.metadata['execute_result']
87 if execute_result:
92 if execute_result:
88 return execute_result.get('metadata', {})
93 return execute_result.get('metadata', {})
89
94
90 def display(self):
95 def display(self):
91 from IPython.display import publish_display_data
96 from IPython.display import publish_display_data
92 publish_display_data(self.data, self.metadata)
97 publish_display_data(self.data, self.metadata)
93
98
94 def _repr_mime_(self, mime):
99 def _repr_mime_(self, mime):
95 if mime not in self.data:
100 if mime not in self.data:
96 return
101 return
97 data = self.data[mime]
102 data = self.data[mime]
98 if mime in self._metadata:
103 if mime in self._metadata:
99 return data, self._metadata[mime]
104 return data, self._metadata[mime]
100 else:
105 else:
101 return data
106 return data
102
107
103 def __getitem__(self, key):
108 def __getitem__(self, key):
104 return self.metadata[key]
109 return self.metadata[key]
105
110
106 def __getattr__(self, key):
111 def __getattr__(self, key):
107 if key not in self.metadata:
112 if key not in self.metadata:
108 raise AttributeError(key)
113 raise AttributeError(key)
109 return self.metadata[key]
114 return self.metadata[key]
110
115
111 def __repr__(self):
116 def __repr__(self):
112 execute_result = self.metadata['execute_result'] or {'data':{}}
117 execute_result = self.metadata['execute_result'] or {'data':{}}
113 text_out = execute_result['data'].get('text/plain', '')
118 text_out = execute_result['data'].get('text/plain', '')
114 if len(text_out) > 32:
119 if len(text_out) > 32:
115 text_out = text_out[:29] + '...'
120 text_out = text_out[:29] + '...'
116
121
117 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
122 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
118
123
119 def _repr_pretty_(self, p, cycle):
124 def _repr_pretty_(self, p, cycle):
120 execute_result = self.metadata['execute_result'] or {'data':{}}
125 execute_result = self.metadata['execute_result'] or {'data':{}}
121 text_out = execute_result['data'].get('text/plain', '')
126 text_out = execute_result['data'].get('text/plain', '')
122
127
123 if not text_out:
128 if not text_out:
124 return
129 return
125
130
126 try:
131 try:
127 ip = get_ipython()
132 ip = get_ipython()
128 except NameError:
133 except NameError:
129 colors = "NoColor"
134 colors = "NoColor"
130 else:
135 else:
131 colors = ip.colors
136 colors = ip.colors
132
137
133 if colors == "NoColor":
138 if colors == "NoColor":
134 out = normal = ""
139 out = normal = ""
135 else:
140 else:
136 out = TermColors.Red
141 out = TermColors.Red
137 normal = TermColors.Normal
142 normal = TermColors.Normal
138
143
139 if '\n' in text_out and not text_out.startswith('\n'):
144 if '\n' in text_out and not text_out.startswith('\n'):
140 # add newline for multiline reprs
145 # add newline for multiline reprs
141 text_out = '\n' + text_out
146 text_out = '\n' + text_out
142
147
143 p.text(
148 p.text(
144 out + u'Out[%i:%i]: ' % (
149 out + u'Out[%i:%i]: ' % (
145 self.metadata['engine_id'], self.execution_count
150 self.metadata['engine_id'], self.execution_count
146 ) + normal + text_out
151 ) + normal + text_out
147 )
152 )
148
153
149
154
150 class Metadata(dict):
155 class Metadata(dict):
151 """Subclass of dict for initializing metadata values.
156 """Subclass of dict for initializing metadata values.
152
157
153 Attribute access works on keys.
158 Attribute access works on keys.
154
159
155 These objects have a strict set of keys - errors will raise if you try
160 These objects have a strict set of keys - errors will raise if you try
156 to add new keys.
161 to add new keys.
157 """
162 """
158 def __init__(self, *args, **kwargs):
163 def __init__(self, *args, **kwargs):
159 dict.__init__(self)
164 dict.__init__(self)
160 md = {'msg_id' : None,
165 md = {'msg_id' : None,
161 'submitted' : None,
166 'submitted' : None,
162 'started' : None,
167 'started' : None,
163 'completed' : None,
168 'completed' : None,
164 'received' : None,
169 'received' : None,
165 'engine_uuid' : None,
170 'engine_uuid' : None,
166 'engine_id' : None,
171 'engine_id' : None,
167 'follow' : None,
172 'follow' : None,
168 'after' : None,
173 'after' : None,
169 'status' : None,
174 'status' : None,
170
175
171 'execute_input' : None,
176 'execute_input' : None,
172 'execute_result' : None,
177 'execute_result' : None,
173 'error' : None,
178 'error' : None,
174 'stdout' : '',
179 'stdout' : '',
175 'stderr' : '',
180 'stderr' : '',
176 'outputs' : [],
181 'outputs' : [],
177 'data': {},
182 'data': {},
178 'outputs_ready' : False,
183 'outputs_ready' : False,
179 }
184 }
180 self.update(md)
185 self.update(md)
181 self.update(dict(*args, **kwargs))
186 self.update(dict(*args, **kwargs))
182
187
183 def __getattr__(self, key):
188 def __getattr__(self, key):
184 """getattr aliased to getitem"""
189 """getattr aliased to getitem"""
185 if key in self:
190 if key in self:
186 return self[key]
191 return self[key]
187 else:
192 else:
188 raise AttributeError(key)
193 raise AttributeError(key)
189
194
190 def __setattr__(self, key, value):
195 def __setattr__(self, key, value):
191 """setattr aliased to setitem, with strict"""
196 """setattr aliased to setitem, with strict"""
192 if key in self:
197 if key in self:
193 self[key] = value
198 self[key] = value
194 else:
199 else:
195 raise AttributeError(key)
200 raise AttributeError(key)
196
201
197 def __setitem__(self, key, value):
202 def __setitem__(self, key, value):
198 """strict static key enforcement"""
203 """strict static key enforcement"""
199 if key in self:
204 if key in self:
200 dict.__setitem__(self, key, value)
205 dict.__setitem__(self, key, value)
201 else:
206 else:
202 raise KeyError(key)
207 raise KeyError(key)
203
208
204
209
205 class Client(HasTraits):
210 class Client(HasTraits):
206 """A semi-synchronous client to the IPython ZMQ cluster
211 """A semi-synchronous client to the IPython ZMQ cluster
207
212
208 Parameters
213 Parameters
209 ----------
214 ----------
210
215
211 url_file : str/unicode; path to ipcontroller-client.json
216 url_file : str/unicode; path to ipcontroller-client.json
212 This JSON file should contain all the information needed to connect to a cluster,
217 This JSON file should contain all the information needed to connect to a cluster,
213 and is likely the only argument needed.
218 and is likely the only argument needed.
214 Connection information for the Hub's registration. If a json connector
219 Connection information for the Hub's registration. If a json connector
215 file is given, then likely no further configuration is necessary.
220 file is given, then likely no further configuration is necessary.
216 [Default: use profile]
221 [Default: use profile]
217 profile : bytes
222 profile : bytes
218 The name of the Cluster profile to be used to find connector information.
223 The name of the Cluster profile to be used to find connector information.
219 If run from an IPython application, the default profile will be the same
224 If run from an IPython application, the default profile will be the same
220 as the running application, otherwise it will be 'default'.
225 as the running application, otherwise it will be 'default'.
221 cluster_id : str
226 cluster_id : str
222 String id to added to runtime files, to prevent name collisions when using
227 String id to added to runtime files, to prevent name collisions when using
223 multiple clusters with a single profile simultaneously.
228 multiple clusters with a single profile simultaneously.
224 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
229 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
225 Since this is text inserted into filenames, typical recommendations apply:
230 Since this is text inserted into filenames, typical recommendations apply:
226 Simple character strings are ideal, and spaces are not recommended (but
231 Simple character strings are ideal, and spaces are not recommended (but
227 should generally work)
232 should generally work)
228 context : zmq.Context
233 context : zmq.Context
229 Pass an existing zmq.Context instance, otherwise the client will create its own.
234 Pass an existing zmq.Context instance, otherwise the client will create its own.
230 debug : bool
235 debug : bool
231 flag for lots of message printing for debug purposes
236 flag for lots of message printing for debug purposes
232 timeout : int/float
237 timeout : int/float
233 time (in seconds) to wait for connection replies from the Hub
238 time (in seconds) to wait for connection replies from the Hub
234 [Default: 10]
239 [Default: 10]
235
240
236 #-------------- session related args ----------------
241 #-------------- session related args ----------------
237
242
238 config : Config object
243 config : Config object
239 If specified, this will be relayed to the Session for configuration
244 If specified, this will be relayed to the Session for configuration
240 username : str
245 username : str
241 set username for the session object
246 set username for the session object
242
247
243 #-------------- ssh related args ----------------
248 #-------------- ssh related args ----------------
244 # These are args for configuring the ssh tunnel to be used
249 # These are args for configuring the ssh tunnel to be used
245 # credentials are used to forward connections over ssh to the Controller
250 # credentials are used to forward connections over ssh to the Controller
246 # Note that the ip given in `addr` needs to be relative to sshserver
251 # Note that the ip given in `addr` needs to be relative to sshserver
247 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
252 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
248 # and set sshserver as the same machine the Controller is on. However,
253 # and set sshserver as the same machine the Controller is on. However,
249 # the only requirement is that sshserver is able to see the Controller
254 # the only requirement is that sshserver is able to see the Controller
250 # (i.e. is within the same trusted network).
255 # (i.e. is within the same trusted network).
251
256
252 sshserver : str
257 sshserver : str
253 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
258 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
254 If keyfile or password is specified, and this is not, it will default to
259 If keyfile or password is specified, and this is not, it will default to
255 the ip given in addr.
260 the ip given in addr.
256 sshkey : str; path to ssh private key file
261 sshkey : str; path to ssh private key file
257 This specifies a key to be used in ssh login, default None.
262 This specifies a key to be used in ssh login, default None.
258 Regular default ssh keys will be used without specifying this argument.
263 Regular default ssh keys will be used without specifying this argument.
259 password : str
264 password : str
260 Your ssh password to sshserver. Note that if this is left None,
265 Your ssh password to sshserver. Note that if this is left None,
261 you will be prompted for it if passwordless key based login is unavailable.
266 you will be prompted for it if passwordless key based login is unavailable.
262 paramiko : bool
267 paramiko : bool
263 flag for whether to use paramiko instead of shell ssh for tunneling.
268 flag for whether to use paramiko instead of shell ssh for tunneling.
264 [default: True on win32, False else]
269 [default: True on win32, False else]
265
270
266
271
267 Attributes
272 Attributes
268 ----------
273 ----------
269
274
270 ids : list of int engine IDs
275 ids : list of int engine IDs
271 requesting the ids attribute always synchronizes
276 requesting the ids attribute always synchronizes
272 the registration state. To request ids without synchronization,
277 the registration state. To request ids without synchronization,
273 use semi-private _ids attributes.
278 use semi-private _ids attributes.
274
279
275 history : list of msg_ids
280 history : list of msg_ids
276 a list of msg_ids, keeping track of all the execution
281 a list of msg_ids, keeping track of all the execution
277 messages you have submitted in order.
282 messages you have submitted in order.
278
283
279 outstanding : set of msg_ids
284 outstanding : set of msg_ids
280 a set of msg_ids that have been submitted, but whose
285 a set of msg_ids that have been submitted, but whose
281 results have not yet been received.
286 results have not yet been received.
282
287
283 results : dict
288 results : dict
284 a dict of all our results, keyed by msg_id
289 a dict of all our results, keyed by msg_id
285
290
286 block : bool
291 block : bool
287 determines default behavior when block not specified
292 determines default behavior when block not specified
288 in execution methods
293 in execution methods
289
294
290 Methods
295 Methods
291 -------
296 -------
292
297
293 spin
298 spin
294 flushes incoming results and registration state changes
299 flushes incoming results and registration state changes
295 control methods spin, and requesting `ids` also ensures up to date
300 control methods spin, and requesting `ids` also ensures up to date
296
301
297 wait
302 wait
298 wait on one or more msg_ids
303 wait on one or more msg_ids
299
304
300 execution methods
305 execution methods
301 apply
306 apply
302 legacy: execute, run
307 legacy: execute, run
303
308
304 data movement
309 data movement
305 push, pull, scatter, gather
310 push, pull, scatter, gather
306
311
307 query methods
312 query methods
308 queue_status, get_result, purge, result_status
313 queue_status, get_result, purge, result_status
309
314
310 control methods
315 control methods
311 abort, shutdown
316 abort, shutdown
312
317
313 """
318 """
314
319
315
320
316 block = Bool(False)
321 block = Bool(False)
317 outstanding = Set()
322 outstanding = Set()
318 results = Instance('collections.defaultdict', (dict,))
323 results = Instance('collections.defaultdict', (dict,))
319 metadata = Instance('collections.defaultdict', (Metadata,))
324 metadata = Instance('collections.defaultdict', (Metadata,))
320 history = List()
325 history = List()
321 debug = Bool(False)
326 debug = Bool(False)
322 _spin_thread = Any()
327 _spin_thread = Any()
323 _stop_spinning = Any()
328 _stop_spinning = Any()
324
329
325 profile=Unicode()
330 profile=Unicode()
326 def _profile_default(self):
331 def _profile_default(self):
327 if BaseIPythonApplication.initialized():
332 if BaseIPythonApplication.initialized():
328 # an IPython app *might* be running, try to get its profile
333 # an IPython app *might* be running, try to get its profile
329 try:
334 try:
330 return BaseIPythonApplication.instance().profile
335 return BaseIPythonApplication.instance().profile
331 except (AttributeError, MultipleInstanceError):
336 except (AttributeError, MultipleInstanceError):
332 # could be a *different* subclass of config.Application,
337 # could be a *different* subclass of config.Application,
333 # which would raise one of these two errors.
338 # which would raise one of these two errors.
334 return u'default'
339 return u'default'
335 else:
340 else:
336 return u'default'
341 return u'default'
337
342
338
343
339 _outstanding_dict = Instance('collections.defaultdict', (set,))
344 _outstanding_dict = Instance('collections.defaultdict', (set,))
340 _ids = List()
345 _ids = List()
341 _connected=Bool(False)
346 _connected=Bool(False)
342 _ssh=Bool(False)
347 _ssh=Bool(False)
343 _context = Instance('zmq.Context')
348 _context = Instance('zmq.Context')
344 _config = Dict()
349 _config = Dict()
345 _engines=Instance(util.ReverseDict, (), {})
350 _engines=Instance(util.ReverseDict, (), {})
346 # _hub_socket=Instance('zmq.Socket')
351 # _hub_socket=Instance('zmq.Socket')
347 _query_socket=Instance('zmq.Socket')
352 _query_socket=Instance('zmq.Socket')
348 _control_socket=Instance('zmq.Socket')
353 _control_socket=Instance('zmq.Socket')
349 _iopub_socket=Instance('zmq.Socket')
354 _iopub_socket=Instance('zmq.Socket')
350 _notification_socket=Instance('zmq.Socket')
355 _notification_socket=Instance('zmq.Socket')
351 _mux_socket=Instance('zmq.Socket')
356 _mux_socket=Instance('zmq.Socket')
352 _task_socket=Instance('zmq.Socket')
357 _task_socket=Instance('zmq.Socket')
353 _task_scheme=Unicode()
358 _task_scheme=Unicode()
354 _closed = False
359 _closed = False
355 _ignored_control_replies=Integer(0)
360 _ignored_control_replies=Integer(0)
356 _ignored_hub_replies=Integer(0)
361 _ignored_hub_replies=Integer(0)
357
362
358 def __new__(self, *args, **kw):
363 def __new__(self, *args, **kw):
359 # don't raise on positional args
364 # don't raise on positional args
360 return HasTraits.__new__(self, **kw)
365 return HasTraits.__new__(self, **kw)
361
366
362 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
367 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
363 context=None, debug=False,
368 context=None, debug=False,
364 sshserver=None, sshkey=None, password=None, paramiko=None,
369 sshserver=None, sshkey=None, password=None, paramiko=None,
365 timeout=10, cluster_id=None, **extra_args
370 timeout=10, cluster_id=None, **extra_args
366 ):
371 ):
367 if profile:
372 if profile:
368 super(Client, self).__init__(debug=debug, profile=profile)
373 super(Client, self).__init__(debug=debug, profile=profile)
369 else:
374 else:
370 super(Client, self).__init__(debug=debug)
375 super(Client, self).__init__(debug=debug)
371 if context is None:
376 if context is None:
372 context = zmq.Context.instance()
377 context = zmq.Context.instance()
373 self._context = context
378 self._context = context
374 self._stop_spinning = Event()
379 self._stop_spinning = Event()
375
380
376 if 'url_or_file' in extra_args:
381 if 'url_or_file' in extra_args:
377 url_file = extra_args['url_or_file']
382 url_file = extra_args['url_or_file']
378 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
383 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
379
384
380 if url_file and util.is_url(url_file):
385 if url_file and util.is_url(url_file):
381 raise ValueError("single urls cannot be specified, url-files must be used.")
386 raise ValueError("single urls cannot be specified, url-files must be used.")
382
387
383 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
388 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
384
389
390 no_file_msg = '\n'.join([
391 "You have attempted to connect to an IPython Cluster but no Controller could be found.",
392 "Please double-check your configuration and ensure that a cluster is running.",
393 ])
394
385 if self._cd is not None:
395 if self._cd is not None:
386 if url_file is None:
396 if url_file is None:
387 if not cluster_id:
397 if not cluster_id:
388 client_json = 'ipcontroller-client.json'
398 client_json = 'ipcontroller-client.json'
389 else:
399 else:
390 client_json = 'ipcontroller-%s-client.json' % cluster_id
400 client_json = 'ipcontroller-%s-client.json' % cluster_id
391 url_file = pjoin(self._cd.security_dir, client_json)
401 url_file = pjoin(self._cd.security_dir, client_json)
402 if not os.path.exists(url_file):
403 msg = '\n'.join([
404 "Connection file %r not found." % compress_user(url_file),
405 no_file_msg,
406 ])
407 raise IOError(msg)
392 if url_file is None:
408 if url_file is None:
393 raise ValueError(
409 raise IOError(no_file_msg)
394 "I can't find enough information to connect to a hub!"
395 " Please specify at least one of url_file or profile."
396 )
397
410
398 if not os.path.exists(url_file):
411 if not os.path.exists(url_file):
412 # Connection file explicitly specified, but not found
399 raise IOError("Connection file %r not found. Is a controller running?" % \
413 raise IOError("Connection file %r not found. Is a controller running?" % \
400 compress_user(url_file)
414 compress_user(url_file)
401 )
415 )
402
416
403 with open(url_file) as f:
417 with open(url_file) as f:
404 cfg = json.load(f)
418 cfg = json.load(f)
405
419
406 self._task_scheme = cfg['task_scheme']
420 self._task_scheme = cfg['task_scheme']
407
421
408 # sync defaults from args, json:
422 # sync defaults from args, json:
409 if sshserver:
423 if sshserver:
410 cfg['ssh'] = sshserver
424 cfg['ssh'] = sshserver
411
425
412 location = cfg.setdefault('location', None)
426 location = cfg.setdefault('location', None)
413
427
414 proto,addr = cfg['interface'].split('://')
428 proto,addr = cfg['interface'].split('://')
415 addr = util.disambiguate_ip_address(addr, location)
429 addr = util.disambiguate_ip_address(addr, location)
416 cfg['interface'] = "%s://%s" % (proto, addr)
430 cfg['interface'] = "%s://%s" % (proto, addr)
417
431
418 # turn interface,port into full urls:
432 # turn interface,port into full urls:
419 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
433 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
420 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
434 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
421
435
422 url = cfg['registration']
436 url = cfg['registration']
423
437
424 if location is not None and addr == localhost():
438 if location is not None and addr == localhost():
425 # location specified, and connection is expected to be local
439 # location specified, and connection is expected to be local
426 if not is_local_ip(location) and not sshserver:
440 if not is_local_ip(location) and not sshserver:
427 # load ssh from JSON *only* if the controller is not on
441 # load ssh from JSON *only* if the controller is not on
428 # this machine
442 # this machine
429 sshserver=cfg['ssh']
443 sshserver=cfg['ssh']
430 if not is_local_ip(location) and not sshserver:
444 if not is_local_ip(location) and not sshserver:
431 # warn if no ssh specified, but SSH is probably needed
445 # warn if no ssh specified, but SSH is probably needed
432 # This is only a warning, because the most likely cause
446 # This is only a warning, because the most likely cause
433 # is a local Controller on a laptop whose IP is dynamic
447 # is a local Controller on a laptop whose IP is dynamic
434 warnings.warn("""
448 warnings.warn("""
435 Controller appears to be listening on localhost, but not on this machine.
449 Controller appears to be listening on localhost, but not on this machine.
436 If this is true, you should specify Client(...,sshserver='you@%s')
450 If this is true, you should specify Client(...,sshserver='you@%s')
437 or instruct your controller to listen on an external IP."""%location,
451 or instruct your controller to listen on an external IP."""%location,
438 RuntimeWarning)
452 RuntimeWarning)
439 elif not sshserver:
453 elif not sshserver:
440 # otherwise sync with cfg
454 # otherwise sync with cfg
441 sshserver = cfg['ssh']
455 sshserver = cfg['ssh']
442
456
443 self._config = cfg
457 self._config = cfg
444
458
445 self._ssh = bool(sshserver or sshkey or password)
459 self._ssh = bool(sshserver or sshkey or password)
446 if self._ssh and sshserver is None:
460 if self._ssh and sshserver is None:
447 # default to ssh via localhost
461 # default to ssh via localhost
448 sshserver = addr
462 sshserver = addr
449 if self._ssh and password is None:
463 if self._ssh and password is None:
450 from zmq.ssh import tunnel
464 from zmq.ssh import tunnel
451 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
465 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
452 password=False
466 password=False
453 else:
467 else:
454 password = getpass("SSH Password for %s: "%sshserver)
468 password = getpass("SSH Password for %s: "%sshserver)
455 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
469 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
456
470
457 # configure and construct the session
471 # configure and construct the session
458 try:
472 try:
459 extra_args['packer'] = cfg['pack']
473 extra_args['packer'] = cfg['pack']
460 extra_args['unpacker'] = cfg['unpack']
474 extra_args['unpacker'] = cfg['unpack']
461 extra_args['key'] = cast_bytes(cfg['key'])
475 extra_args['key'] = cast_bytes(cfg['key'])
462 extra_args['signature_scheme'] = cfg['signature_scheme']
476 extra_args['signature_scheme'] = cfg['signature_scheme']
463 except KeyError as exc:
477 except KeyError as exc:
464 msg = '\n'.join([
478 msg = '\n'.join([
465 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
479 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
466 "If you are reusing connection files, remove them and start ipcontroller again."
480 "If you are reusing connection files, remove them and start ipcontroller again."
467 ])
481 ])
468 raise ValueError(msg.format(exc.message))
482 raise ValueError(msg.format(exc.message))
469
483
470 self.session = Session(**extra_args)
484 self.session = Session(**extra_args)
471
485
472 self._query_socket = self._context.socket(zmq.DEALER)
486 self._query_socket = self._context.socket(zmq.DEALER)
473
487
474 if self._ssh:
488 if self._ssh:
475 from zmq.ssh import tunnel
489 from zmq.ssh import tunnel
476 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
490 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
477 else:
491 else:
478 self._query_socket.connect(cfg['registration'])
492 self._query_socket.connect(cfg['registration'])
479
493
480 self.session.debug = self.debug
494 self.session.debug = self.debug
481
495
482 self._notification_handlers = {'registration_notification' : self._register_engine,
496 self._notification_handlers = {'registration_notification' : self._register_engine,
483 'unregistration_notification' : self._unregister_engine,
497 'unregistration_notification' : self._unregister_engine,
484 'shutdown_notification' : lambda msg: self.close(),
498 'shutdown_notification' : lambda msg: self.close(),
485 }
499 }
486 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
500 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
487 'apply_reply' : self._handle_apply_reply}
501 'apply_reply' : self._handle_apply_reply}
488
502
489 try:
503 try:
490 self._connect(sshserver, ssh_kwargs, timeout)
504 self._connect(sshserver, ssh_kwargs, timeout)
491 except:
505 except:
492 self.close(linger=0)
506 self.close(linger=0)
493 raise
507 raise
494
508
495 # last step: setup magics, if we are in IPython:
509 # last step: setup magics, if we are in IPython:
496
510
497 try:
511 try:
498 ip = get_ipython()
512 ip = get_ipython()
499 except NameError:
513 except NameError:
500 return
514 return
501 else:
515 else:
502 if 'px' not in ip.magics_manager.magics:
516 if 'px' not in ip.magics_manager.magics:
503 # in IPython but we are the first Client.
517 # in IPython but we are the first Client.
504 # activate a default view for parallel magics.
518 # activate a default view for parallel magics.
505 self.activate()
519 self.activate()
506
520
507 def __del__(self):
521 def __del__(self):
508 """cleanup sockets, but _not_ context."""
522 """cleanup sockets, but _not_ context."""
509 self.close()
523 self.close()
510
524
511 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
525 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
512 if ipython_dir is None:
526 if ipython_dir is None:
513 ipython_dir = get_ipython_dir()
527 ipython_dir = get_ipython_dir()
514 if profile_dir is not None:
528 if profile_dir is not None:
515 try:
529 try:
516 self._cd = ProfileDir.find_profile_dir(profile_dir)
530 self._cd = ProfileDir.find_profile_dir(profile_dir)
517 return
531 return
518 except ProfileDirError:
532 except ProfileDirError:
519 pass
533 pass
520 elif profile is not None:
534 elif profile is not None:
521 try:
535 try:
522 self._cd = ProfileDir.find_profile_dir_by_name(
536 self._cd = ProfileDir.find_profile_dir_by_name(
523 ipython_dir, profile)
537 ipython_dir, profile)
524 return
538 return
525 except ProfileDirError:
539 except ProfileDirError:
526 pass
540 pass
527 self._cd = None
541 self._cd = None
528
542
529 def _update_engines(self, engines):
543 def _update_engines(self, engines):
530 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
544 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
531 for k,v in iteritems(engines):
545 for k,v in iteritems(engines):
532 eid = int(k)
546 eid = int(k)
533 if eid not in self._engines:
547 if eid not in self._engines:
534 self._ids.append(eid)
548 self._ids.append(eid)
535 self._engines[eid] = v
549 self._engines[eid] = v
536 self._ids = sorted(self._ids)
550 self._ids = sorted(self._ids)
537 if sorted(self._engines.keys()) != list(range(len(self._engines))) and \
551 if sorted(self._engines.keys()) != list(range(len(self._engines))) and \
538 self._task_scheme == 'pure' and self._task_socket:
552 self._task_scheme == 'pure' and self._task_socket:
539 self._stop_scheduling_tasks()
553 self._stop_scheduling_tasks()
540
554
541 def _stop_scheduling_tasks(self):
555 def _stop_scheduling_tasks(self):
542 """Stop scheduling tasks because an engine has been unregistered
556 """Stop scheduling tasks because an engine has been unregistered
543 from a pure ZMQ scheduler.
557 from a pure ZMQ scheduler.
544 """
558 """
545 self._task_socket.close()
559 self._task_socket.close()
546 self._task_socket = None
560 self._task_socket = None
547 msg = "An engine has been unregistered, and we are using pure " +\
561 msg = "An engine has been unregistered, and we are using pure " +\
548 "ZMQ task scheduling. Task farming will be disabled."
562 "ZMQ task scheduling. Task farming will be disabled."
549 if self.outstanding:
563 if self.outstanding:
550 msg += " If you were running tasks when this happened, " +\
564 msg += " If you were running tasks when this happened, " +\
551 "some `outstanding` msg_ids may never resolve."
565 "some `outstanding` msg_ids may never resolve."
552 warnings.warn(msg, RuntimeWarning)
566 warnings.warn(msg, RuntimeWarning)
553
567
554 def _build_targets(self, targets):
568 def _build_targets(self, targets):
555 """Turn valid target IDs or 'all' into two lists:
569 """Turn valid target IDs or 'all' into two lists:
556 (int_ids, uuids).
570 (int_ids, uuids).
557 """
571 """
558 if not self._ids:
572 if not self._ids:
559 # flush notification socket if no engines yet, just in case
573 # flush notification socket if no engines yet, just in case
560 if not self.ids:
574 if not self.ids:
561 raise error.NoEnginesRegistered("Can't build targets without any engines")
575 raise error.NoEnginesRegistered("Can't build targets without any engines")
562
576
563 if targets is None:
577 if targets is None:
564 targets = self._ids
578 targets = self._ids
565 elif isinstance(targets, string_types):
579 elif isinstance(targets, string_types):
566 if targets.lower() == 'all':
580 if targets.lower() == 'all':
567 targets = self._ids
581 targets = self._ids
568 else:
582 else:
569 raise TypeError("%r not valid str target, must be 'all'"%(targets))
583 raise TypeError("%r not valid str target, must be 'all'"%(targets))
570 elif isinstance(targets, int):
584 elif isinstance(targets, int):
571 if targets < 0:
585 if targets < 0:
572 targets = self.ids[targets]
586 targets = self.ids[targets]
573 if targets not in self._ids:
587 if targets not in self._ids:
574 raise IndexError("No such engine: %i"%targets)
588 raise IndexError("No such engine: %i"%targets)
575 targets = [targets]
589 targets = [targets]
576
590
577 if isinstance(targets, slice):
591 if isinstance(targets, slice):
578 indices = list(range(len(self._ids))[targets])
592 indices = list(range(len(self._ids))[targets])
579 ids = self.ids
593 ids = self.ids
580 targets = [ ids[i] for i in indices ]
594 targets = [ ids[i] for i in indices ]
581
595
582 if not isinstance(targets, (tuple, list, xrange)):
596 if not isinstance(targets, (tuple, list, xrange)):
583 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
597 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
584
598
585 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
599 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
586
600
587 def _connect(self, sshserver, ssh_kwargs, timeout):
601 def _connect(self, sshserver, ssh_kwargs, timeout):
588 """setup all our socket connections to the cluster. This is called from
602 """setup all our socket connections to the cluster. This is called from
589 __init__."""
603 __init__."""
590
604
591 # Maybe allow reconnecting?
605 # Maybe allow reconnecting?
592 if self._connected:
606 if self._connected:
593 return
607 return
594 self._connected=True
608 self._connected=True
595
609
596 def connect_socket(s, url):
610 def connect_socket(s, url):
597 if self._ssh:
611 if self._ssh:
598 from zmq.ssh import tunnel
612 from zmq.ssh import tunnel
599 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
613 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
600 else:
614 else:
601 return s.connect(url)
615 return s.connect(url)
602
616
603 self.session.send(self._query_socket, 'connection_request')
617 self.session.send(self._query_socket, 'connection_request')
604 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
618 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
605 poller = zmq.Poller()
619 poller = zmq.Poller()
606 poller.register(self._query_socket, zmq.POLLIN)
620 poller.register(self._query_socket, zmq.POLLIN)
607 # poll expects milliseconds, timeout is seconds
621 # poll expects milliseconds, timeout is seconds
608 evts = poller.poll(timeout*1000)
622 evts = poller.poll(timeout*1000)
609 if not evts:
623 if not evts:
610 raise error.TimeoutError("Hub connection request timed out")
624 raise error.TimeoutError("Hub connection request timed out")
611 idents,msg = self.session.recv(self._query_socket,mode=0)
625 idents,msg = self.session.recv(self._query_socket,mode=0)
612 if self.debug:
626 if self.debug:
613 pprint(msg)
627 pprint(msg)
614 content = msg['content']
628 content = msg['content']
615 # self._config['registration'] = dict(content)
629 # self._config['registration'] = dict(content)
616 cfg = self._config
630 cfg = self._config
617 if content['status'] == 'ok':
631 if content['status'] == 'ok':
618 self._mux_socket = self._context.socket(zmq.DEALER)
632 self._mux_socket = self._context.socket(zmq.DEALER)
619 connect_socket(self._mux_socket, cfg['mux'])
633 connect_socket(self._mux_socket, cfg['mux'])
620
634
621 self._task_socket = self._context.socket(zmq.DEALER)
635 self._task_socket = self._context.socket(zmq.DEALER)
622 connect_socket(self._task_socket, cfg['task'])
636 connect_socket(self._task_socket, cfg['task'])
623
637
624 self._notification_socket = self._context.socket(zmq.SUB)
638 self._notification_socket = self._context.socket(zmq.SUB)
625 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
639 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
626 connect_socket(self._notification_socket, cfg['notification'])
640 connect_socket(self._notification_socket, cfg['notification'])
627
641
628 self._control_socket = self._context.socket(zmq.DEALER)
642 self._control_socket = self._context.socket(zmq.DEALER)
629 connect_socket(self._control_socket, cfg['control'])
643 connect_socket(self._control_socket, cfg['control'])
630
644
631 self._iopub_socket = self._context.socket(zmq.SUB)
645 self._iopub_socket = self._context.socket(zmq.SUB)
632 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
646 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
633 connect_socket(self._iopub_socket, cfg['iopub'])
647 connect_socket(self._iopub_socket, cfg['iopub'])
634
648
635 self._update_engines(dict(content['engines']))
649 self._update_engines(dict(content['engines']))
636 else:
650 else:
637 self._connected = False
651 self._connected = False
638 raise Exception("Failed to connect!")
652 raise Exception("Failed to connect!")
639
653
640 #--------------------------------------------------------------------------
654 #--------------------------------------------------------------------------
641 # handlers and callbacks for incoming messages
655 # handlers and callbacks for incoming messages
642 #--------------------------------------------------------------------------
656 #--------------------------------------------------------------------------
643
657
644 def _unwrap_exception(self, content):
658 def _unwrap_exception(self, content):
645 """unwrap exception, and remap engine_id to int."""
659 """unwrap exception, and remap engine_id to int."""
646 e = error.unwrap_exception(content)
660 e = error.unwrap_exception(content)
647 # print e.traceback
661 # print e.traceback
648 if e.engine_info:
662 if e.engine_info:
649 e_uuid = e.engine_info['engine_uuid']
663 e_uuid = e.engine_info['engine_uuid']
650 eid = self._engines[e_uuid]
664 eid = self._engines[e_uuid]
651 e.engine_info['engine_id'] = eid
665 e.engine_info['engine_id'] = eid
652 return e
666 return e
653
667
654 def _extract_metadata(self, msg):
668 def _extract_metadata(self, msg):
655 header = msg['header']
669 header = msg['header']
656 parent = msg['parent_header']
670 parent = msg['parent_header']
657 msg_meta = msg['metadata']
671 msg_meta = msg['metadata']
658 content = msg['content']
672 content = msg['content']
659 md = {'msg_id' : parent['msg_id'],
673 md = {'msg_id' : parent['msg_id'],
660 'received' : datetime.now(),
674 'received' : datetime.now(),
661 'engine_uuid' : msg_meta.get('engine', None),
675 'engine_uuid' : msg_meta.get('engine', None),
662 'follow' : msg_meta.get('follow', []),
676 'follow' : msg_meta.get('follow', []),
663 'after' : msg_meta.get('after', []),
677 'after' : msg_meta.get('after', []),
664 'status' : content['status'],
678 'status' : content['status'],
665 }
679 }
666
680
667 if md['engine_uuid'] is not None:
681 if md['engine_uuid'] is not None:
668 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
682 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
669
683
670 if 'date' in parent:
684 if 'date' in parent:
671 md['submitted'] = parent['date']
685 md['submitted'] = parent['date']
672 if 'started' in msg_meta:
686 if 'started' in msg_meta:
673 md['started'] = parse_date(msg_meta['started'])
687 md['started'] = parse_date(msg_meta['started'])
674 if 'date' in header:
688 if 'date' in header:
675 md['completed'] = header['date']
689 md['completed'] = header['date']
676 return md
690 return md
677
691
678 def _register_engine(self, msg):
692 def _register_engine(self, msg):
679 """Register a new engine, and update our connection info."""
693 """Register a new engine, and update our connection info."""
680 content = msg['content']
694 content = msg['content']
681 eid = content['id']
695 eid = content['id']
682 d = {eid : content['uuid']}
696 d = {eid : content['uuid']}
683 self._update_engines(d)
697 self._update_engines(d)
684
698
685 def _unregister_engine(self, msg):
699 def _unregister_engine(self, msg):
686 """Unregister an engine that has died."""
700 """Unregister an engine that has died."""
687 content = msg['content']
701 content = msg['content']
688 eid = int(content['id'])
702 eid = int(content['id'])
689 if eid in self._ids:
703 if eid in self._ids:
690 self._ids.remove(eid)
704 self._ids.remove(eid)
691 uuid = self._engines.pop(eid)
705 uuid = self._engines.pop(eid)
692
706
693 self._handle_stranded_msgs(eid, uuid)
707 self._handle_stranded_msgs(eid, uuid)
694
708
695 if self._task_socket and self._task_scheme == 'pure':
709 if self._task_socket and self._task_scheme == 'pure':
696 self._stop_scheduling_tasks()
710 self._stop_scheduling_tasks()
697
711
698 def _handle_stranded_msgs(self, eid, uuid):
712 def _handle_stranded_msgs(self, eid, uuid):
699 """Handle messages known to be on an engine when the engine unregisters.
713 """Handle messages known to be on an engine when the engine unregisters.
700
714
701 It is possible that this will fire prematurely - that is, an engine will
715 It is possible that this will fire prematurely - that is, an engine will
702 go down after completing a result, and the client will be notified
716 go down after completing a result, and the client will be notified
703 of the unregistration and later receive the successful result.
717 of the unregistration and later receive the successful result.
704 """
718 """
705
719
706 outstanding = self._outstanding_dict[uuid]
720 outstanding = self._outstanding_dict[uuid]
707
721
708 for msg_id in list(outstanding):
722 for msg_id in list(outstanding):
709 if msg_id in self.results:
723 if msg_id in self.results:
710 # we already
724 # we already
711 continue
725 continue
712 try:
726 try:
713 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
727 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
714 except:
728 except:
715 content = error.wrap_exception()
729 content = error.wrap_exception()
716 # build a fake message:
730 # build a fake message:
717 msg = self.session.msg('apply_reply', content=content)
731 msg = self.session.msg('apply_reply', content=content)
718 msg['parent_header']['msg_id'] = msg_id
732 msg['parent_header']['msg_id'] = msg_id
719 msg['metadata']['engine'] = uuid
733 msg['metadata']['engine'] = uuid
720 self._handle_apply_reply(msg)
734 self._handle_apply_reply(msg)
721
735
722 def _handle_execute_reply(self, msg):
736 def _handle_execute_reply(self, msg):
723 """Save the reply to an execute_request into our results.
737 """Save the reply to an execute_request into our results.
724
738
725 execute messages are never actually used. apply is used instead.
739 execute messages are never actually used. apply is used instead.
726 """
740 """
727
741
728 parent = msg['parent_header']
742 parent = msg['parent_header']
729 msg_id = parent['msg_id']
743 msg_id = parent['msg_id']
730 if msg_id not in self.outstanding:
744 if msg_id not in self.outstanding:
731 if msg_id in self.history:
745 if msg_id in self.history:
732 print("got stale result: %s"%msg_id)
746 print("got stale result: %s"%msg_id)
733 else:
747 else:
734 print("got unknown result: %s"%msg_id)
748 print("got unknown result: %s"%msg_id)
735 else:
749 else:
736 self.outstanding.remove(msg_id)
750 self.outstanding.remove(msg_id)
737
751
738 content = msg['content']
752 content = msg['content']
739 header = msg['header']
753 header = msg['header']
740
754
741 # construct metadata:
755 # construct metadata:
742 md = self.metadata[msg_id]
756 md = self.metadata[msg_id]
743 md.update(self._extract_metadata(msg))
757 md.update(self._extract_metadata(msg))
744 # is this redundant?
758 # is this redundant?
745 self.metadata[msg_id] = md
759 self.metadata[msg_id] = md
746
760
747 e_outstanding = self._outstanding_dict[md['engine_uuid']]
761 e_outstanding = self._outstanding_dict[md['engine_uuid']]
748 if msg_id in e_outstanding:
762 if msg_id in e_outstanding:
749 e_outstanding.remove(msg_id)
763 e_outstanding.remove(msg_id)
750
764
751 # construct result:
765 # construct result:
752 if content['status'] == 'ok':
766 if content['status'] == 'ok':
753 self.results[msg_id] = ExecuteReply(msg_id, content, md)
767 self.results[msg_id] = ExecuteReply(msg_id, content, md)
754 elif content['status'] == 'aborted':
768 elif content['status'] == 'aborted':
755 self.results[msg_id] = error.TaskAborted(msg_id)
769 self.results[msg_id] = error.TaskAborted(msg_id)
756 elif content['status'] == 'resubmitted':
770 elif content['status'] == 'resubmitted':
757 # TODO: handle resubmission
771 # TODO: handle resubmission
758 pass
772 pass
759 else:
773 else:
760 self.results[msg_id] = self._unwrap_exception(content)
774 self.results[msg_id] = self._unwrap_exception(content)
761
775
762 def _handle_apply_reply(self, msg):
776 def _handle_apply_reply(self, msg):
763 """Save the reply to an apply_request into our results."""
777 """Save the reply to an apply_request into our results."""
764 parent = msg['parent_header']
778 parent = msg['parent_header']
765 msg_id = parent['msg_id']
779 msg_id = parent['msg_id']
766 if msg_id not in self.outstanding:
780 if msg_id not in self.outstanding:
767 if msg_id in self.history:
781 if msg_id in self.history:
768 print("got stale result: %s"%msg_id)
782 print("got stale result: %s"%msg_id)
769 print(self.results[msg_id])
783 print(self.results[msg_id])
770 print(msg)
784 print(msg)
771 else:
785 else:
772 print("got unknown result: %s"%msg_id)
786 print("got unknown result: %s"%msg_id)
773 else:
787 else:
774 self.outstanding.remove(msg_id)
788 self.outstanding.remove(msg_id)
775 content = msg['content']
789 content = msg['content']
776 header = msg['header']
790 header = msg['header']
777
791
778 # construct metadata:
792 # construct metadata:
779 md = self.metadata[msg_id]
793 md = self.metadata[msg_id]
780 md.update(self._extract_metadata(msg))
794 md.update(self._extract_metadata(msg))
781 # is this redundant?
795 # is this redundant?
782 self.metadata[msg_id] = md
796 self.metadata[msg_id] = md
783
797
784 e_outstanding = self._outstanding_dict[md['engine_uuid']]
798 e_outstanding = self._outstanding_dict[md['engine_uuid']]
785 if msg_id in e_outstanding:
799 if msg_id in e_outstanding:
786 e_outstanding.remove(msg_id)
800 e_outstanding.remove(msg_id)
787
801
788 # construct result:
802 # construct result:
789 if content['status'] == 'ok':
803 if content['status'] == 'ok':
790 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
804 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
791 elif content['status'] == 'aborted':
805 elif content['status'] == 'aborted':
792 self.results[msg_id] = error.TaskAborted(msg_id)
806 self.results[msg_id] = error.TaskAborted(msg_id)
793 elif content['status'] == 'resubmitted':
807 elif content['status'] == 'resubmitted':
794 # TODO: handle resubmission
808 # TODO: handle resubmission
795 pass
809 pass
796 else:
810 else:
797 self.results[msg_id] = self._unwrap_exception(content)
811 self.results[msg_id] = self._unwrap_exception(content)
798
812
799 def _flush_notifications(self):
813 def _flush_notifications(self):
800 """Flush notifications of engine registrations waiting
814 """Flush notifications of engine registrations waiting
801 in ZMQ queue."""
815 in ZMQ queue."""
802 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
816 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
803 while msg is not None:
817 while msg is not None:
804 if self.debug:
818 if self.debug:
805 pprint(msg)
819 pprint(msg)
806 msg_type = msg['header']['msg_type']
820 msg_type = msg['header']['msg_type']
807 handler = self._notification_handlers.get(msg_type, None)
821 handler = self._notification_handlers.get(msg_type, None)
808 if handler is None:
822 if handler is None:
809 raise Exception("Unhandled message type: %s" % msg_type)
823 raise Exception("Unhandled message type: %s" % msg_type)
810 else:
824 else:
811 handler(msg)
825 handler(msg)
812 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
826 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
813
827
814 def _flush_results(self, sock):
828 def _flush_results(self, sock):
815 """Flush task or queue results waiting in ZMQ queue."""
829 """Flush task or queue results waiting in ZMQ queue."""
816 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
830 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
817 while msg is not None:
831 while msg is not None:
818 if self.debug:
832 if self.debug:
819 pprint(msg)
833 pprint(msg)
820 msg_type = msg['header']['msg_type']
834 msg_type = msg['header']['msg_type']
821 handler = self._queue_handlers.get(msg_type, None)
835 handler = self._queue_handlers.get(msg_type, None)
822 if handler is None:
836 if handler is None:
823 raise Exception("Unhandled message type: %s" % msg_type)
837 raise Exception("Unhandled message type: %s" % msg_type)
824 else:
838 else:
825 handler(msg)
839 handler(msg)
826 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
840 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
827
841
828 def _flush_control(self, sock):
842 def _flush_control(self, sock):
829 """Flush replies from the control channel waiting
843 """Flush replies from the control channel waiting
830 in the ZMQ queue.
844 in the ZMQ queue.
831
845
832 Currently: ignore them."""
846 Currently: ignore them."""
833 if self._ignored_control_replies <= 0:
847 if self._ignored_control_replies <= 0:
834 return
848 return
835 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
849 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
836 while msg is not None:
850 while msg is not None:
837 self._ignored_control_replies -= 1
851 self._ignored_control_replies -= 1
838 if self.debug:
852 if self.debug:
839 pprint(msg)
853 pprint(msg)
840 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
854 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
841
855
842 def _flush_ignored_control(self):
856 def _flush_ignored_control(self):
843 """flush ignored control replies"""
857 """flush ignored control replies"""
844 while self._ignored_control_replies > 0:
858 while self._ignored_control_replies > 0:
845 self.session.recv(self._control_socket)
859 self.session.recv(self._control_socket)
846 self._ignored_control_replies -= 1
860 self._ignored_control_replies -= 1
847
861
848 def _flush_ignored_hub_replies(self):
862 def _flush_ignored_hub_replies(self):
849 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
863 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
850 while msg is not None:
864 while msg is not None:
851 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
865 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
852
866
853 def _flush_iopub(self, sock):
867 def _flush_iopub(self, sock):
854 """Flush replies from the iopub channel waiting
868 """Flush replies from the iopub channel waiting
855 in the ZMQ queue.
869 in the ZMQ queue.
856 """
870 """
857 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
871 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
858 while msg is not None:
872 while msg is not None:
859 if self.debug:
873 if self.debug:
860 pprint(msg)
874 pprint(msg)
861 parent = msg['parent_header']
875 parent = msg['parent_header']
862 # ignore IOPub messages with no parent.
876 # ignore IOPub messages with no parent.
863 # Caused by print statements or warnings from before the first execution.
877 # Caused by print statements or warnings from before the first execution.
864 if not parent:
878 if not parent:
865 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
879 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
866 continue
880 continue
867 msg_id = parent['msg_id']
881 msg_id = parent['msg_id']
868 content = msg['content']
882 content = msg['content']
869 header = msg['header']
883 header = msg['header']
870 msg_type = msg['header']['msg_type']
884 msg_type = msg['header']['msg_type']
871
885
872 # init metadata:
886 # init metadata:
873 md = self.metadata[msg_id]
887 md = self.metadata[msg_id]
874
888
875 if msg_type == 'stream':
889 if msg_type == 'stream':
876 name = content['name']
890 name = content['name']
877 s = md[name] or ''
891 s = md[name] or ''
878 md[name] = s + content['data']
892 md[name] = s + content['data']
879 elif msg_type == 'error':
893 elif msg_type == 'error':
880 md.update({'error' : self._unwrap_exception(content)})
894 md.update({'error' : self._unwrap_exception(content)})
881 elif msg_type == 'execute_input':
895 elif msg_type == 'execute_input':
882 md.update({'execute_input' : content['code']})
896 md.update({'execute_input' : content['code']})
883 elif msg_type == 'display_data':
897 elif msg_type == 'display_data':
884 md['outputs'].append(content)
898 md['outputs'].append(content)
885 elif msg_type == 'execute_result':
899 elif msg_type == 'execute_result':
886 md['execute_result'] = content
900 md['execute_result'] = content
887 elif msg_type == 'data_message':
901 elif msg_type == 'data_message':
888 data, remainder = serialize.unserialize_object(msg['buffers'])
902 data, remainder = serialize.unserialize_object(msg['buffers'])
889 md['data'].update(data)
903 md['data'].update(data)
890 elif msg_type == 'status':
904 elif msg_type == 'status':
891 # idle message comes after all outputs
905 # idle message comes after all outputs
892 if content['execution_state'] == 'idle':
906 if content['execution_state'] == 'idle':
893 md['outputs_ready'] = True
907 md['outputs_ready'] = True
894 else:
908 else:
895 # unhandled msg_type (status, etc.)
909 # unhandled msg_type (status, etc.)
896 pass
910 pass
897
911
898 # reduntant?
912 # reduntant?
899 self.metadata[msg_id] = md
913 self.metadata[msg_id] = md
900
914
901 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
915 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
902
916
903 #--------------------------------------------------------------------------
917 #--------------------------------------------------------------------------
904 # len, getitem
918 # len, getitem
905 #--------------------------------------------------------------------------
919 #--------------------------------------------------------------------------
906
920
907 def __len__(self):
921 def __len__(self):
908 """len(client) returns # of engines."""
922 """len(client) returns # of engines."""
909 return len(self.ids)
923 return len(self.ids)
910
924
911 def __getitem__(self, key):
925 def __getitem__(self, key):
912 """index access returns DirectView multiplexer objects
926 """index access returns DirectView multiplexer objects
913
927
914 Must be int, slice, or list/tuple/xrange of ints"""
928 Must be int, slice, or list/tuple/xrange of ints"""
915 if not isinstance(key, (int, slice, tuple, list, xrange)):
929 if not isinstance(key, (int, slice, tuple, list, xrange)):
916 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
930 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
917 else:
931 else:
918 return self.direct_view(key)
932 return self.direct_view(key)
919
933
920 def __iter__(self):
934 def __iter__(self):
921 """Since we define getitem, Client is iterable
935 """Since we define getitem, Client is iterable
922
936
923 but unless we also define __iter__, it won't work correctly unless engine IDs
937 but unless we also define __iter__, it won't work correctly unless engine IDs
924 start at zero and are continuous.
938 start at zero and are continuous.
925 """
939 """
926 for eid in self.ids:
940 for eid in self.ids:
927 yield self.direct_view(eid)
941 yield self.direct_view(eid)
928
942
929 #--------------------------------------------------------------------------
943 #--------------------------------------------------------------------------
930 # Begin public methods
944 # Begin public methods
931 #--------------------------------------------------------------------------
945 #--------------------------------------------------------------------------
932
946
933 @property
947 @property
934 def ids(self):
948 def ids(self):
935 """Always up-to-date ids property."""
949 """Always up-to-date ids property."""
936 self._flush_notifications()
950 self._flush_notifications()
937 # always copy:
951 # always copy:
938 return list(self._ids)
952 return list(self._ids)
939
953
940 def activate(self, targets='all', suffix=''):
954 def activate(self, targets='all', suffix=''):
941 """Create a DirectView and register it with IPython magics
955 """Create a DirectView and register it with IPython magics
942
956
943 Defines the magics `%px, %autopx, %pxresult, %%px`
957 Defines the magics `%px, %autopx, %pxresult, %%px`
944
958
945 Parameters
959 Parameters
946 ----------
960 ----------
947
961
948 targets: int, list of ints, or 'all'
962 targets: int, list of ints, or 'all'
949 The engines on which the view's magics will run
963 The engines on which the view's magics will run
950 suffix: str [default: '']
964 suffix: str [default: '']
951 The suffix, if any, for the magics. This allows you to have
965 The suffix, if any, for the magics. This allows you to have
952 multiple views associated with parallel magics at the same time.
966 multiple views associated with parallel magics at the same time.
953
967
954 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
968 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
955 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
969 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
956 on engine 0.
970 on engine 0.
957 """
971 """
958 view = self.direct_view(targets)
972 view = self.direct_view(targets)
959 view.block = True
973 view.block = True
960 view.activate(suffix)
974 view.activate(suffix)
961 return view
975 return view
962
976
963 def close(self, linger=None):
977 def close(self, linger=None):
964 """Close my zmq Sockets
978 """Close my zmq Sockets
965
979
966 If `linger`, set the zmq LINGER socket option,
980 If `linger`, set the zmq LINGER socket option,
967 which allows discarding of messages.
981 which allows discarding of messages.
968 """
982 """
969 if self._closed:
983 if self._closed:
970 return
984 return
971 self.stop_spin_thread()
985 self.stop_spin_thread()
972 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
986 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
973 for name in snames:
987 for name in snames:
974 socket = getattr(self, name)
988 socket = getattr(self, name)
975 if socket is not None and not socket.closed:
989 if socket is not None and not socket.closed:
976 if linger is not None:
990 if linger is not None:
977 socket.close(linger=linger)
991 socket.close(linger=linger)
978 else:
992 else:
979 socket.close()
993 socket.close()
980 self._closed = True
994 self._closed = True
981
995
982 def _spin_every(self, interval=1):
996 def _spin_every(self, interval=1):
983 """target func for use in spin_thread"""
997 """target func for use in spin_thread"""
984 while True:
998 while True:
985 if self._stop_spinning.is_set():
999 if self._stop_spinning.is_set():
986 return
1000 return
987 time.sleep(interval)
1001 time.sleep(interval)
988 self.spin()
1002 self.spin()
989
1003
990 def spin_thread(self, interval=1):
1004 def spin_thread(self, interval=1):
991 """call Client.spin() in a background thread on some regular interval
1005 """call Client.spin() in a background thread on some regular interval
992
1006
993 This helps ensure that messages don't pile up too much in the zmq queue
1007 This helps ensure that messages don't pile up too much in the zmq queue
994 while you are working on other things, or just leaving an idle terminal.
1008 while you are working on other things, or just leaving an idle terminal.
995
1009
996 It also helps limit potential padding of the `received` timestamp
1010 It also helps limit potential padding of the `received` timestamp
997 on AsyncResult objects, used for timings.
1011 on AsyncResult objects, used for timings.
998
1012
999 Parameters
1013 Parameters
1000 ----------
1014 ----------
1001
1015
1002 interval : float, optional
1016 interval : float, optional
1003 The interval on which to spin the client in the background thread
1017 The interval on which to spin the client in the background thread
1004 (simply passed to time.sleep).
1018 (simply passed to time.sleep).
1005
1019
1006 Notes
1020 Notes
1007 -----
1021 -----
1008
1022
1009 For precision timing, you may want to use this method to put a bound
1023 For precision timing, you may want to use this method to put a bound
1010 on the jitter (in seconds) in `received` timestamps used
1024 on the jitter (in seconds) in `received` timestamps used
1011 in AsyncResult.wall_time.
1025 in AsyncResult.wall_time.
1012
1026
1013 """
1027 """
1014 if self._spin_thread is not None:
1028 if self._spin_thread is not None:
1015 self.stop_spin_thread()
1029 self.stop_spin_thread()
1016 self._stop_spinning.clear()
1030 self._stop_spinning.clear()
1017 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1031 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1018 self._spin_thread.daemon = True
1032 self._spin_thread.daemon = True
1019 self._spin_thread.start()
1033 self._spin_thread.start()
1020
1034
1021 def stop_spin_thread(self):
1035 def stop_spin_thread(self):
1022 """stop background spin_thread, if any"""
1036 """stop background spin_thread, if any"""
1023 if self._spin_thread is not None:
1037 if self._spin_thread is not None:
1024 self._stop_spinning.set()
1038 self._stop_spinning.set()
1025 self._spin_thread.join()
1039 self._spin_thread.join()
1026 self._spin_thread = None
1040 self._spin_thread = None
1027
1041
1028 def spin(self):
1042 def spin(self):
1029 """Flush any registration notifications and execution results
1043 """Flush any registration notifications and execution results
1030 waiting in the ZMQ queue.
1044 waiting in the ZMQ queue.
1031 """
1045 """
1032 if self._notification_socket:
1046 if self._notification_socket:
1033 self._flush_notifications()
1047 self._flush_notifications()
1034 if self._iopub_socket:
1048 if self._iopub_socket:
1035 self._flush_iopub(self._iopub_socket)
1049 self._flush_iopub(self._iopub_socket)
1036 if self._mux_socket:
1050 if self._mux_socket:
1037 self._flush_results(self._mux_socket)
1051 self._flush_results(self._mux_socket)
1038 if self._task_socket:
1052 if self._task_socket:
1039 self._flush_results(self._task_socket)
1053 self._flush_results(self._task_socket)
1040 if self._control_socket:
1054 if self._control_socket:
1041 self._flush_control(self._control_socket)
1055 self._flush_control(self._control_socket)
1042 if self._query_socket:
1056 if self._query_socket:
1043 self._flush_ignored_hub_replies()
1057 self._flush_ignored_hub_replies()
1044
1058
1045 def wait(self, jobs=None, timeout=-1):
1059 def wait(self, jobs=None, timeout=-1):
1046 """waits on one or more `jobs`, for up to `timeout` seconds.
1060 """waits on one or more `jobs`, for up to `timeout` seconds.
1047
1061
1048 Parameters
1062 Parameters
1049 ----------
1063 ----------
1050
1064
1051 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1065 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1052 ints are indices to self.history
1066 ints are indices to self.history
1053 strs are msg_ids
1067 strs are msg_ids
1054 default: wait on all outstanding messages
1068 default: wait on all outstanding messages
1055 timeout : float
1069 timeout : float
1056 a time in seconds, after which to give up.
1070 a time in seconds, after which to give up.
1057 default is -1, which means no timeout
1071 default is -1, which means no timeout
1058
1072
1059 Returns
1073 Returns
1060 -------
1074 -------
1061
1075
1062 True : when all msg_ids are done
1076 True : when all msg_ids are done
1063 False : timeout reached, some msg_ids still outstanding
1077 False : timeout reached, some msg_ids still outstanding
1064 """
1078 """
1065 tic = time.time()
1079 tic = time.time()
1066 if jobs is None:
1080 if jobs is None:
1067 theids = self.outstanding
1081 theids = self.outstanding
1068 else:
1082 else:
1069 if isinstance(jobs, string_types + (int, AsyncResult)):
1083 if isinstance(jobs, string_types + (int, AsyncResult)):
1070 jobs = [jobs]
1084 jobs = [jobs]
1071 theids = set()
1085 theids = set()
1072 for job in jobs:
1086 for job in jobs:
1073 if isinstance(job, int):
1087 if isinstance(job, int):
1074 # index access
1088 # index access
1075 job = self.history[job]
1089 job = self.history[job]
1076 elif isinstance(job, AsyncResult):
1090 elif isinstance(job, AsyncResult):
1077 theids.update(job.msg_ids)
1091 theids.update(job.msg_ids)
1078 continue
1092 continue
1079 theids.add(job)
1093 theids.add(job)
1080 if not theids.intersection(self.outstanding):
1094 if not theids.intersection(self.outstanding):
1081 return True
1095 return True
1082 self.spin()
1096 self.spin()
1083 while theids.intersection(self.outstanding):
1097 while theids.intersection(self.outstanding):
1084 if timeout >= 0 and ( time.time()-tic ) > timeout:
1098 if timeout >= 0 and ( time.time()-tic ) > timeout:
1085 break
1099 break
1086 time.sleep(1e-3)
1100 time.sleep(1e-3)
1087 self.spin()
1101 self.spin()
1088 return len(theids.intersection(self.outstanding)) == 0
1102 return len(theids.intersection(self.outstanding)) == 0
1089
1103
1090 #--------------------------------------------------------------------------
1104 #--------------------------------------------------------------------------
1091 # Control methods
1105 # Control methods
1092 #--------------------------------------------------------------------------
1106 #--------------------------------------------------------------------------
1093
1107
1094 @spin_first
1108 @spin_first
1095 def clear(self, targets=None, block=None):
1109 def clear(self, targets=None, block=None):
1096 """Clear the namespace in target(s)."""
1110 """Clear the namespace in target(s)."""
1097 block = self.block if block is None else block
1111 block = self.block if block is None else block
1098 targets = self._build_targets(targets)[0]
1112 targets = self._build_targets(targets)[0]
1099 for t in targets:
1113 for t in targets:
1100 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1114 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1101 error = False
1115 error = False
1102 if block:
1116 if block:
1103 self._flush_ignored_control()
1117 self._flush_ignored_control()
1104 for i in range(len(targets)):
1118 for i in range(len(targets)):
1105 idents,msg = self.session.recv(self._control_socket,0)
1119 idents,msg = self.session.recv(self._control_socket,0)
1106 if self.debug:
1120 if self.debug:
1107 pprint(msg)
1121 pprint(msg)
1108 if msg['content']['status'] != 'ok':
1122 if msg['content']['status'] != 'ok':
1109 error = self._unwrap_exception(msg['content'])
1123 error = self._unwrap_exception(msg['content'])
1110 else:
1124 else:
1111 self._ignored_control_replies += len(targets)
1125 self._ignored_control_replies += len(targets)
1112 if error:
1126 if error:
1113 raise error
1127 raise error
1114
1128
1115
1129
1116 @spin_first
1130 @spin_first
1117 def abort(self, jobs=None, targets=None, block=None):
1131 def abort(self, jobs=None, targets=None, block=None):
1118 """Abort specific jobs from the execution queues of target(s).
1132 """Abort specific jobs from the execution queues of target(s).
1119
1133
1120 This is a mechanism to prevent jobs that have already been submitted
1134 This is a mechanism to prevent jobs that have already been submitted
1121 from executing.
1135 from executing.
1122
1136
1123 Parameters
1137 Parameters
1124 ----------
1138 ----------
1125
1139
1126 jobs : msg_id, list of msg_ids, or AsyncResult
1140 jobs : msg_id, list of msg_ids, or AsyncResult
1127 The jobs to be aborted
1141 The jobs to be aborted
1128
1142
1129 If unspecified/None: abort all outstanding jobs.
1143 If unspecified/None: abort all outstanding jobs.
1130
1144
1131 """
1145 """
1132 block = self.block if block is None else block
1146 block = self.block if block is None else block
1133 jobs = jobs if jobs is not None else list(self.outstanding)
1147 jobs = jobs if jobs is not None else list(self.outstanding)
1134 targets = self._build_targets(targets)[0]
1148 targets = self._build_targets(targets)[0]
1135
1149
1136 msg_ids = []
1150 msg_ids = []
1137 if isinstance(jobs, string_types + (AsyncResult,)):
1151 if isinstance(jobs, string_types + (AsyncResult,)):
1138 jobs = [jobs]
1152 jobs = [jobs]
1139 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1153 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1140 if bad_ids:
1154 if bad_ids:
1141 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1155 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1142 for j in jobs:
1156 for j in jobs:
1143 if isinstance(j, AsyncResult):
1157 if isinstance(j, AsyncResult):
1144 msg_ids.extend(j.msg_ids)
1158 msg_ids.extend(j.msg_ids)
1145 else:
1159 else:
1146 msg_ids.append(j)
1160 msg_ids.append(j)
1147 content = dict(msg_ids=msg_ids)
1161 content = dict(msg_ids=msg_ids)
1148 for t in targets:
1162 for t in targets:
1149 self.session.send(self._control_socket, 'abort_request',
1163 self.session.send(self._control_socket, 'abort_request',
1150 content=content, ident=t)
1164 content=content, ident=t)
1151 error = False
1165 error = False
1152 if block:
1166 if block:
1153 self._flush_ignored_control()
1167 self._flush_ignored_control()
1154 for i in range(len(targets)):
1168 for i in range(len(targets)):
1155 idents,msg = self.session.recv(self._control_socket,0)
1169 idents,msg = self.session.recv(self._control_socket,0)
1156 if self.debug:
1170 if self.debug:
1157 pprint(msg)
1171 pprint(msg)
1158 if msg['content']['status'] != 'ok':
1172 if msg['content']['status'] != 'ok':
1159 error = self._unwrap_exception(msg['content'])
1173 error = self._unwrap_exception(msg['content'])
1160 else:
1174 else:
1161 self._ignored_control_replies += len(targets)
1175 self._ignored_control_replies += len(targets)
1162 if error:
1176 if error:
1163 raise error
1177 raise error
1164
1178
1165 @spin_first
1179 @spin_first
1166 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1180 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1167 """Terminates one or more engine processes, optionally including the hub.
1181 """Terminates one or more engine processes, optionally including the hub.
1168
1182
1169 Parameters
1183 Parameters
1170 ----------
1184 ----------
1171
1185
1172 targets: list of ints or 'all' [default: all]
1186 targets: list of ints or 'all' [default: all]
1173 Which engines to shutdown.
1187 Which engines to shutdown.
1174 hub: bool [default: False]
1188 hub: bool [default: False]
1175 Whether to include the Hub. hub=True implies targets='all'.
1189 Whether to include the Hub. hub=True implies targets='all'.
1176 block: bool [default: self.block]
1190 block: bool [default: self.block]
1177 Whether to wait for clean shutdown replies or not.
1191 Whether to wait for clean shutdown replies or not.
1178 restart: bool [default: False]
1192 restart: bool [default: False]
1179 NOT IMPLEMENTED
1193 NOT IMPLEMENTED
1180 whether to restart engines after shutting them down.
1194 whether to restart engines after shutting them down.
1181 """
1195 """
1182 from IPython.parallel.error import NoEnginesRegistered
1196 from IPython.parallel.error import NoEnginesRegistered
1183 if restart:
1197 if restart:
1184 raise NotImplementedError("Engine restart is not yet implemented")
1198 raise NotImplementedError("Engine restart is not yet implemented")
1185
1199
1186 block = self.block if block is None else block
1200 block = self.block if block is None else block
1187 if hub:
1201 if hub:
1188 targets = 'all'
1202 targets = 'all'
1189 try:
1203 try:
1190 targets = self._build_targets(targets)[0]
1204 targets = self._build_targets(targets)[0]
1191 except NoEnginesRegistered:
1205 except NoEnginesRegistered:
1192 targets = []
1206 targets = []
1193 for t in targets:
1207 for t in targets:
1194 self.session.send(self._control_socket, 'shutdown_request',
1208 self.session.send(self._control_socket, 'shutdown_request',
1195 content={'restart':restart},ident=t)
1209 content={'restart':restart},ident=t)
1196 error = False
1210 error = False
1197 if block or hub:
1211 if block or hub:
1198 self._flush_ignored_control()
1212 self._flush_ignored_control()
1199 for i in range(len(targets)):
1213 for i in range(len(targets)):
1200 idents,msg = self.session.recv(self._control_socket, 0)
1214 idents,msg = self.session.recv(self._control_socket, 0)
1201 if self.debug:
1215 if self.debug:
1202 pprint(msg)
1216 pprint(msg)
1203 if msg['content']['status'] != 'ok':
1217 if msg['content']['status'] != 'ok':
1204 error = self._unwrap_exception(msg['content'])
1218 error = self._unwrap_exception(msg['content'])
1205 else:
1219 else:
1206 self._ignored_control_replies += len(targets)
1220 self._ignored_control_replies += len(targets)
1207
1221
1208 if hub:
1222 if hub:
1209 time.sleep(0.25)
1223 time.sleep(0.25)
1210 self.session.send(self._query_socket, 'shutdown_request')
1224 self.session.send(self._query_socket, 'shutdown_request')
1211 idents,msg = self.session.recv(self._query_socket, 0)
1225 idents,msg = self.session.recv(self._query_socket, 0)
1212 if self.debug:
1226 if self.debug:
1213 pprint(msg)
1227 pprint(msg)
1214 if msg['content']['status'] != 'ok':
1228 if msg['content']['status'] != 'ok':
1215 error = self._unwrap_exception(msg['content'])
1229 error = self._unwrap_exception(msg['content'])
1216
1230
1217 if error:
1231 if error:
1218 raise error
1232 raise error
1219
1233
1220 #--------------------------------------------------------------------------
1234 #--------------------------------------------------------------------------
1221 # Execution related methods
1235 # Execution related methods
1222 #--------------------------------------------------------------------------
1236 #--------------------------------------------------------------------------
1223
1237
1224 def _maybe_raise(self, result):
1238 def _maybe_raise(self, result):
1225 """wrapper for maybe raising an exception if apply failed."""
1239 """wrapper for maybe raising an exception if apply failed."""
1226 if isinstance(result, error.RemoteError):
1240 if isinstance(result, error.RemoteError):
1227 raise result
1241 raise result
1228
1242
1229 return result
1243 return result
1230
1244
1231 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1245 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1232 ident=None):
1246 ident=None):
1233 """construct and send an apply message via a socket.
1247 """construct and send an apply message via a socket.
1234
1248
1235 This is the principal method with which all engine execution is performed by views.
1249 This is the principal method with which all engine execution is performed by views.
1236 """
1250 """
1237
1251
1238 if self._closed:
1252 if self._closed:
1239 raise RuntimeError("Client cannot be used after its sockets have been closed")
1253 raise RuntimeError("Client cannot be used after its sockets have been closed")
1240
1254
1241 # defaults:
1255 # defaults:
1242 args = args if args is not None else []
1256 args = args if args is not None else []
1243 kwargs = kwargs if kwargs is not None else {}
1257 kwargs = kwargs if kwargs is not None else {}
1244 metadata = metadata if metadata is not None else {}
1258 metadata = metadata if metadata is not None else {}
1245
1259
1246 # validate arguments
1260 # validate arguments
1247 if not callable(f) and not isinstance(f, Reference):
1261 if not callable(f) and not isinstance(f, Reference):
1248 raise TypeError("f must be callable, not %s"%type(f))
1262 raise TypeError("f must be callable, not %s"%type(f))
1249 if not isinstance(args, (tuple, list)):
1263 if not isinstance(args, (tuple, list)):
1250 raise TypeError("args must be tuple or list, not %s"%type(args))
1264 raise TypeError("args must be tuple or list, not %s"%type(args))
1251 if not isinstance(kwargs, dict):
1265 if not isinstance(kwargs, dict):
1252 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1266 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1253 if not isinstance(metadata, dict):
1267 if not isinstance(metadata, dict):
1254 raise TypeError("metadata must be dict, not %s"%type(metadata))
1268 raise TypeError("metadata must be dict, not %s"%type(metadata))
1255
1269
1256 bufs = serialize.pack_apply_message(f, args, kwargs,
1270 bufs = serialize.pack_apply_message(f, args, kwargs,
1257 buffer_threshold=self.session.buffer_threshold,
1271 buffer_threshold=self.session.buffer_threshold,
1258 item_threshold=self.session.item_threshold,
1272 item_threshold=self.session.item_threshold,
1259 )
1273 )
1260
1274
1261 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1275 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1262 metadata=metadata, track=track)
1276 metadata=metadata, track=track)
1263
1277
1264 msg_id = msg['header']['msg_id']
1278 msg_id = msg['header']['msg_id']
1265 self.outstanding.add(msg_id)
1279 self.outstanding.add(msg_id)
1266 if ident:
1280 if ident:
1267 # possibly routed to a specific engine
1281 # possibly routed to a specific engine
1268 if isinstance(ident, list):
1282 if isinstance(ident, list):
1269 ident = ident[-1]
1283 ident = ident[-1]
1270 if ident in self._engines.values():
1284 if ident in self._engines.values():
1271 # save for later, in case of engine death
1285 # save for later, in case of engine death
1272 self._outstanding_dict[ident].add(msg_id)
1286 self._outstanding_dict[ident].add(msg_id)
1273 self.history.append(msg_id)
1287 self.history.append(msg_id)
1274 self.metadata[msg_id]['submitted'] = datetime.now()
1288 self.metadata[msg_id]['submitted'] = datetime.now()
1275
1289
1276 return msg
1290 return msg
1277
1291
1278 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1292 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1279 """construct and send an execute request via a socket.
1293 """construct and send an execute request via a socket.
1280
1294
1281 """
1295 """
1282
1296
1283 if self._closed:
1297 if self._closed:
1284 raise RuntimeError("Client cannot be used after its sockets have been closed")
1298 raise RuntimeError("Client cannot be used after its sockets have been closed")
1285
1299
1286 # defaults:
1300 # defaults:
1287 metadata = metadata if metadata is not None else {}
1301 metadata = metadata if metadata is not None else {}
1288
1302
1289 # validate arguments
1303 # validate arguments
1290 if not isinstance(code, string_types):
1304 if not isinstance(code, string_types):
1291 raise TypeError("code must be text, not %s" % type(code))
1305 raise TypeError("code must be text, not %s" % type(code))
1292 if not isinstance(metadata, dict):
1306 if not isinstance(metadata, dict):
1293 raise TypeError("metadata must be dict, not %s" % type(metadata))
1307 raise TypeError("metadata must be dict, not %s" % type(metadata))
1294
1308
1295 content = dict(code=code, silent=bool(silent), user_expressions={})
1309 content = dict(code=code, silent=bool(silent), user_expressions={})
1296
1310
1297
1311
1298 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1312 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1299 metadata=metadata)
1313 metadata=metadata)
1300
1314
1301 msg_id = msg['header']['msg_id']
1315 msg_id = msg['header']['msg_id']
1302 self.outstanding.add(msg_id)
1316 self.outstanding.add(msg_id)
1303 if ident:
1317 if ident:
1304 # possibly routed to a specific engine
1318 # possibly routed to a specific engine
1305 if isinstance(ident, list):
1319 if isinstance(ident, list):
1306 ident = ident[-1]
1320 ident = ident[-1]
1307 if ident in self._engines.values():
1321 if ident in self._engines.values():
1308 # save for later, in case of engine death
1322 # save for later, in case of engine death
1309 self._outstanding_dict[ident].add(msg_id)
1323 self._outstanding_dict[ident].add(msg_id)
1310 self.history.append(msg_id)
1324 self.history.append(msg_id)
1311 self.metadata[msg_id]['submitted'] = datetime.now()
1325 self.metadata[msg_id]['submitted'] = datetime.now()
1312
1326
1313 return msg
1327 return msg
1314
1328
1315 #--------------------------------------------------------------------------
1329 #--------------------------------------------------------------------------
1316 # construct a View object
1330 # construct a View object
1317 #--------------------------------------------------------------------------
1331 #--------------------------------------------------------------------------
1318
1332
1319 def load_balanced_view(self, targets=None):
1333 def load_balanced_view(self, targets=None):
1320 """construct a DirectView object.
1334 """construct a DirectView object.
1321
1335
1322 If no arguments are specified, create a LoadBalancedView
1336 If no arguments are specified, create a LoadBalancedView
1323 using all engines.
1337 using all engines.
1324
1338
1325 Parameters
1339 Parameters
1326 ----------
1340 ----------
1327
1341
1328 targets: list,slice,int,etc. [default: use all engines]
1342 targets: list,slice,int,etc. [default: use all engines]
1329 The subset of engines across which to load-balance
1343 The subset of engines across which to load-balance
1330 """
1344 """
1331 if targets == 'all':
1345 if targets == 'all':
1332 targets = None
1346 targets = None
1333 if targets is not None:
1347 if targets is not None:
1334 targets = self._build_targets(targets)[1]
1348 targets = self._build_targets(targets)[1]
1335 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1349 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1336
1350
1337 def direct_view(self, targets='all'):
1351 def direct_view(self, targets='all'):
1338 """construct a DirectView object.
1352 """construct a DirectView object.
1339
1353
1340 If no targets are specified, create a DirectView using all engines.
1354 If no targets are specified, create a DirectView using all engines.
1341
1355
1342 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1356 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1343 evaluate the target engines at each execution, whereas rc[:] will connect to
1357 evaluate the target engines at each execution, whereas rc[:] will connect to
1344 all *current* engines, and that list will not change.
1358 all *current* engines, and that list will not change.
1345
1359
1346 That is, 'all' will always use all engines, whereas rc[:] will not use
1360 That is, 'all' will always use all engines, whereas rc[:] will not use
1347 engines added after the DirectView is constructed.
1361 engines added after the DirectView is constructed.
1348
1362
1349 Parameters
1363 Parameters
1350 ----------
1364 ----------
1351
1365
1352 targets: list,slice,int,etc. [default: use all engines]
1366 targets: list,slice,int,etc. [default: use all engines]
1353 The engines to use for the View
1367 The engines to use for the View
1354 """
1368 """
1355 single = isinstance(targets, int)
1369 single = isinstance(targets, int)
1356 # allow 'all' to be lazily evaluated at each execution
1370 # allow 'all' to be lazily evaluated at each execution
1357 if targets != 'all':
1371 if targets != 'all':
1358 targets = self._build_targets(targets)[1]
1372 targets = self._build_targets(targets)[1]
1359 if single:
1373 if single:
1360 targets = targets[0]
1374 targets = targets[0]
1361 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1375 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1362
1376
1363 #--------------------------------------------------------------------------
1377 #--------------------------------------------------------------------------
1364 # Query methods
1378 # Query methods
1365 #--------------------------------------------------------------------------
1379 #--------------------------------------------------------------------------
1366
1380
1367 @spin_first
1381 @spin_first
1368 def get_result(self, indices_or_msg_ids=None, block=None, owner=True):
1382 def get_result(self, indices_or_msg_ids=None, block=None, owner=True):
1369 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1383 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1370
1384
1371 If the client already has the results, no request to the Hub will be made.
1385 If the client already has the results, no request to the Hub will be made.
1372
1386
1373 This is a convenient way to construct AsyncResult objects, which are wrappers
1387 This is a convenient way to construct AsyncResult objects, which are wrappers
1374 that include metadata about execution, and allow for awaiting results that
1388 that include metadata about execution, and allow for awaiting results that
1375 were not submitted by this Client.
1389 were not submitted by this Client.
1376
1390
1377 It can also be a convenient way to retrieve the metadata associated with
1391 It can also be a convenient way to retrieve the metadata associated with
1378 blocking execution, since it always retrieves
1392 blocking execution, since it always retrieves
1379
1393
1380 Examples
1394 Examples
1381 --------
1395 --------
1382 ::
1396 ::
1383
1397
1384 In [10]: r = client.apply()
1398 In [10]: r = client.apply()
1385
1399
1386 Parameters
1400 Parameters
1387 ----------
1401 ----------
1388
1402
1389 indices_or_msg_ids : integer history index, str msg_id, or list of either
1403 indices_or_msg_ids : integer history index, str msg_id, or list of either
1390 The indices or msg_ids of indices to be retrieved
1404 The indices or msg_ids of indices to be retrieved
1391
1405
1392 block : bool
1406 block : bool
1393 Whether to wait for the result to be done
1407 Whether to wait for the result to be done
1394 owner : bool [default: True]
1408 owner : bool [default: True]
1395 Whether this AsyncResult should own the result.
1409 Whether this AsyncResult should own the result.
1396 If so, calling `ar.get()` will remove data from the
1410 If so, calling `ar.get()` will remove data from the
1397 client's result and metadata cache.
1411 client's result and metadata cache.
1398 There should only be one owner of any given msg_id.
1412 There should only be one owner of any given msg_id.
1399
1413
1400 Returns
1414 Returns
1401 -------
1415 -------
1402
1416
1403 AsyncResult
1417 AsyncResult
1404 A single AsyncResult object will always be returned.
1418 A single AsyncResult object will always be returned.
1405
1419
1406 AsyncHubResult
1420 AsyncHubResult
1407 A subclass of AsyncResult that retrieves results from the Hub
1421 A subclass of AsyncResult that retrieves results from the Hub
1408
1422
1409 """
1423 """
1410 block = self.block if block is None else block
1424 block = self.block if block is None else block
1411 if indices_or_msg_ids is None:
1425 if indices_or_msg_ids is None:
1412 indices_or_msg_ids = -1
1426 indices_or_msg_ids = -1
1413
1427
1414 single_result = False
1428 single_result = False
1415 if not isinstance(indices_or_msg_ids, (list,tuple)):
1429 if not isinstance(indices_or_msg_ids, (list,tuple)):
1416 indices_or_msg_ids = [indices_or_msg_ids]
1430 indices_or_msg_ids = [indices_or_msg_ids]
1417 single_result = True
1431 single_result = True
1418
1432
1419 theids = []
1433 theids = []
1420 for id in indices_or_msg_ids:
1434 for id in indices_or_msg_ids:
1421 if isinstance(id, int):
1435 if isinstance(id, int):
1422 id = self.history[id]
1436 id = self.history[id]
1423 if not isinstance(id, string_types):
1437 if not isinstance(id, string_types):
1424 raise TypeError("indices must be str or int, not %r"%id)
1438 raise TypeError("indices must be str or int, not %r"%id)
1425 theids.append(id)
1439 theids.append(id)
1426
1440
1427 local_ids = [msg_id for msg_id in theids if (msg_id in self.outstanding or msg_id in self.results)]
1441 local_ids = [msg_id for msg_id in theids if (msg_id in self.outstanding or msg_id in self.results)]
1428 remote_ids = [msg_id for msg_id in theids if msg_id not in local_ids]
1442 remote_ids = [msg_id for msg_id in theids if msg_id not in local_ids]
1429
1443
1430 # given single msg_id initially, get_result shot get the result itself,
1444 # given single msg_id initially, get_result shot get the result itself,
1431 # not a length-one list
1445 # not a length-one list
1432 if single_result:
1446 if single_result:
1433 theids = theids[0]
1447 theids = theids[0]
1434
1448
1435 if remote_ids:
1449 if remote_ids:
1436 ar = AsyncHubResult(self, msg_ids=theids, owner=owner)
1450 ar = AsyncHubResult(self, msg_ids=theids, owner=owner)
1437 else:
1451 else:
1438 ar = AsyncResult(self, msg_ids=theids, owner=owner)
1452 ar = AsyncResult(self, msg_ids=theids, owner=owner)
1439
1453
1440 if block:
1454 if block:
1441 ar.wait()
1455 ar.wait()
1442
1456
1443 return ar
1457 return ar
1444
1458
1445 @spin_first
1459 @spin_first
1446 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1460 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1447 """Resubmit one or more tasks.
1461 """Resubmit one or more tasks.
1448
1462
1449 in-flight tasks may not be resubmitted.
1463 in-flight tasks may not be resubmitted.
1450
1464
1451 Parameters
1465 Parameters
1452 ----------
1466 ----------
1453
1467
1454 indices_or_msg_ids : integer history index, str msg_id, or list of either
1468 indices_or_msg_ids : integer history index, str msg_id, or list of either
1455 The indices or msg_ids of indices to be retrieved
1469 The indices or msg_ids of indices to be retrieved
1456
1470
1457 block : bool
1471 block : bool
1458 Whether to wait for the result to be done
1472 Whether to wait for the result to be done
1459
1473
1460 Returns
1474 Returns
1461 -------
1475 -------
1462
1476
1463 AsyncHubResult
1477 AsyncHubResult
1464 A subclass of AsyncResult that retrieves results from the Hub
1478 A subclass of AsyncResult that retrieves results from the Hub
1465
1479
1466 """
1480 """
1467 block = self.block if block is None else block
1481 block = self.block if block is None else block
1468 if indices_or_msg_ids is None:
1482 if indices_or_msg_ids is None:
1469 indices_or_msg_ids = -1
1483 indices_or_msg_ids = -1
1470
1484
1471 if not isinstance(indices_or_msg_ids, (list,tuple)):
1485 if not isinstance(indices_or_msg_ids, (list,tuple)):
1472 indices_or_msg_ids = [indices_or_msg_ids]
1486 indices_or_msg_ids = [indices_or_msg_ids]
1473
1487
1474 theids = []
1488 theids = []
1475 for id in indices_or_msg_ids:
1489 for id in indices_or_msg_ids:
1476 if isinstance(id, int):
1490 if isinstance(id, int):
1477 id = self.history[id]
1491 id = self.history[id]
1478 if not isinstance(id, string_types):
1492 if not isinstance(id, string_types):
1479 raise TypeError("indices must be str or int, not %r"%id)
1493 raise TypeError("indices must be str or int, not %r"%id)
1480 theids.append(id)
1494 theids.append(id)
1481
1495
1482 content = dict(msg_ids = theids)
1496 content = dict(msg_ids = theids)
1483
1497
1484 self.session.send(self._query_socket, 'resubmit_request', content)
1498 self.session.send(self._query_socket, 'resubmit_request', content)
1485
1499
1486 zmq.select([self._query_socket], [], [])
1500 zmq.select([self._query_socket], [], [])
1487 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1501 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1488 if self.debug:
1502 if self.debug:
1489 pprint(msg)
1503 pprint(msg)
1490 content = msg['content']
1504 content = msg['content']
1491 if content['status'] != 'ok':
1505 if content['status'] != 'ok':
1492 raise self._unwrap_exception(content)
1506 raise self._unwrap_exception(content)
1493 mapping = content['resubmitted']
1507 mapping = content['resubmitted']
1494 new_ids = [ mapping[msg_id] for msg_id in theids ]
1508 new_ids = [ mapping[msg_id] for msg_id in theids ]
1495
1509
1496 ar = AsyncHubResult(self, msg_ids=new_ids)
1510 ar = AsyncHubResult(self, msg_ids=new_ids)
1497
1511
1498 if block:
1512 if block:
1499 ar.wait()
1513 ar.wait()
1500
1514
1501 return ar
1515 return ar
1502
1516
1503 @spin_first
1517 @spin_first
1504 def result_status(self, msg_ids, status_only=True):
1518 def result_status(self, msg_ids, status_only=True):
1505 """Check on the status of the result(s) of the apply request with `msg_ids`.
1519 """Check on the status of the result(s) of the apply request with `msg_ids`.
1506
1520
1507 If status_only is False, then the actual results will be retrieved, else
1521 If status_only is False, then the actual results will be retrieved, else
1508 only the status of the results will be checked.
1522 only the status of the results will be checked.
1509
1523
1510 Parameters
1524 Parameters
1511 ----------
1525 ----------
1512
1526
1513 msg_ids : list of msg_ids
1527 msg_ids : list of msg_ids
1514 if int:
1528 if int:
1515 Passed as index to self.history for convenience.
1529 Passed as index to self.history for convenience.
1516 status_only : bool (default: True)
1530 status_only : bool (default: True)
1517 if False:
1531 if False:
1518 Retrieve the actual results of completed tasks.
1532 Retrieve the actual results of completed tasks.
1519
1533
1520 Returns
1534 Returns
1521 -------
1535 -------
1522
1536
1523 results : dict
1537 results : dict
1524 There will always be the keys 'pending' and 'completed', which will
1538 There will always be the keys 'pending' and 'completed', which will
1525 be lists of msg_ids that are incomplete or complete. If `status_only`
1539 be lists of msg_ids that are incomplete or complete. If `status_only`
1526 is False, then completed results will be keyed by their `msg_id`.
1540 is False, then completed results will be keyed by their `msg_id`.
1527 """
1541 """
1528 if not isinstance(msg_ids, (list,tuple)):
1542 if not isinstance(msg_ids, (list,tuple)):
1529 msg_ids = [msg_ids]
1543 msg_ids = [msg_ids]
1530
1544
1531 theids = []
1545 theids = []
1532 for msg_id in msg_ids:
1546 for msg_id in msg_ids:
1533 if isinstance(msg_id, int):
1547 if isinstance(msg_id, int):
1534 msg_id = self.history[msg_id]
1548 msg_id = self.history[msg_id]
1535 if not isinstance(msg_id, string_types):
1549 if not isinstance(msg_id, string_types):
1536 raise TypeError("msg_ids must be str, not %r"%msg_id)
1550 raise TypeError("msg_ids must be str, not %r"%msg_id)
1537 theids.append(msg_id)
1551 theids.append(msg_id)
1538
1552
1539 completed = []
1553 completed = []
1540 local_results = {}
1554 local_results = {}
1541
1555
1542 # comment this block out to temporarily disable local shortcut:
1556 # comment this block out to temporarily disable local shortcut:
1543 for msg_id in theids:
1557 for msg_id in theids:
1544 if msg_id in self.results:
1558 if msg_id in self.results:
1545 completed.append(msg_id)
1559 completed.append(msg_id)
1546 local_results[msg_id] = self.results[msg_id]
1560 local_results[msg_id] = self.results[msg_id]
1547 theids.remove(msg_id)
1561 theids.remove(msg_id)
1548
1562
1549 if theids: # some not locally cached
1563 if theids: # some not locally cached
1550 content = dict(msg_ids=theids, status_only=status_only)
1564 content = dict(msg_ids=theids, status_only=status_only)
1551 msg = self.session.send(self._query_socket, "result_request", content=content)
1565 msg = self.session.send(self._query_socket, "result_request", content=content)
1552 zmq.select([self._query_socket], [], [])
1566 zmq.select([self._query_socket], [], [])
1553 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1567 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1554 if self.debug:
1568 if self.debug:
1555 pprint(msg)
1569 pprint(msg)
1556 content = msg['content']
1570 content = msg['content']
1557 if content['status'] != 'ok':
1571 if content['status'] != 'ok':
1558 raise self._unwrap_exception(content)
1572 raise self._unwrap_exception(content)
1559 buffers = msg['buffers']
1573 buffers = msg['buffers']
1560 else:
1574 else:
1561 content = dict(completed=[],pending=[])
1575 content = dict(completed=[],pending=[])
1562
1576
1563 content['completed'].extend(completed)
1577 content['completed'].extend(completed)
1564
1578
1565 if status_only:
1579 if status_only:
1566 return content
1580 return content
1567
1581
1568 failures = []
1582 failures = []
1569 # load cached results into result:
1583 # load cached results into result:
1570 content.update(local_results)
1584 content.update(local_results)
1571
1585
1572 # update cache with results:
1586 # update cache with results:
1573 for msg_id in sorted(theids):
1587 for msg_id in sorted(theids):
1574 if msg_id in content['completed']:
1588 if msg_id in content['completed']:
1575 rec = content[msg_id]
1589 rec = content[msg_id]
1576 parent = extract_dates(rec['header'])
1590 parent = extract_dates(rec['header'])
1577 header = extract_dates(rec['result_header'])
1591 header = extract_dates(rec['result_header'])
1578 rcontent = rec['result_content']
1592 rcontent = rec['result_content']
1579 iodict = rec['io']
1593 iodict = rec['io']
1580 if isinstance(rcontent, str):
1594 if isinstance(rcontent, str):
1581 rcontent = self.session.unpack(rcontent)
1595 rcontent = self.session.unpack(rcontent)
1582
1596
1583 md = self.metadata[msg_id]
1597 md = self.metadata[msg_id]
1584 md_msg = dict(
1598 md_msg = dict(
1585 content=rcontent,
1599 content=rcontent,
1586 parent_header=parent,
1600 parent_header=parent,
1587 header=header,
1601 header=header,
1588 metadata=rec['result_metadata'],
1602 metadata=rec['result_metadata'],
1589 )
1603 )
1590 md.update(self._extract_metadata(md_msg))
1604 md.update(self._extract_metadata(md_msg))
1591 if rec.get('received'):
1605 if rec.get('received'):
1592 md['received'] = parse_date(rec['received'])
1606 md['received'] = parse_date(rec['received'])
1593 md.update(iodict)
1607 md.update(iodict)
1594
1608
1595 if rcontent['status'] == 'ok':
1609 if rcontent['status'] == 'ok':
1596 if header['msg_type'] == 'apply_reply':
1610 if header['msg_type'] == 'apply_reply':
1597 res,buffers = serialize.unserialize_object(buffers)
1611 res,buffers = serialize.unserialize_object(buffers)
1598 elif header['msg_type'] == 'execute_reply':
1612 elif header['msg_type'] == 'execute_reply':
1599 res = ExecuteReply(msg_id, rcontent, md)
1613 res = ExecuteReply(msg_id, rcontent, md)
1600 else:
1614 else:
1601 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1615 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1602 else:
1616 else:
1603 res = self._unwrap_exception(rcontent)
1617 res = self._unwrap_exception(rcontent)
1604 failures.append(res)
1618 failures.append(res)
1605
1619
1606 self.results[msg_id] = res
1620 self.results[msg_id] = res
1607 content[msg_id] = res
1621 content[msg_id] = res
1608
1622
1609 if len(theids) == 1 and failures:
1623 if len(theids) == 1 and failures:
1610 raise failures[0]
1624 raise failures[0]
1611
1625
1612 error.collect_exceptions(failures, "result_status")
1626 error.collect_exceptions(failures, "result_status")
1613 return content
1627 return content
1614
1628
1615 @spin_first
1629 @spin_first
1616 def queue_status(self, targets='all', verbose=False):
1630 def queue_status(self, targets='all', verbose=False):
1617 """Fetch the status of engine queues.
1631 """Fetch the status of engine queues.
1618
1632
1619 Parameters
1633 Parameters
1620 ----------
1634 ----------
1621
1635
1622 targets : int/str/list of ints/strs
1636 targets : int/str/list of ints/strs
1623 the engines whose states are to be queried.
1637 the engines whose states are to be queried.
1624 default : all
1638 default : all
1625 verbose : bool
1639 verbose : bool
1626 Whether to return lengths only, or lists of ids for each element
1640 Whether to return lengths only, or lists of ids for each element
1627 """
1641 """
1628 if targets == 'all':
1642 if targets == 'all':
1629 # allow 'all' to be evaluated on the engine
1643 # allow 'all' to be evaluated on the engine
1630 engine_ids = None
1644 engine_ids = None
1631 else:
1645 else:
1632 engine_ids = self._build_targets(targets)[1]
1646 engine_ids = self._build_targets(targets)[1]
1633 content = dict(targets=engine_ids, verbose=verbose)
1647 content = dict(targets=engine_ids, verbose=verbose)
1634 self.session.send(self._query_socket, "queue_request", content=content)
1648 self.session.send(self._query_socket, "queue_request", content=content)
1635 idents,msg = self.session.recv(self._query_socket, 0)
1649 idents,msg = self.session.recv(self._query_socket, 0)
1636 if self.debug:
1650 if self.debug:
1637 pprint(msg)
1651 pprint(msg)
1638 content = msg['content']
1652 content = msg['content']
1639 status = content.pop('status')
1653 status = content.pop('status')
1640 if status != 'ok':
1654 if status != 'ok':
1641 raise self._unwrap_exception(content)
1655 raise self._unwrap_exception(content)
1642 content = rekey(content)
1656 content = rekey(content)
1643 if isinstance(targets, int):
1657 if isinstance(targets, int):
1644 return content[targets]
1658 return content[targets]
1645 else:
1659 else:
1646 return content
1660 return content
1647
1661
1648 def _build_msgids_from_target(self, targets=None):
1662 def _build_msgids_from_target(self, targets=None):
1649 """Build a list of msg_ids from the list of engine targets"""
1663 """Build a list of msg_ids from the list of engine targets"""
1650 if not targets: # needed as _build_targets otherwise uses all engines
1664 if not targets: # needed as _build_targets otherwise uses all engines
1651 return []
1665 return []
1652 target_ids = self._build_targets(targets)[0]
1666 target_ids = self._build_targets(targets)[0]
1653 return [md_id for md_id in self.metadata if self.metadata[md_id]["engine_uuid"] in target_ids]
1667 return [md_id for md_id in self.metadata if self.metadata[md_id]["engine_uuid"] in target_ids]
1654
1668
1655 def _build_msgids_from_jobs(self, jobs=None):
1669 def _build_msgids_from_jobs(self, jobs=None):
1656 """Build a list of msg_ids from "jobs" """
1670 """Build a list of msg_ids from "jobs" """
1657 if not jobs:
1671 if not jobs:
1658 return []
1672 return []
1659 msg_ids = []
1673 msg_ids = []
1660 if isinstance(jobs, string_types + (AsyncResult,)):
1674 if isinstance(jobs, string_types + (AsyncResult,)):
1661 jobs = [jobs]
1675 jobs = [jobs]
1662 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1676 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1663 if bad_ids:
1677 if bad_ids:
1664 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1678 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1665 for j in jobs:
1679 for j in jobs:
1666 if isinstance(j, AsyncResult):
1680 if isinstance(j, AsyncResult):
1667 msg_ids.extend(j.msg_ids)
1681 msg_ids.extend(j.msg_ids)
1668 else:
1682 else:
1669 msg_ids.append(j)
1683 msg_ids.append(j)
1670 return msg_ids
1684 return msg_ids
1671
1685
1672 def purge_local_results(self, jobs=[], targets=[]):
1686 def purge_local_results(self, jobs=[], targets=[]):
1673 """Clears the client caches of results and their metadata.
1687 """Clears the client caches of results and their metadata.
1674
1688
1675 Individual results can be purged by msg_id, or the entire
1689 Individual results can be purged by msg_id, or the entire
1676 history of specific targets can be purged.
1690 history of specific targets can be purged.
1677
1691
1678 Use `purge_local_results('all')` to scrub everything from the Clients's
1692 Use `purge_local_results('all')` to scrub everything from the Clients's
1679 results and metadata caches.
1693 results and metadata caches.
1680
1694
1681 After this call all `AsyncResults` are invalid and should be discarded.
1695 After this call all `AsyncResults` are invalid and should be discarded.
1682
1696
1683 If you must "reget" the results, you can still do so by using
1697 If you must "reget" the results, you can still do so by using
1684 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1698 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1685 redownload the results from the hub if they are still available
1699 redownload the results from the hub if they are still available
1686 (i.e `client.purge_hub_results(...)` has not been called.
1700 (i.e `client.purge_hub_results(...)` has not been called.
1687
1701
1688 Parameters
1702 Parameters
1689 ----------
1703 ----------
1690
1704
1691 jobs : str or list of str or AsyncResult objects
1705 jobs : str or list of str or AsyncResult objects
1692 the msg_ids whose results should be purged.
1706 the msg_ids whose results should be purged.
1693 targets : int/list of ints
1707 targets : int/list of ints
1694 The engines, by integer ID, whose entire result histories are to be purged.
1708 The engines, by integer ID, whose entire result histories are to be purged.
1695
1709
1696 Raises
1710 Raises
1697 ------
1711 ------
1698
1712
1699 RuntimeError : if any of the tasks to be purged are still outstanding.
1713 RuntimeError : if any of the tasks to be purged are still outstanding.
1700
1714
1701 """
1715 """
1702 if not targets and not jobs:
1716 if not targets and not jobs:
1703 raise ValueError("Must specify at least one of `targets` and `jobs`")
1717 raise ValueError("Must specify at least one of `targets` and `jobs`")
1704
1718
1705 if jobs == 'all':
1719 if jobs == 'all':
1706 if self.outstanding:
1720 if self.outstanding:
1707 raise RuntimeError("Can't purge outstanding tasks: %s" % self.outstanding)
1721 raise RuntimeError("Can't purge outstanding tasks: %s" % self.outstanding)
1708 self.results.clear()
1722 self.results.clear()
1709 self.metadata.clear()
1723 self.metadata.clear()
1710 else:
1724 else:
1711 msg_ids = set()
1725 msg_ids = set()
1712 msg_ids.update(self._build_msgids_from_target(targets))
1726 msg_ids.update(self._build_msgids_from_target(targets))
1713 msg_ids.update(self._build_msgids_from_jobs(jobs))
1727 msg_ids.update(self._build_msgids_from_jobs(jobs))
1714 still_outstanding = self.outstanding.intersection(msg_ids)
1728 still_outstanding = self.outstanding.intersection(msg_ids)
1715 if still_outstanding:
1729 if still_outstanding:
1716 raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding)
1730 raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding)
1717 for mid in msg_ids:
1731 for mid in msg_ids:
1718 self.results.pop(mid, None)
1732 self.results.pop(mid, None)
1719 self.metadata.pop(mid, None)
1733 self.metadata.pop(mid, None)
1720
1734
1721
1735
1722 @spin_first
1736 @spin_first
1723 def purge_hub_results(self, jobs=[], targets=[]):
1737 def purge_hub_results(self, jobs=[], targets=[]):
1724 """Tell the Hub to forget results.
1738 """Tell the Hub to forget results.
1725
1739
1726 Individual results can be purged by msg_id, or the entire
1740 Individual results can be purged by msg_id, or the entire
1727 history of specific targets can be purged.
1741 history of specific targets can be purged.
1728
1742
1729 Use `purge_results('all')` to scrub everything from the Hub's db.
1743 Use `purge_results('all')` to scrub everything from the Hub's db.
1730
1744
1731 Parameters
1745 Parameters
1732 ----------
1746 ----------
1733
1747
1734 jobs : str or list of str or AsyncResult objects
1748 jobs : str or list of str or AsyncResult objects
1735 the msg_ids whose results should be forgotten.
1749 the msg_ids whose results should be forgotten.
1736 targets : int/str/list of ints/strs
1750 targets : int/str/list of ints/strs
1737 The targets, by int_id, whose entire history is to be purged.
1751 The targets, by int_id, whose entire history is to be purged.
1738
1752
1739 default : None
1753 default : None
1740 """
1754 """
1741 if not targets and not jobs:
1755 if not targets and not jobs:
1742 raise ValueError("Must specify at least one of `targets` and `jobs`")
1756 raise ValueError("Must specify at least one of `targets` and `jobs`")
1743 if targets:
1757 if targets:
1744 targets = self._build_targets(targets)[1]
1758 targets = self._build_targets(targets)[1]
1745
1759
1746 # construct msg_ids from jobs
1760 # construct msg_ids from jobs
1747 if jobs == 'all':
1761 if jobs == 'all':
1748 msg_ids = jobs
1762 msg_ids = jobs
1749 else:
1763 else:
1750 msg_ids = self._build_msgids_from_jobs(jobs)
1764 msg_ids = self._build_msgids_from_jobs(jobs)
1751
1765
1752 content = dict(engine_ids=targets, msg_ids=msg_ids)
1766 content = dict(engine_ids=targets, msg_ids=msg_ids)
1753 self.session.send(self._query_socket, "purge_request", content=content)
1767 self.session.send(self._query_socket, "purge_request", content=content)
1754 idents, msg = self.session.recv(self._query_socket, 0)
1768 idents, msg = self.session.recv(self._query_socket, 0)
1755 if self.debug:
1769 if self.debug:
1756 pprint(msg)
1770 pprint(msg)
1757 content = msg['content']
1771 content = msg['content']
1758 if content['status'] != 'ok':
1772 if content['status'] != 'ok':
1759 raise self._unwrap_exception(content)
1773 raise self._unwrap_exception(content)
1760
1774
1761 def purge_results(self, jobs=[], targets=[]):
1775 def purge_results(self, jobs=[], targets=[]):
1762 """Clears the cached results from both the hub and the local client
1776 """Clears the cached results from both the hub and the local client
1763
1777
1764 Individual results can be purged by msg_id, or the entire
1778 Individual results can be purged by msg_id, or the entire
1765 history of specific targets can be purged.
1779 history of specific targets can be purged.
1766
1780
1767 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1781 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1768 the Client's db.
1782 the Client's db.
1769
1783
1770 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1784 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1771 the same arguments.
1785 the same arguments.
1772
1786
1773 Parameters
1787 Parameters
1774 ----------
1788 ----------
1775
1789
1776 jobs : str or list of str or AsyncResult objects
1790 jobs : str or list of str or AsyncResult objects
1777 the msg_ids whose results should be forgotten.
1791 the msg_ids whose results should be forgotten.
1778 targets : int/str/list of ints/strs
1792 targets : int/str/list of ints/strs
1779 The targets, by int_id, whose entire history is to be purged.
1793 The targets, by int_id, whose entire history is to be purged.
1780
1794
1781 default : None
1795 default : None
1782 """
1796 """
1783 self.purge_local_results(jobs=jobs, targets=targets)
1797 self.purge_local_results(jobs=jobs, targets=targets)
1784 self.purge_hub_results(jobs=jobs, targets=targets)
1798 self.purge_hub_results(jobs=jobs, targets=targets)
1785
1799
1786 def purge_everything(self):
1800 def purge_everything(self):
1787 """Clears all content from previous Tasks from both the hub and the local client
1801 """Clears all content from previous Tasks from both the hub and the local client
1788
1802
1789 In addition to calling `purge_results("all")` it also deletes the history and
1803 In addition to calling `purge_results("all")` it also deletes the history and
1790 other bookkeeping lists.
1804 other bookkeeping lists.
1791 """
1805 """
1792 self.purge_results("all")
1806 self.purge_results("all")
1793 self.history = []
1807 self.history = []
1794 self.session.digest_history.clear()
1808 self.session.digest_history.clear()
1795
1809
1796 @spin_first
1810 @spin_first
1797 def hub_history(self):
1811 def hub_history(self):
1798 """Get the Hub's history
1812 """Get the Hub's history
1799
1813
1800 Just like the Client, the Hub has a history, which is a list of msg_ids.
1814 Just like the Client, the Hub has a history, which is a list of msg_ids.
1801 This will contain the history of all clients, and, depending on configuration,
1815 This will contain the history of all clients, and, depending on configuration,
1802 may contain history across multiple cluster sessions.
1816 may contain history across multiple cluster sessions.
1803
1817
1804 Any msg_id returned here is a valid argument to `get_result`.
1818 Any msg_id returned here is a valid argument to `get_result`.
1805
1819
1806 Returns
1820 Returns
1807 -------
1821 -------
1808
1822
1809 msg_ids : list of strs
1823 msg_ids : list of strs
1810 list of all msg_ids, ordered by task submission time.
1824 list of all msg_ids, ordered by task submission time.
1811 """
1825 """
1812
1826
1813 self.session.send(self._query_socket, "history_request", content={})
1827 self.session.send(self._query_socket, "history_request", content={})
1814 idents, msg = self.session.recv(self._query_socket, 0)
1828 idents, msg = self.session.recv(self._query_socket, 0)
1815
1829
1816 if self.debug:
1830 if self.debug:
1817 pprint(msg)
1831 pprint(msg)
1818 content = msg['content']
1832 content = msg['content']
1819 if content['status'] != 'ok':
1833 if content['status'] != 'ok':
1820 raise self._unwrap_exception(content)
1834 raise self._unwrap_exception(content)
1821 else:
1835 else:
1822 return content['history']
1836 return content['history']
1823
1837
1824 @spin_first
1838 @spin_first
1825 def db_query(self, query, keys=None):
1839 def db_query(self, query, keys=None):
1826 """Query the Hub's TaskRecord database
1840 """Query the Hub's TaskRecord database
1827
1841
1828 This will return a list of task record dicts that match `query`
1842 This will return a list of task record dicts that match `query`
1829
1843
1830 Parameters
1844 Parameters
1831 ----------
1845 ----------
1832
1846
1833 query : mongodb query dict
1847 query : mongodb query dict
1834 The search dict. See mongodb query docs for details.
1848 The search dict. See mongodb query docs for details.
1835 keys : list of strs [optional]
1849 keys : list of strs [optional]
1836 The subset of keys to be returned. The default is to fetch everything but buffers.
1850 The subset of keys to be returned. The default is to fetch everything but buffers.
1837 'msg_id' will *always* be included.
1851 'msg_id' will *always* be included.
1838 """
1852 """
1839 if isinstance(keys, string_types):
1853 if isinstance(keys, string_types):
1840 keys = [keys]
1854 keys = [keys]
1841 content = dict(query=query, keys=keys)
1855 content = dict(query=query, keys=keys)
1842 self.session.send(self._query_socket, "db_request", content=content)
1856 self.session.send(self._query_socket, "db_request", content=content)
1843 idents, msg = self.session.recv(self._query_socket, 0)
1857 idents, msg = self.session.recv(self._query_socket, 0)
1844 if self.debug:
1858 if self.debug:
1845 pprint(msg)
1859 pprint(msg)
1846 content = msg['content']
1860 content = msg['content']
1847 if content['status'] != 'ok':
1861 if content['status'] != 'ok':
1848 raise self._unwrap_exception(content)
1862 raise self._unwrap_exception(content)
1849
1863
1850 records = content['records']
1864 records = content['records']
1851
1865
1852 buffer_lens = content['buffer_lens']
1866 buffer_lens = content['buffer_lens']
1853 result_buffer_lens = content['result_buffer_lens']
1867 result_buffer_lens = content['result_buffer_lens']
1854 buffers = msg['buffers']
1868 buffers = msg['buffers']
1855 has_bufs = buffer_lens is not None
1869 has_bufs = buffer_lens is not None
1856 has_rbufs = result_buffer_lens is not None
1870 has_rbufs = result_buffer_lens is not None
1857 for i,rec in enumerate(records):
1871 for i,rec in enumerate(records):
1858 # unpack datetime objects
1872 # unpack datetime objects
1859 for hkey in ('header', 'result_header'):
1873 for hkey in ('header', 'result_header'):
1860 if hkey in rec:
1874 if hkey in rec:
1861 rec[hkey] = extract_dates(rec[hkey])
1875 rec[hkey] = extract_dates(rec[hkey])
1862 for dtkey in ('submitted', 'started', 'completed', 'received'):
1876 for dtkey in ('submitted', 'started', 'completed', 'received'):
1863 if dtkey in rec:
1877 if dtkey in rec:
1864 rec[dtkey] = parse_date(rec[dtkey])
1878 rec[dtkey] = parse_date(rec[dtkey])
1865 # relink buffers
1879 # relink buffers
1866 if has_bufs:
1880 if has_bufs:
1867 blen = buffer_lens[i]
1881 blen = buffer_lens[i]
1868 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1882 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1869 if has_rbufs:
1883 if has_rbufs:
1870 blen = result_buffer_lens[i]
1884 blen = result_buffer_lens[i]
1871 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1885 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1872
1886
1873 return records
1887 return records
1874
1888
1875 __all__ = [ 'Client' ]
1889 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now