##// END OF EJS Templates
Merge pull request #8187 from minrk/parallel_allow_none...
Thomas Kluyver -
r20993:05d48c8d merge
parent child Browse files
Show More
@@ -1,1893 +1,1892 b''
1 """A semi-synchronous Client for IPython parallel"""
1 """A semi-synchronous Client for IPython parallel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import print_function
6 from __future__ import print_function
7
7
8 import os
8 import os
9 import json
9 import json
10 import sys
10 import sys
11 from threading import Thread, Event
11 from threading import Thread, Event
12 import time
12 import time
13 import warnings
13 import warnings
14 from datetime import datetime
14 from datetime import datetime
15 from getpass import getpass
15 from getpass import getpass
16 from pprint import pprint
16 from pprint import pprint
17
17
18 pjoin = os.path.join
18 pjoin = os.path.join
19
19
20 import zmq
20 import zmq
21
21
22 from IPython.config.configurable import MultipleInstanceError
22 from IPython.config.configurable import MultipleInstanceError
23 from IPython.core.application import BaseIPythonApplication
23 from IPython.core.application import BaseIPythonApplication
24 from IPython.core.profiledir import ProfileDir, ProfileDirError
24 from IPython.core.profiledir import ProfileDir, ProfileDirError
25
25
26 from IPython.utils.capture import RichOutput
26 from IPython.utils.capture import RichOutput
27 from IPython.utils.coloransi import TermColors
27 from IPython.utils.coloransi import TermColors
28 from IPython.utils.jsonutil import rekey, extract_dates, parse_date
28 from IPython.utils.jsonutil import rekey, extract_dates, parse_date
29 from IPython.utils.localinterfaces import localhost, is_local_ip
29 from IPython.utils.localinterfaces import localhost, is_local_ip
30 from IPython.utils.path import get_ipython_dir, compress_user
30 from IPython.utils.path import get_ipython_dir, compress_user
31 from IPython.utils.py3compat import cast_bytes, string_types, xrange, iteritems
31 from IPython.utils.py3compat import cast_bytes, string_types, xrange, iteritems
32 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
32 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
33 Dict, List, Bool, Set, Any)
33 Dict, List, Bool, Set, Any)
34 from decorator import decorator
34 from decorator import decorator
35
35
36 from ipython_parallel import Reference
36 from ipython_parallel import Reference
37 from ipython_parallel import error
37 from ipython_parallel import error
38 from ipython_parallel import util
38 from ipython_parallel import util
39
39
40 from IPython.kernel.zmq.session import Session, Message
40 from IPython.kernel.zmq.session import Session, Message
41 from IPython.kernel.zmq import serialize
41 from IPython.kernel.zmq import serialize
42
42
43 from .asyncresult import AsyncResult, AsyncHubResult
43 from .asyncresult import AsyncResult, AsyncHubResult
44 from .view import DirectView, LoadBalancedView
44 from .view import DirectView, LoadBalancedView
45
45
46 #--------------------------------------------------------------------------
46 #--------------------------------------------------------------------------
47 # Decorators for Client methods
47 # Decorators for Client methods
48 #--------------------------------------------------------------------------
48 #--------------------------------------------------------------------------
49
49
50
50
51 @decorator
51 @decorator
52 def spin_first(f, self, *args, **kwargs):
52 def spin_first(f, self, *args, **kwargs):
53 """Call spin() to sync state prior to calling the method."""
53 """Call spin() to sync state prior to calling the method."""
54 self.spin()
54 self.spin()
55 return f(self, *args, **kwargs)
55 return f(self, *args, **kwargs)
56
56
57
57
58 #--------------------------------------------------------------------------
58 #--------------------------------------------------------------------------
59 # Classes
59 # Classes
60 #--------------------------------------------------------------------------
60 #--------------------------------------------------------------------------
61
61
62 _no_connection_file_msg = """
62 _no_connection_file_msg = """
63 Failed to connect because no Controller could be found.
63 Failed to connect because no Controller could be found.
64 Please double-check your profile and ensure that a cluster is running.
64 Please double-check your profile and ensure that a cluster is running.
65 """
65 """
66
66
67 class ExecuteReply(RichOutput):
67 class ExecuteReply(RichOutput):
68 """wrapper for finished Execute results"""
68 """wrapper for finished Execute results"""
69 def __init__(self, msg_id, content, metadata):
69 def __init__(self, msg_id, content, metadata):
70 self.msg_id = msg_id
70 self.msg_id = msg_id
71 self._content = content
71 self._content = content
72 self.execution_count = content['execution_count']
72 self.execution_count = content['execution_count']
73 self.metadata = metadata
73 self.metadata = metadata
74
74
75 # RichOutput overrides
75 # RichOutput overrides
76
76
77 @property
77 @property
78 def source(self):
78 def source(self):
79 execute_result = self.metadata['execute_result']
79 execute_result = self.metadata['execute_result']
80 if execute_result:
80 if execute_result:
81 return execute_result.get('source', '')
81 return execute_result.get('source', '')
82
82
83 @property
83 @property
84 def data(self):
84 def data(self):
85 execute_result = self.metadata['execute_result']
85 execute_result = self.metadata['execute_result']
86 if execute_result:
86 if execute_result:
87 return execute_result.get('data', {})
87 return execute_result.get('data', {})
88
88
89 @property
89 @property
90 def _metadata(self):
90 def _metadata(self):
91 execute_result = self.metadata['execute_result']
91 execute_result = self.metadata['execute_result']
92 if execute_result:
92 if execute_result:
93 return execute_result.get('metadata', {})
93 return execute_result.get('metadata', {})
94
94
95 def display(self):
95 def display(self):
96 from IPython.display import publish_display_data
96 from IPython.display import publish_display_data
97 publish_display_data(self.data, self.metadata)
97 publish_display_data(self.data, self.metadata)
98
98
99 def _repr_mime_(self, mime):
99 def _repr_mime_(self, mime):
100 if mime not in self.data:
100 if mime not in self.data:
101 return
101 return
102 data = self.data[mime]
102 data = self.data[mime]
103 if mime in self._metadata:
103 if mime in self._metadata:
104 return data, self._metadata[mime]
104 return data, self._metadata[mime]
105 else:
105 else:
106 return data
106 return data
107
107
108 def __getitem__(self, key):
108 def __getitem__(self, key):
109 return self.metadata[key]
109 return self.metadata[key]
110
110
111 def __getattr__(self, key):
111 def __getattr__(self, key):
112 if key not in self.metadata:
112 if key not in self.metadata:
113 raise AttributeError(key)
113 raise AttributeError(key)
114 return self.metadata[key]
114 return self.metadata[key]
115
115
116 def __repr__(self):
116 def __repr__(self):
117 execute_result = self.metadata['execute_result'] or {'data':{}}
117 execute_result = self.metadata['execute_result'] or {'data':{}}
118 text_out = execute_result['data'].get('text/plain', '')
118 text_out = execute_result['data'].get('text/plain', '')
119 if len(text_out) > 32:
119 if len(text_out) > 32:
120 text_out = text_out[:29] + '...'
120 text_out = text_out[:29] + '...'
121
121
122 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
122 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
123
123
124 def _repr_pretty_(self, p, cycle):
124 def _repr_pretty_(self, p, cycle):
125 execute_result = self.metadata['execute_result'] or {'data':{}}
125 execute_result = self.metadata['execute_result'] or {'data':{}}
126 text_out = execute_result['data'].get('text/plain', '')
126 text_out = execute_result['data'].get('text/plain', '')
127
127
128 if not text_out:
128 if not text_out:
129 return
129 return
130
130
131 try:
131 try:
132 ip = get_ipython()
132 ip = get_ipython()
133 except NameError:
133 except NameError:
134 colors = "NoColor"
134 colors = "NoColor"
135 else:
135 else:
136 colors = ip.colors
136 colors = ip.colors
137
137
138 if colors == "NoColor":
138 if colors == "NoColor":
139 out = normal = ""
139 out = normal = ""
140 else:
140 else:
141 out = TermColors.Red
141 out = TermColors.Red
142 normal = TermColors.Normal
142 normal = TermColors.Normal
143
143
144 if '\n' in text_out and not text_out.startswith('\n'):
144 if '\n' in text_out and not text_out.startswith('\n'):
145 # add newline for multiline reprs
145 # add newline for multiline reprs
146 text_out = '\n' + text_out
146 text_out = '\n' + text_out
147
147
148 p.text(
148 p.text(
149 out + u'Out[%i:%i]: ' % (
149 out + u'Out[%i:%i]: ' % (
150 self.metadata['engine_id'], self.execution_count
150 self.metadata['engine_id'], self.execution_count
151 ) + normal + text_out
151 ) + normal + text_out
152 )
152 )
153
153
154
154
155 class Metadata(dict):
155 class Metadata(dict):
156 """Subclass of dict for initializing metadata values.
156 """Subclass of dict for initializing metadata values.
157
157
158 Attribute access works on keys.
158 Attribute access works on keys.
159
159
160 These objects have a strict set of keys - errors will raise if you try
160 These objects have a strict set of keys - errors will raise if you try
161 to add new keys.
161 to add new keys.
162 """
162 """
163 def __init__(self, *args, **kwargs):
163 def __init__(self, *args, **kwargs):
164 dict.__init__(self)
164 dict.__init__(self)
165 md = {'msg_id' : None,
165 md = {'msg_id' : None,
166 'submitted' : None,
166 'submitted' : None,
167 'started' : None,
167 'started' : None,
168 'completed' : None,
168 'completed' : None,
169 'received' : None,
169 'received' : None,
170 'engine_uuid' : None,
170 'engine_uuid' : None,
171 'engine_id' : None,
171 'engine_id' : None,
172 'follow' : None,
172 'follow' : None,
173 'after' : None,
173 'after' : None,
174 'status' : None,
174 'status' : None,
175
175
176 'execute_input' : None,
176 'execute_input' : None,
177 'execute_result' : None,
177 'execute_result' : None,
178 'error' : None,
178 'error' : None,
179 'stdout' : '',
179 'stdout' : '',
180 'stderr' : '',
180 'stderr' : '',
181 'outputs' : [],
181 'outputs' : [],
182 'data': {},
182 'data': {},
183 'outputs_ready' : False,
183 'outputs_ready' : False,
184 }
184 }
185 self.update(md)
185 self.update(md)
186 self.update(dict(*args, **kwargs))
186 self.update(dict(*args, **kwargs))
187
187
188 def __getattr__(self, key):
188 def __getattr__(self, key):
189 """getattr aliased to getitem"""
189 """getattr aliased to getitem"""
190 if key in self:
190 if key in self:
191 return self[key]
191 return self[key]
192 else:
192 else:
193 raise AttributeError(key)
193 raise AttributeError(key)
194
194
195 def __setattr__(self, key, value):
195 def __setattr__(self, key, value):
196 """setattr aliased to setitem, with strict"""
196 """setattr aliased to setitem, with strict"""
197 if key in self:
197 if key in self:
198 self[key] = value
198 self[key] = value
199 else:
199 else:
200 raise AttributeError(key)
200 raise AttributeError(key)
201
201
202 def __setitem__(self, key, value):
202 def __setitem__(self, key, value):
203 """strict static key enforcement"""
203 """strict static key enforcement"""
204 if key in self:
204 if key in self:
205 dict.__setitem__(self, key, value)
205 dict.__setitem__(self, key, value)
206 else:
206 else:
207 raise KeyError(key)
207 raise KeyError(key)
208
208
209
209
210 class Client(HasTraits):
210 class Client(HasTraits):
211 """A semi-synchronous client to the IPython ZMQ cluster
211 """A semi-synchronous client to the IPython ZMQ cluster
212
212
213 Parameters
213 Parameters
214 ----------
214 ----------
215
215
216 url_file : str/unicode; path to ipcontroller-client.json
216 url_file : str/unicode; path to ipcontroller-client.json
217 This JSON file should contain all the information needed to connect to a cluster,
217 This JSON file should contain all the information needed to connect to a cluster,
218 and is likely the only argument needed.
218 and is likely the only argument needed.
219 Connection information for the Hub's registration. If a json connector
219 Connection information for the Hub's registration. If a json connector
220 file is given, then likely no further configuration is necessary.
220 file is given, then likely no further configuration is necessary.
221 [Default: use profile]
221 [Default: use profile]
222 profile : bytes
222 profile : bytes
223 The name of the Cluster profile to be used to find connector information.
223 The name of the Cluster profile to be used to find connector information.
224 If run from an IPython application, the default profile will be the same
224 If run from an IPython application, the default profile will be the same
225 as the running application, otherwise it will be 'default'.
225 as the running application, otherwise it will be 'default'.
226 cluster_id : str
226 cluster_id : str
227 String id to added to runtime files, to prevent name collisions when using
227 String id to added to runtime files, to prevent name collisions when using
228 multiple clusters with a single profile simultaneously.
228 multiple clusters with a single profile simultaneously.
229 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
229 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
230 Since this is text inserted into filenames, typical recommendations apply:
230 Since this is text inserted into filenames, typical recommendations apply:
231 Simple character strings are ideal, and spaces are not recommended (but
231 Simple character strings are ideal, and spaces are not recommended (but
232 should generally work)
232 should generally work)
233 context : zmq.Context
233 context : zmq.Context
234 Pass an existing zmq.Context instance, otherwise the client will create its own.
234 Pass an existing zmq.Context instance, otherwise the client will create its own.
235 debug : bool
235 debug : bool
236 flag for lots of message printing for debug purposes
236 flag for lots of message printing for debug purposes
237 timeout : int/float
237 timeout : int/float
238 time (in seconds) to wait for connection replies from the Hub
238 time (in seconds) to wait for connection replies from the Hub
239 [Default: 10]
239 [Default: 10]
240
240
241 #-------------- session related args ----------------
241 #-------------- session related args ----------------
242
242
243 config : Config object
243 config : Config object
244 If specified, this will be relayed to the Session for configuration
244 If specified, this will be relayed to the Session for configuration
245 username : str
245 username : str
246 set username for the session object
246 set username for the session object
247
247
248 #-------------- ssh related args ----------------
248 #-------------- ssh related args ----------------
249 # These are args for configuring the ssh tunnel to be used
249 # These are args for configuring the ssh tunnel to be used
250 # credentials are used to forward connections over ssh to the Controller
250 # credentials are used to forward connections over ssh to the Controller
251 # Note that the ip given in `addr` needs to be relative to sshserver
251 # Note that the ip given in `addr` needs to be relative to sshserver
252 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
252 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
253 # and set sshserver as the same machine the Controller is on. However,
253 # and set sshserver as the same machine the Controller is on. However,
254 # the only requirement is that sshserver is able to see the Controller
254 # the only requirement is that sshserver is able to see the Controller
255 # (i.e. is within the same trusted network).
255 # (i.e. is within the same trusted network).
256
256
257 sshserver : str
257 sshserver : str
258 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
258 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
259 If keyfile or password is specified, and this is not, it will default to
259 If keyfile or password is specified, and this is not, it will default to
260 the ip given in addr.
260 the ip given in addr.
261 sshkey : str; path to ssh private key file
261 sshkey : str; path to ssh private key file
262 This specifies a key to be used in ssh login, default None.
262 This specifies a key to be used in ssh login, default None.
263 Regular default ssh keys will be used without specifying this argument.
263 Regular default ssh keys will be used without specifying this argument.
264 password : str
264 password : str
265 Your ssh password to sshserver. Note that if this is left None,
265 Your ssh password to sshserver. Note that if this is left None,
266 you will be prompted for it if passwordless key based login is unavailable.
266 you will be prompted for it if passwordless key based login is unavailable.
267 paramiko : bool
267 paramiko : bool
268 flag for whether to use paramiko instead of shell ssh for tunneling.
268 flag for whether to use paramiko instead of shell ssh for tunneling.
269 [default: True on win32, False else]
269 [default: True on win32, False else]
270
270
271
271
272 Attributes
272 Attributes
273 ----------
273 ----------
274
274
275 ids : list of int engine IDs
275 ids : list of int engine IDs
276 requesting the ids attribute always synchronizes
276 requesting the ids attribute always synchronizes
277 the registration state. To request ids without synchronization,
277 the registration state. To request ids without synchronization,
278 use semi-private _ids attributes.
278 use semi-private _ids attributes.
279
279
280 history : list of msg_ids
280 history : list of msg_ids
281 a list of msg_ids, keeping track of all the execution
281 a list of msg_ids, keeping track of all the execution
282 messages you have submitted in order.
282 messages you have submitted in order.
283
283
284 outstanding : set of msg_ids
284 outstanding : set of msg_ids
285 a set of msg_ids that have been submitted, but whose
285 a set of msg_ids that have been submitted, but whose
286 results have not yet been received.
286 results have not yet been received.
287
287
288 results : dict
288 results : dict
289 a dict of all our results, keyed by msg_id
289 a dict of all our results, keyed by msg_id
290
290
291 block : bool
291 block : bool
292 determines default behavior when block not specified
292 determines default behavior when block not specified
293 in execution methods
293 in execution methods
294
294
295 Methods
295 Methods
296 -------
296 -------
297
297
298 spin
298 spin
299 flushes incoming results and registration state changes
299 flushes incoming results and registration state changes
300 control methods spin, and requesting `ids` also ensures up to date
300 control methods spin, and requesting `ids` also ensures up to date
301
301
302 wait
302 wait
303 wait on one or more msg_ids
303 wait on one or more msg_ids
304
304
305 execution methods
305 execution methods
306 apply
306 apply
307 legacy: execute, run
307 legacy: execute, run
308
308
309 data movement
309 data movement
310 push, pull, scatter, gather
310 push, pull, scatter, gather
311
311
312 query methods
312 query methods
313 queue_status, get_result, purge, result_status
313 queue_status, get_result, purge, result_status
314
314
315 control methods
315 control methods
316 abort, shutdown
316 abort, shutdown
317
317
318 """
318 """
319
319
320
320
321 block = Bool(False)
321 block = Bool(False)
322 outstanding = Set()
322 outstanding = Set()
323 results = Instance('collections.defaultdict', (dict,))
323 results = Instance('collections.defaultdict', (dict,))
324 metadata = Instance('collections.defaultdict', (Metadata,))
324 metadata = Instance('collections.defaultdict', (Metadata,))
325 history = List()
325 history = List()
326 debug = Bool(False)
326 debug = Bool(False)
327 _spin_thread = Any()
327 _spin_thread = Any()
328 _stop_spinning = Any()
328 _stop_spinning = Any()
329
329
330 profile=Unicode()
330 profile=Unicode()
331 def _profile_default(self):
331 def _profile_default(self):
332 if BaseIPythonApplication.initialized():
332 if BaseIPythonApplication.initialized():
333 # an IPython app *might* be running, try to get its profile
333 # an IPython app *might* be running, try to get its profile
334 try:
334 try:
335 return BaseIPythonApplication.instance().profile
335 return BaseIPythonApplication.instance().profile
336 except (AttributeError, MultipleInstanceError):
336 except (AttributeError, MultipleInstanceError):
337 # could be a *different* subclass of config.Application,
337 # could be a *different* subclass of config.Application,
338 # which would raise one of these two errors.
338 # which would raise one of these two errors.
339 return u'default'
339 return u'default'
340 else:
340 else:
341 return u'default'
341 return u'default'
342
342
343
343
344 _outstanding_dict = Instance('collections.defaultdict', (set,))
344 _outstanding_dict = Instance('collections.defaultdict', (set,))
345 _ids = List()
345 _ids = List()
346 _connected=Bool(False)
346 _connected=Bool(False)
347 _ssh=Bool(False)
347 _ssh=Bool(False)
348 _context = Instance('zmq.Context')
348 _context = Instance('zmq.Context', allow_none=True)
349 _config = Dict()
349 _config = Dict()
350 _engines=Instance(util.ReverseDict, (), {})
350 _engines=Instance(util.ReverseDict, (), {})
351 # _hub_socket=Instance('zmq.Socket')
351 _query_socket=Instance('zmq.Socket', allow_none=True)
352 _query_socket=Instance('zmq.Socket')
352 _control_socket=Instance('zmq.Socket', allow_none=True)
353 _control_socket=Instance('zmq.Socket')
353 _iopub_socket=Instance('zmq.Socket', allow_none=True)
354 _iopub_socket=Instance('zmq.Socket')
354 _notification_socket=Instance('zmq.Socket', allow_none=True)
355 _notification_socket=Instance('zmq.Socket')
355 _mux_socket=Instance('zmq.Socket', allow_none=True)
356 _mux_socket=Instance('zmq.Socket')
356 _task_socket=Instance('zmq.Socket', allow_none=True)
357 _task_socket=Instance('zmq.Socket')
358 _task_scheme=Unicode()
357 _task_scheme=Unicode()
359 _closed = False
358 _closed = False
360 _ignored_control_replies=Integer(0)
359 _ignored_control_replies=Integer(0)
361 _ignored_hub_replies=Integer(0)
360 _ignored_hub_replies=Integer(0)
362
361
363 def __new__(self, *args, **kw):
362 def __new__(self, *args, **kw):
364 # don't raise on positional args
363 # don't raise on positional args
365 return HasTraits.__new__(self, **kw)
364 return HasTraits.__new__(self, **kw)
366
365
367 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
366 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
368 context=None, debug=False,
367 context=None, debug=False,
369 sshserver=None, sshkey=None, password=None, paramiko=None,
368 sshserver=None, sshkey=None, password=None, paramiko=None,
370 timeout=10, cluster_id=None, **extra_args
369 timeout=10, cluster_id=None, **extra_args
371 ):
370 ):
372 if profile:
371 if profile:
373 super(Client, self).__init__(debug=debug, profile=profile)
372 super(Client, self).__init__(debug=debug, profile=profile)
374 else:
373 else:
375 super(Client, self).__init__(debug=debug)
374 super(Client, self).__init__(debug=debug)
376 if context is None:
375 if context is None:
377 context = zmq.Context.instance()
376 context = zmq.Context.instance()
378 self._context = context
377 self._context = context
379 self._stop_spinning = Event()
378 self._stop_spinning = Event()
380
379
381 if 'url_or_file' in extra_args:
380 if 'url_or_file' in extra_args:
382 url_file = extra_args['url_or_file']
381 url_file = extra_args['url_or_file']
383 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
382 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
384
383
385 if url_file and util.is_url(url_file):
384 if url_file and util.is_url(url_file):
386 raise ValueError("single urls cannot be specified, url-files must be used.")
385 raise ValueError("single urls cannot be specified, url-files must be used.")
387
386
388 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
387 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
389
388
390 no_file_msg = '\n'.join([
389 no_file_msg = '\n'.join([
391 "You have attempted to connect to an IPython Cluster but no Controller could be found.",
390 "You have attempted to connect to an IPython Cluster but no Controller could be found.",
392 "Please double-check your configuration and ensure that a cluster is running.",
391 "Please double-check your configuration and ensure that a cluster is running.",
393 ])
392 ])
394
393
395 if self._cd is not None:
394 if self._cd is not None:
396 if url_file is None:
395 if url_file is None:
397 if not cluster_id:
396 if not cluster_id:
398 client_json = 'ipcontroller-client.json'
397 client_json = 'ipcontroller-client.json'
399 else:
398 else:
400 client_json = 'ipcontroller-%s-client.json' % cluster_id
399 client_json = 'ipcontroller-%s-client.json' % cluster_id
401 url_file = pjoin(self._cd.security_dir, client_json)
400 url_file = pjoin(self._cd.security_dir, client_json)
402 if not os.path.exists(url_file):
401 if not os.path.exists(url_file):
403 msg = '\n'.join([
402 msg = '\n'.join([
404 "Connection file %r not found." % compress_user(url_file),
403 "Connection file %r not found." % compress_user(url_file),
405 no_file_msg,
404 no_file_msg,
406 ])
405 ])
407 raise IOError(msg)
406 raise IOError(msg)
408 if url_file is None:
407 if url_file is None:
409 raise IOError(no_file_msg)
408 raise IOError(no_file_msg)
410
409
411 if not os.path.exists(url_file):
410 if not os.path.exists(url_file):
412 # Connection file explicitly specified, but not found
411 # Connection file explicitly specified, but not found
413 raise IOError("Connection file %r not found. Is a controller running?" % \
412 raise IOError("Connection file %r not found. Is a controller running?" % \
414 compress_user(url_file)
413 compress_user(url_file)
415 )
414 )
416
415
417 with open(url_file) as f:
416 with open(url_file) as f:
418 cfg = json.load(f)
417 cfg = json.load(f)
419
418
420 self._task_scheme = cfg['task_scheme']
419 self._task_scheme = cfg['task_scheme']
421
420
422 # sync defaults from args, json:
421 # sync defaults from args, json:
423 if sshserver:
422 if sshserver:
424 cfg['ssh'] = sshserver
423 cfg['ssh'] = sshserver
425
424
426 location = cfg.setdefault('location', None)
425 location = cfg.setdefault('location', None)
427
426
428 proto,addr = cfg['interface'].split('://')
427 proto,addr = cfg['interface'].split('://')
429 addr = util.disambiguate_ip_address(addr, location)
428 addr = util.disambiguate_ip_address(addr, location)
430 cfg['interface'] = "%s://%s" % (proto, addr)
429 cfg['interface'] = "%s://%s" % (proto, addr)
431
430
432 # turn interface,port into full urls:
431 # turn interface,port into full urls:
433 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
432 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
434 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
433 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
435
434
436 url = cfg['registration']
435 url = cfg['registration']
437
436
438 if location is not None and addr == localhost():
437 if location is not None and addr == localhost():
439 # location specified, and connection is expected to be local
438 # location specified, and connection is expected to be local
440 if not is_local_ip(location) and not sshserver:
439 if not is_local_ip(location) and not sshserver:
441 # load ssh from JSON *only* if the controller is not on
440 # load ssh from JSON *only* if the controller is not on
442 # this machine
441 # this machine
443 sshserver=cfg['ssh']
442 sshserver=cfg['ssh']
444 if not is_local_ip(location) and not sshserver:
443 if not is_local_ip(location) and not sshserver:
445 # warn if no ssh specified, but SSH is probably needed
444 # warn if no ssh specified, but SSH is probably needed
446 # This is only a warning, because the most likely cause
445 # This is only a warning, because the most likely cause
447 # is a local Controller on a laptop whose IP is dynamic
446 # is a local Controller on a laptop whose IP is dynamic
448 warnings.warn("""
447 warnings.warn("""
449 Controller appears to be listening on localhost, but not on this machine.
448 Controller appears to be listening on localhost, but not on this machine.
450 If this is true, you should specify Client(...,sshserver='you@%s')
449 If this is true, you should specify Client(...,sshserver='you@%s')
451 or instruct your controller to listen on an external IP."""%location,
450 or instruct your controller to listen on an external IP."""%location,
452 RuntimeWarning)
451 RuntimeWarning)
453 elif not sshserver:
452 elif not sshserver:
454 # otherwise sync with cfg
453 # otherwise sync with cfg
455 sshserver = cfg['ssh']
454 sshserver = cfg['ssh']
456
455
457 self._config = cfg
456 self._config = cfg
458
457
459 self._ssh = bool(sshserver or sshkey or password)
458 self._ssh = bool(sshserver or sshkey or password)
460 if self._ssh and sshserver is None:
459 if self._ssh and sshserver is None:
461 # default to ssh via localhost
460 # default to ssh via localhost
462 sshserver = addr
461 sshserver = addr
463 if self._ssh and password is None:
462 if self._ssh and password is None:
464 from zmq.ssh import tunnel
463 from zmq.ssh import tunnel
465 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
464 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
466 password=False
465 password=False
467 else:
466 else:
468 password = getpass("SSH Password for %s: "%sshserver)
467 password = getpass("SSH Password for %s: "%sshserver)
469 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
468 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
470
469
471 # configure and construct the session
470 # configure and construct the session
472 try:
471 try:
473 extra_args['packer'] = cfg['pack']
472 extra_args['packer'] = cfg['pack']
474 extra_args['unpacker'] = cfg['unpack']
473 extra_args['unpacker'] = cfg['unpack']
475 extra_args['key'] = cast_bytes(cfg['key'])
474 extra_args['key'] = cast_bytes(cfg['key'])
476 extra_args['signature_scheme'] = cfg['signature_scheme']
475 extra_args['signature_scheme'] = cfg['signature_scheme']
477 except KeyError as exc:
476 except KeyError as exc:
478 msg = '\n'.join([
477 msg = '\n'.join([
479 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
478 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
480 "If you are reusing connection files, remove them and start ipcontroller again."
479 "If you are reusing connection files, remove them and start ipcontroller again."
481 ])
480 ])
482 raise ValueError(msg.format(exc.message))
481 raise ValueError(msg.format(exc.message))
483
482
484 self.session = Session(**extra_args)
483 self.session = Session(**extra_args)
485
484
486 self._query_socket = self._context.socket(zmq.DEALER)
485 self._query_socket = self._context.socket(zmq.DEALER)
487
486
488 if self._ssh:
487 if self._ssh:
489 from zmq.ssh import tunnel
488 from zmq.ssh import tunnel
490 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
489 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
491 else:
490 else:
492 self._query_socket.connect(cfg['registration'])
491 self._query_socket.connect(cfg['registration'])
493
492
494 self.session.debug = self.debug
493 self.session.debug = self.debug
495
494
496 self._notification_handlers = {'registration_notification' : self._register_engine,
495 self._notification_handlers = {'registration_notification' : self._register_engine,
497 'unregistration_notification' : self._unregister_engine,
496 'unregistration_notification' : self._unregister_engine,
498 'shutdown_notification' : lambda msg: self.close(),
497 'shutdown_notification' : lambda msg: self.close(),
499 }
498 }
500 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
499 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
501 'apply_reply' : self._handle_apply_reply}
500 'apply_reply' : self._handle_apply_reply}
502
501
503 try:
502 try:
504 self._connect(sshserver, ssh_kwargs, timeout)
503 self._connect(sshserver, ssh_kwargs, timeout)
505 except:
504 except:
506 self.close(linger=0)
505 self.close(linger=0)
507 raise
506 raise
508
507
509 # last step: setup magics, if we are in IPython:
508 # last step: setup magics, if we are in IPython:
510
509
511 try:
510 try:
512 ip = get_ipython()
511 ip = get_ipython()
513 except NameError:
512 except NameError:
514 return
513 return
515 else:
514 else:
516 if 'px' not in ip.magics_manager.magics:
515 if 'px' not in ip.magics_manager.magics:
517 # in IPython but we are the first Client.
516 # in IPython but we are the first Client.
518 # activate a default view for parallel magics.
517 # activate a default view for parallel magics.
519 self.activate()
518 self.activate()
520
519
521 def __del__(self):
520 def __del__(self):
522 """cleanup sockets, but _not_ context."""
521 """cleanup sockets, but _not_ context."""
523 self.close()
522 self.close()
524
523
525 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
524 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
526 if ipython_dir is None:
525 if ipython_dir is None:
527 ipython_dir = get_ipython_dir()
526 ipython_dir = get_ipython_dir()
528 if profile_dir is not None:
527 if profile_dir is not None:
529 try:
528 try:
530 self._cd = ProfileDir.find_profile_dir(profile_dir)
529 self._cd = ProfileDir.find_profile_dir(profile_dir)
531 return
530 return
532 except ProfileDirError:
531 except ProfileDirError:
533 pass
532 pass
534 elif profile is not None:
533 elif profile is not None:
535 try:
534 try:
536 self._cd = ProfileDir.find_profile_dir_by_name(
535 self._cd = ProfileDir.find_profile_dir_by_name(
537 ipython_dir, profile)
536 ipython_dir, profile)
538 return
537 return
539 except ProfileDirError:
538 except ProfileDirError:
540 pass
539 pass
541 self._cd = None
540 self._cd = None
542
541
543 def _update_engines(self, engines):
542 def _update_engines(self, engines):
544 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
543 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
545 for k,v in iteritems(engines):
544 for k,v in iteritems(engines):
546 eid = int(k)
545 eid = int(k)
547 if eid not in self._engines:
546 if eid not in self._engines:
548 self._ids.append(eid)
547 self._ids.append(eid)
549 self._engines[eid] = v
548 self._engines[eid] = v
550 self._ids = sorted(self._ids)
549 self._ids = sorted(self._ids)
551 if sorted(self._engines.keys()) != list(range(len(self._engines))) and \
550 if sorted(self._engines.keys()) != list(range(len(self._engines))) and \
552 self._task_scheme == 'pure' and self._task_socket:
551 self._task_scheme == 'pure' and self._task_socket:
553 self._stop_scheduling_tasks()
552 self._stop_scheduling_tasks()
554
553
555 def _stop_scheduling_tasks(self):
554 def _stop_scheduling_tasks(self):
556 """Stop scheduling tasks because an engine has been unregistered
555 """Stop scheduling tasks because an engine has been unregistered
557 from a pure ZMQ scheduler.
556 from a pure ZMQ scheduler.
558 """
557 """
559 self._task_socket.close()
558 self._task_socket.close()
560 self._task_socket = None
559 self._task_socket = None
561 msg = "An engine has been unregistered, and we are using pure " +\
560 msg = "An engine has been unregistered, and we are using pure " +\
562 "ZMQ task scheduling. Task farming will be disabled."
561 "ZMQ task scheduling. Task farming will be disabled."
563 if self.outstanding:
562 if self.outstanding:
564 msg += " If you were running tasks when this happened, " +\
563 msg += " If you were running tasks when this happened, " +\
565 "some `outstanding` msg_ids may never resolve."
564 "some `outstanding` msg_ids may never resolve."
566 warnings.warn(msg, RuntimeWarning)
565 warnings.warn(msg, RuntimeWarning)
567
566
568 def _build_targets(self, targets):
567 def _build_targets(self, targets):
569 """Turn valid target IDs or 'all' into two lists:
568 """Turn valid target IDs or 'all' into two lists:
570 (int_ids, uuids).
569 (int_ids, uuids).
571 """
570 """
572 if not self._ids:
571 if not self._ids:
573 # flush notification socket if no engines yet, just in case
572 # flush notification socket if no engines yet, just in case
574 if not self.ids:
573 if not self.ids:
575 raise error.NoEnginesRegistered("Can't build targets without any engines")
574 raise error.NoEnginesRegistered("Can't build targets without any engines")
576
575
577 if targets is None:
576 if targets is None:
578 targets = self._ids
577 targets = self._ids
579 elif isinstance(targets, string_types):
578 elif isinstance(targets, string_types):
580 if targets.lower() == 'all':
579 if targets.lower() == 'all':
581 targets = self._ids
580 targets = self._ids
582 else:
581 else:
583 raise TypeError("%r not valid str target, must be 'all'"%(targets))
582 raise TypeError("%r not valid str target, must be 'all'"%(targets))
584 elif isinstance(targets, int):
583 elif isinstance(targets, int):
585 if targets < 0:
584 if targets < 0:
586 targets = self.ids[targets]
585 targets = self.ids[targets]
587 if targets not in self._ids:
586 if targets not in self._ids:
588 raise IndexError("No such engine: %i"%targets)
587 raise IndexError("No such engine: %i"%targets)
589 targets = [targets]
588 targets = [targets]
590
589
591 if isinstance(targets, slice):
590 if isinstance(targets, slice):
592 indices = list(range(len(self._ids))[targets])
591 indices = list(range(len(self._ids))[targets])
593 ids = self.ids
592 ids = self.ids
594 targets = [ ids[i] for i in indices ]
593 targets = [ ids[i] for i in indices ]
595
594
596 if not isinstance(targets, (tuple, list, xrange)):
595 if not isinstance(targets, (tuple, list, xrange)):
597 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
596 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
598
597
599 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
598 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
600
599
601 def _connect(self, sshserver, ssh_kwargs, timeout):
600 def _connect(self, sshserver, ssh_kwargs, timeout):
602 """setup all our socket connections to the cluster. This is called from
601 """setup all our socket connections to the cluster. This is called from
603 __init__."""
602 __init__."""
604
603
605 # Maybe allow reconnecting?
604 # Maybe allow reconnecting?
606 if self._connected:
605 if self._connected:
607 return
606 return
608 self._connected=True
607 self._connected=True
609
608
610 def connect_socket(s, url):
609 def connect_socket(s, url):
611 if self._ssh:
610 if self._ssh:
612 from zmq.ssh import tunnel
611 from zmq.ssh import tunnel
613 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
612 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
614 else:
613 else:
615 return s.connect(url)
614 return s.connect(url)
616
615
617 self.session.send(self._query_socket, 'connection_request')
616 self.session.send(self._query_socket, 'connection_request')
618 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
617 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
619 poller = zmq.Poller()
618 poller = zmq.Poller()
620 poller.register(self._query_socket, zmq.POLLIN)
619 poller.register(self._query_socket, zmq.POLLIN)
621 # poll expects milliseconds, timeout is seconds
620 # poll expects milliseconds, timeout is seconds
622 evts = poller.poll(timeout*1000)
621 evts = poller.poll(timeout*1000)
623 if not evts:
622 if not evts:
624 raise error.TimeoutError("Hub connection request timed out")
623 raise error.TimeoutError("Hub connection request timed out")
625 idents,msg = self.session.recv(self._query_socket,mode=0)
624 idents,msg = self.session.recv(self._query_socket,mode=0)
626 if self.debug:
625 if self.debug:
627 pprint(msg)
626 pprint(msg)
628 content = msg['content']
627 content = msg['content']
629 # self._config['registration'] = dict(content)
628 # self._config['registration'] = dict(content)
630 cfg = self._config
629 cfg = self._config
631 if content['status'] == 'ok':
630 if content['status'] == 'ok':
632 self._mux_socket = self._context.socket(zmq.DEALER)
631 self._mux_socket = self._context.socket(zmq.DEALER)
633 connect_socket(self._mux_socket, cfg['mux'])
632 connect_socket(self._mux_socket, cfg['mux'])
634
633
635 self._task_socket = self._context.socket(zmq.DEALER)
634 self._task_socket = self._context.socket(zmq.DEALER)
636 connect_socket(self._task_socket, cfg['task'])
635 connect_socket(self._task_socket, cfg['task'])
637
636
638 self._notification_socket = self._context.socket(zmq.SUB)
637 self._notification_socket = self._context.socket(zmq.SUB)
639 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
638 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
640 connect_socket(self._notification_socket, cfg['notification'])
639 connect_socket(self._notification_socket, cfg['notification'])
641
640
642 self._control_socket = self._context.socket(zmq.DEALER)
641 self._control_socket = self._context.socket(zmq.DEALER)
643 connect_socket(self._control_socket, cfg['control'])
642 connect_socket(self._control_socket, cfg['control'])
644
643
645 self._iopub_socket = self._context.socket(zmq.SUB)
644 self._iopub_socket = self._context.socket(zmq.SUB)
646 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
645 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
647 connect_socket(self._iopub_socket, cfg['iopub'])
646 connect_socket(self._iopub_socket, cfg['iopub'])
648
647
649 self._update_engines(dict(content['engines']))
648 self._update_engines(dict(content['engines']))
650 else:
649 else:
651 self._connected = False
650 self._connected = False
652 raise Exception("Failed to connect!")
651 raise Exception("Failed to connect!")
653
652
654 #--------------------------------------------------------------------------
653 #--------------------------------------------------------------------------
655 # handlers and callbacks for incoming messages
654 # handlers and callbacks for incoming messages
656 #--------------------------------------------------------------------------
655 #--------------------------------------------------------------------------
657
656
658 def _unwrap_exception(self, content):
657 def _unwrap_exception(self, content):
659 """unwrap exception, and remap engine_id to int."""
658 """unwrap exception, and remap engine_id to int."""
660 e = error.unwrap_exception(content)
659 e = error.unwrap_exception(content)
661 # print e.traceback
660 # print e.traceback
662 if e.engine_info:
661 if e.engine_info:
663 e_uuid = e.engine_info['engine_uuid']
662 e_uuid = e.engine_info['engine_uuid']
664 eid = self._engines[e_uuid]
663 eid = self._engines[e_uuid]
665 e.engine_info['engine_id'] = eid
664 e.engine_info['engine_id'] = eid
666 return e
665 return e
667
666
668 def _extract_metadata(self, msg):
667 def _extract_metadata(self, msg):
669 header = msg['header']
668 header = msg['header']
670 parent = msg['parent_header']
669 parent = msg['parent_header']
671 msg_meta = msg['metadata']
670 msg_meta = msg['metadata']
672 content = msg['content']
671 content = msg['content']
673 md = {'msg_id' : parent['msg_id'],
672 md = {'msg_id' : parent['msg_id'],
674 'received' : datetime.now(),
673 'received' : datetime.now(),
675 'engine_uuid' : msg_meta.get('engine', None),
674 'engine_uuid' : msg_meta.get('engine', None),
676 'follow' : msg_meta.get('follow', []),
675 'follow' : msg_meta.get('follow', []),
677 'after' : msg_meta.get('after', []),
676 'after' : msg_meta.get('after', []),
678 'status' : content['status'],
677 'status' : content['status'],
679 }
678 }
680
679
681 if md['engine_uuid'] is not None:
680 if md['engine_uuid'] is not None:
682 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
681 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
683
682
684 if 'date' in parent:
683 if 'date' in parent:
685 md['submitted'] = parent['date']
684 md['submitted'] = parent['date']
686 if 'started' in msg_meta:
685 if 'started' in msg_meta:
687 md['started'] = parse_date(msg_meta['started'])
686 md['started'] = parse_date(msg_meta['started'])
688 if 'date' in header:
687 if 'date' in header:
689 md['completed'] = header['date']
688 md['completed'] = header['date']
690 return md
689 return md
691
690
692 def _register_engine(self, msg):
691 def _register_engine(self, msg):
693 """Register a new engine, and update our connection info."""
692 """Register a new engine, and update our connection info."""
694 content = msg['content']
693 content = msg['content']
695 eid = content['id']
694 eid = content['id']
696 d = {eid : content['uuid']}
695 d = {eid : content['uuid']}
697 self._update_engines(d)
696 self._update_engines(d)
698
697
699 def _unregister_engine(self, msg):
698 def _unregister_engine(self, msg):
700 """Unregister an engine that has died."""
699 """Unregister an engine that has died."""
701 content = msg['content']
700 content = msg['content']
702 eid = int(content['id'])
701 eid = int(content['id'])
703 if eid in self._ids:
702 if eid in self._ids:
704 self._ids.remove(eid)
703 self._ids.remove(eid)
705 uuid = self._engines.pop(eid)
704 uuid = self._engines.pop(eid)
706
705
707 self._handle_stranded_msgs(eid, uuid)
706 self._handle_stranded_msgs(eid, uuid)
708
707
709 if self._task_socket and self._task_scheme == 'pure':
708 if self._task_socket and self._task_scheme == 'pure':
710 self._stop_scheduling_tasks()
709 self._stop_scheduling_tasks()
711
710
712 def _handle_stranded_msgs(self, eid, uuid):
711 def _handle_stranded_msgs(self, eid, uuid):
713 """Handle messages known to be on an engine when the engine unregisters.
712 """Handle messages known to be on an engine when the engine unregisters.
714
713
715 It is possible that this will fire prematurely - that is, an engine will
714 It is possible that this will fire prematurely - that is, an engine will
716 go down after completing a result, and the client will be notified
715 go down after completing a result, and the client will be notified
717 of the unregistration and later receive the successful result.
716 of the unregistration and later receive the successful result.
718 """
717 """
719
718
720 outstanding = self._outstanding_dict[uuid]
719 outstanding = self._outstanding_dict[uuid]
721
720
722 for msg_id in list(outstanding):
721 for msg_id in list(outstanding):
723 if msg_id in self.results:
722 if msg_id in self.results:
724 # we already
723 # we already
725 continue
724 continue
726 try:
725 try:
727 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
726 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
728 except:
727 except:
729 content = error.wrap_exception()
728 content = error.wrap_exception()
730 # build a fake message:
729 # build a fake message:
731 msg = self.session.msg('apply_reply', content=content)
730 msg = self.session.msg('apply_reply', content=content)
732 msg['parent_header']['msg_id'] = msg_id
731 msg['parent_header']['msg_id'] = msg_id
733 msg['metadata']['engine'] = uuid
732 msg['metadata']['engine'] = uuid
734 self._handle_apply_reply(msg)
733 self._handle_apply_reply(msg)
735
734
736 def _handle_execute_reply(self, msg):
735 def _handle_execute_reply(self, msg):
737 """Save the reply to an execute_request into our results.
736 """Save the reply to an execute_request into our results.
738
737
739 execute messages are never actually used. apply is used instead.
738 execute messages are never actually used. apply is used instead.
740 """
739 """
741
740
742 parent = msg['parent_header']
741 parent = msg['parent_header']
743 msg_id = parent['msg_id']
742 msg_id = parent['msg_id']
744 if msg_id not in self.outstanding:
743 if msg_id not in self.outstanding:
745 if msg_id in self.history:
744 if msg_id in self.history:
746 print("got stale result: %s"%msg_id)
745 print("got stale result: %s"%msg_id)
747 else:
746 else:
748 print("got unknown result: %s"%msg_id)
747 print("got unknown result: %s"%msg_id)
749 else:
748 else:
750 self.outstanding.remove(msg_id)
749 self.outstanding.remove(msg_id)
751
750
752 content = msg['content']
751 content = msg['content']
753 header = msg['header']
752 header = msg['header']
754
753
755 # construct metadata:
754 # construct metadata:
756 md = self.metadata[msg_id]
755 md = self.metadata[msg_id]
757 md.update(self._extract_metadata(msg))
756 md.update(self._extract_metadata(msg))
758 # is this redundant?
757 # is this redundant?
759 self.metadata[msg_id] = md
758 self.metadata[msg_id] = md
760
759
761 e_outstanding = self._outstanding_dict[md['engine_uuid']]
760 e_outstanding = self._outstanding_dict[md['engine_uuid']]
762 if msg_id in e_outstanding:
761 if msg_id in e_outstanding:
763 e_outstanding.remove(msg_id)
762 e_outstanding.remove(msg_id)
764
763
765 # construct result:
764 # construct result:
766 if content['status'] == 'ok':
765 if content['status'] == 'ok':
767 self.results[msg_id] = ExecuteReply(msg_id, content, md)
766 self.results[msg_id] = ExecuteReply(msg_id, content, md)
768 elif content['status'] == 'aborted':
767 elif content['status'] == 'aborted':
769 self.results[msg_id] = error.TaskAborted(msg_id)
768 self.results[msg_id] = error.TaskAborted(msg_id)
770 elif content['status'] == 'resubmitted':
769 elif content['status'] == 'resubmitted':
771 # TODO: handle resubmission
770 # TODO: handle resubmission
772 pass
771 pass
773 else:
772 else:
774 self.results[msg_id] = self._unwrap_exception(content)
773 self.results[msg_id] = self._unwrap_exception(content)
775
774
776 def _handle_apply_reply(self, msg):
775 def _handle_apply_reply(self, msg):
777 """Save the reply to an apply_request into our results."""
776 """Save the reply to an apply_request into our results."""
778 parent = msg['parent_header']
777 parent = msg['parent_header']
779 msg_id = parent['msg_id']
778 msg_id = parent['msg_id']
780 if msg_id not in self.outstanding:
779 if msg_id not in self.outstanding:
781 if msg_id in self.history:
780 if msg_id in self.history:
782 print("got stale result: %s"%msg_id)
781 print("got stale result: %s"%msg_id)
783 print(self.results[msg_id])
782 print(self.results[msg_id])
784 print(msg)
783 print(msg)
785 else:
784 else:
786 print("got unknown result: %s"%msg_id)
785 print("got unknown result: %s"%msg_id)
787 else:
786 else:
788 self.outstanding.remove(msg_id)
787 self.outstanding.remove(msg_id)
789 content = msg['content']
788 content = msg['content']
790 header = msg['header']
789 header = msg['header']
791
790
792 # construct metadata:
791 # construct metadata:
793 md = self.metadata[msg_id]
792 md = self.metadata[msg_id]
794 md.update(self._extract_metadata(msg))
793 md.update(self._extract_metadata(msg))
795 # is this redundant?
794 # is this redundant?
796 self.metadata[msg_id] = md
795 self.metadata[msg_id] = md
797
796
798 e_outstanding = self._outstanding_dict[md['engine_uuid']]
797 e_outstanding = self._outstanding_dict[md['engine_uuid']]
799 if msg_id in e_outstanding:
798 if msg_id in e_outstanding:
800 e_outstanding.remove(msg_id)
799 e_outstanding.remove(msg_id)
801
800
802 # construct result:
801 # construct result:
803 if content['status'] == 'ok':
802 if content['status'] == 'ok':
804 self.results[msg_id] = serialize.deserialize_object(msg['buffers'])[0]
803 self.results[msg_id] = serialize.deserialize_object(msg['buffers'])[0]
805 elif content['status'] == 'aborted':
804 elif content['status'] == 'aborted':
806 self.results[msg_id] = error.TaskAborted(msg_id)
805 self.results[msg_id] = error.TaskAborted(msg_id)
807 elif content['status'] == 'resubmitted':
806 elif content['status'] == 'resubmitted':
808 # TODO: handle resubmission
807 # TODO: handle resubmission
809 pass
808 pass
810 else:
809 else:
811 self.results[msg_id] = self._unwrap_exception(content)
810 self.results[msg_id] = self._unwrap_exception(content)
812
811
813 def _flush_notifications(self):
812 def _flush_notifications(self):
814 """Flush notifications of engine registrations waiting
813 """Flush notifications of engine registrations waiting
815 in ZMQ queue."""
814 in ZMQ queue."""
816 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
815 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
817 while msg is not None:
816 while msg is not None:
818 if self.debug:
817 if self.debug:
819 pprint(msg)
818 pprint(msg)
820 msg_type = msg['header']['msg_type']
819 msg_type = msg['header']['msg_type']
821 handler = self._notification_handlers.get(msg_type, None)
820 handler = self._notification_handlers.get(msg_type, None)
822 if handler is None:
821 if handler is None:
823 raise Exception("Unhandled message type: %s" % msg_type)
822 raise Exception("Unhandled message type: %s" % msg_type)
824 else:
823 else:
825 handler(msg)
824 handler(msg)
826 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
825 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
827
826
828 def _flush_results(self, sock):
827 def _flush_results(self, sock):
829 """Flush task or queue results waiting in ZMQ queue."""
828 """Flush task or queue results waiting in ZMQ queue."""
830 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
829 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
831 while msg is not None:
830 while msg is not None:
832 if self.debug:
831 if self.debug:
833 pprint(msg)
832 pprint(msg)
834 msg_type = msg['header']['msg_type']
833 msg_type = msg['header']['msg_type']
835 handler = self._queue_handlers.get(msg_type, None)
834 handler = self._queue_handlers.get(msg_type, None)
836 if handler is None:
835 if handler is None:
837 raise Exception("Unhandled message type: %s" % msg_type)
836 raise Exception("Unhandled message type: %s" % msg_type)
838 else:
837 else:
839 handler(msg)
838 handler(msg)
840 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
839 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
841
840
842 def _flush_control(self, sock):
841 def _flush_control(self, sock):
843 """Flush replies from the control channel waiting
842 """Flush replies from the control channel waiting
844 in the ZMQ queue.
843 in the ZMQ queue.
845
844
846 Currently: ignore them."""
845 Currently: ignore them."""
847 if self._ignored_control_replies <= 0:
846 if self._ignored_control_replies <= 0:
848 return
847 return
849 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
848 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
850 while msg is not None:
849 while msg is not None:
851 self._ignored_control_replies -= 1
850 self._ignored_control_replies -= 1
852 if self.debug:
851 if self.debug:
853 pprint(msg)
852 pprint(msg)
854 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
853 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
855
854
856 def _flush_ignored_control(self):
855 def _flush_ignored_control(self):
857 """flush ignored control replies"""
856 """flush ignored control replies"""
858 while self._ignored_control_replies > 0:
857 while self._ignored_control_replies > 0:
859 self.session.recv(self._control_socket)
858 self.session.recv(self._control_socket)
860 self._ignored_control_replies -= 1
859 self._ignored_control_replies -= 1
861
860
862 def _flush_ignored_hub_replies(self):
861 def _flush_ignored_hub_replies(self):
863 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
862 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
864 while msg is not None:
863 while msg is not None:
865 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
864 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
866
865
867 def _flush_iopub(self, sock):
866 def _flush_iopub(self, sock):
868 """Flush replies from the iopub channel waiting
867 """Flush replies from the iopub channel waiting
869 in the ZMQ queue.
868 in the ZMQ queue.
870 """
869 """
871 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
870 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
872 while msg is not None:
871 while msg is not None:
873 if self.debug:
872 if self.debug:
874 pprint(msg)
873 pprint(msg)
875 parent = msg['parent_header']
874 parent = msg['parent_header']
876 if not parent or parent['session'] != self.session.session:
875 if not parent or parent['session'] != self.session.session:
877 # ignore IOPub messages not from here
876 # ignore IOPub messages not from here
878 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
877 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
879 continue
878 continue
880 msg_id = parent['msg_id']
879 msg_id = parent['msg_id']
881 content = msg['content']
880 content = msg['content']
882 header = msg['header']
881 header = msg['header']
883 msg_type = msg['header']['msg_type']
882 msg_type = msg['header']['msg_type']
884
883
885 if msg_type == 'status' and msg_id not in self.metadata:
884 if msg_type == 'status' and msg_id not in self.metadata:
886 # ignore status messages if they aren't mine
885 # ignore status messages if they aren't mine
887 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
886 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
888 continue
887 continue
889
888
890 # init metadata:
889 # init metadata:
891 md = self.metadata[msg_id]
890 md = self.metadata[msg_id]
892
891
893 if msg_type == 'stream':
892 if msg_type == 'stream':
894 name = content['name']
893 name = content['name']
895 s = md[name] or ''
894 s = md[name] or ''
896 md[name] = s + content['text']
895 md[name] = s + content['text']
897 elif msg_type == 'error':
896 elif msg_type == 'error':
898 md.update({'error' : self._unwrap_exception(content)})
897 md.update({'error' : self._unwrap_exception(content)})
899 elif msg_type == 'execute_input':
898 elif msg_type == 'execute_input':
900 md.update({'execute_input' : content['code']})
899 md.update({'execute_input' : content['code']})
901 elif msg_type == 'display_data':
900 elif msg_type == 'display_data':
902 md['outputs'].append(content)
901 md['outputs'].append(content)
903 elif msg_type == 'execute_result':
902 elif msg_type == 'execute_result':
904 md['execute_result'] = content
903 md['execute_result'] = content
905 elif msg_type == 'data_message':
904 elif msg_type == 'data_message':
906 data, remainder = serialize.deserialize_object(msg['buffers'])
905 data, remainder = serialize.deserialize_object(msg['buffers'])
907 md['data'].update(data)
906 md['data'].update(data)
908 elif msg_type == 'status':
907 elif msg_type == 'status':
909 # idle message comes after all outputs
908 # idle message comes after all outputs
910 if content['execution_state'] == 'idle':
909 if content['execution_state'] == 'idle':
911 md['outputs_ready'] = True
910 md['outputs_ready'] = True
912 else:
911 else:
913 # unhandled msg_type (status, etc.)
912 # unhandled msg_type (status, etc.)
914 pass
913 pass
915
914
916 # reduntant?
915 # reduntant?
917 self.metadata[msg_id] = md
916 self.metadata[msg_id] = md
918
917
919 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
918 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
920
919
921 #--------------------------------------------------------------------------
920 #--------------------------------------------------------------------------
922 # len, getitem
921 # len, getitem
923 #--------------------------------------------------------------------------
922 #--------------------------------------------------------------------------
924
923
925 def __len__(self):
924 def __len__(self):
926 """len(client) returns # of engines."""
925 """len(client) returns # of engines."""
927 return len(self.ids)
926 return len(self.ids)
928
927
929 def __getitem__(self, key):
928 def __getitem__(self, key):
930 """index access returns DirectView multiplexer objects
929 """index access returns DirectView multiplexer objects
931
930
932 Must be int, slice, or list/tuple/xrange of ints"""
931 Must be int, slice, or list/tuple/xrange of ints"""
933 if not isinstance(key, (int, slice, tuple, list, xrange)):
932 if not isinstance(key, (int, slice, tuple, list, xrange)):
934 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
933 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
935 else:
934 else:
936 return self.direct_view(key)
935 return self.direct_view(key)
937
936
938 def __iter__(self):
937 def __iter__(self):
939 """Since we define getitem, Client is iterable
938 """Since we define getitem, Client is iterable
940
939
941 but unless we also define __iter__, it won't work correctly unless engine IDs
940 but unless we also define __iter__, it won't work correctly unless engine IDs
942 start at zero and are continuous.
941 start at zero and are continuous.
943 """
942 """
944 for eid in self.ids:
943 for eid in self.ids:
945 yield self.direct_view(eid)
944 yield self.direct_view(eid)
946
945
947 #--------------------------------------------------------------------------
946 #--------------------------------------------------------------------------
948 # Begin public methods
947 # Begin public methods
949 #--------------------------------------------------------------------------
948 #--------------------------------------------------------------------------
950
949
951 @property
950 @property
952 def ids(self):
951 def ids(self):
953 """Always up-to-date ids property."""
952 """Always up-to-date ids property."""
954 self._flush_notifications()
953 self._flush_notifications()
955 # always copy:
954 # always copy:
956 return list(self._ids)
955 return list(self._ids)
957
956
958 def activate(self, targets='all', suffix=''):
957 def activate(self, targets='all', suffix=''):
959 """Create a DirectView and register it with IPython magics
958 """Create a DirectView and register it with IPython magics
960
959
961 Defines the magics `%px, %autopx, %pxresult, %%px`
960 Defines the magics `%px, %autopx, %pxresult, %%px`
962
961
963 Parameters
962 Parameters
964 ----------
963 ----------
965
964
966 targets: int, list of ints, or 'all'
965 targets: int, list of ints, or 'all'
967 The engines on which the view's magics will run
966 The engines on which the view's magics will run
968 suffix: str [default: '']
967 suffix: str [default: '']
969 The suffix, if any, for the magics. This allows you to have
968 The suffix, if any, for the magics. This allows you to have
970 multiple views associated with parallel magics at the same time.
969 multiple views associated with parallel magics at the same time.
971
970
972 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
971 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
973 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
972 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
974 on engine 0.
973 on engine 0.
975 """
974 """
976 view = self.direct_view(targets)
975 view = self.direct_view(targets)
977 view.block = True
976 view.block = True
978 view.activate(suffix)
977 view.activate(suffix)
979 return view
978 return view
980
979
981 def close(self, linger=None):
980 def close(self, linger=None):
982 """Close my zmq Sockets
981 """Close my zmq Sockets
983
982
984 If `linger`, set the zmq LINGER socket option,
983 If `linger`, set the zmq LINGER socket option,
985 which allows discarding of messages.
984 which allows discarding of messages.
986 """
985 """
987 if self._closed:
986 if self._closed:
988 return
987 return
989 self.stop_spin_thread()
988 self.stop_spin_thread()
990 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
989 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
991 for name in snames:
990 for name in snames:
992 socket = getattr(self, name)
991 socket = getattr(self, name)
993 if socket is not None and not socket.closed:
992 if socket is not None and not socket.closed:
994 if linger is not None:
993 if linger is not None:
995 socket.close(linger=linger)
994 socket.close(linger=linger)
996 else:
995 else:
997 socket.close()
996 socket.close()
998 self._closed = True
997 self._closed = True
999
998
1000 def _spin_every(self, interval=1):
999 def _spin_every(self, interval=1):
1001 """target func for use in spin_thread"""
1000 """target func for use in spin_thread"""
1002 while True:
1001 while True:
1003 if self._stop_spinning.is_set():
1002 if self._stop_spinning.is_set():
1004 return
1003 return
1005 time.sleep(interval)
1004 time.sleep(interval)
1006 self.spin()
1005 self.spin()
1007
1006
1008 def spin_thread(self, interval=1):
1007 def spin_thread(self, interval=1):
1009 """call Client.spin() in a background thread on some regular interval
1008 """call Client.spin() in a background thread on some regular interval
1010
1009
1011 This helps ensure that messages don't pile up too much in the zmq queue
1010 This helps ensure that messages don't pile up too much in the zmq queue
1012 while you are working on other things, or just leaving an idle terminal.
1011 while you are working on other things, or just leaving an idle terminal.
1013
1012
1014 It also helps limit potential padding of the `received` timestamp
1013 It also helps limit potential padding of the `received` timestamp
1015 on AsyncResult objects, used for timings.
1014 on AsyncResult objects, used for timings.
1016
1015
1017 Parameters
1016 Parameters
1018 ----------
1017 ----------
1019
1018
1020 interval : float, optional
1019 interval : float, optional
1021 The interval on which to spin the client in the background thread
1020 The interval on which to spin the client in the background thread
1022 (simply passed to time.sleep).
1021 (simply passed to time.sleep).
1023
1022
1024 Notes
1023 Notes
1025 -----
1024 -----
1026
1025
1027 For precision timing, you may want to use this method to put a bound
1026 For precision timing, you may want to use this method to put a bound
1028 on the jitter (in seconds) in `received` timestamps used
1027 on the jitter (in seconds) in `received` timestamps used
1029 in AsyncResult.wall_time.
1028 in AsyncResult.wall_time.
1030
1029
1031 """
1030 """
1032 if self._spin_thread is not None:
1031 if self._spin_thread is not None:
1033 self.stop_spin_thread()
1032 self.stop_spin_thread()
1034 self._stop_spinning.clear()
1033 self._stop_spinning.clear()
1035 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1034 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1036 self._spin_thread.daemon = True
1035 self._spin_thread.daemon = True
1037 self._spin_thread.start()
1036 self._spin_thread.start()
1038
1037
1039 def stop_spin_thread(self):
1038 def stop_spin_thread(self):
1040 """stop background spin_thread, if any"""
1039 """stop background spin_thread, if any"""
1041 if self._spin_thread is not None:
1040 if self._spin_thread is not None:
1042 self._stop_spinning.set()
1041 self._stop_spinning.set()
1043 self._spin_thread.join()
1042 self._spin_thread.join()
1044 self._spin_thread = None
1043 self._spin_thread = None
1045
1044
1046 def spin(self):
1045 def spin(self):
1047 """Flush any registration notifications and execution results
1046 """Flush any registration notifications and execution results
1048 waiting in the ZMQ queue.
1047 waiting in the ZMQ queue.
1049 """
1048 """
1050 if self._notification_socket:
1049 if self._notification_socket:
1051 self._flush_notifications()
1050 self._flush_notifications()
1052 if self._iopub_socket:
1051 if self._iopub_socket:
1053 self._flush_iopub(self._iopub_socket)
1052 self._flush_iopub(self._iopub_socket)
1054 if self._mux_socket:
1053 if self._mux_socket:
1055 self._flush_results(self._mux_socket)
1054 self._flush_results(self._mux_socket)
1056 if self._task_socket:
1055 if self._task_socket:
1057 self._flush_results(self._task_socket)
1056 self._flush_results(self._task_socket)
1058 if self._control_socket:
1057 if self._control_socket:
1059 self._flush_control(self._control_socket)
1058 self._flush_control(self._control_socket)
1060 if self._query_socket:
1059 if self._query_socket:
1061 self._flush_ignored_hub_replies()
1060 self._flush_ignored_hub_replies()
1062
1061
1063 def wait(self, jobs=None, timeout=-1):
1062 def wait(self, jobs=None, timeout=-1):
1064 """waits on one or more `jobs`, for up to `timeout` seconds.
1063 """waits on one or more `jobs`, for up to `timeout` seconds.
1065
1064
1066 Parameters
1065 Parameters
1067 ----------
1066 ----------
1068
1067
1069 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1068 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1070 ints are indices to self.history
1069 ints are indices to self.history
1071 strs are msg_ids
1070 strs are msg_ids
1072 default: wait on all outstanding messages
1071 default: wait on all outstanding messages
1073 timeout : float
1072 timeout : float
1074 a time in seconds, after which to give up.
1073 a time in seconds, after which to give up.
1075 default is -1, which means no timeout
1074 default is -1, which means no timeout
1076
1075
1077 Returns
1076 Returns
1078 -------
1077 -------
1079
1078
1080 True : when all msg_ids are done
1079 True : when all msg_ids are done
1081 False : timeout reached, some msg_ids still outstanding
1080 False : timeout reached, some msg_ids still outstanding
1082 """
1081 """
1083 tic = time.time()
1082 tic = time.time()
1084 if jobs is None:
1083 if jobs is None:
1085 theids = self.outstanding
1084 theids = self.outstanding
1086 else:
1085 else:
1087 if isinstance(jobs, string_types + (int, AsyncResult)):
1086 if isinstance(jobs, string_types + (int, AsyncResult)):
1088 jobs = [jobs]
1087 jobs = [jobs]
1089 theids = set()
1088 theids = set()
1090 for job in jobs:
1089 for job in jobs:
1091 if isinstance(job, int):
1090 if isinstance(job, int):
1092 # index access
1091 # index access
1093 job = self.history[job]
1092 job = self.history[job]
1094 elif isinstance(job, AsyncResult):
1093 elif isinstance(job, AsyncResult):
1095 theids.update(job.msg_ids)
1094 theids.update(job.msg_ids)
1096 continue
1095 continue
1097 theids.add(job)
1096 theids.add(job)
1098 if not theids.intersection(self.outstanding):
1097 if not theids.intersection(self.outstanding):
1099 return True
1098 return True
1100 self.spin()
1099 self.spin()
1101 while theids.intersection(self.outstanding):
1100 while theids.intersection(self.outstanding):
1102 if timeout >= 0 and ( time.time()-tic ) > timeout:
1101 if timeout >= 0 and ( time.time()-tic ) > timeout:
1103 break
1102 break
1104 time.sleep(1e-3)
1103 time.sleep(1e-3)
1105 self.spin()
1104 self.spin()
1106 return len(theids.intersection(self.outstanding)) == 0
1105 return len(theids.intersection(self.outstanding)) == 0
1107
1106
1108 #--------------------------------------------------------------------------
1107 #--------------------------------------------------------------------------
1109 # Control methods
1108 # Control methods
1110 #--------------------------------------------------------------------------
1109 #--------------------------------------------------------------------------
1111
1110
1112 @spin_first
1111 @spin_first
1113 def clear(self, targets=None, block=None):
1112 def clear(self, targets=None, block=None):
1114 """Clear the namespace in target(s)."""
1113 """Clear the namespace in target(s)."""
1115 block = self.block if block is None else block
1114 block = self.block if block is None else block
1116 targets = self._build_targets(targets)[0]
1115 targets = self._build_targets(targets)[0]
1117 for t in targets:
1116 for t in targets:
1118 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1117 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1119 error = False
1118 error = False
1120 if block:
1119 if block:
1121 self._flush_ignored_control()
1120 self._flush_ignored_control()
1122 for i in range(len(targets)):
1121 for i in range(len(targets)):
1123 idents,msg = self.session.recv(self._control_socket,0)
1122 idents,msg = self.session.recv(self._control_socket,0)
1124 if self.debug:
1123 if self.debug:
1125 pprint(msg)
1124 pprint(msg)
1126 if msg['content']['status'] != 'ok':
1125 if msg['content']['status'] != 'ok':
1127 error = self._unwrap_exception(msg['content'])
1126 error = self._unwrap_exception(msg['content'])
1128 else:
1127 else:
1129 self._ignored_control_replies += len(targets)
1128 self._ignored_control_replies += len(targets)
1130 if error:
1129 if error:
1131 raise error
1130 raise error
1132
1131
1133
1132
1134 @spin_first
1133 @spin_first
1135 def abort(self, jobs=None, targets=None, block=None):
1134 def abort(self, jobs=None, targets=None, block=None):
1136 """Abort specific jobs from the execution queues of target(s).
1135 """Abort specific jobs from the execution queues of target(s).
1137
1136
1138 This is a mechanism to prevent jobs that have already been submitted
1137 This is a mechanism to prevent jobs that have already been submitted
1139 from executing.
1138 from executing.
1140
1139
1141 Parameters
1140 Parameters
1142 ----------
1141 ----------
1143
1142
1144 jobs : msg_id, list of msg_ids, or AsyncResult
1143 jobs : msg_id, list of msg_ids, or AsyncResult
1145 The jobs to be aborted
1144 The jobs to be aborted
1146
1145
1147 If unspecified/None: abort all outstanding jobs.
1146 If unspecified/None: abort all outstanding jobs.
1148
1147
1149 """
1148 """
1150 block = self.block if block is None else block
1149 block = self.block if block is None else block
1151 jobs = jobs if jobs is not None else list(self.outstanding)
1150 jobs = jobs if jobs is not None else list(self.outstanding)
1152 targets = self._build_targets(targets)[0]
1151 targets = self._build_targets(targets)[0]
1153
1152
1154 msg_ids = []
1153 msg_ids = []
1155 if isinstance(jobs, string_types + (AsyncResult,)):
1154 if isinstance(jobs, string_types + (AsyncResult,)):
1156 jobs = [jobs]
1155 jobs = [jobs]
1157 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1156 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1158 if bad_ids:
1157 if bad_ids:
1159 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1158 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1160 for j in jobs:
1159 for j in jobs:
1161 if isinstance(j, AsyncResult):
1160 if isinstance(j, AsyncResult):
1162 msg_ids.extend(j.msg_ids)
1161 msg_ids.extend(j.msg_ids)
1163 else:
1162 else:
1164 msg_ids.append(j)
1163 msg_ids.append(j)
1165 content = dict(msg_ids=msg_ids)
1164 content = dict(msg_ids=msg_ids)
1166 for t in targets:
1165 for t in targets:
1167 self.session.send(self._control_socket, 'abort_request',
1166 self.session.send(self._control_socket, 'abort_request',
1168 content=content, ident=t)
1167 content=content, ident=t)
1169 error = False
1168 error = False
1170 if block:
1169 if block:
1171 self._flush_ignored_control()
1170 self._flush_ignored_control()
1172 for i in range(len(targets)):
1171 for i in range(len(targets)):
1173 idents,msg = self.session.recv(self._control_socket,0)
1172 idents,msg = self.session.recv(self._control_socket,0)
1174 if self.debug:
1173 if self.debug:
1175 pprint(msg)
1174 pprint(msg)
1176 if msg['content']['status'] != 'ok':
1175 if msg['content']['status'] != 'ok':
1177 error = self._unwrap_exception(msg['content'])
1176 error = self._unwrap_exception(msg['content'])
1178 else:
1177 else:
1179 self._ignored_control_replies += len(targets)
1178 self._ignored_control_replies += len(targets)
1180 if error:
1179 if error:
1181 raise error
1180 raise error
1182
1181
1183 @spin_first
1182 @spin_first
1184 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1183 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1185 """Terminates one or more engine processes, optionally including the hub.
1184 """Terminates one or more engine processes, optionally including the hub.
1186
1185
1187 Parameters
1186 Parameters
1188 ----------
1187 ----------
1189
1188
1190 targets: list of ints or 'all' [default: all]
1189 targets: list of ints or 'all' [default: all]
1191 Which engines to shutdown.
1190 Which engines to shutdown.
1192 hub: bool [default: False]
1191 hub: bool [default: False]
1193 Whether to include the Hub. hub=True implies targets='all'.
1192 Whether to include the Hub. hub=True implies targets='all'.
1194 block: bool [default: self.block]
1193 block: bool [default: self.block]
1195 Whether to wait for clean shutdown replies or not.
1194 Whether to wait for clean shutdown replies or not.
1196 restart: bool [default: False]
1195 restart: bool [default: False]
1197 NOT IMPLEMENTED
1196 NOT IMPLEMENTED
1198 whether to restart engines after shutting them down.
1197 whether to restart engines after shutting them down.
1199 """
1198 """
1200 from ipython_parallel.error import NoEnginesRegistered
1199 from ipython_parallel.error import NoEnginesRegistered
1201 if restart:
1200 if restart:
1202 raise NotImplementedError("Engine restart is not yet implemented")
1201 raise NotImplementedError("Engine restart is not yet implemented")
1203
1202
1204 block = self.block if block is None else block
1203 block = self.block if block is None else block
1205 if hub:
1204 if hub:
1206 targets = 'all'
1205 targets = 'all'
1207 try:
1206 try:
1208 targets = self._build_targets(targets)[0]
1207 targets = self._build_targets(targets)[0]
1209 except NoEnginesRegistered:
1208 except NoEnginesRegistered:
1210 targets = []
1209 targets = []
1211 for t in targets:
1210 for t in targets:
1212 self.session.send(self._control_socket, 'shutdown_request',
1211 self.session.send(self._control_socket, 'shutdown_request',
1213 content={'restart':restart},ident=t)
1212 content={'restart':restart},ident=t)
1214 error = False
1213 error = False
1215 if block or hub:
1214 if block or hub:
1216 self._flush_ignored_control()
1215 self._flush_ignored_control()
1217 for i in range(len(targets)):
1216 for i in range(len(targets)):
1218 idents,msg = self.session.recv(self._control_socket, 0)
1217 idents,msg = self.session.recv(self._control_socket, 0)
1219 if self.debug:
1218 if self.debug:
1220 pprint(msg)
1219 pprint(msg)
1221 if msg['content']['status'] != 'ok':
1220 if msg['content']['status'] != 'ok':
1222 error = self._unwrap_exception(msg['content'])
1221 error = self._unwrap_exception(msg['content'])
1223 else:
1222 else:
1224 self._ignored_control_replies += len(targets)
1223 self._ignored_control_replies += len(targets)
1225
1224
1226 if hub:
1225 if hub:
1227 time.sleep(0.25)
1226 time.sleep(0.25)
1228 self.session.send(self._query_socket, 'shutdown_request')
1227 self.session.send(self._query_socket, 'shutdown_request')
1229 idents,msg = self.session.recv(self._query_socket, 0)
1228 idents,msg = self.session.recv(self._query_socket, 0)
1230 if self.debug:
1229 if self.debug:
1231 pprint(msg)
1230 pprint(msg)
1232 if msg['content']['status'] != 'ok':
1231 if msg['content']['status'] != 'ok':
1233 error = self._unwrap_exception(msg['content'])
1232 error = self._unwrap_exception(msg['content'])
1234
1233
1235 if error:
1234 if error:
1236 raise error
1235 raise error
1237
1236
1238 #--------------------------------------------------------------------------
1237 #--------------------------------------------------------------------------
1239 # Execution related methods
1238 # Execution related methods
1240 #--------------------------------------------------------------------------
1239 #--------------------------------------------------------------------------
1241
1240
1242 def _maybe_raise(self, result):
1241 def _maybe_raise(self, result):
1243 """wrapper for maybe raising an exception if apply failed."""
1242 """wrapper for maybe raising an exception if apply failed."""
1244 if isinstance(result, error.RemoteError):
1243 if isinstance(result, error.RemoteError):
1245 raise result
1244 raise result
1246
1245
1247 return result
1246 return result
1248
1247
1249 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1248 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1250 ident=None):
1249 ident=None):
1251 """construct and send an apply message via a socket.
1250 """construct and send an apply message via a socket.
1252
1251
1253 This is the principal method with which all engine execution is performed by views.
1252 This is the principal method with which all engine execution is performed by views.
1254 """
1253 """
1255
1254
1256 if self._closed:
1255 if self._closed:
1257 raise RuntimeError("Client cannot be used after its sockets have been closed")
1256 raise RuntimeError("Client cannot be used after its sockets have been closed")
1258
1257
1259 # defaults:
1258 # defaults:
1260 args = args if args is not None else []
1259 args = args if args is not None else []
1261 kwargs = kwargs if kwargs is not None else {}
1260 kwargs = kwargs if kwargs is not None else {}
1262 metadata = metadata if metadata is not None else {}
1261 metadata = metadata if metadata is not None else {}
1263
1262
1264 # validate arguments
1263 # validate arguments
1265 if not callable(f) and not isinstance(f, Reference):
1264 if not callable(f) and not isinstance(f, Reference):
1266 raise TypeError("f must be callable, not %s"%type(f))
1265 raise TypeError("f must be callable, not %s"%type(f))
1267 if not isinstance(args, (tuple, list)):
1266 if not isinstance(args, (tuple, list)):
1268 raise TypeError("args must be tuple or list, not %s"%type(args))
1267 raise TypeError("args must be tuple or list, not %s"%type(args))
1269 if not isinstance(kwargs, dict):
1268 if not isinstance(kwargs, dict):
1270 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1269 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1271 if not isinstance(metadata, dict):
1270 if not isinstance(metadata, dict):
1272 raise TypeError("metadata must be dict, not %s"%type(metadata))
1271 raise TypeError("metadata must be dict, not %s"%type(metadata))
1273
1272
1274 bufs = serialize.pack_apply_message(f, args, kwargs,
1273 bufs = serialize.pack_apply_message(f, args, kwargs,
1275 buffer_threshold=self.session.buffer_threshold,
1274 buffer_threshold=self.session.buffer_threshold,
1276 item_threshold=self.session.item_threshold,
1275 item_threshold=self.session.item_threshold,
1277 )
1276 )
1278
1277
1279 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1278 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1280 metadata=metadata, track=track)
1279 metadata=metadata, track=track)
1281
1280
1282 msg_id = msg['header']['msg_id']
1281 msg_id = msg['header']['msg_id']
1283 self.outstanding.add(msg_id)
1282 self.outstanding.add(msg_id)
1284 if ident:
1283 if ident:
1285 # possibly routed to a specific engine
1284 # possibly routed to a specific engine
1286 if isinstance(ident, list):
1285 if isinstance(ident, list):
1287 ident = ident[-1]
1286 ident = ident[-1]
1288 if ident in self._engines.values():
1287 if ident in self._engines.values():
1289 # save for later, in case of engine death
1288 # save for later, in case of engine death
1290 self._outstanding_dict[ident].add(msg_id)
1289 self._outstanding_dict[ident].add(msg_id)
1291 self.history.append(msg_id)
1290 self.history.append(msg_id)
1292 self.metadata[msg_id]['submitted'] = datetime.now()
1291 self.metadata[msg_id]['submitted'] = datetime.now()
1293
1292
1294 return msg
1293 return msg
1295
1294
1296 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1295 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1297 """construct and send an execute request via a socket.
1296 """construct and send an execute request via a socket.
1298
1297
1299 """
1298 """
1300
1299
1301 if self._closed:
1300 if self._closed:
1302 raise RuntimeError("Client cannot be used after its sockets have been closed")
1301 raise RuntimeError("Client cannot be used after its sockets have been closed")
1303
1302
1304 # defaults:
1303 # defaults:
1305 metadata = metadata if metadata is not None else {}
1304 metadata = metadata if metadata is not None else {}
1306
1305
1307 # validate arguments
1306 # validate arguments
1308 if not isinstance(code, string_types):
1307 if not isinstance(code, string_types):
1309 raise TypeError("code must be text, not %s" % type(code))
1308 raise TypeError("code must be text, not %s" % type(code))
1310 if not isinstance(metadata, dict):
1309 if not isinstance(metadata, dict):
1311 raise TypeError("metadata must be dict, not %s" % type(metadata))
1310 raise TypeError("metadata must be dict, not %s" % type(metadata))
1312
1311
1313 content = dict(code=code, silent=bool(silent), user_expressions={})
1312 content = dict(code=code, silent=bool(silent), user_expressions={})
1314
1313
1315
1314
1316 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1315 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1317 metadata=metadata)
1316 metadata=metadata)
1318
1317
1319 msg_id = msg['header']['msg_id']
1318 msg_id = msg['header']['msg_id']
1320 self.outstanding.add(msg_id)
1319 self.outstanding.add(msg_id)
1321 if ident:
1320 if ident:
1322 # possibly routed to a specific engine
1321 # possibly routed to a specific engine
1323 if isinstance(ident, list):
1322 if isinstance(ident, list):
1324 ident = ident[-1]
1323 ident = ident[-1]
1325 if ident in self._engines.values():
1324 if ident in self._engines.values():
1326 # save for later, in case of engine death
1325 # save for later, in case of engine death
1327 self._outstanding_dict[ident].add(msg_id)
1326 self._outstanding_dict[ident].add(msg_id)
1328 self.history.append(msg_id)
1327 self.history.append(msg_id)
1329 self.metadata[msg_id]['submitted'] = datetime.now()
1328 self.metadata[msg_id]['submitted'] = datetime.now()
1330
1329
1331 return msg
1330 return msg
1332
1331
1333 #--------------------------------------------------------------------------
1332 #--------------------------------------------------------------------------
1334 # construct a View object
1333 # construct a View object
1335 #--------------------------------------------------------------------------
1334 #--------------------------------------------------------------------------
1336
1335
1337 def load_balanced_view(self, targets=None):
1336 def load_balanced_view(self, targets=None):
1338 """construct a DirectView object.
1337 """construct a DirectView object.
1339
1338
1340 If no arguments are specified, create a LoadBalancedView
1339 If no arguments are specified, create a LoadBalancedView
1341 using all engines.
1340 using all engines.
1342
1341
1343 Parameters
1342 Parameters
1344 ----------
1343 ----------
1345
1344
1346 targets: list,slice,int,etc. [default: use all engines]
1345 targets: list,slice,int,etc. [default: use all engines]
1347 The subset of engines across which to load-balance
1346 The subset of engines across which to load-balance
1348 """
1347 """
1349 if targets == 'all':
1348 if targets == 'all':
1350 targets = None
1349 targets = None
1351 if targets is not None:
1350 if targets is not None:
1352 targets = self._build_targets(targets)[1]
1351 targets = self._build_targets(targets)[1]
1353 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1352 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1354
1353
1355 def direct_view(self, targets='all'):
1354 def direct_view(self, targets='all'):
1356 """construct a DirectView object.
1355 """construct a DirectView object.
1357
1356
1358 If no targets are specified, create a DirectView using all engines.
1357 If no targets are specified, create a DirectView using all engines.
1359
1358
1360 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1359 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1361 evaluate the target engines at each execution, whereas rc[:] will connect to
1360 evaluate the target engines at each execution, whereas rc[:] will connect to
1362 all *current* engines, and that list will not change.
1361 all *current* engines, and that list will not change.
1363
1362
1364 That is, 'all' will always use all engines, whereas rc[:] will not use
1363 That is, 'all' will always use all engines, whereas rc[:] will not use
1365 engines added after the DirectView is constructed.
1364 engines added after the DirectView is constructed.
1366
1365
1367 Parameters
1366 Parameters
1368 ----------
1367 ----------
1369
1368
1370 targets: list,slice,int,etc. [default: use all engines]
1369 targets: list,slice,int,etc. [default: use all engines]
1371 The engines to use for the View
1370 The engines to use for the View
1372 """
1371 """
1373 single = isinstance(targets, int)
1372 single = isinstance(targets, int)
1374 # allow 'all' to be lazily evaluated at each execution
1373 # allow 'all' to be lazily evaluated at each execution
1375 if targets != 'all':
1374 if targets != 'all':
1376 targets = self._build_targets(targets)[1]
1375 targets = self._build_targets(targets)[1]
1377 if single:
1376 if single:
1378 targets = targets[0]
1377 targets = targets[0]
1379 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1378 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1380
1379
1381 #--------------------------------------------------------------------------
1380 #--------------------------------------------------------------------------
1382 # Query methods
1381 # Query methods
1383 #--------------------------------------------------------------------------
1382 #--------------------------------------------------------------------------
1384
1383
1385 @spin_first
1384 @spin_first
1386 def get_result(self, indices_or_msg_ids=None, block=None, owner=True):
1385 def get_result(self, indices_or_msg_ids=None, block=None, owner=True):
1387 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1386 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1388
1387
1389 If the client already has the results, no request to the Hub will be made.
1388 If the client already has the results, no request to the Hub will be made.
1390
1389
1391 This is a convenient way to construct AsyncResult objects, which are wrappers
1390 This is a convenient way to construct AsyncResult objects, which are wrappers
1392 that include metadata about execution, and allow for awaiting results that
1391 that include metadata about execution, and allow for awaiting results that
1393 were not submitted by this Client.
1392 were not submitted by this Client.
1394
1393
1395 It can also be a convenient way to retrieve the metadata associated with
1394 It can also be a convenient way to retrieve the metadata associated with
1396 blocking execution, since it always retrieves
1395 blocking execution, since it always retrieves
1397
1396
1398 Examples
1397 Examples
1399 --------
1398 --------
1400 ::
1399 ::
1401
1400
1402 In [10]: r = client.apply()
1401 In [10]: r = client.apply()
1403
1402
1404 Parameters
1403 Parameters
1405 ----------
1404 ----------
1406
1405
1407 indices_or_msg_ids : integer history index, str msg_id, or list of either
1406 indices_or_msg_ids : integer history index, str msg_id, or list of either
1408 The indices or msg_ids of indices to be retrieved
1407 The indices or msg_ids of indices to be retrieved
1409
1408
1410 block : bool
1409 block : bool
1411 Whether to wait for the result to be done
1410 Whether to wait for the result to be done
1412 owner : bool [default: True]
1411 owner : bool [default: True]
1413 Whether this AsyncResult should own the result.
1412 Whether this AsyncResult should own the result.
1414 If so, calling `ar.get()` will remove data from the
1413 If so, calling `ar.get()` will remove data from the
1415 client's result and metadata cache.
1414 client's result and metadata cache.
1416 There should only be one owner of any given msg_id.
1415 There should only be one owner of any given msg_id.
1417
1416
1418 Returns
1417 Returns
1419 -------
1418 -------
1420
1419
1421 AsyncResult
1420 AsyncResult
1422 A single AsyncResult object will always be returned.
1421 A single AsyncResult object will always be returned.
1423
1422
1424 AsyncHubResult
1423 AsyncHubResult
1425 A subclass of AsyncResult that retrieves results from the Hub
1424 A subclass of AsyncResult that retrieves results from the Hub
1426
1425
1427 """
1426 """
1428 block = self.block if block is None else block
1427 block = self.block if block is None else block
1429 if indices_or_msg_ids is None:
1428 if indices_or_msg_ids is None:
1430 indices_or_msg_ids = -1
1429 indices_or_msg_ids = -1
1431
1430
1432 single_result = False
1431 single_result = False
1433 if not isinstance(indices_or_msg_ids, (list,tuple)):
1432 if not isinstance(indices_or_msg_ids, (list,tuple)):
1434 indices_or_msg_ids = [indices_or_msg_ids]
1433 indices_or_msg_ids = [indices_or_msg_ids]
1435 single_result = True
1434 single_result = True
1436
1435
1437 theids = []
1436 theids = []
1438 for id in indices_or_msg_ids:
1437 for id in indices_or_msg_ids:
1439 if isinstance(id, int):
1438 if isinstance(id, int):
1440 id = self.history[id]
1439 id = self.history[id]
1441 if not isinstance(id, string_types):
1440 if not isinstance(id, string_types):
1442 raise TypeError("indices must be str or int, not %r"%id)
1441 raise TypeError("indices must be str or int, not %r"%id)
1443 theids.append(id)
1442 theids.append(id)
1444
1443
1445 local_ids = [msg_id for msg_id in theids if (msg_id in self.outstanding or msg_id in self.results)]
1444 local_ids = [msg_id for msg_id in theids if (msg_id in self.outstanding or msg_id in self.results)]
1446 remote_ids = [msg_id for msg_id in theids if msg_id not in local_ids]
1445 remote_ids = [msg_id for msg_id in theids if msg_id not in local_ids]
1447
1446
1448 # given single msg_id initially, get_result shot get the result itself,
1447 # given single msg_id initially, get_result shot get the result itself,
1449 # not a length-one list
1448 # not a length-one list
1450 if single_result:
1449 if single_result:
1451 theids = theids[0]
1450 theids = theids[0]
1452
1451
1453 if remote_ids:
1452 if remote_ids:
1454 ar = AsyncHubResult(self, msg_ids=theids, owner=owner)
1453 ar = AsyncHubResult(self, msg_ids=theids, owner=owner)
1455 else:
1454 else:
1456 ar = AsyncResult(self, msg_ids=theids, owner=owner)
1455 ar = AsyncResult(self, msg_ids=theids, owner=owner)
1457
1456
1458 if block:
1457 if block:
1459 ar.wait()
1458 ar.wait()
1460
1459
1461 return ar
1460 return ar
1462
1461
1463 @spin_first
1462 @spin_first
1464 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1463 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1465 """Resubmit one or more tasks.
1464 """Resubmit one or more tasks.
1466
1465
1467 in-flight tasks may not be resubmitted.
1466 in-flight tasks may not be resubmitted.
1468
1467
1469 Parameters
1468 Parameters
1470 ----------
1469 ----------
1471
1470
1472 indices_or_msg_ids : integer history index, str msg_id, or list of either
1471 indices_or_msg_ids : integer history index, str msg_id, or list of either
1473 The indices or msg_ids of indices to be retrieved
1472 The indices or msg_ids of indices to be retrieved
1474
1473
1475 block : bool
1474 block : bool
1476 Whether to wait for the result to be done
1475 Whether to wait for the result to be done
1477
1476
1478 Returns
1477 Returns
1479 -------
1478 -------
1480
1479
1481 AsyncHubResult
1480 AsyncHubResult
1482 A subclass of AsyncResult that retrieves results from the Hub
1481 A subclass of AsyncResult that retrieves results from the Hub
1483
1482
1484 """
1483 """
1485 block = self.block if block is None else block
1484 block = self.block if block is None else block
1486 if indices_or_msg_ids is None:
1485 if indices_or_msg_ids is None:
1487 indices_or_msg_ids = -1
1486 indices_or_msg_ids = -1
1488
1487
1489 if not isinstance(indices_or_msg_ids, (list,tuple)):
1488 if not isinstance(indices_or_msg_ids, (list,tuple)):
1490 indices_or_msg_ids = [indices_or_msg_ids]
1489 indices_or_msg_ids = [indices_or_msg_ids]
1491
1490
1492 theids = []
1491 theids = []
1493 for id in indices_or_msg_ids:
1492 for id in indices_or_msg_ids:
1494 if isinstance(id, int):
1493 if isinstance(id, int):
1495 id = self.history[id]
1494 id = self.history[id]
1496 if not isinstance(id, string_types):
1495 if not isinstance(id, string_types):
1497 raise TypeError("indices must be str or int, not %r"%id)
1496 raise TypeError("indices must be str or int, not %r"%id)
1498 theids.append(id)
1497 theids.append(id)
1499
1498
1500 content = dict(msg_ids = theids)
1499 content = dict(msg_ids = theids)
1501
1500
1502 self.session.send(self._query_socket, 'resubmit_request', content)
1501 self.session.send(self._query_socket, 'resubmit_request', content)
1503
1502
1504 zmq.select([self._query_socket], [], [])
1503 zmq.select([self._query_socket], [], [])
1505 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1504 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1506 if self.debug:
1505 if self.debug:
1507 pprint(msg)
1506 pprint(msg)
1508 content = msg['content']
1507 content = msg['content']
1509 if content['status'] != 'ok':
1508 if content['status'] != 'ok':
1510 raise self._unwrap_exception(content)
1509 raise self._unwrap_exception(content)
1511 mapping = content['resubmitted']
1510 mapping = content['resubmitted']
1512 new_ids = [ mapping[msg_id] for msg_id in theids ]
1511 new_ids = [ mapping[msg_id] for msg_id in theids ]
1513
1512
1514 ar = AsyncHubResult(self, msg_ids=new_ids)
1513 ar = AsyncHubResult(self, msg_ids=new_ids)
1515
1514
1516 if block:
1515 if block:
1517 ar.wait()
1516 ar.wait()
1518
1517
1519 return ar
1518 return ar
1520
1519
1521 @spin_first
1520 @spin_first
1522 def result_status(self, msg_ids, status_only=True):
1521 def result_status(self, msg_ids, status_only=True):
1523 """Check on the status of the result(s) of the apply request with `msg_ids`.
1522 """Check on the status of the result(s) of the apply request with `msg_ids`.
1524
1523
1525 If status_only is False, then the actual results will be retrieved, else
1524 If status_only is False, then the actual results will be retrieved, else
1526 only the status of the results will be checked.
1525 only the status of the results will be checked.
1527
1526
1528 Parameters
1527 Parameters
1529 ----------
1528 ----------
1530
1529
1531 msg_ids : list of msg_ids
1530 msg_ids : list of msg_ids
1532 if int:
1531 if int:
1533 Passed as index to self.history for convenience.
1532 Passed as index to self.history for convenience.
1534 status_only : bool (default: True)
1533 status_only : bool (default: True)
1535 if False:
1534 if False:
1536 Retrieve the actual results of completed tasks.
1535 Retrieve the actual results of completed tasks.
1537
1536
1538 Returns
1537 Returns
1539 -------
1538 -------
1540
1539
1541 results : dict
1540 results : dict
1542 There will always be the keys 'pending' and 'completed', which will
1541 There will always be the keys 'pending' and 'completed', which will
1543 be lists of msg_ids that are incomplete or complete. If `status_only`
1542 be lists of msg_ids that are incomplete or complete. If `status_only`
1544 is False, then completed results will be keyed by their `msg_id`.
1543 is False, then completed results will be keyed by their `msg_id`.
1545 """
1544 """
1546 if not isinstance(msg_ids, (list,tuple)):
1545 if not isinstance(msg_ids, (list,tuple)):
1547 msg_ids = [msg_ids]
1546 msg_ids = [msg_ids]
1548
1547
1549 theids = []
1548 theids = []
1550 for msg_id in msg_ids:
1549 for msg_id in msg_ids:
1551 if isinstance(msg_id, int):
1550 if isinstance(msg_id, int):
1552 msg_id = self.history[msg_id]
1551 msg_id = self.history[msg_id]
1553 if not isinstance(msg_id, string_types):
1552 if not isinstance(msg_id, string_types):
1554 raise TypeError("msg_ids must be str, not %r"%msg_id)
1553 raise TypeError("msg_ids must be str, not %r"%msg_id)
1555 theids.append(msg_id)
1554 theids.append(msg_id)
1556
1555
1557 completed = []
1556 completed = []
1558 local_results = {}
1557 local_results = {}
1559
1558
1560 # comment this block out to temporarily disable local shortcut:
1559 # comment this block out to temporarily disable local shortcut:
1561 for msg_id in theids:
1560 for msg_id in theids:
1562 if msg_id in self.results:
1561 if msg_id in self.results:
1563 completed.append(msg_id)
1562 completed.append(msg_id)
1564 local_results[msg_id] = self.results[msg_id]
1563 local_results[msg_id] = self.results[msg_id]
1565 theids.remove(msg_id)
1564 theids.remove(msg_id)
1566
1565
1567 if theids: # some not locally cached
1566 if theids: # some not locally cached
1568 content = dict(msg_ids=theids, status_only=status_only)
1567 content = dict(msg_ids=theids, status_only=status_only)
1569 msg = self.session.send(self._query_socket, "result_request", content=content)
1568 msg = self.session.send(self._query_socket, "result_request", content=content)
1570 zmq.select([self._query_socket], [], [])
1569 zmq.select([self._query_socket], [], [])
1571 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1570 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1572 if self.debug:
1571 if self.debug:
1573 pprint(msg)
1572 pprint(msg)
1574 content = msg['content']
1573 content = msg['content']
1575 if content['status'] != 'ok':
1574 if content['status'] != 'ok':
1576 raise self._unwrap_exception(content)
1575 raise self._unwrap_exception(content)
1577 buffers = msg['buffers']
1576 buffers = msg['buffers']
1578 else:
1577 else:
1579 content = dict(completed=[],pending=[])
1578 content = dict(completed=[],pending=[])
1580
1579
1581 content['completed'].extend(completed)
1580 content['completed'].extend(completed)
1582
1581
1583 if status_only:
1582 if status_only:
1584 return content
1583 return content
1585
1584
1586 failures = []
1585 failures = []
1587 # load cached results into result:
1586 # load cached results into result:
1588 content.update(local_results)
1587 content.update(local_results)
1589
1588
1590 # update cache with results:
1589 # update cache with results:
1591 for msg_id in sorted(theids):
1590 for msg_id in sorted(theids):
1592 if msg_id in content['completed']:
1591 if msg_id in content['completed']:
1593 rec = content[msg_id]
1592 rec = content[msg_id]
1594 parent = extract_dates(rec['header'])
1593 parent = extract_dates(rec['header'])
1595 header = extract_dates(rec['result_header'])
1594 header = extract_dates(rec['result_header'])
1596 rcontent = rec['result_content']
1595 rcontent = rec['result_content']
1597 iodict = rec['io']
1596 iodict = rec['io']
1598 if isinstance(rcontent, str):
1597 if isinstance(rcontent, str):
1599 rcontent = self.session.unpack(rcontent)
1598 rcontent = self.session.unpack(rcontent)
1600
1599
1601 md = self.metadata[msg_id]
1600 md = self.metadata[msg_id]
1602 md_msg = dict(
1601 md_msg = dict(
1603 content=rcontent,
1602 content=rcontent,
1604 parent_header=parent,
1603 parent_header=parent,
1605 header=header,
1604 header=header,
1606 metadata=rec['result_metadata'],
1605 metadata=rec['result_metadata'],
1607 )
1606 )
1608 md.update(self._extract_metadata(md_msg))
1607 md.update(self._extract_metadata(md_msg))
1609 if rec.get('received'):
1608 if rec.get('received'):
1610 md['received'] = parse_date(rec['received'])
1609 md['received'] = parse_date(rec['received'])
1611 md.update(iodict)
1610 md.update(iodict)
1612
1611
1613 if rcontent['status'] == 'ok':
1612 if rcontent['status'] == 'ok':
1614 if header['msg_type'] == 'apply_reply':
1613 if header['msg_type'] == 'apply_reply':
1615 res,buffers = serialize.deserialize_object(buffers)
1614 res,buffers = serialize.deserialize_object(buffers)
1616 elif header['msg_type'] == 'execute_reply':
1615 elif header['msg_type'] == 'execute_reply':
1617 res = ExecuteReply(msg_id, rcontent, md)
1616 res = ExecuteReply(msg_id, rcontent, md)
1618 else:
1617 else:
1619 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1618 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1620 else:
1619 else:
1621 res = self._unwrap_exception(rcontent)
1620 res = self._unwrap_exception(rcontent)
1622 failures.append(res)
1621 failures.append(res)
1623
1622
1624 self.results[msg_id] = res
1623 self.results[msg_id] = res
1625 content[msg_id] = res
1624 content[msg_id] = res
1626
1625
1627 if len(theids) == 1 and failures:
1626 if len(theids) == 1 and failures:
1628 raise failures[0]
1627 raise failures[0]
1629
1628
1630 error.collect_exceptions(failures, "result_status")
1629 error.collect_exceptions(failures, "result_status")
1631 return content
1630 return content
1632
1631
1633 @spin_first
1632 @spin_first
1634 def queue_status(self, targets='all', verbose=False):
1633 def queue_status(self, targets='all', verbose=False):
1635 """Fetch the status of engine queues.
1634 """Fetch the status of engine queues.
1636
1635
1637 Parameters
1636 Parameters
1638 ----------
1637 ----------
1639
1638
1640 targets : int/str/list of ints/strs
1639 targets : int/str/list of ints/strs
1641 the engines whose states are to be queried.
1640 the engines whose states are to be queried.
1642 default : all
1641 default : all
1643 verbose : bool
1642 verbose : bool
1644 Whether to return lengths only, or lists of ids for each element
1643 Whether to return lengths only, or lists of ids for each element
1645 """
1644 """
1646 if targets == 'all':
1645 if targets == 'all':
1647 # allow 'all' to be evaluated on the engine
1646 # allow 'all' to be evaluated on the engine
1648 engine_ids = None
1647 engine_ids = None
1649 else:
1648 else:
1650 engine_ids = self._build_targets(targets)[1]
1649 engine_ids = self._build_targets(targets)[1]
1651 content = dict(targets=engine_ids, verbose=verbose)
1650 content = dict(targets=engine_ids, verbose=verbose)
1652 self.session.send(self._query_socket, "queue_request", content=content)
1651 self.session.send(self._query_socket, "queue_request", content=content)
1653 idents,msg = self.session.recv(self._query_socket, 0)
1652 idents,msg = self.session.recv(self._query_socket, 0)
1654 if self.debug:
1653 if self.debug:
1655 pprint(msg)
1654 pprint(msg)
1656 content = msg['content']
1655 content = msg['content']
1657 status = content.pop('status')
1656 status = content.pop('status')
1658 if status != 'ok':
1657 if status != 'ok':
1659 raise self._unwrap_exception(content)
1658 raise self._unwrap_exception(content)
1660 content = rekey(content)
1659 content = rekey(content)
1661 if isinstance(targets, int):
1660 if isinstance(targets, int):
1662 return content[targets]
1661 return content[targets]
1663 else:
1662 else:
1664 return content
1663 return content
1665
1664
1666 def _build_msgids_from_target(self, targets=None):
1665 def _build_msgids_from_target(self, targets=None):
1667 """Build a list of msg_ids from the list of engine targets"""
1666 """Build a list of msg_ids from the list of engine targets"""
1668 if not targets: # needed as _build_targets otherwise uses all engines
1667 if not targets: # needed as _build_targets otherwise uses all engines
1669 return []
1668 return []
1670 target_ids = self._build_targets(targets)[0]
1669 target_ids = self._build_targets(targets)[0]
1671 return [md_id for md_id in self.metadata if self.metadata[md_id]["engine_uuid"] in target_ids]
1670 return [md_id for md_id in self.metadata if self.metadata[md_id]["engine_uuid"] in target_ids]
1672
1671
1673 def _build_msgids_from_jobs(self, jobs=None):
1672 def _build_msgids_from_jobs(self, jobs=None):
1674 """Build a list of msg_ids from "jobs" """
1673 """Build a list of msg_ids from "jobs" """
1675 if not jobs:
1674 if not jobs:
1676 return []
1675 return []
1677 msg_ids = []
1676 msg_ids = []
1678 if isinstance(jobs, string_types + (AsyncResult,)):
1677 if isinstance(jobs, string_types + (AsyncResult,)):
1679 jobs = [jobs]
1678 jobs = [jobs]
1680 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1679 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1681 if bad_ids:
1680 if bad_ids:
1682 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1681 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1683 for j in jobs:
1682 for j in jobs:
1684 if isinstance(j, AsyncResult):
1683 if isinstance(j, AsyncResult):
1685 msg_ids.extend(j.msg_ids)
1684 msg_ids.extend(j.msg_ids)
1686 else:
1685 else:
1687 msg_ids.append(j)
1686 msg_ids.append(j)
1688 return msg_ids
1687 return msg_ids
1689
1688
1690 def purge_local_results(self, jobs=[], targets=[]):
1689 def purge_local_results(self, jobs=[], targets=[]):
1691 """Clears the client caches of results and their metadata.
1690 """Clears the client caches of results and their metadata.
1692
1691
1693 Individual results can be purged by msg_id, or the entire
1692 Individual results can be purged by msg_id, or the entire
1694 history of specific targets can be purged.
1693 history of specific targets can be purged.
1695
1694
1696 Use `purge_local_results('all')` to scrub everything from the Clients's
1695 Use `purge_local_results('all')` to scrub everything from the Clients's
1697 results and metadata caches.
1696 results and metadata caches.
1698
1697
1699 After this call all `AsyncResults` are invalid and should be discarded.
1698 After this call all `AsyncResults` are invalid and should be discarded.
1700
1699
1701 If you must "reget" the results, you can still do so by using
1700 If you must "reget" the results, you can still do so by using
1702 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1701 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1703 redownload the results from the hub if they are still available
1702 redownload the results from the hub if they are still available
1704 (i.e `client.purge_hub_results(...)` has not been called.
1703 (i.e `client.purge_hub_results(...)` has not been called.
1705
1704
1706 Parameters
1705 Parameters
1707 ----------
1706 ----------
1708
1707
1709 jobs : str or list of str or AsyncResult objects
1708 jobs : str or list of str or AsyncResult objects
1710 the msg_ids whose results should be purged.
1709 the msg_ids whose results should be purged.
1711 targets : int/list of ints
1710 targets : int/list of ints
1712 The engines, by integer ID, whose entire result histories are to be purged.
1711 The engines, by integer ID, whose entire result histories are to be purged.
1713
1712
1714 Raises
1713 Raises
1715 ------
1714 ------
1716
1715
1717 RuntimeError : if any of the tasks to be purged are still outstanding.
1716 RuntimeError : if any of the tasks to be purged are still outstanding.
1718
1717
1719 """
1718 """
1720 if not targets and not jobs:
1719 if not targets and not jobs:
1721 raise ValueError("Must specify at least one of `targets` and `jobs`")
1720 raise ValueError("Must specify at least one of `targets` and `jobs`")
1722
1721
1723 if jobs == 'all':
1722 if jobs == 'all':
1724 if self.outstanding:
1723 if self.outstanding:
1725 raise RuntimeError("Can't purge outstanding tasks: %s" % self.outstanding)
1724 raise RuntimeError("Can't purge outstanding tasks: %s" % self.outstanding)
1726 self.results.clear()
1725 self.results.clear()
1727 self.metadata.clear()
1726 self.metadata.clear()
1728 else:
1727 else:
1729 msg_ids = set()
1728 msg_ids = set()
1730 msg_ids.update(self._build_msgids_from_target(targets))
1729 msg_ids.update(self._build_msgids_from_target(targets))
1731 msg_ids.update(self._build_msgids_from_jobs(jobs))
1730 msg_ids.update(self._build_msgids_from_jobs(jobs))
1732 still_outstanding = self.outstanding.intersection(msg_ids)
1731 still_outstanding = self.outstanding.intersection(msg_ids)
1733 if still_outstanding:
1732 if still_outstanding:
1734 raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding)
1733 raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding)
1735 for mid in msg_ids:
1734 for mid in msg_ids:
1736 self.results.pop(mid, None)
1735 self.results.pop(mid, None)
1737 self.metadata.pop(mid, None)
1736 self.metadata.pop(mid, None)
1738
1737
1739
1738
1740 @spin_first
1739 @spin_first
1741 def purge_hub_results(self, jobs=[], targets=[]):
1740 def purge_hub_results(self, jobs=[], targets=[]):
1742 """Tell the Hub to forget results.
1741 """Tell the Hub to forget results.
1743
1742
1744 Individual results can be purged by msg_id, or the entire
1743 Individual results can be purged by msg_id, or the entire
1745 history of specific targets can be purged.
1744 history of specific targets can be purged.
1746
1745
1747 Use `purge_results('all')` to scrub everything from the Hub's db.
1746 Use `purge_results('all')` to scrub everything from the Hub's db.
1748
1747
1749 Parameters
1748 Parameters
1750 ----------
1749 ----------
1751
1750
1752 jobs : str or list of str or AsyncResult objects
1751 jobs : str or list of str or AsyncResult objects
1753 the msg_ids whose results should be forgotten.
1752 the msg_ids whose results should be forgotten.
1754 targets : int/str/list of ints/strs
1753 targets : int/str/list of ints/strs
1755 The targets, by int_id, whose entire history is to be purged.
1754 The targets, by int_id, whose entire history is to be purged.
1756
1755
1757 default : None
1756 default : None
1758 """
1757 """
1759 if not targets and not jobs:
1758 if not targets and not jobs:
1760 raise ValueError("Must specify at least one of `targets` and `jobs`")
1759 raise ValueError("Must specify at least one of `targets` and `jobs`")
1761 if targets:
1760 if targets:
1762 targets = self._build_targets(targets)[1]
1761 targets = self._build_targets(targets)[1]
1763
1762
1764 # construct msg_ids from jobs
1763 # construct msg_ids from jobs
1765 if jobs == 'all':
1764 if jobs == 'all':
1766 msg_ids = jobs
1765 msg_ids = jobs
1767 else:
1766 else:
1768 msg_ids = self._build_msgids_from_jobs(jobs)
1767 msg_ids = self._build_msgids_from_jobs(jobs)
1769
1768
1770 content = dict(engine_ids=targets, msg_ids=msg_ids)
1769 content = dict(engine_ids=targets, msg_ids=msg_ids)
1771 self.session.send(self._query_socket, "purge_request", content=content)
1770 self.session.send(self._query_socket, "purge_request", content=content)
1772 idents, msg = self.session.recv(self._query_socket, 0)
1771 idents, msg = self.session.recv(self._query_socket, 0)
1773 if self.debug:
1772 if self.debug:
1774 pprint(msg)
1773 pprint(msg)
1775 content = msg['content']
1774 content = msg['content']
1776 if content['status'] != 'ok':
1775 if content['status'] != 'ok':
1777 raise self._unwrap_exception(content)
1776 raise self._unwrap_exception(content)
1778
1777
1779 def purge_results(self, jobs=[], targets=[]):
1778 def purge_results(self, jobs=[], targets=[]):
1780 """Clears the cached results from both the hub and the local client
1779 """Clears the cached results from both the hub and the local client
1781
1780
1782 Individual results can be purged by msg_id, or the entire
1781 Individual results can be purged by msg_id, or the entire
1783 history of specific targets can be purged.
1782 history of specific targets can be purged.
1784
1783
1785 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1784 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1786 the Client's db.
1785 the Client's db.
1787
1786
1788 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1787 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1789 the same arguments.
1788 the same arguments.
1790
1789
1791 Parameters
1790 Parameters
1792 ----------
1791 ----------
1793
1792
1794 jobs : str or list of str or AsyncResult objects
1793 jobs : str or list of str or AsyncResult objects
1795 the msg_ids whose results should be forgotten.
1794 the msg_ids whose results should be forgotten.
1796 targets : int/str/list of ints/strs
1795 targets : int/str/list of ints/strs
1797 The targets, by int_id, whose entire history is to be purged.
1796 The targets, by int_id, whose entire history is to be purged.
1798
1797
1799 default : None
1798 default : None
1800 """
1799 """
1801 self.purge_local_results(jobs=jobs, targets=targets)
1800 self.purge_local_results(jobs=jobs, targets=targets)
1802 self.purge_hub_results(jobs=jobs, targets=targets)
1801 self.purge_hub_results(jobs=jobs, targets=targets)
1803
1802
1804 def purge_everything(self):
1803 def purge_everything(self):
1805 """Clears all content from previous Tasks from both the hub and the local client
1804 """Clears all content from previous Tasks from both the hub and the local client
1806
1805
1807 In addition to calling `purge_results("all")` it also deletes the history and
1806 In addition to calling `purge_results("all")` it also deletes the history and
1808 other bookkeeping lists.
1807 other bookkeeping lists.
1809 """
1808 """
1810 self.purge_results("all")
1809 self.purge_results("all")
1811 self.history = []
1810 self.history = []
1812 self.session.digest_history.clear()
1811 self.session.digest_history.clear()
1813
1812
1814 @spin_first
1813 @spin_first
1815 def hub_history(self):
1814 def hub_history(self):
1816 """Get the Hub's history
1815 """Get the Hub's history
1817
1816
1818 Just like the Client, the Hub has a history, which is a list of msg_ids.
1817 Just like the Client, the Hub has a history, which is a list of msg_ids.
1819 This will contain the history of all clients, and, depending on configuration,
1818 This will contain the history of all clients, and, depending on configuration,
1820 may contain history across multiple cluster sessions.
1819 may contain history across multiple cluster sessions.
1821
1820
1822 Any msg_id returned here is a valid argument to `get_result`.
1821 Any msg_id returned here is a valid argument to `get_result`.
1823
1822
1824 Returns
1823 Returns
1825 -------
1824 -------
1826
1825
1827 msg_ids : list of strs
1826 msg_ids : list of strs
1828 list of all msg_ids, ordered by task submission time.
1827 list of all msg_ids, ordered by task submission time.
1829 """
1828 """
1830
1829
1831 self.session.send(self._query_socket, "history_request", content={})
1830 self.session.send(self._query_socket, "history_request", content={})
1832 idents, msg = self.session.recv(self._query_socket, 0)
1831 idents, msg = self.session.recv(self._query_socket, 0)
1833
1832
1834 if self.debug:
1833 if self.debug:
1835 pprint(msg)
1834 pprint(msg)
1836 content = msg['content']
1835 content = msg['content']
1837 if content['status'] != 'ok':
1836 if content['status'] != 'ok':
1838 raise self._unwrap_exception(content)
1837 raise self._unwrap_exception(content)
1839 else:
1838 else:
1840 return content['history']
1839 return content['history']
1841
1840
1842 @spin_first
1841 @spin_first
1843 def db_query(self, query, keys=None):
1842 def db_query(self, query, keys=None):
1844 """Query the Hub's TaskRecord database
1843 """Query the Hub's TaskRecord database
1845
1844
1846 This will return a list of task record dicts that match `query`
1845 This will return a list of task record dicts that match `query`
1847
1846
1848 Parameters
1847 Parameters
1849 ----------
1848 ----------
1850
1849
1851 query : mongodb query dict
1850 query : mongodb query dict
1852 The search dict. See mongodb query docs for details.
1851 The search dict. See mongodb query docs for details.
1853 keys : list of strs [optional]
1852 keys : list of strs [optional]
1854 The subset of keys to be returned. The default is to fetch everything but buffers.
1853 The subset of keys to be returned. The default is to fetch everything but buffers.
1855 'msg_id' will *always* be included.
1854 'msg_id' will *always* be included.
1856 """
1855 """
1857 if isinstance(keys, string_types):
1856 if isinstance(keys, string_types):
1858 keys = [keys]
1857 keys = [keys]
1859 content = dict(query=query, keys=keys)
1858 content = dict(query=query, keys=keys)
1860 self.session.send(self._query_socket, "db_request", content=content)
1859 self.session.send(self._query_socket, "db_request", content=content)
1861 idents, msg = self.session.recv(self._query_socket, 0)
1860 idents, msg = self.session.recv(self._query_socket, 0)
1862 if self.debug:
1861 if self.debug:
1863 pprint(msg)
1862 pprint(msg)
1864 content = msg['content']
1863 content = msg['content']
1865 if content['status'] != 'ok':
1864 if content['status'] != 'ok':
1866 raise self._unwrap_exception(content)
1865 raise self._unwrap_exception(content)
1867
1866
1868 records = content['records']
1867 records = content['records']
1869
1868
1870 buffer_lens = content['buffer_lens']
1869 buffer_lens = content['buffer_lens']
1871 result_buffer_lens = content['result_buffer_lens']
1870 result_buffer_lens = content['result_buffer_lens']
1872 buffers = msg['buffers']
1871 buffers = msg['buffers']
1873 has_bufs = buffer_lens is not None
1872 has_bufs = buffer_lens is not None
1874 has_rbufs = result_buffer_lens is not None
1873 has_rbufs = result_buffer_lens is not None
1875 for i,rec in enumerate(records):
1874 for i,rec in enumerate(records):
1876 # unpack datetime objects
1875 # unpack datetime objects
1877 for hkey in ('header', 'result_header'):
1876 for hkey in ('header', 'result_header'):
1878 if hkey in rec:
1877 if hkey in rec:
1879 rec[hkey] = extract_dates(rec[hkey])
1878 rec[hkey] = extract_dates(rec[hkey])
1880 for dtkey in ('submitted', 'started', 'completed', 'received'):
1879 for dtkey in ('submitted', 'started', 'completed', 'received'):
1881 if dtkey in rec:
1880 if dtkey in rec:
1882 rec[dtkey] = parse_date(rec[dtkey])
1881 rec[dtkey] = parse_date(rec[dtkey])
1883 # relink buffers
1882 # relink buffers
1884 if has_bufs:
1883 if has_bufs:
1885 blen = buffer_lens[i]
1884 blen = buffer_lens[i]
1886 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1885 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1887 if has_rbufs:
1886 if has_rbufs:
1888 blen = result_buffer_lens[i]
1887 blen = result_buffer_lens[i]
1889 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1888 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1890
1889
1891 return records
1890 return records
1892
1891
1893 __all__ = [ 'Client' ]
1892 __all__ = [ 'Client' ]
@@ -1,1125 +1,1125 b''
1 """Views of remote engines."""
1 """Views of remote engines."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import print_function
6 from __future__ import print_function
7
7
8 import imp
8 import imp
9 import sys
9 import sys
10 import warnings
10 import warnings
11 from contextlib import contextmanager
11 from contextlib import contextmanager
12 from types import ModuleType
12 from types import ModuleType
13
13
14 import zmq
14 import zmq
15
15
16 from IPython.testing.skipdoctest import skip_doctest
16 from IPython.testing.skipdoctest import skip_doctest
17 from IPython.utils import pickleutil
17 from IPython.utils import pickleutil
18 from IPython.utils.traitlets import (
18 from IPython.utils.traitlets import (
19 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
19 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
20 )
20 )
21 from decorator import decorator
21 from decorator import decorator
22
22
23 from ipython_parallel import util
23 from ipython_parallel import util
24 from ipython_parallel.controller.dependency import Dependency, dependent
24 from ipython_parallel.controller.dependency import Dependency, dependent
25 from IPython.utils.py3compat import string_types, iteritems, PY3
25 from IPython.utils.py3compat import string_types, iteritems, PY3
26
26
27 from . import map as Map
27 from . import map as Map
28 from .asyncresult import AsyncResult, AsyncMapResult
28 from .asyncresult import AsyncResult, AsyncMapResult
29 from .remotefunction import ParallelFunction, parallel, remote, getname
29 from .remotefunction import ParallelFunction, parallel, remote, getname
30
30
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32 # Decorators
32 # Decorators
33 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
34
34
35 @decorator
35 @decorator
36 def save_ids(f, self, *args, **kwargs):
36 def save_ids(f, self, *args, **kwargs):
37 """Keep our history and outstanding attributes up to date after a method call."""
37 """Keep our history and outstanding attributes up to date after a method call."""
38 n_previous = len(self.client.history)
38 n_previous = len(self.client.history)
39 try:
39 try:
40 ret = f(self, *args, **kwargs)
40 ret = f(self, *args, **kwargs)
41 finally:
41 finally:
42 nmsgs = len(self.client.history) - n_previous
42 nmsgs = len(self.client.history) - n_previous
43 msg_ids = self.client.history[-nmsgs:]
43 msg_ids = self.client.history[-nmsgs:]
44 self.history.extend(msg_ids)
44 self.history.extend(msg_ids)
45 self.outstanding.update(msg_ids)
45 self.outstanding.update(msg_ids)
46 return ret
46 return ret
47
47
48 @decorator
48 @decorator
49 def sync_results(f, self, *args, **kwargs):
49 def sync_results(f, self, *args, **kwargs):
50 """sync relevant results from self.client to our results attribute."""
50 """sync relevant results from self.client to our results attribute."""
51 if self._in_sync_results:
51 if self._in_sync_results:
52 return f(self, *args, **kwargs)
52 return f(self, *args, **kwargs)
53 self._in_sync_results = True
53 self._in_sync_results = True
54 try:
54 try:
55 ret = f(self, *args, **kwargs)
55 ret = f(self, *args, **kwargs)
56 finally:
56 finally:
57 self._in_sync_results = False
57 self._in_sync_results = False
58 self._sync_results()
58 self._sync_results()
59 return ret
59 return ret
60
60
61 @decorator
61 @decorator
62 def spin_after(f, self, *args, **kwargs):
62 def spin_after(f, self, *args, **kwargs):
63 """call spin after the method."""
63 """call spin after the method."""
64 ret = f(self, *args, **kwargs)
64 ret = f(self, *args, **kwargs)
65 self.spin()
65 self.spin()
66 return ret
66 return ret
67
67
68 #-----------------------------------------------------------------------------
68 #-----------------------------------------------------------------------------
69 # Classes
69 # Classes
70 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
71
71
72 @skip_doctest
72 @skip_doctest
73 class View(HasTraits):
73 class View(HasTraits):
74 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
74 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
75
75
76 Don't use this class, use subclasses.
76 Don't use this class, use subclasses.
77
77
78 Methods
78 Methods
79 -------
79 -------
80
80
81 spin
81 spin
82 flushes incoming results and registration state changes
82 flushes incoming results and registration state changes
83 control methods spin, and requesting `ids` also ensures up to date
83 control methods spin, and requesting `ids` also ensures up to date
84
84
85 wait
85 wait
86 wait on one or more msg_ids
86 wait on one or more msg_ids
87
87
88 execution methods
88 execution methods
89 apply
89 apply
90 legacy: execute, run
90 legacy: execute, run
91
91
92 data movement
92 data movement
93 push, pull, scatter, gather
93 push, pull, scatter, gather
94
94
95 query methods
95 query methods
96 get_result, queue_status, purge_results, result_status
96 get_result, queue_status, purge_results, result_status
97
97
98 control methods
98 control methods
99 abort, shutdown
99 abort, shutdown
100
100
101 """
101 """
102 # flags
102 # flags
103 block=Bool(False)
103 block=Bool(False)
104 track=Bool(True)
104 track=Bool(True)
105 targets = Any()
105 targets = Any()
106
106
107 history=List()
107 history=List()
108 outstanding = Set()
108 outstanding = Set()
109 results = Dict()
109 results = Dict()
110 client = Instance('ipython_parallel.Client', allow_none=True)
110 client = Instance('ipython_parallel.Client', allow_none=True)
111
111
112 _socket = Instance('zmq.Socket')
112 _socket = Instance('zmq.Socket', allow_none=True)
113 _flag_names = List(['targets', 'block', 'track'])
113 _flag_names = List(['targets', 'block', 'track'])
114 _in_sync_results = Bool(False)
114 _in_sync_results = Bool(False)
115 _targets = Any()
115 _targets = Any()
116 _idents = Any()
116 _idents = Any()
117
117
118 def __init__(self, client=None, socket=None, **flags):
118 def __init__(self, client=None, socket=None, **flags):
119 super(View, self).__init__(client=client, _socket=socket)
119 super(View, self).__init__(client=client, _socket=socket)
120 self.results = client.results
120 self.results = client.results
121 self.block = client.block
121 self.block = client.block
122
122
123 self.set_flags(**flags)
123 self.set_flags(**flags)
124
124
125 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
125 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
126
126
127 def __repr__(self):
127 def __repr__(self):
128 strtargets = str(self.targets)
128 strtargets = str(self.targets)
129 if len(strtargets) > 16:
129 if len(strtargets) > 16:
130 strtargets = strtargets[:12]+'...]'
130 strtargets = strtargets[:12]+'...]'
131 return "<%s %s>"%(self.__class__.__name__, strtargets)
131 return "<%s %s>"%(self.__class__.__name__, strtargets)
132
132
133 def __len__(self):
133 def __len__(self):
134 if isinstance(self.targets, list):
134 if isinstance(self.targets, list):
135 return len(self.targets)
135 return len(self.targets)
136 elif isinstance(self.targets, int):
136 elif isinstance(self.targets, int):
137 return 1
137 return 1
138 else:
138 else:
139 return len(self.client)
139 return len(self.client)
140
140
141 def set_flags(self, **kwargs):
141 def set_flags(self, **kwargs):
142 """set my attribute flags by keyword.
142 """set my attribute flags by keyword.
143
143
144 Views determine behavior with a few attributes (`block`, `track`, etc.).
144 Views determine behavior with a few attributes (`block`, `track`, etc.).
145 These attributes can be set all at once by name with this method.
145 These attributes can be set all at once by name with this method.
146
146
147 Parameters
147 Parameters
148 ----------
148 ----------
149
149
150 block : bool
150 block : bool
151 whether to wait for results
151 whether to wait for results
152 track : bool
152 track : bool
153 whether to create a MessageTracker to allow the user to
153 whether to create a MessageTracker to allow the user to
154 safely edit after arrays and buffers during non-copying
154 safely edit after arrays and buffers during non-copying
155 sends.
155 sends.
156 """
156 """
157 for name, value in iteritems(kwargs):
157 for name, value in iteritems(kwargs):
158 if name not in self._flag_names:
158 if name not in self._flag_names:
159 raise KeyError("Invalid name: %r"%name)
159 raise KeyError("Invalid name: %r"%name)
160 else:
160 else:
161 setattr(self, name, value)
161 setattr(self, name, value)
162
162
163 @contextmanager
163 @contextmanager
164 def temp_flags(self, **kwargs):
164 def temp_flags(self, **kwargs):
165 """temporarily set flags, for use in `with` statements.
165 """temporarily set flags, for use in `with` statements.
166
166
167 See set_flags for permanent setting of flags
167 See set_flags for permanent setting of flags
168
168
169 Examples
169 Examples
170 --------
170 --------
171
171
172 >>> view.track=False
172 >>> view.track=False
173 ...
173 ...
174 >>> with view.temp_flags(track=True):
174 >>> with view.temp_flags(track=True):
175 ... ar = view.apply(dostuff, my_big_array)
175 ... ar = view.apply(dostuff, my_big_array)
176 ... ar.tracker.wait() # wait for send to finish
176 ... ar.tracker.wait() # wait for send to finish
177 >>> view.track
177 >>> view.track
178 False
178 False
179
179
180 """
180 """
181 # preflight: save flags, and set temporaries
181 # preflight: save flags, and set temporaries
182 saved_flags = {}
182 saved_flags = {}
183 for f in self._flag_names:
183 for f in self._flag_names:
184 saved_flags[f] = getattr(self, f)
184 saved_flags[f] = getattr(self, f)
185 self.set_flags(**kwargs)
185 self.set_flags(**kwargs)
186 # yield to the with-statement block
186 # yield to the with-statement block
187 try:
187 try:
188 yield
188 yield
189 finally:
189 finally:
190 # postflight: restore saved flags
190 # postflight: restore saved flags
191 self.set_flags(**saved_flags)
191 self.set_flags(**saved_flags)
192
192
193
193
194 #----------------------------------------------------------------
194 #----------------------------------------------------------------
195 # apply
195 # apply
196 #----------------------------------------------------------------
196 #----------------------------------------------------------------
197
197
198 def _sync_results(self):
198 def _sync_results(self):
199 """to be called by @sync_results decorator
199 """to be called by @sync_results decorator
200
200
201 after submitting any tasks.
201 after submitting any tasks.
202 """
202 """
203 delta = self.outstanding.difference(self.client.outstanding)
203 delta = self.outstanding.difference(self.client.outstanding)
204 completed = self.outstanding.intersection(delta)
204 completed = self.outstanding.intersection(delta)
205 self.outstanding = self.outstanding.difference(completed)
205 self.outstanding = self.outstanding.difference(completed)
206
206
207 @sync_results
207 @sync_results
208 @save_ids
208 @save_ids
209 def _really_apply(self, f, args, kwargs, block=None, **options):
209 def _really_apply(self, f, args, kwargs, block=None, **options):
210 """wrapper for client.send_apply_request"""
210 """wrapper for client.send_apply_request"""
211 raise NotImplementedError("Implement in subclasses")
211 raise NotImplementedError("Implement in subclasses")
212
212
213 def apply(self, f, *args, **kwargs):
213 def apply(self, f, *args, **kwargs):
214 """calls ``f(*args, **kwargs)`` on remote engines, returning the result.
214 """calls ``f(*args, **kwargs)`` on remote engines, returning the result.
215
215
216 This method sets all apply flags via this View's attributes.
216 This method sets all apply flags via this View's attributes.
217
217
218 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult`
218 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult`
219 instance if ``self.block`` is False, otherwise the return value of
219 instance if ``self.block`` is False, otherwise the return value of
220 ``f(*args, **kwargs)``.
220 ``f(*args, **kwargs)``.
221 """
221 """
222 return self._really_apply(f, args, kwargs)
222 return self._really_apply(f, args, kwargs)
223
223
224 def apply_async(self, f, *args, **kwargs):
224 def apply_async(self, f, *args, **kwargs):
225 """calls ``f(*args, **kwargs)`` on remote engines in a nonblocking manner.
225 """calls ``f(*args, **kwargs)`` on remote engines in a nonblocking manner.
226
226
227 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult` instance.
227 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult` instance.
228 """
228 """
229 return self._really_apply(f, args, kwargs, block=False)
229 return self._really_apply(f, args, kwargs, block=False)
230
230
231 @spin_after
231 @spin_after
232 def apply_sync(self, f, *args, **kwargs):
232 def apply_sync(self, f, *args, **kwargs):
233 """calls ``f(*args, **kwargs)`` on remote engines in a blocking manner,
233 """calls ``f(*args, **kwargs)`` on remote engines in a blocking manner,
234 returning the result.
234 returning the result.
235 """
235 """
236 return self._really_apply(f, args, kwargs, block=True)
236 return self._really_apply(f, args, kwargs, block=True)
237
237
238 #----------------------------------------------------------------
238 #----------------------------------------------------------------
239 # wrappers for client and control methods
239 # wrappers for client and control methods
240 #----------------------------------------------------------------
240 #----------------------------------------------------------------
241 @sync_results
241 @sync_results
242 def spin(self):
242 def spin(self):
243 """spin the client, and sync"""
243 """spin the client, and sync"""
244 self.client.spin()
244 self.client.spin()
245
245
246 @sync_results
246 @sync_results
247 def wait(self, jobs=None, timeout=-1):
247 def wait(self, jobs=None, timeout=-1):
248 """waits on one or more `jobs`, for up to `timeout` seconds.
248 """waits on one or more `jobs`, for up to `timeout` seconds.
249
249
250 Parameters
250 Parameters
251 ----------
251 ----------
252
252
253 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
253 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
254 ints are indices to self.history
254 ints are indices to self.history
255 strs are msg_ids
255 strs are msg_ids
256 default: wait on all outstanding messages
256 default: wait on all outstanding messages
257 timeout : float
257 timeout : float
258 a time in seconds, after which to give up.
258 a time in seconds, after which to give up.
259 default is -1, which means no timeout
259 default is -1, which means no timeout
260
260
261 Returns
261 Returns
262 -------
262 -------
263
263
264 True : when all msg_ids are done
264 True : when all msg_ids are done
265 False : timeout reached, some msg_ids still outstanding
265 False : timeout reached, some msg_ids still outstanding
266 """
266 """
267 if jobs is None:
267 if jobs is None:
268 jobs = self.history
268 jobs = self.history
269 return self.client.wait(jobs, timeout)
269 return self.client.wait(jobs, timeout)
270
270
271 def abort(self, jobs=None, targets=None, block=None):
271 def abort(self, jobs=None, targets=None, block=None):
272 """Abort jobs on my engines.
272 """Abort jobs on my engines.
273
273
274 Parameters
274 Parameters
275 ----------
275 ----------
276
276
277 jobs : None, str, list of strs, optional
277 jobs : None, str, list of strs, optional
278 if None: abort all jobs.
278 if None: abort all jobs.
279 else: abort specific msg_id(s).
279 else: abort specific msg_id(s).
280 """
280 """
281 block = block if block is not None else self.block
281 block = block if block is not None else self.block
282 targets = targets if targets is not None else self.targets
282 targets = targets if targets is not None else self.targets
283 jobs = jobs if jobs is not None else list(self.outstanding)
283 jobs = jobs if jobs is not None else list(self.outstanding)
284
284
285 return self.client.abort(jobs=jobs, targets=targets, block=block)
285 return self.client.abort(jobs=jobs, targets=targets, block=block)
286
286
287 def queue_status(self, targets=None, verbose=False):
287 def queue_status(self, targets=None, verbose=False):
288 """Fetch the Queue status of my engines"""
288 """Fetch the Queue status of my engines"""
289 targets = targets if targets is not None else self.targets
289 targets = targets if targets is not None else self.targets
290 return self.client.queue_status(targets=targets, verbose=verbose)
290 return self.client.queue_status(targets=targets, verbose=verbose)
291
291
292 def purge_results(self, jobs=[], targets=[]):
292 def purge_results(self, jobs=[], targets=[]):
293 """Instruct the controller to forget specific results."""
293 """Instruct the controller to forget specific results."""
294 if targets is None or targets == 'all':
294 if targets is None or targets == 'all':
295 targets = self.targets
295 targets = self.targets
296 return self.client.purge_results(jobs=jobs, targets=targets)
296 return self.client.purge_results(jobs=jobs, targets=targets)
297
297
298 def shutdown(self, targets=None, restart=False, hub=False, block=None):
298 def shutdown(self, targets=None, restart=False, hub=False, block=None):
299 """Terminates one or more engine processes, optionally including the hub.
299 """Terminates one or more engine processes, optionally including the hub.
300 """
300 """
301 block = self.block if block is None else block
301 block = self.block if block is None else block
302 if targets is None or targets == 'all':
302 if targets is None or targets == 'all':
303 targets = self.targets
303 targets = self.targets
304 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
304 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
305
305
306 @spin_after
306 @spin_after
307 def get_result(self, indices_or_msg_ids=None, block=None, owner=True):
307 def get_result(self, indices_or_msg_ids=None, block=None, owner=True):
308 """return one or more results, specified by history index or msg_id.
308 """return one or more results, specified by history index or msg_id.
309
309
310 See :meth:`IPython.parallel.client.client.Client.get_result` for details.
310 See :meth:`IPython.parallel.client.client.Client.get_result` for details.
311 """
311 """
312
312
313 if indices_or_msg_ids is None:
313 if indices_or_msg_ids is None:
314 indices_or_msg_ids = -1
314 indices_or_msg_ids = -1
315 if isinstance(indices_or_msg_ids, int):
315 if isinstance(indices_or_msg_ids, int):
316 indices_or_msg_ids = self.history[indices_or_msg_ids]
316 indices_or_msg_ids = self.history[indices_or_msg_ids]
317 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
317 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
318 indices_or_msg_ids = list(indices_or_msg_ids)
318 indices_or_msg_ids = list(indices_or_msg_ids)
319 for i,index in enumerate(indices_or_msg_ids):
319 for i,index in enumerate(indices_or_msg_ids):
320 if isinstance(index, int):
320 if isinstance(index, int):
321 indices_or_msg_ids[i] = self.history[index]
321 indices_or_msg_ids[i] = self.history[index]
322 return self.client.get_result(indices_or_msg_ids, block=block, owner=owner)
322 return self.client.get_result(indices_or_msg_ids, block=block, owner=owner)
323
323
324 #-------------------------------------------------------------------
324 #-------------------------------------------------------------------
325 # Map
325 # Map
326 #-------------------------------------------------------------------
326 #-------------------------------------------------------------------
327
327
328 @sync_results
328 @sync_results
329 def map(self, f, *sequences, **kwargs):
329 def map(self, f, *sequences, **kwargs):
330 """override in subclasses"""
330 """override in subclasses"""
331 raise NotImplementedError
331 raise NotImplementedError
332
332
333 def map_async(self, f, *sequences, **kwargs):
333 def map_async(self, f, *sequences, **kwargs):
334 """Parallel version of builtin :func:`python:map`, using this view's engines.
334 """Parallel version of builtin :func:`python:map`, using this view's engines.
335
335
336 This is equivalent to ``map(...block=False)``.
336 This is equivalent to ``map(...block=False)``.
337
337
338 See `self.map` for details.
338 See `self.map` for details.
339 """
339 """
340 if 'block' in kwargs:
340 if 'block' in kwargs:
341 raise TypeError("map_async doesn't take a `block` keyword argument.")
341 raise TypeError("map_async doesn't take a `block` keyword argument.")
342 kwargs['block'] = False
342 kwargs['block'] = False
343 return self.map(f,*sequences,**kwargs)
343 return self.map(f,*sequences,**kwargs)
344
344
345 def map_sync(self, f, *sequences, **kwargs):
345 def map_sync(self, f, *sequences, **kwargs):
346 """Parallel version of builtin :func:`python:map`, using this view's engines.
346 """Parallel version of builtin :func:`python:map`, using this view's engines.
347
347
348 This is equivalent to ``map(...block=True)``.
348 This is equivalent to ``map(...block=True)``.
349
349
350 See `self.map` for details.
350 See `self.map` for details.
351 """
351 """
352 if 'block' in kwargs:
352 if 'block' in kwargs:
353 raise TypeError("map_sync doesn't take a `block` keyword argument.")
353 raise TypeError("map_sync doesn't take a `block` keyword argument.")
354 kwargs['block'] = True
354 kwargs['block'] = True
355 return self.map(f,*sequences,**kwargs)
355 return self.map(f,*sequences,**kwargs)
356
356
357 def imap(self, f, *sequences, **kwargs):
357 def imap(self, f, *sequences, **kwargs):
358 """Parallel version of :func:`itertools.imap`.
358 """Parallel version of :func:`itertools.imap`.
359
359
360 See `self.map` for details.
360 See `self.map` for details.
361
361
362 """
362 """
363
363
364 return iter(self.map_async(f,*sequences, **kwargs))
364 return iter(self.map_async(f,*sequences, **kwargs))
365
365
366 #-------------------------------------------------------------------
366 #-------------------------------------------------------------------
367 # Decorators
367 # Decorators
368 #-------------------------------------------------------------------
368 #-------------------------------------------------------------------
369
369
370 def remote(self, block=None, **flags):
370 def remote(self, block=None, **flags):
371 """Decorator for making a RemoteFunction"""
371 """Decorator for making a RemoteFunction"""
372 block = self.block if block is None else block
372 block = self.block if block is None else block
373 return remote(self, block=block, **flags)
373 return remote(self, block=block, **flags)
374
374
375 def parallel(self, dist='b', block=None, **flags):
375 def parallel(self, dist='b', block=None, **flags):
376 """Decorator for making a ParallelFunction"""
376 """Decorator for making a ParallelFunction"""
377 block = self.block if block is None else block
377 block = self.block if block is None else block
378 return parallel(self, dist=dist, block=block, **flags)
378 return parallel(self, dist=dist, block=block, **flags)
379
379
380 @skip_doctest
380 @skip_doctest
381 class DirectView(View):
381 class DirectView(View):
382 """Direct Multiplexer View of one or more engines.
382 """Direct Multiplexer View of one or more engines.
383
383
384 These are created via indexed access to a client:
384 These are created via indexed access to a client:
385
385
386 >>> dv_1 = client[1]
386 >>> dv_1 = client[1]
387 >>> dv_all = client[:]
387 >>> dv_all = client[:]
388 >>> dv_even = client[::2]
388 >>> dv_even = client[::2]
389 >>> dv_some = client[1:3]
389 >>> dv_some = client[1:3]
390
390
391 This object provides dictionary access to engine namespaces:
391 This object provides dictionary access to engine namespaces:
392
392
393 # push a=5:
393 # push a=5:
394 >>> dv['a'] = 5
394 >>> dv['a'] = 5
395 # pull 'foo':
395 # pull 'foo':
396 >>> dv['foo']
396 >>> dv['foo']
397
397
398 """
398 """
399
399
400 def __init__(self, client=None, socket=None, targets=None):
400 def __init__(self, client=None, socket=None, targets=None):
401 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
401 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
402
402
403 @property
403 @property
404 def importer(self):
404 def importer(self):
405 """sync_imports(local=True) as a property.
405 """sync_imports(local=True) as a property.
406
406
407 See sync_imports for details.
407 See sync_imports for details.
408
408
409 """
409 """
410 return self.sync_imports(True)
410 return self.sync_imports(True)
411
411
412 @contextmanager
412 @contextmanager
413 def sync_imports(self, local=True, quiet=False):
413 def sync_imports(self, local=True, quiet=False):
414 """Context Manager for performing simultaneous local and remote imports.
414 """Context Manager for performing simultaneous local and remote imports.
415
415
416 'import x as y' will *not* work. The 'as y' part will simply be ignored.
416 'import x as y' will *not* work. The 'as y' part will simply be ignored.
417
417
418 If `local=True`, then the package will also be imported locally.
418 If `local=True`, then the package will also be imported locally.
419
419
420 If `quiet=True`, no output will be produced when attempting remote
420 If `quiet=True`, no output will be produced when attempting remote
421 imports.
421 imports.
422
422
423 Note that remote-only (`local=False`) imports have not been implemented.
423 Note that remote-only (`local=False`) imports have not been implemented.
424
424
425 >>> with view.sync_imports():
425 >>> with view.sync_imports():
426 ... from numpy import recarray
426 ... from numpy import recarray
427 importing recarray from numpy on engine(s)
427 importing recarray from numpy on engine(s)
428
428
429 """
429 """
430 from IPython.utils.py3compat import builtin_mod
430 from IPython.utils.py3compat import builtin_mod
431 local_import = builtin_mod.__import__
431 local_import = builtin_mod.__import__
432 modules = set()
432 modules = set()
433 results = []
433 results = []
434 @util.interactive
434 @util.interactive
435 def remote_import(name, fromlist, level):
435 def remote_import(name, fromlist, level):
436 """the function to be passed to apply, that actually performs the import
436 """the function to be passed to apply, that actually performs the import
437 on the engine, and loads up the user namespace.
437 on the engine, and loads up the user namespace.
438 """
438 """
439 import sys
439 import sys
440 user_ns = globals()
440 user_ns = globals()
441 mod = __import__(name, fromlist=fromlist, level=level)
441 mod = __import__(name, fromlist=fromlist, level=level)
442 if fromlist:
442 if fromlist:
443 for key in fromlist:
443 for key in fromlist:
444 user_ns[key] = getattr(mod, key)
444 user_ns[key] = getattr(mod, key)
445 else:
445 else:
446 user_ns[name] = sys.modules[name]
446 user_ns[name] = sys.modules[name]
447
447
448 def view_import(name, globals={}, locals={}, fromlist=[], level=0):
448 def view_import(name, globals={}, locals={}, fromlist=[], level=0):
449 """the drop-in replacement for __import__, that optionally imports
449 """the drop-in replacement for __import__, that optionally imports
450 locally as well.
450 locally as well.
451 """
451 """
452 # don't override nested imports
452 # don't override nested imports
453 save_import = builtin_mod.__import__
453 save_import = builtin_mod.__import__
454 builtin_mod.__import__ = local_import
454 builtin_mod.__import__ = local_import
455
455
456 if imp.lock_held():
456 if imp.lock_held():
457 # this is a side-effect import, don't do it remotely, or even
457 # this is a side-effect import, don't do it remotely, or even
458 # ignore the local effects
458 # ignore the local effects
459 return local_import(name, globals, locals, fromlist, level)
459 return local_import(name, globals, locals, fromlist, level)
460
460
461 imp.acquire_lock()
461 imp.acquire_lock()
462 if local:
462 if local:
463 mod = local_import(name, globals, locals, fromlist, level)
463 mod = local_import(name, globals, locals, fromlist, level)
464 else:
464 else:
465 raise NotImplementedError("remote-only imports not yet implemented")
465 raise NotImplementedError("remote-only imports not yet implemented")
466 imp.release_lock()
466 imp.release_lock()
467
467
468 key = name+':'+','.join(fromlist or [])
468 key = name+':'+','.join(fromlist or [])
469 if level <= 0 and key not in modules:
469 if level <= 0 and key not in modules:
470 modules.add(key)
470 modules.add(key)
471 if not quiet:
471 if not quiet:
472 if fromlist:
472 if fromlist:
473 print("importing %s from %s on engine(s)"%(','.join(fromlist), name))
473 print("importing %s from %s on engine(s)"%(','.join(fromlist), name))
474 else:
474 else:
475 print("importing %s on engine(s)"%name)
475 print("importing %s on engine(s)"%name)
476 results.append(self.apply_async(remote_import, name, fromlist, level))
476 results.append(self.apply_async(remote_import, name, fromlist, level))
477 # restore override
477 # restore override
478 builtin_mod.__import__ = save_import
478 builtin_mod.__import__ = save_import
479
479
480 return mod
480 return mod
481
481
482 # override __import__
482 # override __import__
483 builtin_mod.__import__ = view_import
483 builtin_mod.__import__ = view_import
484 try:
484 try:
485 # enter the block
485 # enter the block
486 yield
486 yield
487 except ImportError:
487 except ImportError:
488 if local:
488 if local:
489 raise
489 raise
490 else:
490 else:
491 # ignore import errors if not doing local imports
491 # ignore import errors if not doing local imports
492 pass
492 pass
493 finally:
493 finally:
494 # always restore __import__
494 # always restore __import__
495 builtin_mod.__import__ = local_import
495 builtin_mod.__import__ = local_import
496
496
497 for r in results:
497 for r in results:
498 # raise possible remote ImportErrors here
498 # raise possible remote ImportErrors here
499 r.get()
499 r.get()
500
500
501 def use_dill(self):
501 def use_dill(self):
502 """Expand serialization support with dill
502 """Expand serialization support with dill
503
503
504 adds support for closures, etc.
504 adds support for closures, etc.
505
505
506 This calls IPython.utils.pickleutil.use_dill() here and on each engine.
506 This calls IPython.utils.pickleutil.use_dill() here and on each engine.
507 """
507 """
508 pickleutil.use_dill()
508 pickleutil.use_dill()
509 return self.apply(pickleutil.use_dill)
509 return self.apply(pickleutil.use_dill)
510
510
511 def use_cloudpickle(self):
511 def use_cloudpickle(self):
512 """Expand serialization support with cloudpickle.
512 """Expand serialization support with cloudpickle.
513 """
513 """
514 pickleutil.use_cloudpickle()
514 pickleutil.use_cloudpickle()
515 return self.apply(pickleutil.use_cloudpickle)
515 return self.apply(pickleutil.use_cloudpickle)
516
516
517
517
518 @sync_results
518 @sync_results
519 @save_ids
519 @save_ids
520 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
520 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
521 """calls f(*args, **kwargs) on remote engines, returning the result.
521 """calls f(*args, **kwargs) on remote engines, returning the result.
522
522
523 This method sets all of `apply`'s flags via this View's attributes.
523 This method sets all of `apply`'s flags via this View's attributes.
524
524
525 Parameters
525 Parameters
526 ----------
526 ----------
527
527
528 f : callable
528 f : callable
529
529
530 args : list [default: empty]
530 args : list [default: empty]
531
531
532 kwargs : dict [default: empty]
532 kwargs : dict [default: empty]
533
533
534 targets : target list [default: self.targets]
534 targets : target list [default: self.targets]
535 where to run
535 where to run
536 block : bool [default: self.block]
536 block : bool [default: self.block]
537 whether to block
537 whether to block
538 track : bool [default: self.track]
538 track : bool [default: self.track]
539 whether to ask zmq to track the message, for safe non-copying sends
539 whether to ask zmq to track the message, for safe non-copying sends
540
540
541 Returns
541 Returns
542 -------
542 -------
543
543
544 if self.block is False:
544 if self.block is False:
545 returns AsyncResult
545 returns AsyncResult
546 else:
546 else:
547 returns actual result of f(*args, **kwargs) on the engine(s)
547 returns actual result of f(*args, **kwargs) on the engine(s)
548 This will be a list of self.targets is also a list (even length 1), or
548 This will be a list of self.targets is also a list (even length 1), or
549 the single result if self.targets is an integer engine id
549 the single result if self.targets is an integer engine id
550 """
550 """
551 args = [] if args is None else args
551 args = [] if args is None else args
552 kwargs = {} if kwargs is None else kwargs
552 kwargs = {} if kwargs is None else kwargs
553 block = self.block if block is None else block
553 block = self.block if block is None else block
554 track = self.track if track is None else track
554 track = self.track if track is None else track
555 targets = self.targets if targets is None else targets
555 targets = self.targets if targets is None else targets
556
556
557 _idents, _targets = self.client._build_targets(targets)
557 _idents, _targets = self.client._build_targets(targets)
558 msg_ids = []
558 msg_ids = []
559 trackers = []
559 trackers = []
560 for ident in _idents:
560 for ident in _idents:
561 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
561 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
562 ident=ident)
562 ident=ident)
563 if track:
563 if track:
564 trackers.append(msg['tracker'])
564 trackers.append(msg['tracker'])
565 msg_ids.append(msg['header']['msg_id'])
565 msg_ids.append(msg['header']['msg_id'])
566 if isinstance(targets, int):
566 if isinstance(targets, int):
567 msg_ids = msg_ids[0]
567 msg_ids = msg_ids[0]
568 tracker = None if track is False else zmq.MessageTracker(*trackers)
568 tracker = None if track is False else zmq.MessageTracker(*trackers)
569 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets,
569 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets,
570 tracker=tracker, owner=True,
570 tracker=tracker, owner=True,
571 )
571 )
572 if block:
572 if block:
573 try:
573 try:
574 return ar.get()
574 return ar.get()
575 except KeyboardInterrupt:
575 except KeyboardInterrupt:
576 pass
576 pass
577 return ar
577 return ar
578
578
579
579
580 @sync_results
580 @sync_results
581 def map(self, f, *sequences, **kwargs):
581 def map(self, f, *sequences, **kwargs):
582 """``view.map(f, *sequences, block=self.block)`` => list|AsyncMapResult
582 """``view.map(f, *sequences, block=self.block)`` => list|AsyncMapResult
583
583
584 Parallel version of builtin `map`, using this View's `targets`.
584 Parallel version of builtin `map`, using this View's `targets`.
585
585
586 There will be one task per target, so work will be chunked
586 There will be one task per target, so work will be chunked
587 if the sequences are longer than `targets`.
587 if the sequences are longer than `targets`.
588
588
589 Results can be iterated as they are ready, but will become available in chunks.
589 Results can be iterated as they are ready, but will become available in chunks.
590
590
591 Parameters
591 Parameters
592 ----------
592 ----------
593
593
594 f : callable
594 f : callable
595 function to be mapped
595 function to be mapped
596 *sequences: one or more sequences of matching length
596 *sequences: one or more sequences of matching length
597 the sequences to be distributed and passed to `f`
597 the sequences to be distributed and passed to `f`
598 block : bool
598 block : bool
599 whether to wait for the result or not [default self.block]
599 whether to wait for the result or not [default self.block]
600
600
601 Returns
601 Returns
602 -------
602 -------
603
603
604
604
605 If block=False
605 If block=False
606 An :class:`~ipython_parallel.client.asyncresult.AsyncMapResult` instance.
606 An :class:`~ipython_parallel.client.asyncresult.AsyncMapResult` instance.
607 An object like AsyncResult, but which reassembles the sequence of results
607 An object like AsyncResult, but which reassembles the sequence of results
608 into a single list. AsyncMapResults can be iterated through before all
608 into a single list. AsyncMapResults can be iterated through before all
609 results are complete.
609 results are complete.
610 else
610 else
611 A list, the result of ``map(f,*sequences)``
611 A list, the result of ``map(f,*sequences)``
612 """
612 """
613
613
614 block = kwargs.pop('block', self.block)
614 block = kwargs.pop('block', self.block)
615 for k in kwargs.keys():
615 for k in kwargs.keys():
616 if k not in ['block', 'track']:
616 if k not in ['block', 'track']:
617 raise TypeError("invalid keyword arg, %r"%k)
617 raise TypeError("invalid keyword arg, %r"%k)
618
618
619 assert len(sequences) > 0, "must have some sequences to map onto!"
619 assert len(sequences) > 0, "must have some sequences to map onto!"
620 pf = ParallelFunction(self, f, block=block, **kwargs)
620 pf = ParallelFunction(self, f, block=block, **kwargs)
621 return pf.map(*sequences)
621 return pf.map(*sequences)
622
622
623 @sync_results
623 @sync_results
624 @save_ids
624 @save_ids
625 def execute(self, code, silent=True, targets=None, block=None):
625 def execute(self, code, silent=True, targets=None, block=None):
626 """Executes `code` on `targets` in blocking or nonblocking manner.
626 """Executes `code` on `targets` in blocking or nonblocking manner.
627
627
628 ``execute`` is always `bound` (affects engine namespace)
628 ``execute`` is always `bound` (affects engine namespace)
629
629
630 Parameters
630 Parameters
631 ----------
631 ----------
632
632
633 code : str
633 code : str
634 the code string to be executed
634 the code string to be executed
635 block : bool
635 block : bool
636 whether or not to wait until done to return
636 whether or not to wait until done to return
637 default: self.block
637 default: self.block
638 """
638 """
639 block = self.block if block is None else block
639 block = self.block if block is None else block
640 targets = self.targets if targets is None else targets
640 targets = self.targets if targets is None else targets
641
641
642 _idents, _targets = self.client._build_targets(targets)
642 _idents, _targets = self.client._build_targets(targets)
643 msg_ids = []
643 msg_ids = []
644 trackers = []
644 trackers = []
645 for ident in _idents:
645 for ident in _idents:
646 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
646 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
647 msg_ids.append(msg['header']['msg_id'])
647 msg_ids.append(msg['header']['msg_id'])
648 if isinstance(targets, int):
648 if isinstance(targets, int):
649 msg_ids = msg_ids[0]
649 msg_ids = msg_ids[0]
650 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets, owner=True)
650 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets, owner=True)
651 if block:
651 if block:
652 try:
652 try:
653 ar.get()
653 ar.get()
654 except KeyboardInterrupt:
654 except KeyboardInterrupt:
655 pass
655 pass
656 return ar
656 return ar
657
657
658 def run(self, filename, targets=None, block=None):
658 def run(self, filename, targets=None, block=None):
659 """Execute contents of `filename` on my engine(s).
659 """Execute contents of `filename` on my engine(s).
660
660
661 This simply reads the contents of the file and calls `execute`.
661 This simply reads the contents of the file and calls `execute`.
662
662
663 Parameters
663 Parameters
664 ----------
664 ----------
665
665
666 filename : str
666 filename : str
667 The path to the file
667 The path to the file
668 targets : int/str/list of ints/strs
668 targets : int/str/list of ints/strs
669 the engines on which to execute
669 the engines on which to execute
670 default : all
670 default : all
671 block : bool
671 block : bool
672 whether or not to wait until done
672 whether or not to wait until done
673 default: self.block
673 default: self.block
674
674
675 """
675 """
676 with open(filename, 'r') as f:
676 with open(filename, 'r') as f:
677 # add newline in case of trailing indented whitespace
677 # add newline in case of trailing indented whitespace
678 # which will cause SyntaxError
678 # which will cause SyntaxError
679 code = f.read()+'\n'
679 code = f.read()+'\n'
680 return self.execute(code, block=block, targets=targets)
680 return self.execute(code, block=block, targets=targets)
681
681
682 def update(self, ns):
682 def update(self, ns):
683 """update remote namespace with dict `ns`
683 """update remote namespace with dict `ns`
684
684
685 See `push` for details.
685 See `push` for details.
686 """
686 """
687 return self.push(ns, block=self.block, track=self.track)
687 return self.push(ns, block=self.block, track=self.track)
688
688
689 def push(self, ns, targets=None, block=None, track=None):
689 def push(self, ns, targets=None, block=None, track=None):
690 """update remote namespace with dict `ns`
690 """update remote namespace with dict `ns`
691
691
692 Parameters
692 Parameters
693 ----------
693 ----------
694
694
695 ns : dict
695 ns : dict
696 dict of keys with which to update engine namespace(s)
696 dict of keys with which to update engine namespace(s)
697 block : bool [default : self.block]
697 block : bool [default : self.block]
698 whether to wait to be notified of engine receipt
698 whether to wait to be notified of engine receipt
699
699
700 """
700 """
701
701
702 block = block if block is not None else self.block
702 block = block if block is not None else self.block
703 track = track if track is not None else self.track
703 track = track if track is not None else self.track
704 targets = targets if targets is not None else self.targets
704 targets = targets if targets is not None else self.targets
705 # applier = self.apply_sync if block else self.apply_async
705 # applier = self.apply_sync if block else self.apply_async
706 if not isinstance(ns, dict):
706 if not isinstance(ns, dict):
707 raise TypeError("Must be a dict, not %s"%type(ns))
707 raise TypeError("Must be a dict, not %s"%type(ns))
708 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
708 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
709
709
710 def get(self, key_s):
710 def get(self, key_s):
711 """get object(s) by `key_s` from remote namespace
711 """get object(s) by `key_s` from remote namespace
712
712
713 see `pull` for details.
713 see `pull` for details.
714 """
714 """
715 # block = block if block is not None else self.block
715 # block = block if block is not None else self.block
716 return self.pull(key_s, block=True)
716 return self.pull(key_s, block=True)
717
717
718 def pull(self, names, targets=None, block=None):
718 def pull(self, names, targets=None, block=None):
719 """get object(s) by `name` from remote namespace
719 """get object(s) by `name` from remote namespace
720
720
721 will return one object if it is a key.
721 will return one object if it is a key.
722 can also take a list of keys, in which case it will return a list of objects.
722 can also take a list of keys, in which case it will return a list of objects.
723 """
723 """
724 block = block if block is not None else self.block
724 block = block if block is not None else self.block
725 targets = targets if targets is not None else self.targets
725 targets = targets if targets is not None else self.targets
726 applier = self.apply_sync if block else self.apply_async
726 applier = self.apply_sync if block else self.apply_async
727 if isinstance(names, string_types):
727 if isinstance(names, string_types):
728 pass
728 pass
729 elif isinstance(names, (list,tuple,set)):
729 elif isinstance(names, (list,tuple,set)):
730 for key in names:
730 for key in names:
731 if not isinstance(key, string_types):
731 if not isinstance(key, string_types):
732 raise TypeError("keys must be str, not type %r"%type(key))
732 raise TypeError("keys must be str, not type %r"%type(key))
733 else:
733 else:
734 raise TypeError("names must be strs, not %r"%names)
734 raise TypeError("names must be strs, not %r"%names)
735 return self._really_apply(util._pull, (names,), block=block, targets=targets)
735 return self._really_apply(util._pull, (names,), block=block, targets=targets)
736
736
737 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
737 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
738 """
738 """
739 Partition a Python sequence and send the partitions to a set of engines.
739 Partition a Python sequence and send the partitions to a set of engines.
740 """
740 """
741 block = block if block is not None else self.block
741 block = block if block is not None else self.block
742 track = track if track is not None else self.track
742 track = track if track is not None else self.track
743 targets = targets if targets is not None else self.targets
743 targets = targets if targets is not None else self.targets
744
744
745 # construct integer ID list:
745 # construct integer ID list:
746 targets = self.client._build_targets(targets)[1]
746 targets = self.client._build_targets(targets)[1]
747
747
748 mapObject = Map.dists[dist]()
748 mapObject = Map.dists[dist]()
749 nparts = len(targets)
749 nparts = len(targets)
750 msg_ids = []
750 msg_ids = []
751 trackers = []
751 trackers = []
752 for index, engineid in enumerate(targets):
752 for index, engineid in enumerate(targets):
753 partition = mapObject.getPartition(seq, index, nparts)
753 partition = mapObject.getPartition(seq, index, nparts)
754 if flatten and len(partition) == 1:
754 if flatten and len(partition) == 1:
755 ns = {key: partition[0]}
755 ns = {key: partition[0]}
756 else:
756 else:
757 ns = {key: partition}
757 ns = {key: partition}
758 r = self.push(ns, block=False, track=track, targets=engineid)
758 r = self.push(ns, block=False, track=track, targets=engineid)
759 msg_ids.extend(r.msg_ids)
759 msg_ids.extend(r.msg_ids)
760 if track:
760 if track:
761 trackers.append(r._tracker)
761 trackers.append(r._tracker)
762
762
763 if track:
763 if track:
764 tracker = zmq.MessageTracker(*trackers)
764 tracker = zmq.MessageTracker(*trackers)
765 else:
765 else:
766 tracker = None
766 tracker = None
767
767
768 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets,
768 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets,
769 tracker=tracker, owner=True,
769 tracker=tracker, owner=True,
770 )
770 )
771 if block:
771 if block:
772 r.wait()
772 r.wait()
773 else:
773 else:
774 return r
774 return r
775
775
776 @sync_results
776 @sync_results
777 @save_ids
777 @save_ids
778 def gather(self, key, dist='b', targets=None, block=None):
778 def gather(self, key, dist='b', targets=None, block=None):
779 """
779 """
780 Gather a partitioned sequence on a set of engines as a single local seq.
780 Gather a partitioned sequence on a set of engines as a single local seq.
781 """
781 """
782 block = block if block is not None else self.block
782 block = block if block is not None else self.block
783 targets = targets if targets is not None else self.targets
783 targets = targets if targets is not None else self.targets
784 mapObject = Map.dists[dist]()
784 mapObject = Map.dists[dist]()
785 msg_ids = []
785 msg_ids = []
786
786
787 # construct integer ID list:
787 # construct integer ID list:
788 targets = self.client._build_targets(targets)[1]
788 targets = self.client._build_targets(targets)[1]
789
789
790 for index, engineid in enumerate(targets):
790 for index, engineid in enumerate(targets):
791 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
791 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
792
792
793 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
793 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
794
794
795 if block:
795 if block:
796 try:
796 try:
797 return r.get()
797 return r.get()
798 except KeyboardInterrupt:
798 except KeyboardInterrupt:
799 pass
799 pass
800 return r
800 return r
801
801
802 def __getitem__(self, key):
802 def __getitem__(self, key):
803 return self.get(key)
803 return self.get(key)
804
804
805 def __setitem__(self,key, value):
805 def __setitem__(self,key, value):
806 self.update({key:value})
806 self.update({key:value})
807
807
808 def clear(self, targets=None, block=None):
808 def clear(self, targets=None, block=None):
809 """Clear the remote namespaces on my engines."""
809 """Clear the remote namespaces on my engines."""
810 block = block if block is not None else self.block
810 block = block if block is not None else self.block
811 targets = targets if targets is not None else self.targets
811 targets = targets if targets is not None else self.targets
812 return self.client.clear(targets=targets, block=block)
812 return self.client.clear(targets=targets, block=block)
813
813
814 #----------------------------------------
814 #----------------------------------------
815 # activate for %px, %autopx, etc. magics
815 # activate for %px, %autopx, etc. magics
816 #----------------------------------------
816 #----------------------------------------
817
817
818 def activate(self, suffix=''):
818 def activate(self, suffix=''):
819 """Activate IPython magics associated with this View
819 """Activate IPython magics associated with this View
820
820
821 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
821 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
822
822
823 Parameters
823 Parameters
824 ----------
824 ----------
825
825
826 suffix: str [default: '']
826 suffix: str [default: '']
827 The suffix, if any, for the magics. This allows you to have
827 The suffix, if any, for the magics. This allows you to have
828 multiple views associated with parallel magics at the same time.
828 multiple views associated with parallel magics at the same time.
829
829
830 e.g. ``rc[::2].activate(suffix='_even')`` will give you
830 e.g. ``rc[::2].activate(suffix='_even')`` will give you
831 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
831 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
832 on the even engines.
832 on the even engines.
833 """
833 """
834
834
835 from IPython.parallel.client.magics import ParallelMagics
835 from IPython.parallel.client.magics import ParallelMagics
836
836
837 try:
837 try:
838 # This is injected into __builtins__.
838 # This is injected into __builtins__.
839 ip = get_ipython()
839 ip = get_ipython()
840 except NameError:
840 except NameError:
841 print("The IPython parallel magics (%px, etc.) only work within IPython.")
841 print("The IPython parallel magics (%px, etc.) only work within IPython.")
842 return
842 return
843
843
844 M = ParallelMagics(ip, self, suffix)
844 M = ParallelMagics(ip, self, suffix)
845 ip.magics_manager.register(M)
845 ip.magics_manager.register(M)
846
846
847
847
848 @skip_doctest
848 @skip_doctest
849 class LoadBalancedView(View):
849 class LoadBalancedView(View):
850 """An load-balancing View that only executes via the Task scheduler.
850 """An load-balancing View that only executes via the Task scheduler.
851
851
852 Load-balanced views can be created with the client's `view` method:
852 Load-balanced views can be created with the client's `view` method:
853
853
854 >>> v = client.load_balanced_view()
854 >>> v = client.load_balanced_view()
855
855
856 or targets can be specified, to restrict the potential destinations:
856 or targets can be specified, to restrict the potential destinations:
857
857
858 >>> v = client.load_balanced_view([1,3])
858 >>> v = client.load_balanced_view([1,3])
859
859
860 which would restrict loadbalancing to between engines 1 and 3.
860 which would restrict loadbalancing to between engines 1 and 3.
861
861
862 """
862 """
863
863
864 follow=Any()
864 follow=Any()
865 after=Any()
865 after=Any()
866 timeout=CFloat()
866 timeout=CFloat()
867 retries = Integer(0)
867 retries = Integer(0)
868
868
869 _task_scheme = Any()
869 _task_scheme = Any()
870 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
870 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
871
871
872 def __init__(self, client=None, socket=None, **flags):
872 def __init__(self, client=None, socket=None, **flags):
873 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
873 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
874 self._task_scheme=client._task_scheme
874 self._task_scheme=client._task_scheme
875
875
876 def _validate_dependency(self, dep):
876 def _validate_dependency(self, dep):
877 """validate a dependency.
877 """validate a dependency.
878
878
879 For use in `set_flags`.
879 For use in `set_flags`.
880 """
880 """
881 if dep is None or isinstance(dep, string_types + (AsyncResult, Dependency)):
881 if dep is None or isinstance(dep, string_types + (AsyncResult, Dependency)):
882 return True
882 return True
883 elif isinstance(dep, (list,set, tuple)):
883 elif isinstance(dep, (list,set, tuple)):
884 for d in dep:
884 for d in dep:
885 if not isinstance(d, string_types + (AsyncResult,)):
885 if not isinstance(d, string_types + (AsyncResult,)):
886 return False
886 return False
887 elif isinstance(dep, dict):
887 elif isinstance(dep, dict):
888 if set(dep.keys()) != set(Dependency().as_dict().keys()):
888 if set(dep.keys()) != set(Dependency().as_dict().keys()):
889 return False
889 return False
890 if not isinstance(dep['msg_ids'], list):
890 if not isinstance(dep['msg_ids'], list):
891 return False
891 return False
892 for d in dep['msg_ids']:
892 for d in dep['msg_ids']:
893 if not isinstance(d, string_types):
893 if not isinstance(d, string_types):
894 return False
894 return False
895 else:
895 else:
896 return False
896 return False
897
897
898 return True
898 return True
899
899
900 def _render_dependency(self, dep):
900 def _render_dependency(self, dep):
901 """helper for building jsonable dependencies from various input forms."""
901 """helper for building jsonable dependencies from various input forms."""
902 if isinstance(dep, Dependency):
902 if isinstance(dep, Dependency):
903 return dep.as_dict()
903 return dep.as_dict()
904 elif isinstance(dep, AsyncResult):
904 elif isinstance(dep, AsyncResult):
905 return dep.msg_ids
905 return dep.msg_ids
906 elif dep is None:
906 elif dep is None:
907 return []
907 return []
908 else:
908 else:
909 # pass to Dependency constructor
909 # pass to Dependency constructor
910 return list(Dependency(dep))
910 return list(Dependency(dep))
911
911
912 def set_flags(self, **kwargs):
912 def set_flags(self, **kwargs):
913 """set my attribute flags by keyword.
913 """set my attribute flags by keyword.
914
914
915 A View is a wrapper for the Client's apply method, but with attributes
915 A View is a wrapper for the Client's apply method, but with attributes
916 that specify keyword arguments, those attributes can be set by keyword
916 that specify keyword arguments, those attributes can be set by keyword
917 argument with this method.
917 argument with this method.
918
918
919 Parameters
919 Parameters
920 ----------
920 ----------
921
921
922 block : bool
922 block : bool
923 whether to wait for results
923 whether to wait for results
924 track : bool
924 track : bool
925 whether to create a MessageTracker to allow the user to
925 whether to create a MessageTracker to allow the user to
926 safely edit after arrays and buffers during non-copying
926 safely edit after arrays and buffers during non-copying
927 sends.
927 sends.
928
928
929 after : Dependency or collection of msg_ids
929 after : Dependency or collection of msg_ids
930 Only for load-balanced execution (targets=None)
930 Only for load-balanced execution (targets=None)
931 Specify a list of msg_ids as a time-based dependency.
931 Specify a list of msg_ids as a time-based dependency.
932 This job will only be run *after* the dependencies
932 This job will only be run *after* the dependencies
933 have been met.
933 have been met.
934
934
935 follow : Dependency or collection of msg_ids
935 follow : Dependency or collection of msg_ids
936 Only for load-balanced execution (targets=None)
936 Only for load-balanced execution (targets=None)
937 Specify a list of msg_ids as a location-based dependency.
937 Specify a list of msg_ids as a location-based dependency.
938 This job will only be run on an engine where this dependency
938 This job will only be run on an engine where this dependency
939 is met.
939 is met.
940
940
941 timeout : float/int or None
941 timeout : float/int or None
942 Only for load-balanced execution (targets=None)
942 Only for load-balanced execution (targets=None)
943 Specify an amount of time (in seconds) for the scheduler to
943 Specify an amount of time (in seconds) for the scheduler to
944 wait for dependencies to be met before failing with a
944 wait for dependencies to be met before failing with a
945 DependencyTimeout.
945 DependencyTimeout.
946
946
947 retries : int
947 retries : int
948 Number of times a task will be retried on failure.
948 Number of times a task will be retried on failure.
949 """
949 """
950
950
951 super(LoadBalancedView, self).set_flags(**kwargs)
951 super(LoadBalancedView, self).set_flags(**kwargs)
952 for name in ('follow', 'after'):
952 for name in ('follow', 'after'):
953 if name in kwargs:
953 if name in kwargs:
954 value = kwargs[name]
954 value = kwargs[name]
955 if self._validate_dependency(value):
955 if self._validate_dependency(value):
956 setattr(self, name, value)
956 setattr(self, name, value)
957 else:
957 else:
958 raise ValueError("Invalid dependency: %r"%value)
958 raise ValueError("Invalid dependency: %r"%value)
959 if 'timeout' in kwargs:
959 if 'timeout' in kwargs:
960 t = kwargs['timeout']
960 t = kwargs['timeout']
961 if not isinstance(t, (int, float, type(None))):
961 if not isinstance(t, (int, float, type(None))):
962 if (not PY3) and (not isinstance(t, long)):
962 if (not PY3) and (not isinstance(t, long)):
963 raise TypeError("Invalid type for timeout: %r"%type(t))
963 raise TypeError("Invalid type for timeout: %r"%type(t))
964 if t is not None:
964 if t is not None:
965 if t < 0:
965 if t < 0:
966 raise ValueError("Invalid timeout: %s"%t)
966 raise ValueError("Invalid timeout: %s"%t)
967 self.timeout = t
967 self.timeout = t
968
968
969 @sync_results
969 @sync_results
970 @save_ids
970 @save_ids
971 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
971 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
972 after=None, follow=None, timeout=None,
972 after=None, follow=None, timeout=None,
973 targets=None, retries=None):
973 targets=None, retries=None):
974 """calls f(*args, **kwargs) on a remote engine, returning the result.
974 """calls f(*args, **kwargs) on a remote engine, returning the result.
975
975
976 This method temporarily sets all of `apply`'s flags for a single call.
976 This method temporarily sets all of `apply`'s flags for a single call.
977
977
978 Parameters
978 Parameters
979 ----------
979 ----------
980
980
981 f : callable
981 f : callable
982
982
983 args : list [default: empty]
983 args : list [default: empty]
984
984
985 kwargs : dict [default: empty]
985 kwargs : dict [default: empty]
986
986
987 block : bool [default: self.block]
987 block : bool [default: self.block]
988 whether to block
988 whether to block
989 track : bool [default: self.track]
989 track : bool [default: self.track]
990 whether to ask zmq to track the message, for safe non-copying sends
990 whether to ask zmq to track the message, for safe non-copying sends
991
991
992 !!!!!! TODO: THE REST HERE !!!!
992 !!!!!! TODO: THE REST HERE !!!!
993
993
994 Returns
994 Returns
995 -------
995 -------
996
996
997 if self.block is False:
997 if self.block is False:
998 returns AsyncResult
998 returns AsyncResult
999 else:
999 else:
1000 returns actual result of f(*args, **kwargs) on the engine(s)
1000 returns actual result of f(*args, **kwargs) on the engine(s)
1001 This will be a list of self.targets is also a list (even length 1), or
1001 This will be a list of self.targets is also a list (even length 1), or
1002 the single result if self.targets is an integer engine id
1002 the single result if self.targets is an integer engine id
1003 """
1003 """
1004
1004
1005 # validate whether we can run
1005 # validate whether we can run
1006 if self._socket.closed:
1006 if self._socket.closed:
1007 msg = "Task farming is disabled"
1007 msg = "Task farming is disabled"
1008 if self._task_scheme == 'pure':
1008 if self._task_scheme == 'pure':
1009 msg += " because the pure ZMQ scheduler cannot handle"
1009 msg += " because the pure ZMQ scheduler cannot handle"
1010 msg += " disappearing engines."
1010 msg += " disappearing engines."
1011 raise RuntimeError(msg)
1011 raise RuntimeError(msg)
1012
1012
1013 if self._task_scheme == 'pure':
1013 if self._task_scheme == 'pure':
1014 # pure zmq scheme doesn't support extra features
1014 # pure zmq scheme doesn't support extra features
1015 msg = "Pure ZMQ scheduler doesn't support the following flags:"
1015 msg = "Pure ZMQ scheduler doesn't support the following flags:"
1016 "follow, after, retries, targets, timeout"
1016 "follow, after, retries, targets, timeout"
1017 if (follow or after or retries or targets or timeout):
1017 if (follow or after or retries or targets or timeout):
1018 # hard fail on Scheduler flags
1018 # hard fail on Scheduler flags
1019 raise RuntimeError(msg)
1019 raise RuntimeError(msg)
1020 if isinstance(f, dependent):
1020 if isinstance(f, dependent):
1021 # soft warn on functional dependencies
1021 # soft warn on functional dependencies
1022 warnings.warn(msg, RuntimeWarning)
1022 warnings.warn(msg, RuntimeWarning)
1023
1023
1024 # build args
1024 # build args
1025 args = [] if args is None else args
1025 args = [] if args is None else args
1026 kwargs = {} if kwargs is None else kwargs
1026 kwargs = {} if kwargs is None else kwargs
1027 block = self.block if block is None else block
1027 block = self.block if block is None else block
1028 track = self.track if track is None else track
1028 track = self.track if track is None else track
1029 after = self.after if after is None else after
1029 after = self.after if after is None else after
1030 retries = self.retries if retries is None else retries
1030 retries = self.retries if retries is None else retries
1031 follow = self.follow if follow is None else follow
1031 follow = self.follow if follow is None else follow
1032 timeout = self.timeout if timeout is None else timeout
1032 timeout = self.timeout if timeout is None else timeout
1033 targets = self.targets if targets is None else targets
1033 targets = self.targets if targets is None else targets
1034
1034
1035 if not isinstance(retries, int):
1035 if not isinstance(retries, int):
1036 raise TypeError('retries must be int, not %r'%type(retries))
1036 raise TypeError('retries must be int, not %r'%type(retries))
1037
1037
1038 if targets is None:
1038 if targets is None:
1039 idents = []
1039 idents = []
1040 else:
1040 else:
1041 idents = self.client._build_targets(targets)[0]
1041 idents = self.client._build_targets(targets)[0]
1042 # ensure *not* bytes
1042 # ensure *not* bytes
1043 idents = [ ident.decode() for ident in idents ]
1043 idents = [ ident.decode() for ident in idents ]
1044
1044
1045 after = self._render_dependency(after)
1045 after = self._render_dependency(after)
1046 follow = self._render_dependency(follow)
1046 follow = self._render_dependency(follow)
1047 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1047 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1048
1048
1049 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1049 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1050 metadata=metadata)
1050 metadata=metadata)
1051 tracker = None if track is False else msg['tracker']
1051 tracker = None if track is False else msg['tracker']
1052
1052
1053 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f),
1053 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f),
1054 targets=None, tracker=tracker, owner=True,
1054 targets=None, tracker=tracker, owner=True,
1055 )
1055 )
1056 if block:
1056 if block:
1057 try:
1057 try:
1058 return ar.get()
1058 return ar.get()
1059 except KeyboardInterrupt:
1059 except KeyboardInterrupt:
1060 pass
1060 pass
1061 return ar
1061 return ar
1062
1062
1063 @sync_results
1063 @sync_results
1064 @save_ids
1064 @save_ids
1065 def map(self, f, *sequences, **kwargs):
1065 def map(self, f, *sequences, **kwargs):
1066 """``view.map(f, *sequences, block=self.block, chunksize=1, ordered=True)`` => list|AsyncMapResult
1066 """``view.map(f, *sequences, block=self.block, chunksize=1, ordered=True)`` => list|AsyncMapResult
1067
1067
1068 Parallel version of builtin `map`, load-balanced by this View.
1068 Parallel version of builtin `map`, load-balanced by this View.
1069
1069
1070 `block`, and `chunksize` can be specified by keyword only.
1070 `block`, and `chunksize` can be specified by keyword only.
1071
1071
1072 Each `chunksize` elements will be a separate task, and will be
1072 Each `chunksize` elements will be a separate task, and will be
1073 load-balanced. This lets individual elements be available for iteration
1073 load-balanced. This lets individual elements be available for iteration
1074 as soon as they arrive.
1074 as soon as they arrive.
1075
1075
1076 Parameters
1076 Parameters
1077 ----------
1077 ----------
1078
1078
1079 f : callable
1079 f : callable
1080 function to be mapped
1080 function to be mapped
1081 *sequences: one or more sequences of matching length
1081 *sequences: one or more sequences of matching length
1082 the sequences to be distributed and passed to `f`
1082 the sequences to be distributed and passed to `f`
1083 block : bool [default self.block]
1083 block : bool [default self.block]
1084 whether to wait for the result or not
1084 whether to wait for the result or not
1085 track : bool
1085 track : bool
1086 whether to create a MessageTracker to allow the user to
1086 whether to create a MessageTracker to allow the user to
1087 safely edit after arrays and buffers during non-copying
1087 safely edit after arrays and buffers during non-copying
1088 sends.
1088 sends.
1089 chunksize : int [default 1]
1089 chunksize : int [default 1]
1090 how many elements should be in each task.
1090 how many elements should be in each task.
1091 ordered : bool [default True]
1091 ordered : bool [default True]
1092 Whether the results should be gathered as they arrive, or enforce
1092 Whether the results should be gathered as they arrive, or enforce
1093 the order of submission.
1093 the order of submission.
1094
1094
1095 Only applies when iterating through AsyncMapResult as results arrive.
1095 Only applies when iterating through AsyncMapResult as results arrive.
1096 Has no effect when block=True.
1096 Has no effect when block=True.
1097
1097
1098 Returns
1098 Returns
1099 -------
1099 -------
1100
1100
1101 if block=False
1101 if block=False
1102 An :class:`~ipython_parallel.client.asyncresult.AsyncMapResult` instance.
1102 An :class:`~ipython_parallel.client.asyncresult.AsyncMapResult` instance.
1103 An object like AsyncResult, but which reassembles the sequence of results
1103 An object like AsyncResult, but which reassembles the sequence of results
1104 into a single list. AsyncMapResults can be iterated through before all
1104 into a single list. AsyncMapResults can be iterated through before all
1105 results are complete.
1105 results are complete.
1106 else
1106 else
1107 A list, the result of ``map(f,*sequences)``
1107 A list, the result of ``map(f,*sequences)``
1108 """
1108 """
1109
1109
1110 # default
1110 # default
1111 block = kwargs.get('block', self.block)
1111 block = kwargs.get('block', self.block)
1112 chunksize = kwargs.get('chunksize', 1)
1112 chunksize = kwargs.get('chunksize', 1)
1113 ordered = kwargs.get('ordered', True)
1113 ordered = kwargs.get('ordered', True)
1114
1114
1115 keyset = set(kwargs.keys())
1115 keyset = set(kwargs.keys())
1116 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1116 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1117 if extra_keys:
1117 if extra_keys:
1118 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1118 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1119
1119
1120 assert len(sequences) > 0, "must have some sequences to map onto!"
1120 assert len(sequences) > 0, "must have some sequences to map onto!"
1121
1121
1122 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1122 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1123 return pf.map(*sequences)
1123 return pf.map(*sequences)
1124
1124
1125 __all__ = ['LoadBalancedView', 'DirectView']
1125 __all__ = ['LoadBalancedView', 'DirectView']
General Comments 0
You need to be logged in to leave comments. Login now