##// END OF EJS Templates
update parallel magics...
MinRK -
Show More
@@ -1,1659 +1,1694 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
35
36
36 from IPython.utils.coloransi import TermColors
37 from IPython.utils.coloransi import TermColors
37 from IPython.utils.jsonutil import rekey
38 from IPython.utils.jsonutil import rekey
38 from IPython.utils.localinterfaces import LOCAL_IPS
39 from IPython.utils.localinterfaces import LOCAL_IPS
39 from IPython.utils.path import get_ipython_dir
40 from IPython.utils.path import get_ipython_dir
40 from IPython.utils.py3compat import cast_bytes
41 from IPython.utils.py3compat import cast_bytes
41 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
42 Dict, List, Bool, Set, Any)
43 Dict, List, Bool, Set, Any)
43 from IPython.external.decorator import decorator
44 from IPython.external.decorator import decorator
44 from IPython.external.ssh import tunnel
45 from IPython.external.ssh import tunnel
45
46
46 from IPython.parallel import Reference
47 from IPython.parallel import Reference
47 from IPython.parallel import error
48 from IPython.parallel import error
48 from IPython.parallel import util
49 from IPython.parallel import util
49
50
50 from IPython.zmq.session import Session, Message
51 from IPython.zmq.session import Session, Message
51
52
52 from .asyncresult import AsyncResult, AsyncHubResult
53 from .asyncresult import AsyncResult, AsyncHubResult
53 from IPython.core.profiledir import ProfileDir, ProfileDirError
54 from .view import DirectView, LoadBalancedView
54 from .view import DirectView, LoadBalancedView
55
55
56 if sys.version_info[0] >= 3:
56 if sys.version_info[0] >= 3:
57 # xrange is used in a couple 'isinstance' tests in py2
57 # xrange is used in a couple 'isinstance' tests in py2
58 # should be just 'range' in 3k
58 # should be just 'range' in 3k
59 xrange = range
59 xrange = range
60
60
61 #--------------------------------------------------------------------------
61 #--------------------------------------------------------------------------
62 # Decorators for Client methods
62 # Decorators for Client methods
63 #--------------------------------------------------------------------------
63 #--------------------------------------------------------------------------
64
64
65 @decorator
65 @decorator
66 def spin_first(f, self, *args, **kwargs):
66 def spin_first(f, self, *args, **kwargs):
67 """Call spin() to sync state prior to calling the method."""
67 """Call spin() to sync state prior to calling the method."""
68 self.spin()
68 self.spin()
69 return f(self, *args, **kwargs)
69 return f(self, *args, **kwargs)
70
70
71
71
72 #--------------------------------------------------------------------------
72 #--------------------------------------------------------------------------
73 # Classes
73 # Classes
74 #--------------------------------------------------------------------------
74 #--------------------------------------------------------------------------
75
75
76
76
77 class ExecuteReply(object):
77 class ExecuteReply(object):
78 """wrapper for finished Execute results"""
78 """wrapper for finished Execute results"""
79 def __init__(self, msg_id, content, metadata):
79 def __init__(self, msg_id, content, metadata):
80 self.msg_id = msg_id
80 self.msg_id = msg_id
81 self._content = content
81 self._content = content
82 self.execution_count = content['execution_count']
82 self.execution_count = content['execution_count']
83 self.metadata = metadata
83 self.metadata = metadata
84
84
85 def __getitem__(self, key):
85 def __getitem__(self, key):
86 return self.metadata[key]
86 return self.metadata[key]
87
87
88 def __getattr__(self, key):
88 def __getattr__(self, key):
89 if key not in self.metadata:
89 if key not in self.metadata:
90 raise AttributeError(key)
90 raise AttributeError(key)
91 return self.metadata[key]
91 return self.metadata[key]
92
92
93 def __repr__(self):
93 def __repr__(self):
94 pyout = self.metadata['pyout'] or {'data':{}}
94 pyout = self.metadata['pyout'] or {'data':{}}
95 text_out = pyout['data'].get('text/plain', '')
95 text_out = pyout['data'].get('text/plain', '')
96 if len(text_out) > 32:
96 if len(text_out) > 32:
97 text_out = text_out[:29] + '...'
97 text_out = text_out[:29] + '...'
98
98
99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100
100
101 def _repr_pretty_(self, p, cycle):
101 def _repr_pretty_(self, p, cycle):
102 pyout = self.metadata['pyout'] or {'data':{}}
102 pyout = self.metadata['pyout'] or {'data':{}}
103 text_out = pyout['data'].get('text/plain', '')
103 text_out = pyout['data'].get('text/plain', '')
104
104
105 if not text_out:
105 if not text_out:
106 return
106 return
107
107
108 try:
108 try:
109 ip = get_ipython()
109 ip = get_ipython()
110 except NameError:
110 except NameError:
111 colors = "NoColor"
111 colors = "NoColor"
112 else:
112 else:
113 colors = ip.colors
113 colors = ip.colors
114
114
115 if colors == "NoColor":
115 if colors == "NoColor":
116 out = normal = ""
116 out = normal = ""
117 else:
117 else:
118 out = TermColors.Red
118 out = TermColors.Red
119 normal = TermColors.Normal
119 normal = TermColors.Normal
120
120
121 if '\n' in text_out and not text_out.startswith('\n'):
121 if '\n' in text_out and not text_out.startswith('\n'):
122 # add newline for multiline reprs
122 # add newline for multiline reprs
123 text_out = '\n' + text_out
123 text_out = '\n' + text_out
124
124
125 p.text(
125 p.text(
126 out + u'Out[%i:%i]: ' % (
126 out + u'Out[%i:%i]: ' % (
127 self.metadata['engine_id'], self.execution_count
127 self.metadata['engine_id'], self.execution_count
128 ) + normal + text_out
128 ) + normal + text_out
129 )
129 )
130
130
131 def _repr_html_(self):
131 def _repr_html_(self):
132 pyout = self.metadata['pyout'] or {'data':{}}
132 pyout = self.metadata['pyout'] or {'data':{}}
133 return pyout['data'].get("text/html")
133 return pyout['data'].get("text/html")
134
134
135 def _repr_latex_(self):
135 def _repr_latex_(self):
136 pyout = self.metadata['pyout'] or {'data':{}}
136 pyout = self.metadata['pyout'] or {'data':{}}
137 return pyout['data'].get("text/latex")
137 return pyout['data'].get("text/latex")
138
138
139 def _repr_json_(self):
139 def _repr_json_(self):
140 pyout = self.metadata['pyout'] or {'data':{}}
140 pyout = self.metadata['pyout'] or {'data':{}}
141 return pyout['data'].get("application/json")
141 return pyout['data'].get("application/json")
142
142
143 def _repr_javascript_(self):
143 def _repr_javascript_(self):
144 pyout = self.metadata['pyout'] or {'data':{}}
144 pyout = self.metadata['pyout'] or {'data':{}}
145 return pyout['data'].get("application/javascript")
145 return pyout['data'].get("application/javascript")
146
146
147 def _repr_png_(self):
147 def _repr_png_(self):
148 pyout = self.metadata['pyout'] or {'data':{}}
148 pyout = self.metadata['pyout'] or {'data':{}}
149 return pyout['data'].get("image/png")
149 return pyout['data'].get("image/png")
150
150
151 def _repr_jpeg_(self):
151 def _repr_jpeg_(self):
152 pyout = self.metadata['pyout'] or {'data':{}}
152 pyout = self.metadata['pyout'] or {'data':{}}
153 return pyout['data'].get("image/jpeg")
153 return pyout['data'].get("image/jpeg")
154
154
155 def _repr_svg_(self):
155 def _repr_svg_(self):
156 pyout = self.metadata['pyout'] or {'data':{}}
156 pyout = self.metadata['pyout'] or {'data':{}}
157 return pyout['data'].get("image/svg+xml")
157 return pyout['data'].get("image/svg+xml")
158
158
159
159
160 class Metadata(dict):
160 class Metadata(dict):
161 """Subclass of dict for initializing metadata values.
161 """Subclass of dict for initializing metadata values.
162
162
163 Attribute access works on keys.
163 Attribute access works on keys.
164
164
165 These objects have a strict set of keys - errors will raise if you try
165 These objects have a strict set of keys - errors will raise if you try
166 to add new keys.
166 to add new keys.
167 """
167 """
168 def __init__(self, *args, **kwargs):
168 def __init__(self, *args, **kwargs):
169 dict.__init__(self)
169 dict.__init__(self)
170 md = {'msg_id' : None,
170 md = {'msg_id' : None,
171 'submitted' : None,
171 'submitted' : None,
172 'started' : None,
172 'started' : None,
173 'completed' : None,
173 'completed' : None,
174 'received' : None,
174 'received' : None,
175 'engine_uuid' : None,
175 'engine_uuid' : None,
176 'engine_id' : None,
176 'engine_id' : None,
177 'follow' : None,
177 'follow' : None,
178 'after' : None,
178 'after' : None,
179 'status' : None,
179 'status' : None,
180
180
181 'pyin' : None,
181 'pyin' : None,
182 'pyout' : None,
182 'pyout' : None,
183 'pyerr' : None,
183 'pyerr' : None,
184 'stdout' : '',
184 'stdout' : '',
185 'stderr' : '',
185 'stderr' : '',
186 'outputs' : [],
186 'outputs' : [],
187 }
187 }
188 self.update(md)
188 self.update(md)
189 self.update(dict(*args, **kwargs))
189 self.update(dict(*args, **kwargs))
190
190
191 def __getattr__(self, key):
191 def __getattr__(self, key):
192 """getattr aliased to getitem"""
192 """getattr aliased to getitem"""
193 if key in self.iterkeys():
193 if key in self.iterkeys():
194 return self[key]
194 return self[key]
195 else:
195 else:
196 raise AttributeError(key)
196 raise AttributeError(key)
197
197
198 def __setattr__(self, key, value):
198 def __setattr__(self, key, value):
199 """setattr aliased to setitem, with strict"""
199 """setattr aliased to setitem, with strict"""
200 if key in self.iterkeys():
200 if key in self.iterkeys():
201 self[key] = value
201 self[key] = value
202 else:
202 else:
203 raise AttributeError(key)
203 raise AttributeError(key)
204
204
205 def __setitem__(self, key, value):
205 def __setitem__(self, key, value):
206 """strict static key enforcement"""
206 """strict static key enforcement"""
207 if key in self.iterkeys():
207 if key in self.iterkeys():
208 dict.__setitem__(self, key, value)
208 dict.__setitem__(self, key, value)
209 else:
209 else:
210 raise KeyError(key)
210 raise KeyError(key)
211
211
212
212
213 class Client(HasTraits):
213 class Client(HasTraits):
214 """A semi-synchronous client to the IPython ZMQ cluster
214 """A semi-synchronous client to the IPython ZMQ cluster
215
215
216 Parameters
216 Parameters
217 ----------
217 ----------
218
218
219 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
219 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
220 Connection information for the Hub's registration. If a json connector
220 Connection information for the Hub's registration. If a json connector
221 file is given, then likely no further configuration is necessary.
221 file is given, then likely no further configuration is necessary.
222 [Default: use profile]
222 [Default: use profile]
223 profile : bytes
223 profile : bytes
224 The name of the Cluster profile to be used to find connector information.
224 The name of the Cluster profile to be used to find connector information.
225 If run from an IPython application, the default profile will be the same
225 If run from an IPython application, the default profile will be the same
226 as the running application, otherwise it will be 'default'.
226 as the running application, otherwise it will be 'default'.
227 context : zmq.Context
227 context : zmq.Context
228 Pass an existing zmq.Context instance, otherwise the client will create its own.
228 Pass an existing zmq.Context instance, otherwise the client will create its own.
229 debug : bool
229 debug : bool
230 flag for lots of message printing for debug purposes
230 flag for lots of message printing for debug purposes
231 timeout : int/float
231 timeout : int/float
232 time (in seconds) to wait for connection replies from the Hub
232 time (in seconds) to wait for connection replies from the Hub
233 [Default: 10]
233 [Default: 10]
234
234
235 #-------------- session related args ----------------
235 #-------------- session related args ----------------
236
236
237 config : Config object
237 config : Config object
238 If specified, this will be relayed to the Session for configuration
238 If specified, this will be relayed to the Session for configuration
239 username : str
239 username : str
240 set username for the session object
240 set username for the session object
241 packer : str (import_string) or callable
241 packer : str (import_string) or callable
242 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
242 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
243 function to serialize messages. Must support same input as
243 function to serialize messages. Must support same input as
244 JSON, and output must be bytes.
244 JSON, and output must be bytes.
245 You can pass a callable directly as `pack`
245 You can pass a callable directly as `pack`
246 unpacker : str (import_string) or callable
246 unpacker : str (import_string) or callable
247 The inverse of packer. Only necessary if packer is specified as *not* one
247 The inverse of packer. Only necessary if packer is specified as *not* one
248 of 'json' or 'pickle'.
248 of 'json' or 'pickle'.
249
249
250 #-------------- ssh related args ----------------
250 #-------------- ssh related args ----------------
251 # These are args for configuring the ssh tunnel to be used
251 # These are args for configuring the ssh tunnel to be used
252 # credentials are used to forward connections over ssh to the Controller
252 # credentials are used to forward connections over ssh to the Controller
253 # Note that the ip given in `addr` needs to be relative to sshserver
253 # Note that the ip given in `addr` needs to be relative to sshserver
254 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
254 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
255 # and set sshserver as the same machine the Controller is on. However,
255 # and set sshserver as the same machine the Controller is on. However,
256 # the only requirement is that sshserver is able to see the Controller
256 # the only requirement is that sshserver is able to see the Controller
257 # (i.e. is within the same trusted network).
257 # (i.e. is within the same trusted network).
258
258
259 sshserver : str
259 sshserver : str
260 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
260 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
261 If keyfile or password is specified, and this is not, it will default to
261 If keyfile or password is specified, and this is not, it will default to
262 the ip given in addr.
262 the ip given in addr.
263 sshkey : str; path to ssh private key file
263 sshkey : str; path to ssh private key file
264 This specifies a key to be used in ssh login, default None.
264 This specifies a key to be used in ssh login, default None.
265 Regular default ssh keys will be used without specifying this argument.
265 Regular default ssh keys will be used without specifying this argument.
266 password : str
266 password : str
267 Your ssh password to sshserver. Note that if this is left None,
267 Your ssh password to sshserver. Note that if this is left None,
268 you will be prompted for it if passwordless key based login is unavailable.
268 you will be prompted for it if passwordless key based login is unavailable.
269 paramiko : bool
269 paramiko : bool
270 flag for whether to use paramiko instead of shell ssh for tunneling.
270 flag for whether to use paramiko instead of shell ssh for tunneling.
271 [default: True on win32, False else]
271 [default: True on win32, False else]
272
272
273 ------- exec authentication args -------
273 ------- exec authentication args -------
274 If even localhost is untrusted, you can have some protection against
274 If even localhost is untrusted, you can have some protection against
275 unauthorized execution by signing messages with HMAC digests.
275 unauthorized execution by signing messages with HMAC digests.
276 Messages are still sent as cleartext, so if someone can snoop your
276 Messages are still sent as cleartext, so if someone can snoop your
277 loopback traffic this will not protect your privacy, but will prevent
277 loopback traffic this will not protect your privacy, but will prevent
278 unauthorized execution.
278 unauthorized execution.
279
279
280 exec_key : str
280 exec_key : str
281 an authentication key or file containing a key
281 an authentication key or file containing a key
282 default: None
282 default: None
283
283
284
284
285 Attributes
285 Attributes
286 ----------
286 ----------
287
287
288 ids : list of int engine IDs
288 ids : list of int engine IDs
289 requesting the ids attribute always synchronizes
289 requesting the ids attribute always synchronizes
290 the registration state. To request ids without synchronization,
290 the registration state. To request ids without synchronization,
291 use semi-private _ids attributes.
291 use semi-private _ids attributes.
292
292
293 history : list of msg_ids
293 history : list of msg_ids
294 a list of msg_ids, keeping track of all the execution
294 a list of msg_ids, keeping track of all the execution
295 messages you have submitted in order.
295 messages you have submitted in order.
296
296
297 outstanding : set of msg_ids
297 outstanding : set of msg_ids
298 a set of msg_ids that have been submitted, but whose
298 a set of msg_ids that have been submitted, but whose
299 results have not yet been received.
299 results have not yet been received.
300
300
301 results : dict
301 results : dict
302 a dict of all our results, keyed by msg_id
302 a dict of all our results, keyed by msg_id
303
303
304 block : bool
304 block : bool
305 determines default behavior when block not specified
305 determines default behavior when block not specified
306 in execution methods
306 in execution methods
307
307
308 Methods
308 Methods
309 -------
309 -------
310
310
311 spin
311 spin
312 flushes incoming results and registration state changes
312 flushes incoming results and registration state changes
313 control methods spin, and requesting `ids` also ensures up to date
313 control methods spin, and requesting `ids` also ensures up to date
314
314
315 wait
315 wait
316 wait on one or more msg_ids
316 wait on one or more msg_ids
317
317
318 execution methods
318 execution methods
319 apply
319 apply
320 legacy: execute, run
320 legacy: execute, run
321
321
322 data movement
322 data movement
323 push, pull, scatter, gather
323 push, pull, scatter, gather
324
324
325 query methods
325 query methods
326 queue_status, get_result, purge, result_status
326 queue_status, get_result, purge, result_status
327
327
328 control methods
328 control methods
329 abort, shutdown
329 abort, shutdown
330
330
331 """
331 """
332
332
333
333
334 block = Bool(False)
334 block = Bool(False)
335 outstanding = Set()
335 outstanding = Set()
336 results = Instance('collections.defaultdict', (dict,))
336 results = Instance('collections.defaultdict', (dict,))
337 metadata = Instance('collections.defaultdict', (Metadata,))
337 metadata = Instance('collections.defaultdict', (Metadata,))
338 history = List()
338 history = List()
339 debug = Bool(False)
339 debug = Bool(False)
340 _spin_thread = Any()
340 _spin_thread = Any()
341 _stop_spinning = Any()
341 _stop_spinning = Any()
342
342
343 profile=Unicode()
343 profile=Unicode()
344 def _profile_default(self):
344 def _profile_default(self):
345 if BaseIPythonApplication.initialized():
345 if BaseIPythonApplication.initialized():
346 # an IPython app *might* be running, try to get its profile
346 # an IPython app *might* be running, try to get its profile
347 try:
347 try:
348 return BaseIPythonApplication.instance().profile
348 return BaseIPythonApplication.instance().profile
349 except (AttributeError, MultipleInstanceError):
349 except (AttributeError, MultipleInstanceError):
350 # could be a *different* subclass of config.Application,
350 # could be a *different* subclass of config.Application,
351 # which would raise one of these two errors.
351 # which would raise one of these two errors.
352 return u'default'
352 return u'default'
353 else:
353 else:
354 return u'default'
354 return u'default'
355
355
356
356
357 _outstanding_dict = Instance('collections.defaultdict', (set,))
357 _outstanding_dict = Instance('collections.defaultdict', (set,))
358 _ids = List()
358 _ids = List()
359 _connected=Bool(False)
359 _connected=Bool(False)
360 _ssh=Bool(False)
360 _ssh=Bool(False)
361 _context = Instance('zmq.Context')
361 _context = Instance('zmq.Context')
362 _config = Dict()
362 _config = Dict()
363 _engines=Instance(util.ReverseDict, (), {})
363 _engines=Instance(util.ReverseDict, (), {})
364 # _hub_socket=Instance('zmq.Socket')
364 # _hub_socket=Instance('zmq.Socket')
365 _query_socket=Instance('zmq.Socket')
365 _query_socket=Instance('zmq.Socket')
366 _control_socket=Instance('zmq.Socket')
366 _control_socket=Instance('zmq.Socket')
367 _iopub_socket=Instance('zmq.Socket')
367 _iopub_socket=Instance('zmq.Socket')
368 _notification_socket=Instance('zmq.Socket')
368 _notification_socket=Instance('zmq.Socket')
369 _mux_socket=Instance('zmq.Socket')
369 _mux_socket=Instance('zmq.Socket')
370 _task_socket=Instance('zmq.Socket')
370 _task_socket=Instance('zmq.Socket')
371 _task_scheme=Unicode()
371 _task_scheme=Unicode()
372 _closed = False
372 _closed = False
373 _ignored_control_replies=Integer(0)
373 _ignored_control_replies=Integer(0)
374 _ignored_hub_replies=Integer(0)
374 _ignored_hub_replies=Integer(0)
375
375
376 def __new__(self, *args, **kw):
376 def __new__(self, *args, **kw):
377 # don't raise on positional args
377 # don't raise on positional args
378 return HasTraits.__new__(self, **kw)
378 return HasTraits.__new__(self, **kw)
379
379
380 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
380 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
381 context=None, debug=False, exec_key=None,
381 context=None, debug=False, exec_key=None,
382 sshserver=None, sshkey=None, password=None, paramiko=None,
382 sshserver=None, sshkey=None, password=None, paramiko=None,
383 timeout=10, **extra_args
383 timeout=10, **extra_args
384 ):
384 ):
385 if profile:
385 if profile:
386 super(Client, self).__init__(debug=debug, profile=profile)
386 super(Client, self).__init__(debug=debug, profile=profile)
387 else:
387 else:
388 super(Client, self).__init__(debug=debug)
388 super(Client, self).__init__(debug=debug)
389 if context is None:
389 if context is None:
390 context = zmq.Context.instance()
390 context = zmq.Context.instance()
391 self._context = context
391 self._context = context
392 self._stop_spinning = Event()
392 self._stop_spinning = Event()
393
393
394 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
394 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
395 if self._cd is not None:
395 if self._cd is not None:
396 if url_or_file is None:
396 if url_or_file is None:
397 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
397 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
398 if url_or_file is None:
398 if url_or_file is None:
399 raise ValueError(
399 raise ValueError(
400 "I can't find enough information to connect to a hub!"
400 "I can't find enough information to connect to a hub!"
401 " Please specify at least one of url_or_file or profile."
401 " Please specify at least one of url_or_file or profile."
402 )
402 )
403
403
404 if not util.is_url(url_or_file):
404 if not util.is_url(url_or_file):
405 # it's not a url, try for a file
405 # it's not a url, try for a file
406 if not os.path.exists(url_or_file):
406 if not os.path.exists(url_or_file):
407 if self._cd:
407 if self._cd:
408 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
408 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
409 if not os.path.exists(url_or_file):
409 if not os.path.exists(url_or_file):
410 raise IOError("Connection file not found: %r" % url_or_file)
410 raise IOError("Connection file not found: %r" % url_or_file)
411 with open(url_or_file) as f:
411 with open(url_or_file) as f:
412 cfg = json.loads(f.read())
412 cfg = json.loads(f.read())
413 else:
413 else:
414 cfg = {'url':url_or_file}
414 cfg = {'url':url_or_file}
415
415
416 # sync defaults from args, json:
416 # sync defaults from args, json:
417 if sshserver:
417 if sshserver:
418 cfg['ssh'] = sshserver
418 cfg['ssh'] = sshserver
419 if exec_key:
419 if exec_key:
420 cfg['exec_key'] = exec_key
420 cfg['exec_key'] = exec_key
421 exec_key = cfg['exec_key']
421 exec_key = cfg['exec_key']
422 location = cfg.setdefault('location', None)
422 location = cfg.setdefault('location', None)
423 cfg['url'] = util.disambiguate_url(cfg['url'], location)
423 cfg['url'] = util.disambiguate_url(cfg['url'], location)
424 url = cfg['url']
424 url = cfg['url']
425 proto,addr,port = util.split_url(url)
425 proto,addr,port = util.split_url(url)
426 if location is not None and addr == '127.0.0.1':
426 if location is not None and addr == '127.0.0.1':
427 # location specified, and connection is expected to be local
427 # location specified, and connection is expected to be local
428 if location not in LOCAL_IPS and not sshserver:
428 if location not in LOCAL_IPS and not sshserver:
429 # load ssh from JSON *only* if the controller is not on
429 # load ssh from JSON *only* if the controller is not on
430 # this machine
430 # this machine
431 sshserver=cfg['ssh']
431 sshserver=cfg['ssh']
432 if location not in LOCAL_IPS and not sshserver:
432 if location not in LOCAL_IPS and not sshserver:
433 # warn if no ssh specified, but SSH is probably needed
433 # warn if no ssh specified, but SSH is probably needed
434 # This is only a warning, because the most likely cause
434 # This is only a warning, because the most likely cause
435 # is a local Controller on a laptop whose IP is dynamic
435 # is a local Controller on a laptop whose IP is dynamic
436 warnings.warn("""
436 warnings.warn("""
437 Controller appears to be listening on localhost, but not on this machine.
437 Controller appears to be listening on localhost, but not on this machine.
438 If this is true, you should specify Client(...,sshserver='you@%s')
438 If this is true, you should specify Client(...,sshserver='you@%s')
439 or instruct your controller to listen on an external IP."""%location,
439 or instruct your controller to listen on an external IP."""%location,
440 RuntimeWarning)
440 RuntimeWarning)
441 elif not sshserver:
441 elif not sshserver:
442 # otherwise sync with cfg
442 # otherwise sync with cfg
443 sshserver = cfg['ssh']
443 sshserver = cfg['ssh']
444
444
445 self._config = cfg
445 self._config = cfg
446
446
447 self._ssh = bool(sshserver or sshkey or password)
447 self._ssh = bool(sshserver or sshkey or password)
448 if self._ssh and sshserver is None:
448 if self._ssh and sshserver is None:
449 # default to ssh via localhost
449 # default to ssh via localhost
450 sshserver = url.split('://')[1].split(':')[0]
450 sshserver = url.split('://')[1].split(':')[0]
451 if self._ssh and password is None:
451 if self._ssh and password is None:
452 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
452 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
453 password=False
453 password=False
454 else:
454 else:
455 password = getpass("SSH Password for %s: "%sshserver)
455 password = getpass("SSH Password for %s: "%sshserver)
456 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
456 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
457
457
458 # configure and construct the session
458 # configure and construct the session
459 if exec_key is not None:
459 if exec_key is not None:
460 if os.path.isfile(exec_key):
460 if os.path.isfile(exec_key):
461 extra_args['keyfile'] = exec_key
461 extra_args['keyfile'] = exec_key
462 else:
462 else:
463 exec_key = cast_bytes(exec_key)
463 exec_key = cast_bytes(exec_key)
464 extra_args['key'] = exec_key
464 extra_args['key'] = exec_key
465 self.session = Session(**extra_args)
465 self.session = Session(**extra_args)
466
466
467 self._query_socket = self._context.socket(zmq.DEALER)
467 self._query_socket = self._context.socket(zmq.DEALER)
468 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
468 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
469 if self._ssh:
469 if self._ssh:
470 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
470 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
471 else:
471 else:
472 self._query_socket.connect(url)
472 self._query_socket.connect(url)
473
473
474 self.session.debug = self.debug
474 self.session.debug = self.debug
475
475
476 self._notification_handlers = {'registration_notification' : self._register_engine,
476 self._notification_handlers = {'registration_notification' : self._register_engine,
477 'unregistration_notification' : self._unregister_engine,
477 'unregistration_notification' : self._unregister_engine,
478 'shutdown_notification' : lambda msg: self.close(),
478 'shutdown_notification' : lambda msg: self.close(),
479 }
479 }
480 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
480 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
481 'apply_reply' : self._handle_apply_reply}
481 'apply_reply' : self._handle_apply_reply}
482 self._connect(sshserver, ssh_kwargs, timeout)
482 self._connect(sshserver, ssh_kwargs, timeout)
483
484 # last step: setup magics, if we are in IPython:
485
486 try:
487 ip = get_ipython()
488 except NameError:
489 return
490 else:
491 if 'px' not in ip.magics_manager.magics:
492 # in IPython but we are the first Client.
493 # activate a default view for parallel magics.
494 self.activate()
483
495
484 def __del__(self):
496 def __del__(self):
485 """cleanup sockets, but _not_ context."""
497 """cleanup sockets, but _not_ context."""
486 self.close()
498 self.close()
487
499
488 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
500 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
489 if ipython_dir is None:
501 if ipython_dir is None:
490 ipython_dir = get_ipython_dir()
502 ipython_dir = get_ipython_dir()
491 if profile_dir is not None:
503 if profile_dir is not None:
492 try:
504 try:
493 self._cd = ProfileDir.find_profile_dir(profile_dir)
505 self._cd = ProfileDir.find_profile_dir(profile_dir)
494 return
506 return
495 except ProfileDirError:
507 except ProfileDirError:
496 pass
508 pass
497 elif profile is not None:
509 elif profile is not None:
498 try:
510 try:
499 self._cd = ProfileDir.find_profile_dir_by_name(
511 self._cd = ProfileDir.find_profile_dir_by_name(
500 ipython_dir, profile)
512 ipython_dir, profile)
501 return
513 return
502 except ProfileDirError:
514 except ProfileDirError:
503 pass
515 pass
504 self._cd = None
516 self._cd = None
505
517
506 def _update_engines(self, engines):
518 def _update_engines(self, engines):
507 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
519 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
508 for k,v in engines.iteritems():
520 for k,v in engines.iteritems():
509 eid = int(k)
521 eid = int(k)
510 self._engines[eid] = v
522 self._engines[eid] = v
511 self._ids.append(eid)
523 self._ids.append(eid)
512 self._ids = sorted(self._ids)
524 self._ids = sorted(self._ids)
513 if sorted(self._engines.keys()) != range(len(self._engines)) and \
525 if sorted(self._engines.keys()) != range(len(self._engines)) and \
514 self._task_scheme == 'pure' and self._task_socket:
526 self._task_scheme == 'pure' and self._task_socket:
515 self._stop_scheduling_tasks()
527 self._stop_scheduling_tasks()
516
528
517 def _stop_scheduling_tasks(self):
529 def _stop_scheduling_tasks(self):
518 """Stop scheduling tasks because an engine has been unregistered
530 """Stop scheduling tasks because an engine has been unregistered
519 from a pure ZMQ scheduler.
531 from a pure ZMQ scheduler.
520 """
532 """
521 self._task_socket.close()
533 self._task_socket.close()
522 self._task_socket = None
534 self._task_socket = None
523 msg = "An engine has been unregistered, and we are using pure " +\
535 msg = "An engine has been unregistered, and we are using pure " +\
524 "ZMQ task scheduling. Task farming will be disabled."
536 "ZMQ task scheduling. Task farming will be disabled."
525 if self.outstanding:
537 if self.outstanding:
526 msg += " If you were running tasks when this happened, " +\
538 msg += " If you were running tasks when this happened, " +\
527 "some `outstanding` msg_ids may never resolve."
539 "some `outstanding` msg_ids may never resolve."
528 warnings.warn(msg, RuntimeWarning)
540 warnings.warn(msg, RuntimeWarning)
529
541
530 def _build_targets(self, targets):
542 def _build_targets(self, targets):
531 """Turn valid target IDs or 'all' into two lists:
543 """Turn valid target IDs or 'all' into two lists:
532 (int_ids, uuids).
544 (int_ids, uuids).
533 """
545 """
534 if not self._ids:
546 if not self._ids:
535 # flush notification socket if no engines yet, just in case
547 # flush notification socket if no engines yet, just in case
536 if not self.ids:
548 if not self.ids:
537 raise error.NoEnginesRegistered("Can't build targets without any engines")
549 raise error.NoEnginesRegistered("Can't build targets without any engines")
538
550
539 if targets is None:
551 if targets is None:
540 targets = self._ids
552 targets = self._ids
541 elif isinstance(targets, basestring):
553 elif isinstance(targets, basestring):
542 if targets.lower() == 'all':
554 if targets.lower() == 'all':
543 targets = self._ids
555 targets = self._ids
544 else:
556 else:
545 raise TypeError("%r not valid str target, must be 'all'"%(targets))
557 raise TypeError("%r not valid str target, must be 'all'"%(targets))
546 elif isinstance(targets, int):
558 elif isinstance(targets, int):
547 if targets < 0:
559 if targets < 0:
548 targets = self.ids[targets]
560 targets = self.ids[targets]
549 if targets not in self._ids:
561 if targets not in self._ids:
550 raise IndexError("No such engine: %i"%targets)
562 raise IndexError("No such engine: %i"%targets)
551 targets = [targets]
563 targets = [targets]
552
564
553 if isinstance(targets, slice):
565 if isinstance(targets, slice):
554 indices = range(len(self._ids))[targets]
566 indices = range(len(self._ids))[targets]
555 ids = self.ids
567 ids = self.ids
556 targets = [ ids[i] for i in indices ]
568 targets = [ ids[i] for i in indices ]
557
569
558 if not isinstance(targets, (tuple, list, xrange)):
570 if not isinstance(targets, (tuple, list, xrange)):
559 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
571 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
560
572
561 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
573 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
562
574
563 def _connect(self, sshserver, ssh_kwargs, timeout):
575 def _connect(self, sshserver, ssh_kwargs, timeout):
564 """setup all our socket connections to the cluster. This is called from
576 """setup all our socket connections to the cluster. This is called from
565 __init__."""
577 __init__."""
566
578
567 # Maybe allow reconnecting?
579 # Maybe allow reconnecting?
568 if self._connected:
580 if self._connected:
569 return
581 return
570 self._connected=True
582 self._connected=True
571
583
572 def connect_socket(s, url):
584 def connect_socket(s, url):
573 url = util.disambiguate_url(url, self._config['location'])
585 url = util.disambiguate_url(url, self._config['location'])
574 if self._ssh:
586 if self._ssh:
575 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
587 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
576 else:
588 else:
577 return s.connect(url)
589 return s.connect(url)
578
590
579 self.session.send(self._query_socket, 'connection_request')
591 self.session.send(self._query_socket, 'connection_request')
580 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
592 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
581 poller = zmq.Poller()
593 poller = zmq.Poller()
582 poller.register(self._query_socket, zmq.POLLIN)
594 poller.register(self._query_socket, zmq.POLLIN)
583 # poll expects milliseconds, timeout is seconds
595 # poll expects milliseconds, timeout is seconds
584 evts = poller.poll(timeout*1000)
596 evts = poller.poll(timeout*1000)
585 if not evts:
597 if not evts:
586 raise error.TimeoutError("Hub connection request timed out")
598 raise error.TimeoutError("Hub connection request timed out")
587 idents,msg = self.session.recv(self._query_socket,mode=0)
599 idents,msg = self.session.recv(self._query_socket,mode=0)
588 if self.debug:
600 if self.debug:
589 pprint(msg)
601 pprint(msg)
590 msg = Message(msg)
602 msg = Message(msg)
591 content = msg.content
603 content = msg.content
592 self._config['registration'] = dict(content)
604 self._config['registration'] = dict(content)
593 if content.status == 'ok':
605 if content.status == 'ok':
594 ident = self.session.bsession
606 ident = self.session.bsession
595 if content.mux:
607 if content.mux:
596 self._mux_socket = self._context.socket(zmq.DEALER)
608 self._mux_socket = self._context.socket(zmq.DEALER)
597 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
609 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
598 connect_socket(self._mux_socket, content.mux)
610 connect_socket(self._mux_socket, content.mux)
599 if content.task:
611 if content.task:
600 self._task_scheme, task_addr = content.task
612 self._task_scheme, task_addr = content.task
601 self._task_socket = self._context.socket(zmq.DEALER)
613 self._task_socket = self._context.socket(zmq.DEALER)
602 self._task_socket.setsockopt(zmq.IDENTITY, ident)
614 self._task_socket.setsockopt(zmq.IDENTITY, ident)
603 connect_socket(self._task_socket, task_addr)
615 connect_socket(self._task_socket, task_addr)
604 if content.notification:
616 if content.notification:
605 self._notification_socket = self._context.socket(zmq.SUB)
617 self._notification_socket = self._context.socket(zmq.SUB)
606 connect_socket(self._notification_socket, content.notification)
618 connect_socket(self._notification_socket, content.notification)
607 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
619 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
608 # if content.query:
620 # if content.query:
609 # self._query_socket = self._context.socket(zmq.DEALER)
621 # self._query_socket = self._context.socket(zmq.DEALER)
610 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
622 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
611 # connect_socket(self._query_socket, content.query)
623 # connect_socket(self._query_socket, content.query)
612 if content.control:
624 if content.control:
613 self._control_socket = self._context.socket(zmq.DEALER)
625 self._control_socket = self._context.socket(zmq.DEALER)
614 self._control_socket.setsockopt(zmq.IDENTITY, ident)
626 self._control_socket.setsockopt(zmq.IDENTITY, ident)
615 connect_socket(self._control_socket, content.control)
627 connect_socket(self._control_socket, content.control)
616 if content.iopub:
628 if content.iopub:
617 self._iopub_socket = self._context.socket(zmq.SUB)
629 self._iopub_socket = self._context.socket(zmq.SUB)
618 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
630 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
619 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
631 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
620 connect_socket(self._iopub_socket, content.iopub)
632 connect_socket(self._iopub_socket, content.iopub)
621 self._update_engines(dict(content.engines))
633 self._update_engines(dict(content.engines))
622 else:
634 else:
623 self._connected = False
635 self._connected = False
624 raise Exception("Failed to connect!")
636 raise Exception("Failed to connect!")
625
637
626 #--------------------------------------------------------------------------
638 #--------------------------------------------------------------------------
627 # handlers and callbacks for incoming messages
639 # handlers and callbacks for incoming messages
628 #--------------------------------------------------------------------------
640 #--------------------------------------------------------------------------
629
641
630 def _unwrap_exception(self, content):
642 def _unwrap_exception(self, content):
631 """unwrap exception, and remap engine_id to int."""
643 """unwrap exception, and remap engine_id to int."""
632 e = error.unwrap_exception(content)
644 e = error.unwrap_exception(content)
633 # print e.traceback
645 # print e.traceback
634 if e.engine_info:
646 if e.engine_info:
635 e_uuid = e.engine_info['engine_uuid']
647 e_uuid = e.engine_info['engine_uuid']
636 eid = self._engines[e_uuid]
648 eid = self._engines[e_uuid]
637 e.engine_info['engine_id'] = eid
649 e.engine_info['engine_id'] = eid
638 return e
650 return e
639
651
640 def _extract_metadata(self, header, parent, content):
652 def _extract_metadata(self, header, parent, content):
641 md = {'msg_id' : parent['msg_id'],
653 md = {'msg_id' : parent['msg_id'],
642 'received' : datetime.now(),
654 'received' : datetime.now(),
643 'engine_uuid' : header.get('engine', None),
655 'engine_uuid' : header.get('engine', None),
644 'follow' : parent.get('follow', []),
656 'follow' : parent.get('follow', []),
645 'after' : parent.get('after', []),
657 'after' : parent.get('after', []),
646 'status' : content['status'],
658 'status' : content['status'],
647 }
659 }
648
660
649 if md['engine_uuid'] is not None:
661 if md['engine_uuid'] is not None:
650 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
662 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
651
663
652 if 'date' in parent:
664 if 'date' in parent:
653 md['submitted'] = parent['date']
665 md['submitted'] = parent['date']
654 if 'started' in header:
666 if 'started' in header:
655 md['started'] = header['started']
667 md['started'] = header['started']
656 if 'date' in header:
668 if 'date' in header:
657 md['completed'] = header['date']
669 md['completed'] = header['date']
658 return md
670 return md
659
671
660 def _register_engine(self, msg):
672 def _register_engine(self, msg):
661 """Register a new engine, and update our connection info."""
673 """Register a new engine, and update our connection info."""
662 content = msg['content']
674 content = msg['content']
663 eid = content['id']
675 eid = content['id']
664 d = {eid : content['queue']}
676 d = {eid : content['queue']}
665 self._update_engines(d)
677 self._update_engines(d)
666
678
667 def _unregister_engine(self, msg):
679 def _unregister_engine(self, msg):
668 """Unregister an engine that has died."""
680 """Unregister an engine that has died."""
669 content = msg['content']
681 content = msg['content']
670 eid = int(content['id'])
682 eid = int(content['id'])
671 if eid in self._ids:
683 if eid in self._ids:
672 self._ids.remove(eid)
684 self._ids.remove(eid)
673 uuid = self._engines.pop(eid)
685 uuid = self._engines.pop(eid)
674
686
675 self._handle_stranded_msgs(eid, uuid)
687 self._handle_stranded_msgs(eid, uuid)
676
688
677 if self._task_socket and self._task_scheme == 'pure':
689 if self._task_socket and self._task_scheme == 'pure':
678 self._stop_scheduling_tasks()
690 self._stop_scheduling_tasks()
679
691
680 def _handle_stranded_msgs(self, eid, uuid):
692 def _handle_stranded_msgs(self, eid, uuid):
681 """Handle messages known to be on an engine when the engine unregisters.
693 """Handle messages known to be on an engine when the engine unregisters.
682
694
683 It is possible that this will fire prematurely - that is, an engine will
695 It is possible that this will fire prematurely - that is, an engine will
684 go down after completing a result, and the client will be notified
696 go down after completing a result, and the client will be notified
685 of the unregistration and later receive the successful result.
697 of the unregistration and later receive the successful result.
686 """
698 """
687
699
688 outstanding = self._outstanding_dict[uuid]
700 outstanding = self._outstanding_dict[uuid]
689
701
690 for msg_id in list(outstanding):
702 for msg_id in list(outstanding):
691 if msg_id in self.results:
703 if msg_id in self.results:
692 # we already
704 # we already
693 continue
705 continue
694 try:
706 try:
695 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
707 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
696 except:
708 except:
697 content = error.wrap_exception()
709 content = error.wrap_exception()
698 # build a fake message:
710 # build a fake message:
699 parent = {}
711 parent = {}
700 header = {}
712 header = {}
701 parent['msg_id'] = msg_id
713 parent['msg_id'] = msg_id
702 header['engine'] = uuid
714 header['engine'] = uuid
703 header['date'] = datetime.now()
715 header['date'] = datetime.now()
704 msg = dict(parent_header=parent, header=header, content=content)
716 msg = dict(parent_header=parent, header=header, content=content)
705 self._handle_apply_reply(msg)
717 self._handle_apply_reply(msg)
706
718
707 def _handle_execute_reply(self, msg):
719 def _handle_execute_reply(self, msg):
708 """Save the reply to an execute_request into our results.
720 """Save the reply to an execute_request into our results.
709
721
710 execute messages are never actually used. apply is used instead.
722 execute messages are never actually used. apply is used instead.
711 """
723 """
712
724
713 parent = msg['parent_header']
725 parent = msg['parent_header']
714 msg_id = parent['msg_id']
726 msg_id = parent['msg_id']
715 if msg_id not in self.outstanding:
727 if msg_id not in self.outstanding:
716 if msg_id in self.history:
728 if msg_id in self.history:
717 print ("got stale result: %s"%msg_id)
729 print ("got stale result: %s"%msg_id)
718 else:
730 else:
719 print ("got unknown result: %s"%msg_id)
731 print ("got unknown result: %s"%msg_id)
720 else:
732 else:
721 self.outstanding.remove(msg_id)
733 self.outstanding.remove(msg_id)
722
734
723 content = msg['content']
735 content = msg['content']
724 header = msg['header']
736 header = msg['header']
725
737
726 # construct metadata:
738 # construct metadata:
727 md = self.metadata[msg_id]
739 md = self.metadata[msg_id]
728 md.update(self._extract_metadata(header, parent, content))
740 md.update(self._extract_metadata(header, parent, content))
729 # is this redundant?
741 # is this redundant?
730 self.metadata[msg_id] = md
742 self.metadata[msg_id] = md
731
743
732 e_outstanding = self._outstanding_dict[md['engine_uuid']]
744 e_outstanding = self._outstanding_dict[md['engine_uuid']]
733 if msg_id in e_outstanding:
745 if msg_id in e_outstanding:
734 e_outstanding.remove(msg_id)
746 e_outstanding.remove(msg_id)
735
747
736 # construct result:
748 # construct result:
737 if content['status'] == 'ok':
749 if content['status'] == 'ok':
738 self.results[msg_id] = ExecuteReply(msg_id, content, md)
750 self.results[msg_id] = ExecuteReply(msg_id, content, md)
739 elif content['status'] == 'aborted':
751 elif content['status'] == 'aborted':
740 self.results[msg_id] = error.TaskAborted(msg_id)
752 self.results[msg_id] = error.TaskAborted(msg_id)
741 elif content['status'] == 'resubmitted':
753 elif content['status'] == 'resubmitted':
742 # TODO: handle resubmission
754 # TODO: handle resubmission
743 pass
755 pass
744 else:
756 else:
745 self.results[msg_id] = self._unwrap_exception(content)
757 self.results[msg_id] = self._unwrap_exception(content)
746
758
747 def _handle_apply_reply(self, msg):
759 def _handle_apply_reply(self, msg):
748 """Save the reply to an apply_request into our results."""
760 """Save the reply to an apply_request into our results."""
749 parent = msg['parent_header']
761 parent = msg['parent_header']
750 msg_id = parent['msg_id']
762 msg_id = parent['msg_id']
751 if msg_id not in self.outstanding:
763 if msg_id not in self.outstanding:
752 if msg_id in self.history:
764 if msg_id in self.history:
753 print ("got stale result: %s"%msg_id)
765 print ("got stale result: %s"%msg_id)
754 print self.results[msg_id]
766 print self.results[msg_id]
755 print msg
767 print msg
756 else:
768 else:
757 print ("got unknown result: %s"%msg_id)
769 print ("got unknown result: %s"%msg_id)
758 else:
770 else:
759 self.outstanding.remove(msg_id)
771 self.outstanding.remove(msg_id)
760 content = msg['content']
772 content = msg['content']
761 header = msg['header']
773 header = msg['header']
762
774
763 # construct metadata:
775 # construct metadata:
764 md = self.metadata[msg_id]
776 md = self.metadata[msg_id]
765 md.update(self._extract_metadata(header, parent, content))
777 md.update(self._extract_metadata(header, parent, content))
766 # is this redundant?
778 # is this redundant?
767 self.metadata[msg_id] = md
779 self.metadata[msg_id] = md
768
780
769 e_outstanding = self._outstanding_dict[md['engine_uuid']]
781 e_outstanding = self._outstanding_dict[md['engine_uuid']]
770 if msg_id in e_outstanding:
782 if msg_id in e_outstanding:
771 e_outstanding.remove(msg_id)
783 e_outstanding.remove(msg_id)
772
784
773 # construct result:
785 # construct result:
774 if content['status'] == 'ok':
786 if content['status'] == 'ok':
775 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
787 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
776 elif content['status'] == 'aborted':
788 elif content['status'] == 'aborted':
777 self.results[msg_id] = error.TaskAborted(msg_id)
789 self.results[msg_id] = error.TaskAborted(msg_id)
778 elif content['status'] == 'resubmitted':
790 elif content['status'] == 'resubmitted':
779 # TODO: handle resubmission
791 # TODO: handle resubmission
780 pass
792 pass
781 else:
793 else:
782 self.results[msg_id] = self._unwrap_exception(content)
794 self.results[msg_id] = self._unwrap_exception(content)
783
795
784 def _flush_notifications(self):
796 def _flush_notifications(self):
785 """Flush notifications of engine registrations waiting
797 """Flush notifications of engine registrations waiting
786 in ZMQ queue."""
798 in ZMQ queue."""
787 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
799 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
788 while msg is not None:
800 while msg is not None:
789 if self.debug:
801 if self.debug:
790 pprint(msg)
802 pprint(msg)
791 msg_type = msg['header']['msg_type']
803 msg_type = msg['header']['msg_type']
792 handler = self._notification_handlers.get(msg_type, None)
804 handler = self._notification_handlers.get(msg_type, None)
793 if handler is None:
805 if handler is None:
794 raise Exception("Unhandled message type: %s"%msg.msg_type)
806 raise Exception("Unhandled message type: %s"%msg.msg_type)
795 else:
807 else:
796 handler(msg)
808 handler(msg)
797 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
809 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
798
810
799 def _flush_results(self, sock):
811 def _flush_results(self, sock):
800 """Flush task or queue results waiting in ZMQ queue."""
812 """Flush task or queue results waiting in ZMQ queue."""
801 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
813 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
802 while msg is not None:
814 while msg is not None:
803 if self.debug:
815 if self.debug:
804 pprint(msg)
816 pprint(msg)
805 msg_type = msg['header']['msg_type']
817 msg_type = msg['header']['msg_type']
806 handler = self._queue_handlers.get(msg_type, None)
818 handler = self._queue_handlers.get(msg_type, None)
807 if handler is None:
819 if handler is None:
808 raise Exception("Unhandled message type: %s"%msg.msg_type)
820 raise Exception("Unhandled message type: %s"%msg.msg_type)
809 else:
821 else:
810 handler(msg)
822 handler(msg)
811 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
823 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
812
824
813 def _flush_control(self, sock):
825 def _flush_control(self, sock):
814 """Flush replies from the control channel waiting
826 """Flush replies from the control channel waiting
815 in the ZMQ queue.
827 in the ZMQ queue.
816
828
817 Currently: ignore them."""
829 Currently: ignore them."""
818 if self._ignored_control_replies <= 0:
830 if self._ignored_control_replies <= 0:
819 return
831 return
820 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
832 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
821 while msg is not None:
833 while msg is not None:
822 self._ignored_control_replies -= 1
834 self._ignored_control_replies -= 1
823 if self.debug:
835 if self.debug:
824 pprint(msg)
836 pprint(msg)
825 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
837 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
826
838
827 def _flush_ignored_control(self):
839 def _flush_ignored_control(self):
828 """flush ignored control replies"""
840 """flush ignored control replies"""
829 while self._ignored_control_replies > 0:
841 while self._ignored_control_replies > 0:
830 self.session.recv(self._control_socket)
842 self.session.recv(self._control_socket)
831 self._ignored_control_replies -= 1
843 self._ignored_control_replies -= 1
832
844
833 def _flush_ignored_hub_replies(self):
845 def _flush_ignored_hub_replies(self):
834 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
846 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
835 while msg is not None:
847 while msg is not None:
836 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
848 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
837
849
838 def _flush_iopub(self, sock):
850 def _flush_iopub(self, sock):
839 """Flush replies from the iopub channel waiting
851 """Flush replies from the iopub channel waiting
840 in the ZMQ queue.
852 in the ZMQ queue.
841 """
853 """
842 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
854 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
843 while msg is not None:
855 while msg is not None:
844 if self.debug:
856 if self.debug:
845 pprint(msg)
857 pprint(msg)
846 parent = msg['parent_header']
858 parent = msg['parent_header']
847 # ignore IOPub messages with no parent.
859 # ignore IOPub messages with no parent.
848 # Caused by print statements or warnings from before the first execution.
860 # Caused by print statements or warnings from before the first execution.
849 if not parent:
861 if not parent:
850 continue
862 continue
851 msg_id = parent['msg_id']
863 msg_id = parent['msg_id']
852 content = msg['content']
864 content = msg['content']
853 header = msg['header']
865 header = msg['header']
854 msg_type = msg['header']['msg_type']
866 msg_type = msg['header']['msg_type']
855
867
856 # init metadata:
868 # init metadata:
857 md = self.metadata[msg_id]
869 md = self.metadata[msg_id]
858
870
859 if msg_type == 'stream':
871 if msg_type == 'stream':
860 name = content['name']
872 name = content['name']
861 s = md[name] or ''
873 s = md[name] or ''
862 md[name] = s + content['data']
874 md[name] = s + content['data']
863 elif msg_type == 'pyerr':
875 elif msg_type == 'pyerr':
864 md.update({'pyerr' : self._unwrap_exception(content)})
876 md.update({'pyerr' : self._unwrap_exception(content)})
865 elif msg_type == 'pyin':
877 elif msg_type == 'pyin':
866 md.update({'pyin' : content['code']})
878 md.update({'pyin' : content['code']})
867 elif msg_type == 'display_data':
879 elif msg_type == 'display_data':
868 md['outputs'].append(content)
880 md['outputs'].append(content)
869 elif msg_type == 'pyout':
881 elif msg_type == 'pyout':
870 md['pyout'] = content
882 md['pyout'] = content
871 else:
883 else:
872 # unhandled msg_type (status, etc.)
884 # unhandled msg_type (status, etc.)
873 pass
885 pass
874
886
875 # reduntant?
887 # reduntant?
876 self.metadata[msg_id] = md
888 self.metadata[msg_id] = md
877
889
878 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
890 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
879
891
880 #--------------------------------------------------------------------------
892 #--------------------------------------------------------------------------
881 # len, getitem
893 # len, getitem
882 #--------------------------------------------------------------------------
894 #--------------------------------------------------------------------------
883
895
884 def __len__(self):
896 def __len__(self):
885 """len(client) returns # of engines."""
897 """len(client) returns # of engines."""
886 return len(self.ids)
898 return len(self.ids)
887
899
888 def __getitem__(self, key):
900 def __getitem__(self, key):
889 """index access returns DirectView multiplexer objects
901 """index access returns DirectView multiplexer objects
890
902
891 Must be int, slice, or list/tuple/xrange of ints"""
903 Must be int, slice, or list/tuple/xrange of ints"""
892 if not isinstance(key, (int, slice, tuple, list, xrange)):
904 if not isinstance(key, (int, slice, tuple, list, xrange)):
893 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
905 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
894 else:
906 else:
895 return self.direct_view(key)
907 return self.direct_view(key)
896
908
897 #--------------------------------------------------------------------------
909 #--------------------------------------------------------------------------
898 # Begin public methods
910 # Begin public methods
899 #--------------------------------------------------------------------------
911 #--------------------------------------------------------------------------
900
912
901 @property
913 @property
902 def ids(self):
914 def ids(self):
903 """Always up-to-date ids property."""
915 """Always up-to-date ids property."""
904 self._flush_notifications()
916 self._flush_notifications()
905 # always copy:
917 # always copy:
906 return list(self._ids)
918 return list(self._ids)
907
919
920 def activate(self, targets='all', suffix=''):
921 """Create a DirectView and register it with IPython magics
922
923 Defines the magics `%px, %autopx, %pxresult, %%px`
924
925 Parameters
926 ----------
927
928 targets: int, list of ints, or 'all'
929 The engines on which the view's magics will run
930 suffix: str [default: '']
931 The suffix, if any, for the magics. This allows you to have
932 multiple views associated with parallel magics at the same time.
933
934 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
935 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
936 on engine 0.
937 """
938 view = self.direct_view(targets)
939 view.block = True
940 view.activate(suffix)
941 return view
942
908 def close(self):
943 def close(self):
909 if self._closed:
944 if self._closed:
910 return
945 return
911 self.stop_spin_thread()
946 self.stop_spin_thread()
912 snames = filter(lambda n: n.endswith('socket'), dir(self))
947 snames = filter(lambda n: n.endswith('socket'), dir(self))
913 for socket in map(lambda name: getattr(self, name), snames):
948 for socket in map(lambda name: getattr(self, name), snames):
914 if isinstance(socket, zmq.Socket) and not socket.closed:
949 if isinstance(socket, zmq.Socket) and not socket.closed:
915 socket.close()
950 socket.close()
916 self._closed = True
951 self._closed = True
917
952
918 def _spin_every(self, interval=1):
953 def _spin_every(self, interval=1):
919 """target func for use in spin_thread"""
954 """target func for use in spin_thread"""
920 while True:
955 while True:
921 if self._stop_spinning.is_set():
956 if self._stop_spinning.is_set():
922 return
957 return
923 time.sleep(interval)
958 time.sleep(interval)
924 self.spin()
959 self.spin()
925
960
926 def spin_thread(self, interval=1):
961 def spin_thread(self, interval=1):
927 """call Client.spin() in a background thread on some regular interval
962 """call Client.spin() in a background thread on some regular interval
928
963
929 This helps ensure that messages don't pile up too much in the zmq queue
964 This helps ensure that messages don't pile up too much in the zmq queue
930 while you are working on other things, or just leaving an idle terminal.
965 while you are working on other things, or just leaving an idle terminal.
931
966
932 It also helps limit potential padding of the `received` timestamp
967 It also helps limit potential padding of the `received` timestamp
933 on AsyncResult objects, used for timings.
968 on AsyncResult objects, used for timings.
934
969
935 Parameters
970 Parameters
936 ----------
971 ----------
937
972
938 interval : float, optional
973 interval : float, optional
939 The interval on which to spin the client in the background thread
974 The interval on which to spin the client in the background thread
940 (simply passed to time.sleep).
975 (simply passed to time.sleep).
941
976
942 Notes
977 Notes
943 -----
978 -----
944
979
945 For precision timing, you may want to use this method to put a bound
980 For precision timing, you may want to use this method to put a bound
946 on the jitter (in seconds) in `received` timestamps used
981 on the jitter (in seconds) in `received` timestamps used
947 in AsyncResult.wall_time.
982 in AsyncResult.wall_time.
948
983
949 """
984 """
950 if self._spin_thread is not None:
985 if self._spin_thread is not None:
951 self.stop_spin_thread()
986 self.stop_spin_thread()
952 self._stop_spinning.clear()
987 self._stop_spinning.clear()
953 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
988 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
954 self._spin_thread.daemon = True
989 self._spin_thread.daemon = True
955 self._spin_thread.start()
990 self._spin_thread.start()
956
991
957 def stop_spin_thread(self):
992 def stop_spin_thread(self):
958 """stop background spin_thread, if any"""
993 """stop background spin_thread, if any"""
959 if self._spin_thread is not None:
994 if self._spin_thread is not None:
960 self._stop_spinning.set()
995 self._stop_spinning.set()
961 self._spin_thread.join()
996 self._spin_thread.join()
962 self._spin_thread = None
997 self._spin_thread = None
963
998
964 def spin(self):
999 def spin(self):
965 """Flush any registration notifications and execution results
1000 """Flush any registration notifications and execution results
966 waiting in the ZMQ queue.
1001 waiting in the ZMQ queue.
967 """
1002 """
968 if self._notification_socket:
1003 if self._notification_socket:
969 self._flush_notifications()
1004 self._flush_notifications()
970 if self._iopub_socket:
1005 if self._iopub_socket:
971 self._flush_iopub(self._iopub_socket)
1006 self._flush_iopub(self._iopub_socket)
972 if self._mux_socket:
1007 if self._mux_socket:
973 self._flush_results(self._mux_socket)
1008 self._flush_results(self._mux_socket)
974 if self._task_socket:
1009 if self._task_socket:
975 self._flush_results(self._task_socket)
1010 self._flush_results(self._task_socket)
976 if self._control_socket:
1011 if self._control_socket:
977 self._flush_control(self._control_socket)
1012 self._flush_control(self._control_socket)
978 if self._query_socket:
1013 if self._query_socket:
979 self._flush_ignored_hub_replies()
1014 self._flush_ignored_hub_replies()
980
1015
981 def wait(self, jobs=None, timeout=-1):
1016 def wait(self, jobs=None, timeout=-1):
982 """waits on one or more `jobs`, for up to `timeout` seconds.
1017 """waits on one or more `jobs`, for up to `timeout` seconds.
983
1018
984 Parameters
1019 Parameters
985 ----------
1020 ----------
986
1021
987 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1022 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
988 ints are indices to self.history
1023 ints are indices to self.history
989 strs are msg_ids
1024 strs are msg_ids
990 default: wait on all outstanding messages
1025 default: wait on all outstanding messages
991 timeout : float
1026 timeout : float
992 a time in seconds, after which to give up.
1027 a time in seconds, after which to give up.
993 default is -1, which means no timeout
1028 default is -1, which means no timeout
994
1029
995 Returns
1030 Returns
996 -------
1031 -------
997
1032
998 True : when all msg_ids are done
1033 True : when all msg_ids are done
999 False : timeout reached, some msg_ids still outstanding
1034 False : timeout reached, some msg_ids still outstanding
1000 """
1035 """
1001 tic = time.time()
1036 tic = time.time()
1002 if jobs is None:
1037 if jobs is None:
1003 theids = self.outstanding
1038 theids = self.outstanding
1004 else:
1039 else:
1005 if isinstance(jobs, (int, basestring, AsyncResult)):
1040 if isinstance(jobs, (int, basestring, AsyncResult)):
1006 jobs = [jobs]
1041 jobs = [jobs]
1007 theids = set()
1042 theids = set()
1008 for job in jobs:
1043 for job in jobs:
1009 if isinstance(job, int):
1044 if isinstance(job, int):
1010 # index access
1045 # index access
1011 job = self.history[job]
1046 job = self.history[job]
1012 elif isinstance(job, AsyncResult):
1047 elif isinstance(job, AsyncResult):
1013 map(theids.add, job.msg_ids)
1048 map(theids.add, job.msg_ids)
1014 continue
1049 continue
1015 theids.add(job)
1050 theids.add(job)
1016 if not theids.intersection(self.outstanding):
1051 if not theids.intersection(self.outstanding):
1017 return True
1052 return True
1018 self.spin()
1053 self.spin()
1019 while theids.intersection(self.outstanding):
1054 while theids.intersection(self.outstanding):
1020 if timeout >= 0 and ( time.time()-tic ) > timeout:
1055 if timeout >= 0 and ( time.time()-tic ) > timeout:
1021 break
1056 break
1022 time.sleep(1e-3)
1057 time.sleep(1e-3)
1023 self.spin()
1058 self.spin()
1024 return len(theids.intersection(self.outstanding)) == 0
1059 return len(theids.intersection(self.outstanding)) == 0
1025
1060
1026 #--------------------------------------------------------------------------
1061 #--------------------------------------------------------------------------
1027 # Control methods
1062 # Control methods
1028 #--------------------------------------------------------------------------
1063 #--------------------------------------------------------------------------
1029
1064
1030 @spin_first
1065 @spin_first
1031 def clear(self, targets=None, block=None):
1066 def clear(self, targets=None, block=None):
1032 """Clear the namespace in target(s)."""
1067 """Clear the namespace in target(s)."""
1033 block = self.block if block is None else block
1068 block = self.block if block is None else block
1034 targets = self._build_targets(targets)[0]
1069 targets = self._build_targets(targets)[0]
1035 for t in targets:
1070 for t in targets:
1036 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1071 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1037 error = False
1072 error = False
1038 if block:
1073 if block:
1039 self._flush_ignored_control()
1074 self._flush_ignored_control()
1040 for i in range(len(targets)):
1075 for i in range(len(targets)):
1041 idents,msg = self.session.recv(self._control_socket,0)
1076 idents,msg = self.session.recv(self._control_socket,0)
1042 if self.debug:
1077 if self.debug:
1043 pprint(msg)
1078 pprint(msg)
1044 if msg['content']['status'] != 'ok':
1079 if msg['content']['status'] != 'ok':
1045 error = self._unwrap_exception(msg['content'])
1080 error = self._unwrap_exception(msg['content'])
1046 else:
1081 else:
1047 self._ignored_control_replies += len(targets)
1082 self._ignored_control_replies += len(targets)
1048 if error:
1083 if error:
1049 raise error
1084 raise error
1050
1085
1051
1086
1052 @spin_first
1087 @spin_first
1053 def abort(self, jobs=None, targets=None, block=None):
1088 def abort(self, jobs=None, targets=None, block=None):
1054 """Abort specific jobs from the execution queues of target(s).
1089 """Abort specific jobs from the execution queues of target(s).
1055
1090
1056 This is a mechanism to prevent jobs that have already been submitted
1091 This is a mechanism to prevent jobs that have already been submitted
1057 from executing.
1092 from executing.
1058
1093
1059 Parameters
1094 Parameters
1060 ----------
1095 ----------
1061
1096
1062 jobs : msg_id, list of msg_ids, or AsyncResult
1097 jobs : msg_id, list of msg_ids, or AsyncResult
1063 The jobs to be aborted
1098 The jobs to be aborted
1064
1099
1065 If unspecified/None: abort all outstanding jobs.
1100 If unspecified/None: abort all outstanding jobs.
1066
1101
1067 """
1102 """
1068 block = self.block if block is None else block
1103 block = self.block if block is None else block
1069 jobs = jobs if jobs is not None else list(self.outstanding)
1104 jobs = jobs if jobs is not None else list(self.outstanding)
1070 targets = self._build_targets(targets)[0]
1105 targets = self._build_targets(targets)[0]
1071
1106
1072 msg_ids = []
1107 msg_ids = []
1073 if isinstance(jobs, (basestring,AsyncResult)):
1108 if isinstance(jobs, (basestring,AsyncResult)):
1074 jobs = [jobs]
1109 jobs = [jobs]
1075 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1110 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1076 if bad_ids:
1111 if bad_ids:
1077 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1112 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1078 for j in jobs:
1113 for j in jobs:
1079 if isinstance(j, AsyncResult):
1114 if isinstance(j, AsyncResult):
1080 msg_ids.extend(j.msg_ids)
1115 msg_ids.extend(j.msg_ids)
1081 else:
1116 else:
1082 msg_ids.append(j)
1117 msg_ids.append(j)
1083 content = dict(msg_ids=msg_ids)
1118 content = dict(msg_ids=msg_ids)
1084 for t in targets:
1119 for t in targets:
1085 self.session.send(self._control_socket, 'abort_request',
1120 self.session.send(self._control_socket, 'abort_request',
1086 content=content, ident=t)
1121 content=content, ident=t)
1087 error = False
1122 error = False
1088 if block:
1123 if block:
1089 self._flush_ignored_control()
1124 self._flush_ignored_control()
1090 for i in range(len(targets)):
1125 for i in range(len(targets)):
1091 idents,msg = self.session.recv(self._control_socket,0)
1126 idents,msg = self.session.recv(self._control_socket,0)
1092 if self.debug:
1127 if self.debug:
1093 pprint(msg)
1128 pprint(msg)
1094 if msg['content']['status'] != 'ok':
1129 if msg['content']['status'] != 'ok':
1095 error = self._unwrap_exception(msg['content'])
1130 error = self._unwrap_exception(msg['content'])
1096 else:
1131 else:
1097 self._ignored_control_replies += len(targets)
1132 self._ignored_control_replies += len(targets)
1098 if error:
1133 if error:
1099 raise error
1134 raise error
1100
1135
1101 @spin_first
1136 @spin_first
1102 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1137 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1103 """Terminates one or more engine processes, optionally including the hub."""
1138 """Terminates one or more engine processes, optionally including the hub."""
1104 block = self.block if block is None else block
1139 block = self.block if block is None else block
1105 if hub:
1140 if hub:
1106 targets = 'all'
1141 targets = 'all'
1107 targets = self._build_targets(targets)[0]
1142 targets = self._build_targets(targets)[0]
1108 for t in targets:
1143 for t in targets:
1109 self.session.send(self._control_socket, 'shutdown_request',
1144 self.session.send(self._control_socket, 'shutdown_request',
1110 content={'restart':restart},ident=t)
1145 content={'restart':restart},ident=t)
1111 error = False
1146 error = False
1112 if block or hub:
1147 if block or hub:
1113 self._flush_ignored_control()
1148 self._flush_ignored_control()
1114 for i in range(len(targets)):
1149 for i in range(len(targets)):
1115 idents,msg = self.session.recv(self._control_socket, 0)
1150 idents,msg = self.session.recv(self._control_socket, 0)
1116 if self.debug:
1151 if self.debug:
1117 pprint(msg)
1152 pprint(msg)
1118 if msg['content']['status'] != 'ok':
1153 if msg['content']['status'] != 'ok':
1119 error = self._unwrap_exception(msg['content'])
1154 error = self._unwrap_exception(msg['content'])
1120 else:
1155 else:
1121 self._ignored_control_replies += len(targets)
1156 self._ignored_control_replies += len(targets)
1122
1157
1123 if hub:
1158 if hub:
1124 time.sleep(0.25)
1159 time.sleep(0.25)
1125 self.session.send(self._query_socket, 'shutdown_request')
1160 self.session.send(self._query_socket, 'shutdown_request')
1126 idents,msg = self.session.recv(self._query_socket, 0)
1161 idents,msg = self.session.recv(self._query_socket, 0)
1127 if self.debug:
1162 if self.debug:
1128 pprint(msg)
1163 pprint(msg)
1129 if msg['content']['status'] != 'ok':
1164 if msg['content']['status'] != 'ok':
1130 error = self._unwrap_exception(msg['content'])
1165 error = self._unwrap_exception(msg['content'])
1131
1166
1132 if error:
1167 if error:
1133 raise error
1168 raise error
1134
1169
1135 #--------------------------------------------------------------------------
1170 #--------------------------------------------------------------------------
1136 # Execution related methods
1171 # Execution related methods
1137 #--------------------------------------------------------------------------
1172 #--------------------------------------------------------------------------
1138
1173
1139 def _maybe_raise(self, result):
1174 def _maybe_raise(self, result):
1140 """wrapper for maybe raising an exception if apply failed."""
1175 """wrapper for maybe raising an exception if apply failed."""
1141 if isinstance(result, error.RemoteError):
1176 if isinstance(result, error.RemoteError):
1142 raise result
1177 raise result
1143
1178
1144 return result
1179 return result
1145
1180
1146 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1181 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1147 ident=None):
1182 ident=None):
1148 """construct and send an apply message via a socket.
1183 """construct and send an apply message via a socket.
1149
1184
1150 This is the principal method with which all engine execution is performed by views.
1185 This is the principal method with which all engine execution is performed by views.
1151 """
1186 """
1152
1187
1153 if self._closed:
1188 if self._closed:
1154 raise RuntimeError("Client cannot be used after its sockets have been closed")
1189 raise RuntimeError("Client cannot be used after its sockets have been closed")
1155
1190
1156 # defaults:
1191 # defaults:
1157 args = args if args is not None else []
1192 args = args if args is not None else []
1158 kwargs = kwargs if kwargs is not None else {}
1193 kwargs = kwargs if kwargs is not None else {}
1159 subheader = subheader if subheader is not None else {}
1194 subheader = subheader if subheader is not None else {}
1160
1195
1161 # validate arguments
1196 # validate arguments
1162 if not callable(f) and not isinstance(f, Reference):
1197 if not callable(f) and not isinstance(f, Reference):
1163 raise TypeError("f must be callable, not %s"%type(f))
1198 raise TypeError("f must be callable, not %s"%type(f))
1164 if not isinstance(args, (tuple, list)):
1199 if not isinstance(args, (tuple, list)):
1165 raise TypeError("args must be tuple or list, not %s"%type(args))
1200 raise TypeError("args must be tuple or list, not %s"%type(args))
1166 if not isinstance(kwargs, dict):
1201 if not isinstance(kwargs, dict):
1167 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1202 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1168 if not isinstance(subheader, dict):
1203 if not isinstance(subheader, dict):
1169 raise TypeError("subheader must be dict, not %s"%type(subheader))
1204 raise TypeError("subheader must be dict, not %s"%type(subheader))
1170
1205
1171 bufs = util.pack_apply_message(f,args,kwargs)
1206 bufs = util.pack_apply_message(f,args,kwargs)
1172
1207
1173 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1208 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1174 subheader=subheader, track=track)
1209 subheader=subheader, track=track)
1175
1210
1176 msg_id = msg['header']['msg_id']
1211 msg_id = msg['header']['msg_id']
1177 self.outstanding.add(msg_id)
1212 self.outstanding.add(msg_id)
1178 if ident:
1213 if ident:
1179 # possibly routed to a specific engine
1214 # possibly routed to a specific engine
1180 if isinstance(ident, list):
1215 if isinstance(ident, list):
1181 ident = ident[-1]
1216 ident = ident[-1]
1182 if ident in self._engines.values():
1217 if ident in self._engines.values():
1183 # save for later, in case of engine death
1218 # save for later, in case of engine death
1184 self._outstanding_dict[ident].add(msg_id)
1219 self._outstanding_dict[ident].add(msg_id)
1185 self.history.append(msg_id)
1220 self.history.append(msg_id)
1186 self.metadata[msg_id]['submitted'] = datetime.now()
1221 self.metadata[msg_id]['submitted'] = datetime.now()
1187
1222
1188 return msg
1223 return msg
1189
1224
1190 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1225 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1191 """construct and send an execute request via a socket.
1226 """construct and send an execute request via a socket.
1192
1227
1193 """
1228 """
1194
1229
1195 if self._closed:
1230 if self._closed:
1196 raise RuntimeError("Client cannot be used after its sockets have been closed")
1231 raise RuntimeError("Client cannot be used after its sockets have been closed")
1197
1232
1198 # defaults:
1233 # defaults:
1199 subheader = subheader if subheader is not None else {}
1234 subheader = subheader if subheader is not None else {}
1200
1235
1201 # validate arguments
1236 # validate arguments
1202 if not isinstance(code, basestring):
1237 if not isinstance(code, basestring):
1203 raise TypeError("code must be text, not %s" % type(code))
1238 raise TypeError("code must be text, not %s" % type(code))
1204 if not isinstance(subheader, dict):
1239 if not isinstance(subheader, dict):
1205 raise TypeError("subheader must be dict, not %s" % type(subheader))
1240 raise TypeError("subheader must be dict, not %s" % type(subheader))
1206
1241
1207 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1242 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1208
1243
1209
1244
1210 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1245 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1211 subheader=subheader)
1246 subheader=subheader)
1212
1247
1213 msg_id = msg['header']['msg_id']
1248 msg_id = msg['header']['msg_id']
1214 self.outstanding.add(msg_id)
1249 self.outstanding.add(msg_id)
1215 if ident:
1250 if ident:
1216 # possibly routed to a specific engine
1251 # possibly routed to a specific engine
1217 if isinstance(ident, list):
1252 if isinstance(ident, list):
1218 ident = ident[-1]
1253 ident = ident[-1]
1219 if ident in self._engines.values():
1254 if ident in self._engines.values():
1220 # save for later, in case of engine death
1255 # save for later, in case of engine death
1221 self._outstanding_dict[ident].add(msg_id)
1256 self._outstanding_dict[ident].add(msg_id)
1222 self.history.append(msg_id)
1257 self.history.append(msg_id)
1223 self.metadata[msg_id]['submitted'] = datetime.now()
1258 self.metadata[msg_id]['submitted'] = datetime.now()
1224
1259
1225 return msg
1260 return msg
1226
1261
1227 #--------------------------------------------------------------------------
1262 #--------------------------------------------------------------------------
1228 # construct a View object
1263 # construct a View object
1229 #--------------------------------------------------------------------------
1264 #--------------------------------------------------------------------------
1230
1265
1231 def load_balanced_view(self, targets=None):
1266 def load_balanced_view(self, targets=None):
1232 """construct a DirectView object.
1267 """construct a DirectView object.
1233
1268
1234 If no arguments are specified, create a LoadBalancedView
1269 If no arguments are specified, create a LoadBalancedView
1235 using all engines.
1270 using all engines.
1236
1271
1237 Parameters
1272 Parameters
1238 ----------
1273 ----------
1239
1274
1240 targets: list,slice,int,etc. [default: use all engines]
1275 targets: list,slice,int,etc. [default: use all engines]
1241 The subset of engines across which to load-balance
1276 The subset of engines across which to load-balance
1242 """
1277 """
1243 if targets == 'all':
1278 if targets == 'all':
1244 targets = None
1279 targets = None
1245 if targets is not None:
1280 if targets is not None:
1246 targets = self._build_targets(targets)[1]
1281 targets = self._build_targets(targets)[1]
1247 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1282 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1248
1283
1249 def direct_view(self, targets='all'):
1284 def direct_view(self, targets='all'):
1250 """construct a DirectView object.
1285 """construct a DirectView object.
1251
1286
1252 If no targets are specified, create a DirectView using all engines.
1287 If no targets are specified, create a DirectView using all engines.
1253
1288
1254 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1289 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1255 evaluate the target engines at each execution, whereas rc[:] will connect to
1290 evaluate the target engines at each execution, whereas rc[:] will connect to
1256 all *current* engines, and that list will not change.
1291 all *current* engines, and that list will not change.
1257
1292
1258 That is, 'all' will always use all engines, whereas rc[:] will not use
1293 That is, 'all' will always use all engines, whereas rc[:] will not use
1259 engines added after the DirectView is constructed.
1294 engines added after the DirectView is constructed.
1260
1295
1261 Parameters
1296 Parameters
1262 ----------
1297 ----------
1263
1298
1264 targets: list,slice,int,etc. [default: use all engines]
1299 targets: list,slice,int,etc. [default: use all engines]
1265 The engines to use for the View
1300 The engines to use for the View
1266 """
1301 """
1267 single = isinstance(targets, int)
1302 single = isinstance(targets, int)
1268 # allow 'all' to be lazily evaluated at each execution
1303 # allow 'all' to be lazily evaluated at each execution
1269 if targets != 'all':
1304 if targets != 'all':
1270 targets = self._build_targets(targets)[1]
1305 targets = self._build_targets(targets)[1]
1271 if single:
1306 if single:
1272 targets = targets[0]
1307 targets = targets[0]
1273 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1308 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1274
1309
1275 #--------------------------------------------------------------------------
1310 #--------------------------------------------------------------------------
1276 # Query methods
1311 # Query methods
1277 #--------------------------------------------------------------------------
1312 #--------------------------------------------------------------------------
1278
1313
1279 @spin_first
1314 @spin_first
1280 def get_result(self, indices_or_msg_ids=None, block=None):
1315 def get_result(self, indices_or_msg_ids=None, block=None):
1281 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1316 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1282
1317
1283 If the client already has the results, no request to the Hub will be made.
1318 If the client already has the results, no request to the Hub will be made.
1284
1319
1285 This is a convenient way to construct AsyncResult objects, which are wrappers
1320 This is a convenient way to construct AsyncResult objects, which are wrappers
1286 that include metadata about execution, and allow for awaiting results that
1321 that include metadata about execution, and allow for awaiting results that
1287 were not submitted by this Client.
1322 were not submitted by this Client.
1288
1323
1289 It can also be a convenient way to retrieve the metadata associated with
1324 It can also be a convenient way to retrieve the metadata associated with
1290 blocking execution, since it always retrieves
1325 blocking execution, since it always retrieves
1291
1326
1292 Examples
1327 Examples
1293 --------
1328 --------
1294 ::
1329 ::
1295
1330
1296 In [10]: r = client.apply()
1331 In [10]: r = client.apply()
1297
1332
1298 Parameters
1333 Parameters
1299 ----------
1334 ----------
1300
1335
1301 indices_or_msg_ids : integer history index, str msg_id, or list of either
1336 indices_or_msg_ids : integer history index, str msg_id, or list of either
1302 The indices or msg_ids of indices to be retrieved
1337 The indices or msg_ids of indices to be retrieved
1303
1338
1304 block : bool
1339 block : bool
1305 Whether to wait for the result to be done
1340 Whether to wait for the result to be done
1306
1341
1307 Returns
1342 Returns
1308 -------
1343 -------
1309
1344
1310 AsyncResult
1345 AsyncResult
1311 A single AsyncResult object will always be returned.
1346 A single AsyncResult object will always be returned.
1312
1347
1313 AsyncHubResult
1348 AsyncHubResult
1314 A subclass of AsyncResult that retrieves results from the Hub
1349 A subclass of AsyncResult that retrieves results from the Hub
1315
1350
1316 """
1351 """
1317 block = self.block if block is None else block
1352 block = self.block if block is None else block
1318 if indices_or_msg_ids is None:
1353 if indices_or_msg_ids is None:
1319 indices_or_msg_ids = -1
1354 indices_or_msg_ids = -1
1320
1355
1321 if not isinstance(indices_or_msg_ids, (list,tuple)):
1356 if not isinstance(indices_or_msg_ids, (list,tuple)):
1322 indices_or_msg_ids = [indices_or_msg_ids]
1357 indices_or_msg_ids = [indices_or_msg_ids]
1323
1358
1324 theids = []
1359 theids = []
1325 for id in indices_or_msg_ids:
1360 for id in indices_or_msg_ids:
1326 if isinstance(id, int):
1361 if isinstance(id, int):
1327 id = self.history[id]
1362 id = self.history[id]
1328 if not isinstance(id, basestring):
1363 if not isinstance(id, basestring):
1329 raise TypeError("indices must be str or int, not %r"%id)
1364 raise TypeError("indices must be str or int, not %r"%id)
1330 theids.append(id)
1365 theids.append(id)
1331
1366
1332 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1367 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1333 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1368 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1334
1369
1335 if remote_ids:
1370 if remote_ids:
1336 ar = AsyncHubResult(self, msg_ids=theids)
1371 ar = AsyncHubResult(self, msg_ids=theids)
1337 else:
1372 else:
1338 ar = AsyncResult(self, msg_ids=theids)
1373 ar = AsyncResult(self, msg_ids=theids)
1339
1374
1340 if block:
1375 if block:
1341 ar.wait()
1376 ar.wait()
1342
1377
1343 return ar
1378 return ar
1344
1379
1345 @spin_first
1380 @spin_first
1346 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1381 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1347 """Resubmit one or more tasks.
1382 """Resubmit one or more tasks.
1348
1383
1349 in-flight tasks may not be resubmitted.
1384 in-flight tasks may not be resubmitted.
1350
1385
1351 Parameters
1386 Parameters
1352 ----------
1387 ----------
1353
1388
1354 indices_or_msg_ids : integer history index, str msg_id, or list of either
1389 indices_or_msg_ids : integer history index, str msg_id, or list of either
1355 The indices or msg_ids of indices to be retrieved
1390 The indices or msg_ids of indices to be retrieved
1356
1391
1357 block : bool
1392 block : bool
1358 Whether to wait for the result to be done
1393 Whether to wait for the result to be done
1359
1394
1360 Returns
1395 Returns
1361 -------
1396 -------
1362
1397
1363 AsyncHubResult
1398 AsyncHubResult
1364 A subclass of AsyncResult that retrieves results from the Hub
1399 A subclass of AsyncResult that retrieves results from the Hub
1365
1400
1366 """
1401 """
1367 block = self.block if block is None else block
1402 block = self.block if block is None else block
1368 if indices_or_msg_ids is None:
1403 if indices_or_msg_ids is None:
1369 indices_or_msg_ids = -1
1404 indices_or_msg_ids = -1
1370
1405
1371 if not isinstance(indices_or_msg_ids, (list,tuple)):
1406 if not isinstance(indices_or_msg_ids, (list,tuple)):
1372 indices_or_msg_ids = [indices_or_msg_ids]
1407 indices_or_msg_ids = [indices_or_msg_ids]
1373
1408
1374 theids = []
1409 theids = []
1375 for id in indices_or_msg_ids:
1410 for id in indices_or_msg_ids:
1376 if isinstance(id, int):
1411 if isinstance(id, int):
1377 id = self.history[id]
1412 id = self.history[id]
1378 if not isinstance(id, basestring):
1413 if not isinstance(id, basestring):
1379 raise TypeError("indices must be str or int, not %r"%id)
1414 raise TypeError("indices must be str or int, not %r"%id)
1380 theids.append(id)
1415 theids.append(id)
1381
1416
1382 content = dict(msg_ids = theids)
1417 content = dict(msg_ids = theids)
1383
1418
1384 self.session.send(self._query_socket, 'resubmit_request', content)
1419 self.session.send(self._query_socket, 'resubmit_request', content)
1385
1420
1386 zmq.select([self._query_socket], [], [])
1421 zmq.select([self._query_socket], [], [])
1387 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1422 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1388 if self.debug:
1423 if self.debug:
1389 pprint(msg)
1424 pprint(msg)
1390 content = msg['content']
1425 content = msg['content']
1391 if content['status'] != 'ok':
1426 if content['status'] != 'ok':
1392 raise self._unwrap_exception(content)
1427 raise self._unwrap_exception(content)
1393 mapping = content['resubmitted']
1428 mapping = content['resubmitted']
1394 new_ids = [ mapping[msg_id] for msg_id in theids ]
1429 new_ids = [ mapping[msg_id] for msg_id in theids ]
1395
1430
1396 ar = AsyncHubResult(self, msg_ids=new_ids)
1431 ar = AsyncHubResult(self, msg_ids=new_ids)
1397
1432
1398 if block:
1433 if block:
1399 ar.wait()
1434 ar.wait()
1400
1435
1401 return ar
1436 return ar
1402
1437
1403 @spin_first
1438 @spin_first
1404 def result_status(self, msg_ids, status_only=True):
1439 def result_status(self, msg_ids, status_only=True):
1405 """Check on the status of the result(s) of the apply request with `msg_ids`.
1440 """Check on the status of the result(s) of the apply request with `msg_ids`.
1406
1441
1407 If status_only is False, then the actual results will be retrieved, else
1442 If status_only is False, then the actual results will be retrieved, else
1408 only the status of the results will be checked.
1443 only the status of the results will be checked.
1409
1444
1410 Parameters
1445 Parameters
1411 ----------
1446 ----------
1412
1447
1413 msg_ids : list of msg_ids
1448 msg_ids : list of msg_ids
1414 if int:
1449 if int:
1415 Passed as index to self.history for convenience.
1450 Passed as index to self.history for convenience.
1416 status_only : bool (default: True)
1451 status_only : bool (default: True)
1417 if False:
1452 if False:
1418 Retrieve the actual results of completed tasks.
1453 Retrieve the actual results of completed tasks.
1419
1454
1420 Returns
1455 Returns
1421 -------
1456 -------
1422
1457
1423 results : dict
1458 results : dict
1424 There will always be the keys 'pending' and 'completed', which will
1459 There will always be the keys 'pending' and 'completed', which will
1425 be lists of msg_ids that are incomplete or complete. If `status_only`
1460 be lists of msg_ids that are incomplete or complete. If `status_only`
1426 is False, then completed results will be keyed by their `msg_id`.
1461 is False, then completed results will be keyed by their `msg_id`.
1427 """
1462 """
1428 if not isinstance(msg_ids, (list,tuple)):
1463 if not isinstance(msg_ids, (list,tuple)):
1429 msg_ids = [msg_ids]
1464 msg_ids = [msg_ids]
1430
1465
1431 theids = []
1466 theids = []
1432 for msg_id in msg_ids:
1467 for msg_id in msg_ids:
1433 if isinstance(msg_id, int):
1468 if isinstance(msg_id, int):
1434 msg_id = self.history[msg_id]
1469 msg_id = self.history[msg_id]
1435 if not isinstance(msg_id, basestring):
1470 if not isinstance(msg_id, basestring):
1436 raise TypeError("msg_ids must be str, not %r"%msg_id)
1471 raise TypeError("msg_ids must be str, not %r"%msg_id)
1437 theids.append(msg_id)
1472 theids.append(msg_id)
1438
1473
1439 completed = []
1474 completed = []
1440 local_results = {}
1475 local_results = {}
1441
1476
1442 # comment this block out to temporarily disable local shortcut:
1477 # comment this block out to temporarily disable local shortcut:
1443 for msg_id in theids:
1478 for msg_id in theids:
1444 if msg_id in self.results:
1479 if msg_id in self.results:
1445 completed.append(msg_id)
1480 completed.append(msg_id)
1446 local_results[msg_id] = self.results[msg_id]
1481 local_results[msg_id] = self.results[msg_id]
1447 theids.remove(msg_id)
1482 theids.remove(msg_id)
1448
1483
1449 if theids: # some not locally cached
1484 if theids: # some not locally cached
1450 content = dict(msg_ids=theids, status_only=status_only)
1485 content = dict(msg_ids=theids, status_only=status_only)
1451 msg = self.session.send(self._query_socket, "result_request", content=content)
1486 msg = self.session.send(self._query_socket, "result_request", content=content)
1452 zmq.select([self._query_socket], [], [])
1487 zmq.select([self._query_socket], [], [])
1453 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1488 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1454 if self.debug:
1489 if self.debug:
1455 pprint(msg)
1490 pprint(msg)
1456 content = msg['content']
1491 content = msg['content']
1457 if content['status'] != 'ok':
1492 if content['status'] != 'ok':
1458 raise self._unwrap_exception(content)
1493 raise self._unwrap_exception(content)
1459 buffers = msg['buffers']
1494 buffers = msg['buffers']
1460 else:
1495 else:
1461 content = dict(completed=[],pending=[])
1496 content = dict(completed=[],pending=[])
1462
1497
1463 content['completed'].extend(completed)
1498 content['completed'].extend(completed)
1464
1499
1465 if status_only:
1500 if status_only:
1466 return content
1501 return content
1467
1502
1468 failures = []
1503 failures = []
1469 # load cached results into result:
1504 # load cached results into result:
1470 content.update(local_results)
1505 content.update(local_results)
1471
1506
1472 # update cache with results:
1507 # update cache with results:
1473 for msg_id in sorted(theids):
1508 for msg_id in sorted(theids):
1474 if msg_id in content['completed']:
1509 if msg_id in content['completed']:
1475 rec = content[msg_id]
1510 rec = content[msg_id]
1476 parent = rec['header']
1511 parent = rec['header']
1477 header = rec['result_header']
1512 header = rec['result_header']
1478 rcontent = rec['result_content']
1513 rcontent = rec['result_content']
1479 iodict = rec['io']
1514 iodict = rec['io']
1480 if isinstance(rcontent, str):
1515 if isinstance(rcontent, str):
1481 rcontent = self.session.unpack(rcontent)
1516 rcontent = self.session.unpack(rcontent)
1482
1517
1483 md = self.metadata[msg_id]
1518 md = self.metadata[msg_id]
1484 md.update(self._extract_metadata(header, parent, rcontent))
1519 md.update(self._extract_metadata(header, parent, rcontent))
1485 if rec.get('received'):
1520 if rec.get('received'):
1486 md['received'] = rec['received']
1521 md['received'] = rec['received']
1487 md.update(iodict)
1522 md.update(iodict)
1488
1523
1489 if rcontent['status'] == 'ok':
1524 if rcontent['status'] == 'ok':
1490 res,buffers = util.unserialize_object(buffers)
1525 res,buffers = util.unserialize_object(buffers)
1491 else:
1526 else:
1492 print rcontent
1527 print rcontent
1493 res = self._unwrap_exception(rcontent)
1528 res = self._unwrap_exception(rcontent)
1494 failures.append(res)
1529 failures.append(res)
1495
1530
1496 self.results[msg_id] = res
1531 self.results[msg_id] = res
1497 content[msg_id] = res
1532 content[msg_id] = res
1498
1533
1499 if len(theids) == 1 and failures:
1534 if len(theids) == 1 and failures:
1500 raise failures[0]
1535 raise failures[0]
1501
1536
1502 error.collect_exceptions(failures, "result_status")
1537 error.collect_exceptions(failures, "result_status")
1503 return content
1538 return content
1504
1539
1505 @spin_first
1540 @spin_first
1506 def queue_status(self, targets='all', verbose=False):
1541 def queue_status(self, targets='all', verbose=False):
1507 """Fetch the status of engine queues.
1542 """Fetch the status of engine queues.
1508
1543
1509 Parameters
1544 Parameters
1510 ----------
1545 ----------
1511
1546
1512 targets : int/str/list of ints/strs
1547 targets : int/str/list of ints/strs
1513 the engines whose states are to be queried.
1548 the engines whose states are to be queried.
1514 default : all
1549 default : all
1515 verbose : bool
1550 verbose : bool
1516 Whether to return lengths only, or lists of ids for each element
1551 Whether to return lengths only, or lists of ids for each element
1517 """
1552 """
1518 if targets == 'all':
1553 if targets == 'all':
1519 # allow 'all' to be evaluated on the engine
1554 # allow 'all' to be evaluated on the engine
1520 engine_ids = None
1555 engine_ids = None
1521 else:
1556 else:
1522 engine_ids = self._build_targets(targets)[1]
1557 engine_ids = self._build_targets(targets)[1]
1523 content = dict(targets=engine_ids, verbose=verbose)
1558 content = dict(targets=engine_ids, verbose=verbose)
1524 self.session.send(self._query_socket, "queue_request", content=content)
1559 self.session.send(self._query_socket, "queue_request", content=content)
1525 idents,msg = self.session.recv(self._query_socket, 0)
1560 idents,msg = self.session.recv(self._query_socket, 0)
1526 if self.debug:
1561 if self.debug:
1527 pprint(msg)
1562 pprint(msg)
1528 content = msg['content']
1563 content = msg['content']
1529 status = content.pop('status')
1564 status = content.pop('status')
1530 if status != 'ok':
1565 if status != 'ok':
1531 raise self._unwrap_exception(content)
1566 raise self._unwrap_exception(content)
1532 content = rekey(content)
1567 content = rekey(content)
1533 if isinstance(targets, int):
1568 if isinstance(targets, int):
1534 return content[targets]
1569 return content[targets]
1535 else:
1570 else:
1536 return content
1571 return content
1537
1572
1538 @spin_first
1573 @spin_first
1539 def purge_results(self, jobs=[], targets=[]):
1574 def purge_results(self, jobs=[], targets=[]):
1540 """Tell the Hub to forget results.
1575 """Tell the Hub to forget results.
1541
1576
1542 Individual results can be purged by msg_id, or the entire
1577 Individual results can be purged by msg_id, or the entire
1543 history of specific targets can be purged.
1578 history of specific targets can be purged.
1544
1579
1545 Use `purge_results('all')` to scrub everything from the Hub's db.
1580 Use `purge_results('all')` to scrub everything from the Hub's db.
1546
1581
1547 Parameters
1582 Parameters
1548 ----------
1583 ----------
1549
1584
1550 jobs : str or list of str or AsyncResult objects
1585 jobs : str or list of str or AsyncResult objects
1551 the msg_ids whose results should be forgotten.
1586 the msg_ids whose results should be forgotten.
1552 targets : int/str/list of ints/strs
1587 targets : int/str/list of ints/strs
1553 The targets, by int_id, whose entire history is to be purged.
1588 The targets, by int_id, whose entire history is to be purged.
1554
1589
1555 default : None
1590 default : None
1556 """
1591 """
1557 if not targets and not jobs:
1592 if not targets and not jobs:
1558 raise ValueError("Must specify at least one of `targets` and `jobs`")
1593 raise ValueError("Must specify at least one of `targets` and `jobs`")
1559 if targets:
1594 if targets:
1560 targets = self._build_targets(targets)[1]
1595 targets = self._build_targets(targets)[1]
1561
1596
1562 # construct msg_ids from jobs
1597 # construct msg_ids from jobs
1563 if jobs == 'all':
1598 if jobs == 'all':
1564 msg_ids = jobs
1599 msg_ids = jobs
1565 else:
1600 else:
1566 msg_ids = []
1601 msg_ids = []
1567 if isinstance(jobs, (basestring,AsyncResult)):
1602 if isinstance(jobs, (basestring,AsyncResult)):
1568 jobs = [jobs]
1603 jobs = [jobs]
1569 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1604 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1570 if bad_ids:
1605 if bad_ids:
1571 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1606 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1572 for j in jobs:
1607 for j in jobs:
1573 if isinstance(j, AsyncResult):
1608 if isinstance(j, AsyncResult):
1574 msg_ids.extend(j.msg_ids)
1609 msg_ids.extend(j.msg_ids)
1575 else:
1610 else:
1576 msg_ids.append(j)
1611 msg_ids.append(j)
1577
1612
1578 content = dict(engine_ids=targets, msg_ids=msg_ids)
1613 content = dict(engine_ids=targets, msg_ids=msg_ids)
1579 self.session.send(self._query_socket, "purge_request", content=content)
1614 self.session.send(self._query_socket, "purge_request", content=content)
1580 idents, msg = self.session.recv(self._query_socket, 0)
1615 idents, msg = self.session.recv(self._query_socket, 0)
1581 if self.debug:
1616 if self.debug:
1582 pprint(msg)
1617 pprint(msg)
1583 content = msg['content']
1618 content = msg['content']
1584 if content['status'] != 'ok':
1619 if content['status'] != 'ok':
1585 raise self._unwrap_exception(content)
1620 raise self._unwrap_exception(content)
1586
1621
1587 @spin_first
1622 @spin_first
1588 def hub_history(self):
1623 def hub_history(self):
1589 """Get the Hub's history
1624 """Get the Hub's history
1590
1625
1591 Just like the Client, the Hub has a history, which is a list of msg_ids.
1626 Just like the Client, the Hub has a history, which is a list of msg_ids.
1592 This will contain the history of all clients, and, depending on configuration,
1627 This will contain the history of all clients, and, depending on configuration,
1593 may contain history across multiple cluster sessions.
1628 may contain history across multiple cluster sessions.
1594
1629
1595 Any msg_id returned here is a valid argument to `get_result`.
1630 Any msg_id returned here is a valid argument to `get_result`.
1596
1631
1597 Returns
1632 Returns
1598 -------
1633 -------
1599
1634
1600 msg_ids : list of strs
1635 msg_ids : list of strs
1601 list of all msg_ids, ordered by task submission time.
1636 list of all msg_ids, ordered by task submission time.
1602 """
1637 """
1603
1638
1604 self.session.send(self._query_socket, "history_request", content={})
1639 self.session.send(self._query_socket, "history_request", content={})
1605 idents, msg = self.session.recv(self._query_socket, 0)
1640 idents, msg = self.session.recv(self._query_socket, 0)
1606
1641
1607 if self.debug:
1642 if self.debug:
1608 pprint(msg)
1643 pprint(msg)
1609 content = msg['content']
1644 content = msg['content']
1610 if content['status'] != 'ok':
1645 if content['status'] != 'ok':
1611 raise self._unwrap_exception(content)
1646 raise self._unwrap_exception(content)
1612 else:
1647 else:
1613 return content['history']
1648 return content['history']
1614
1649
1615 @spin_first
1650 @spin_first
1616 def db_query(self, query, keys=None):
1651 def db_query(self, query, keys=None):
1617 """Query the Hub's TaskRecord database
1652 """Query the Hub's TaskRecord database
1618
1653
1619 This will return a list of task record dicts that match `query`
1654 This will return a list of task record dicts that match `query`
1620
1655
1621 Parameters
1656 Parameters
1622 ----------
1657 ----------
1623
1658
1624 query : mongodb query dict
1659 query : mongodb query dict
1625 The search dict. See mongodb query docs for details.
1660 The search dict. See mongodb query docs for details.
1626 keys : list of strs [optional]
1661 keys : list of strs [optional]
1627 The subset of keys to be returned. The default is to fetch everything but buffers.
1662 The subset of keys to be returned. The default is to fetch everything but buffers.
1628 'msg_id' will *always* be included.
1663 'msg_id' will *always* be included.
1629 """
1664 """
1630 if isinstance(keys, basestring):
1665 if isinstance(keys, basestring):
1631 keys = [keys]
1666 keys = [keys]
1632 content = dict(query=query, keys=keys)
1667 content = dict(query=query, keys=keys)
1633 self.session.send(self._query_socket, "db_request", content=content)
1668 self.session.send(self._query_socket, "db_request", content=content)
1634 idents, msg = self.session.recv(self._query_socket, 0)
1669 idents, msg = self.session.recv(self._query_socket, 0)
1635 if self.debug:
1670 if self.debug:
1636 pprint(msg)
1671 pprint(msg)
1637 content = msg['content']
1672 content = msg['content']
1638 if content['status'] != 'ok':
1673 if content['status'] != 'ok':
1639 raise self._unwrap_exception(content)
1674 raise self._unwrap_exception(content)
1640
1675
1641 records = content['records']
1676 records = content['records']
1642
1677
1643 buffer_lens = content['buffer_lens']
1678 buffer_lens = content['buffer_lens']
1644 result_buffer_lens = content['result_buffer_lens']
1679 result_buffer_lens = content['result_buffer_lens']
1645 buffers = msg['buffers']
1680 buffers = msg['buffers']
1646 has_bufs = buffer_lens is not None
1681 has_bufs = buffer_lens is not None
1647 has_rbufs = result_buffer_lens is not None
1682 has_rbufs = result_buffer_lens is not None
1648 for i,rec in enumerate(records):
1683 for i,rec in enumerate(records):
1649 # relink buffers
1684 # relink buffers
1650 if has_bufs:
1685 if has_bufs:
1651 blen = buffer_lens[i]
1686 blen = buffer_lens[i]
1652 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1687 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1653 if has_rbufs:
1688 if has_rbufs:
1654 blen = result_buffer_lens[i]
1689 blen = result_buffer_lens[i]
1655 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1690 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1656
1691
1657 return records
1692 return records
1658
1693
1659 __all__ = [ 'Client' ]
1694 __all__ = [ 'Client' ]
@@ -1,390 +1,411 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 =============
3 =============
4 parallelmagic
4 parallelmagic
5 =============
5 =============
6
6
7 Magic command interface for interactive parallel work.
7 Magic command interface for interactive parallel work.
8
8
9 Usage
9 Usage
10 =====
10 =====
11
11
12 ``%autopx``
12 ``%autopx``
13
13
14 {AUTOPX_DOC}
14 {AUTOPX_DOC}
15
15
16 ``%px``
16 ``%px``
17
17
18 {PX_DOC}
18 {PX_DOC}
19
19
20 ``%result``
20 ``%pxresult``
21
21
22 {RESULT_DOC}
22 {RESULT_DOC}
23
23
24 ``%pxconfig``
25
26 {CONFIG_DOC}
27
24 """
28 """
25
29
26 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
27 # Copyright (C) 2008 The IPython Development Team
31 # Copyright (C) 2008 The IPython Development Team
28 #
32 #
29 # Distributed under the terms of the BSD License. The full license is in
33 # Distributed under the terms of the BSD License. The full license is in
30 # the file COPYING, distributed as part of this software.
34 # the file COPYING, distributed as part of this software.
31 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
32
36
33 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
34 # Imports
38 # Imports
35 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
36
40
37 import ast
41 import ast
38 import re
42 import re
39
43
40 from IPython.core.error import UsageError
44 from IPython.core.error import UsageError
41 from IPython.core.magic import Magics, magics_class, line_magic, cell_magic
45 from IPython.core.magic import Magics
46 from IPython.core import magic_arguments
42 from IPython.testing.skipdoctest import skip_doctest
47 from IPython.testing.skipdoctest import skip_doctest
43
48
44 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
45 # Definitions of magic functions for use with IPython
50 # Definitions of magic functions for use with IPython
46 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
47
52
48
53
49 NO_ACTIVE_VIEW = "Use activate() on a DirectView object to use it with magics."
54 NO_LAST_RESULT = "%pxresult recalls last %px result, which has not yet been used."
50 NO_LAST_RESULT = "%result recalls last %px result, which has not yet been used."
51
55
56 def exec_args(f):
57 """decorator for adding block/targets args for execution
58
59 applied to %pxconfig and %%px
60 """
61 args = [
62 magic_arguments.argument('-b', '--block', action="store_const",
63 const=True, dest='block',
64 help="use blocking (sync) execution"
65 ),
66 magic_arguments.argument('-a', '--noblock', action="store_const",
67 const=False, dest='block',
68 help="use non-blocking (async) execution"
69 ),
70 magic_arguments.argument('-t', '--targets', type=str,
71 help="specify the targets on which to execute"
72 ),
73 ]
74 for a in args:
75 f = a(f)
76 return f
77
78 def output_args(f):
79 """decorator for output-formatting args
80
81 applied to %pxresult and %%px
82 """
83 args = [
84 magic_arguments.argument('-r', action="store_const", dest='groupby',
85 const='order',
86 help="collate outputs in order (same as group-outputs=order)"
87 ),
88 magic_arguments.argument('-e', action="store_const", dest='groupby',
89 const='engine',
90 help="group outputs by engine (same as group-outputs=engine)"
91 ),
92 magic_arguments.argument('--group-outputs', dest='groupby', type=str,
93 choices=['engine', 'order', 'type'], default='type',
94 help="""Group the outputs in a particular way.
95
96 Choices are:
97
98 type: group outputs of all engines by type (stdout, stderr, displaypub, etc.).
99
100 engine: display all output for each engine together.
101
102 order: like type, but individual displaypub output from each engine is collated.
103 For example, if multiple plots are generated by each engine, the first
104 figure of each engine will be displayed, then the second of each, etc.
105 """
106 ),
107 magic_arguments.argument('-o', '--out', dest='save_name', type=str,
108 help="""store the AsyncResult object for this computation
109 in the global namespace under this name.
110 """
111 ),
112 ]
113 for a in args:
114 f = a(f)
115 return f
52
116
53 @magics_class
54 class ParallelMagics(Magics):
117 class ParallelMagics(Magics):
55 """A set of magics useful when controlling a parallel IPython cluster.
118 """A set of magics useful when controlling a parallel IPython cluster.
56 """
119 """
57
120
121 # magic-related
122 magics = None
123 registered = True
124
125 # suffix for magics
126 suffix = ''
58 # A flag showing if autopx is activated or not
127 # A flag showing if autopx is activated or not
59 _autopx = False
128 _autopx = False
60 # the current view used by the magics:
129 # the current view used by the magics:
61 active_view = None
130 view = None
62 # last result cache for %result
131 # last result cache for %pxresult
63 last_result = None
132 last_result = None
64
133
65 @skip_doctest
134 def __init__(self, shell, view, suffix=''):
66 @line_magic
135 self.view = view
67 def result(self, line=''):
136 self.suffix = suffix
68 """Print the result of the last asynchronous %px command.
69
70 Usage:
71
137
72 %result [-o] [-e] [--group-options=type|engine|order]
138 # register magics
139 self.magics = dict(cell={},line={})
140 line_magics = self.magics['line']
73
141
74 Options:
142 px = 'px' + suffix
143 if not suffix:
144 # keep %result for legacy compatibility
145 line_magics['result'] = self.result
75
146
76 -o: collate outputs in order (same as group-outputs=order)
147 line_magics['pxresult' + suffix] = self.result
148 line_magics[px] = self.px
149 line_magics['pxconfig' + suffix] = self.pxconfig
150 line_magics['auto' + px] = self.autopx
77
151
78 -e: group outputs by engine (same as group-outputs=engine)
152 self.magics['cell'][px] = self.cell_px
79
153
80 --group-outputs=type [default behavior]:
154 super(ParallelMagics, self).__init__(shell=shell)
81 each output type (stdout, stderr, displaypub) for all engines
155
82 displayed together.
156 def _eval_target_str(self, ts):
83
157 if ':' in ts:
84 --group-outputs=order:
158 targets = eval("self.view.client.ids[%s]" % ts)
85 The same as 'type', but individual displaypub outputs (e.g. plots)
159 elif 'all' in ts:
86 will be interleaved, so it will display all of the first plots,
160 targets = 'all'
87 then all of the second plots, etc.
161 else:
88
162 targets = eval(ts)
89 --group-outputs=engine:
163 return targets
90 All of an engine's output is displayed before moving on to the next.
164
91
165 @magic_arguments.magic_arguments()
92 To use this a :class:`DirectView` instance must be created
166 @exec_args
93 and then activated by calling its :meth:`activate` method.
167 def pxconfig(self, line):
168 """configure default targets/blocking for %px magics"""
169 args = magic_arguments.parse_argstring(self.pxconfig, line)
170 if args.targets:
171 self.view.targets = self._eval_target_str(args.targets)
172 if args.block is not None:
173 self.view.block = args.block
174
175 @magic_arguments.magic_arguments()
176 @output_args
177 @skip_doctest
178 def result(self, line=''):
179 """Print the result of the last asynchronous %px command.
94
180
95 This lets you recall the results of %px computations after
181 This lets you recall the results of %px computations after
96 asynchronous submission (view.block=False).
182 asynchronous submission (block=False).
97
183
98 Then you can do the following::
184 Examples
185 --------
186 ::
99
187
100 In [23]: %px os.getpid()
188 In [23]: %px os.getpid()
101 Async parallel execution on engine(s): all
189 Async parallel execution on engine(s): all
102
190
103 In [24]: %result
191 In [24]: %pxresult
104 [ 8] Out[10]: 60920
192 [ 8] Out[10]: 60920
105 [ 9] Out[10]: 60921
193 [ 9] Out[10]: 60921
106 [10] Out[10]: 60922
194 [10] Out[10]: 60922
107 [11] Out[10]: 60923
195 [11] Out[10]: 60923
108 """
196 """
109 opts, _ = self.parse_options(line, 'oe', 'group-outputs=')
197 args = magic_arguments.parse_argstring(self.result, line)
110
111 if 'group-outputs' in opts:
112 groupby = opts['group-outputs']
113 elif 'o' in opts:
114 groupby = 'order'
115 elif 'e' in opts:
116 groupby = 'engine'
117 else:
118 groupby = 'type'
119
120 if self.active_view is None:
121 raise UsageError(NO_ACTIVE_VIEW)
122
198
123 if self.last_result is None:
199 if self.last_result is None:
124 raise UsageError(NO_LAST_RESULT)
200 raise UsageError(NO_LAST_RESULT)
125
201
126 self.last_result.get()
202 self.last_result.get()
127 self.last_result.display_outputs(groupby=groupby)
203 self.last_result.display_outputs(groupby=args.groupby)
128
204
129 @skip_doctest
205 @skip_doctest
130 @line_magic
206 def px(self, line=''):
131 def px(self, parameter_s=''):
132 """Executes the given python command in parallel.
207 """Executes the given python command in parallel.
133
208
134 To use this a :class:`DirectView` instance must be created
209 Examples
135 and then activated by calling its :meth:`activate` method.
210 --------
136
211 ::
137 Then you can do the following::
138
212
139 In [24]: %px a = os.getpid()
213 In [24]: %px a = os.getpid()
140 Parallel execution on engine(s): all
214 Parallel execution on engine(s): all
141
215
142 In [25]: %px print a
216 In [25]: %px print a
143 [stdout:0] 1234
217 [stdout:0] 1234
144 [stdout:1] 1235
218 [stdout:1] 1235
145 [stdout:2] 1236
219 [stdout:2] 1236
146 [stdout:3] 1237
220 [stdout:3] 1237
147 """
221 """
148 return self.parallel_execute(parameter_s)
222 return self.parallel_execute(line)
149
223
150 def parallel_execute(self, cell, block=None, groupby='type', save_name=None):
224 def parallel_execute(self, cell, block=None, groupby='type', save_name=None):
151 """implementation used by %px and %%parallel"""
225 """implementation used by %px and %%parallel"""
152
226
153 if self.active_view is None:
154 raise UsageError(NO_ACTIVE_VIEW)
155
156 # defaults:
227 # defaults:
157 block = self.active_view.block if block is None else block
228 block = self.view.block if block is None else block
158
229
159 base = "Parallel" if block else "Async parallel"
230 base = "Parallel" if block else "Async parallel"
160
231
161 targets = self.active_view.targets
232 targets = self.view.targets
162 if isinstance(targets, list) and len(targets) > 10:
233 if isinstance(targets, list) and len(targets) > 10:
163 str_targets = str(targets[:4])[:-1] + ', ..., ' + str(targets[-4:])[1:]
234 str_targets = str(targets[:4])[:-1] + ', ..., ' + str(targets[-4:])[1:]
164 else:
235 else:
165 str_targets = str(targets)
236 str_targets = str(targets)
166 print base + " execution on engine(s): %s" % str_targets
237 print base + " execution on engine(s): %s" % str_targets
167
238
168 result = self.active_view.execute(cell, silent=False, block=False)
239 result = self.view.execute(cell, silent=False, block=False)
169 self.last_result = result
240 self.last_result = result
170
241
171 if save_name:
242 if save_name:
172 self.shell.user_ns[save_name] = result
243 self.shell.user_ns[save_name] = result
173
244
174 if block:
245 if block:
175 result.get()
246 result.get()
176 result.display_outputs(groupby)
247 result.display_outputs(groupby)
177 else:
248 else:
178 # return AsyncResult only on non-blocking submission
249 # return AsyncResult only on non-blocking submission
179 return result
250 return result
180
251
252 @magic_arguments.magic_arguments()
253 @exec_args
254 @output_args
181 @skip_doctest
255 @skip_doctest
182 @cell_magic('px')
183 def cell_px(self, line='', cell=None):
256 def cell_px(self, line='', cell=None):
184 """Executes the given python command in parallel.
257 """Executes the cell in parallel.
185
186 Cell magic usage:
187
188 %%px [-o] [-e] [--group-options=type|engine|order] [--[no]block] [--out name]
189
190 Options:
191
192 --out <name>: store the AsyncResult object for this computation
193 in the global namespace.
194
195 -o: collate outputs in order (same as group-outputs=order)
196
197 -e: group outputs by engine (same as group-outputs=engine)
198
199 --group-outputs=type [default behavior]:
200 each output type (stdout, stderr, displaypub) for all engines
201 displayed together.
202
203 --group-outputs=order:
204 The same as 'type', but individual displaypub outputs (e.g. plots)
205 will be interleaved, so it will display all of the first plots,
206 then all of the second plots, etc.
207
208 --group-outputs=engine:
209 All of an engine's output is displayed before moving on to the next.
210
211 --[no]block:
212 Whether or not to block for the execution to complete
213 (and display the results). If unspecified, the active view's
214
258
215
259 Examples
216 To use this a :class:`DirectView` instance must be created
260 --------
217 and then activated by calling its :meth:`activate` method.
261 ::
218
219 Then you can do the following::
220
262
221 In [24]: %%px --noblock
263 In [24]: %%px --noblock
222 ....: a = os.getpid()
264 ....: a = os.getpid()
223 Async parallel execution on engine(s): all
265 Async parallel execution on engine(s): all
224
266
225 In [25]: %%px
267 In [25]: %%px
226 ....: print a
268 ....: print a
227 [stdout:0] 1234
269 [stdout:0] 1234
228 [stdout:1] 1235
270 [stdout:1] 1235
229 [stdout:2] 1236
271 [stdout:2] 1236
230 [stdout:3] 1237
272 [stdout:3] 1237
231 """
273 """
232
274
233 block = None
275 args = magic_arguments.parse_argstring(self.cell_px, line)
234 groupby = 'type'
276
235 # as a cell magic, we accept args
277 if args.targets:
236 opts, _ = self.parse_options(line, 'oe', 'group-outputs=', 'out=', 'block', 'noblock')
278 save_targets = self.view.targets
237
279 self.view.targets = self._eval_target_str(args.targets)
238 if 'group-outputs' in opts:
280 try:
239 groupby = opts['group-outputs']
281 return self.parallel_execute(cell, block=args.block,
240 elif 'o' in opts:
282 groupby=args.groupby,
241 groupby = 'order'
283 save_name=args.save_name,
242 elif 'e' in opts:
284 )
243 groupby = 'engine'
285 finally:
244
286 if args.targets:
245 if 'block' in opts:
287 self.view.targets = save_targets
246 block = True
288
247 elif 'noblock' in opts:
248 block = False
249
250 save_name = opts.get('out')
251
252 return self.parallel_execute(cell, block=block, groupby=groupby, save_name=save_name)
253
254 @skip_doctest
289 @skip_doctest
255 @line_magic
290 def autopx(self, line=''):
256 def autopx(self, parameter_s=''):
257 """Toggles auto parallel mode.
291 """Toggles auto parallel mode.
258
292
259 To use this a :class:`DirectView` instance must be created
293 Once this is called, all commands typed at the command line are send to
260 and then activated by calling its :meth:`activate` method. Once this
294 the engines to be executed in parallel. To control which engine are
261 is called, all commands typed at the command line are send to
295 used, the ``targets`` attribute of the view before
262 the engines to be executed in parallel. To control which engine
296 entering ``%autopx`` mode.
263 are used, set the ``targets`` attributed of the multiengine client
297
264 before entering ``%autopx`` mode.
265
298
266 Then you can do the following::
299 Then you can do the following::
267
300
268 In [25]: %autopx
301 In [25]: %autopx
269 %autopx to enabled
302 %autopx to enabled
270
303
271 In [26]: a = 10
304 In [26]: a = 10
272 Parallel execution on engine(s): [0,1,2,3]
305 Parallel execution on engine(s): [0,1,2,3]
273 In [27]: print a
306 In [27]: print a
274 Parallel execution on engine(s): [0,1,2,3]
307 Parallel execution on engine(s): [0,1,2,3]
275 [stdout:0] 10
308 [stdout:0] 10
276 [stdout:1] 10
309 [stdout:1] 10
277 [stdout:2] 10
310 [stdout:2] 10
278 [stdout:3] 10
311 [stdout:3] 10
279
312
280
313
281 In [27]: %autopx
314 In [27]: %autopx
282 %autopx disabled
315 %autopx disabled
283 """
316 """
284 if self._autopx:
317 if self._autopx:
285 self._disable_autopx()
318 self._disable_autopx()
286 else:
319 else:
287 self._enable_autopx()
320 self._enable_autopx()
288
321
289 def _enable_autopx(self):
322 def _enable_autopx(self):
290 """Enable %autopx mode by saving the original run_cell and installing
323 """Enable %autopx mode by saving the original run_cell and installing
291 pxrun_cell.
324 pxrun_cell.
292 """
325 """
293 if self.active_view is None:
294 raise UsageError(NO_ACTIVE_VIEW)
295
296 # override run_cell
326 # override run_cell
297 self._original_run_cell = self.shell.run_cell
327 self._original_run_cell = self.shell.run_cell
298 self.shell.run_cell = self.pxrun_cell
328 self.shell.run_cell = self.pxrun_cell
299
329
300 self._autopx = True
330 self._autopx = True
301 print "%autopx enabled"
331 print "%autopx enabled"
302
332
303 def _disable_autopx(self):
333 def _disable_autopx(self):
304 """Disable %autopx by restoring the original InteractiveShell.run_cell.
334 """Disable %autopx by restoring the original InteractiveShell.run_cell.
305 """
335 """
306 if self._autopx:
336 if self._autopx:
307 self.shell.run_cell = self._original_run_cell
337 self.shell.run_cell = self._original_run_cell
308 self._autopx = False
338 self._autopx = False
309 print "%autopx disabled"
339 print "%autopx disabled"
310
340
311 def pxrun_cell(self, raw_cell, store_history=False, silent=False):
341 def pxrun_cell(self, raw_cell, store_history=False, silent=False):
312 """drop-in replacement for InteractiveShell.run_cell.
342 """drop-in replacement for InteractiveShell.run_cell.
313
343
314 This executes code remotely, instead of in the local namespace.
344 This executes code remotely, instead of in the local namespace.
315
345
316 See InteractiveShell.run_cell for details.
346 See InteractiveShell.run_cell for details.
317 """
347 """
318
348
319 if (not raw_cell) or raw_cell.isspace():
349 if (not raw_cell) or raw_cell.isspace():
320 return
350 return
321
351
322 ipself = self.shell
352 ipself = self.shell
323
353
324 with ipself.builtin_trap:
354 with ipself.builtin_trap:
325 cell = ipself.prefilter_manager.prefilter_lines(raw_cell)
355 cell = ipself.prefilter_manager.prefilter_lines(raw_cell)
326
356
327 # Store raw and processed history
357 # Store raw and processed history
328 if store_history:
358 if store_history:
329 ipself.history_manager.store_inputs(ipself.execution_count,
359 ipself.history_manager.store_inputs(ipself.execution_count,
330 cell, raw_cell)
360 cell, raw_cell)
331
361
332 # ipself.logger.log(cell, raw_cell)
362 # ipself.logger.log(cell, raw_cell)
333
363
334 cell_name = ipself.compile.cache(cell, ipself.execution_count)
364 cell_name = ipself.compile.cache(cell, ipself.execution_count)
335
365
336 try:
366 try:
337 ast.parse(cell, filename=cell_name)
367 ast.parse(cell, filename=cell_name)
338 except (OverflowError, SyntaxError, ValueError, TypeError,
368 except (OverflowError, SyntaxError, ValueError, TypeError,
339 MemoryError):
369 MemoryError):
340 # Case 1
370 # Case 1
341 ipself.showsyntaxerror()
371 ipself.showsyntaxerror()
342 ipself.execution_count += 1
372 ipself.execution_count += 1
343 return None
373 return None
344 except NameError:
374 except NameError:
345 # ignore name errors, because we don't know the remote keys
375 # ignore name errors, because we don't know the remote keys
346 pass
376 pass
347
377
348 if store_history:
378 if store_history:
349 # Write output to the database. Does nothing unless
379 # Write output to the database. Does nothing unless
350 # history output logging is enabled.
380 # history output logging is enabled.
351 ipself.history_manager.store_output(ipself.execution_count)
381 ipself.history_manager.store_output(ipself.execution_count)
352 # Each cell is a *single* input, regardless of how many lines it has
382 # Each cell is a *single* input, regardless of how many lines it has
353 ipself.execution_count += 1
383 ipself.execution_count += 1
354 if re.search(r'get_ipython\(\)\.magic\(u?["\']%?autopx', cell):
384 if re.search(r'get_ipython\(\)\.magic\(u?["\']%?autopx', cell):
355 self._disable_autopx()
385 self._disable_autopx()
356 return False
386 return False
357 else:
387 else:
358 try:
388 try:
359 result = self.active_view.execute(cell, silent=False, block=False)
389 result = self.view.execute(cell, silent=False, block=False)
360 except:
390 except:
361 ipself.showtraceback()
391 ipself.showtraceback()
362 return True
392 return True
363 else:
393 else:
364 if self.active_view.block:
394 if self.view.block:
365 try:
395 try:
366 result.get()
396 result.get()
367 except:
397 except:
368 self.shell.showtraceback()
398 self.shell.showtraceback()
369 return True
399 return True
370 else:
400 else:
371 with ipself.builtin_trap:
401 with ipself.builtin_trap:
372 result.display_outputs()
402 result.display_outputs()
373 return False
403 return False
374
404
375
405
376 __doc__ = __doc__.format(
406 __doc__ = __doc__.format(
377 AUTOPX_DOC = ' '*8 + ParallelMagics.autopx.__doc__,
407 AUTOPX_DOC = ' '*8 + ParallelMagics.autopx.__doc__,
378 PX_DOC = ' '*8 + ParallelMagics.px.__doc__,
408 PX_DOC = ' '*8 + ParallelMagics.px.__doc__,
379 RESULT_DOC = ' '*8 + ParallelMagics.result.__doc__
409 RESULT_DOC = ' '*8 + ParallelMagics.result.__doc__,
410 CONFIG_DOC = ' '*8 + ParallelMagics.pxconfig.__doc__,
380 )
411 )
381
382 _loaded = False
383
384
385 def load_ipython_extension(ip):
386 """Load the extension in IPython."""
387 global _loaded
388 if not _loaded:
389 ip.register_magics(ParallelMagics)
390 _loaded = True
@@ -1,1100 +1,1104 b''
1 """Views of remote engines.
1 """Views of remote engines.
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import imp
18 import imp
19 import sys
19 import sys
20 import warnings
20 import warnings
21 from contextlib import contextmanager
21 from contextlib import contextmanager
22 from types import ModuleType
22 from types import ModuleType
23
23
24 import zmq
24 import zmq
25
25
26 from IPython.testing.skipdoctest import skip_doctest
26 from IPython.testing.skipdoctest import skip_doctest
27 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
28 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
29 )
29 )
30 from IPython.external.decorator import decorator
30 from IPython.external.decorator import decorator
31
31
32 from IPython.parallel import util
32 from IPython.parallel import util
33 from IPython.parallel.controller.dependency import Dependency, dependent
33 from IPython.parallel.controller.dependency import Dependency, dependent
34
34
35 from . import map as Map
35 from . import map as Map
36 from .asyncresult import AsyncResult, AsyncMapResult
36 from .asyncresult import AsyncResult, AsyncMapResult
37 from .remotefunction import ParallelFunction, parallel, remote, getname
37 from .remotefunction import ParallelFunction, parallel, remote, getname
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Decorators
40 # Decorators
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 @decorator
43 @decorator
44 def save_ids(f, self, *args, **kwargs):
44 def save_ids(f, self, *args, **kwargs):
45 """Keep our history and outstanding attributes up to date after a method call."""
45 """Keep our history and outstanding attributes up to date after a method call."""
46 n_previous = len(self.client.history)
46 n_previous = len(self.client.history)
47 try:
47 try:
48 ret = f(self, *args, **kwargs)
48 ret = f(self, *args, **kwargs)
49 finally:
49 finally:
50 nmsgs = len(self.client.history) - n_previous
50 nmsgs = len(self.client.history) - n_previous
51 msg_ids = self.client.history[-nmsgs:]
51 msg_ids = self.client.history[-nmsgs:]
52 self.history.extend(msg_ids)
52 self.history.extend(msg_ids)
53 map(self.outstanding.add, msg_ids)
53 map(self.outstanding.add, msg_ids)
54 return ret
54 return ret
55
55
56 @decorator
56 @decorator
57 def sync_results(f, self, *args, **kwargs):
57 def sync_results(f, self, *args, **kwargs):
58 """sync relevant results from self.client to our results attribute."""
58 """sync relevant results from self.client to our results attribute."""
59 ret = f(self, *args, **kwargs)
59 ret = f(self, *args, **kwargs)
60 delta = self.outstanding.difference(self.client.outstanding)
60 delta = self.outstanding.difference(self.client.outstanding)
61 completed = self.outstanding.intersection(delta)
61 completed = self.outstanding.intersection(delta)
62 self.outstanding = self.outstanding.difference(completed)
62 self.outstanding = self.outstanding.difference(completed)
63 for msg_id in completed:
63 for msg_id in completed:
64 self.results[msg_id] = self.client.results[msg_id]
64 self.results[msg_id] = self.client.results[msg_id]
65 return ret
65 return ret
66
66
67 @decorator
67 @decorator
68 def spin_after(f, self, *args, **kwargs):
68 def spin_after(f, self, *args, **kwargs):
69 """call spin after the method."""
69 """call spin after the method."""
70 ret = f(self, *args, **kwargs)
70 ret = f(self, *args, **kwargs)
71 self.spin()
71 self.spin()
72 return ret
72 return ret
73
73
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75 # Classes
75 # Classes
76 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
77
77
78 @skip_doctest
78 @skip_doctest
79 class View(HasTraits):
79 class View(HasTraits):
80 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
80 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
81
81
82 Don't use this class, use subclasses.
82 Don't use this class, use subclasses.
83
83
84 Methods
84 Methods
85 -------
85 -------
86
86
87 spin
87 spin
88 flushes incoming results and registration state changes
88 flushes incoming results and registration state changes
89 control methods spin, and requesting `ids` also ensures up to date
89 control methods spin, and requesting `ids` also ensures up to date
90
90
91 wait
91 wait
92 wait on one or more msg_ids
92 wait on one or more msg_ids
93
93
94 execution methods
94 execution methods
95 apply
95 apply
96 legacy: execute, run
96 legacy: execute, run
97
97
98 data movement
98 data movement
99 push, pull, scatter, gather
99 push, pull, scatter, gather
100
100
101 query methods
101 query methods
102 get_result, queue_status, purge_results, result_status
102 get_result, queue_status, purge_results, result_status
103
103
104 control methods
104 control methods
105 abort, shutdown
105 abort, shutdown
106
106
107 """
107 """
108 # flags
108 # flags
109 block=Bool(False)
109 block=Bool(False)
110 track=Bool(True)
110 track=Bool(True)
111 targets = Any()
111 targets = Any()
112
112
113 history=List()
113 history=List()
114 outstanding = Set()
114 outstanding = Set()
115 results = Dict()
115 results = Dict()
116 client = Instance('IPython.parallel.Client')
116 client = Instance('IPython.parallel.Client')
117
117
118 _socket = Instance('zmq.Socket')
118 _socket = Instance('zmq.Socket')
119 _flag_names = List(['targets', 'block', 'track'])
119 _flag_names = List(['targets', 'block', 'track'])
120 _targets = Any()
120 _targets = Any()
121 _idents = Any()
121 _idents = Any()
122
122
123 def __init__(self, client=None, socket=None, **flags):
123 def __init__(self, client=None, socket=None, **flags):
124 super(View, self).__init__(client=client, _socket=socket)
124 super(View, self).__init__(client=client, _socket=socket)
125 self.block = client.block
125 self.block = client.block
126
126
127 self.set_flags(**flags)
127 self.set_flags(**flags)
128
128
129 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
129 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
130
130
131 def __repr__(self):
131 def __repr__(self):
132 strtargets = str(self.targets)
132 strtargets = str(self.targets)
133 if len(strtargets) > 16:
133 if len(strtargets) > 16:
134 strtargets = strtargets[:12]+'...]'
134 strtargets = strtargets[:12]+'...]'
135 return "<%s %s>"%(self.__class__.__name__, strtargets)
135 return "<%s %s>"%(self.__class__.__name__, strtargets)
136
136
137 def __len__(self):
137 def __len__(self):
138 if isinstance(self.targets, list):
138 if isinstance(self.targets, list):
139 return len(self.targets)
139 return len(self.targets)
140 elif isinstance(self.targets, int):
140 elif isinstance(self.targets, int):
141 return 1
141 return 1
142 else:
142 else:
143 return len(self.client)
143 return len(self.client)
144
144
145 def set_flags(self, **kwargs):
145 def set_flags(self, **kwargs):
146 """set my attribute flags by keyword.
146 """set my attribute flags by keyword.
147
147
148 Views determine behavior with a few attributes (`block`, `track`, etc.).
148 Views determine behavior with a few attributes (`block`, `track`, etc.).
149 These attributes can be set all at once by name with this method.
149 These attributes can be set all at once by name with this method.
150
150
151 Parameters
151 Parameters
152 ----------
152 ----------
153
153
154 block : bool
154 block : bool
155 whether to wait for results
155 whether to wait for results
156 track : bool
156 track : bool
157 whether to create a MessageTracker to allow the user to
157 whether to create a MessageTracker to allow the user to
158 safely edit after arrays and buffers during non-copying
158 safely edit after arrays and buffers during non-copying
159 sends.
159 sends.
160 """
160 """
161 for name, value in kwargs.iteritems():
161 for name, value in kwargs.iteritems():
162 if name not in self._flag_names:
162 if name not in self._flag_names:
163 raise KeyError("Invalid name: %r"%name)
163 raise KeyError("Invalid name: %r"%name)
164 else:
164 else:
165 setattr(self, name, value)
165 setattr(self, name, value)
166
166
167 @contextmanager
167 @contextmanager
168 def temp_flags(self, **kwargs):
168 def temp_flags(self, **kwargs):
169 """temporarily set flags, for use in `with` statements.
169 """temporarily set flags, for use in `with` statements.
170
170
171 See set_flags for permanent setting of flags
171 See set_flags for permanent setting of flags
172
172
173 Examples
173 Examples
174 --------
174 --------
175
175
176 >>> view.track=False
176 >>> view.track=False
177 ...
177 ...
178 >>> with view.temp_flags(track=True):
178 >>> with view.temp_flags(track=True):
179 ... ar = view.apply(dostuff, my_big_array)
179 ... ar = view.apply(dostuff, my_big_array)
180 ... ar.tracker.wait() # wait for send to finish
180 ... ar.tracker.wait() # wait for send to finish
181 >>> view.track
181 >>> view.track
182 False
182 False
183
183
184 """
184 """
185 # preflight: save flags, and set temporaries
185 # preflight: save flags, and set temporaries
186 saved_flags = {}
186 saved_flags = {}
187 for f in self._flag_names:
187 for f in self._flag_names:
188 saved_flags[f] = getattr(self, f)
188 saved_flags[f] = getattr(self, f)
189 self.set_flags(**kwargs)
189 self.set_flags(**kwargs)
190 # yield to the with-statement block
190 # yield to the with-statement block
191 try:
191 try:
192 yield
192 yield
193 finally:
193 finally:
194 # postflight: restore saved flags
194 # postflight: restore saved flags
195 self.set_flags(**saved_flags)
195 self.set_flags(**saved_flags)
196
196
197
197
198 #----------------------------------------------------------------
198 #----------------------------------------------------------------
199 # apply
199 # apply
200 #----------------------------------------------------------------
200 #----------------------------------------------------------------
201
201
202 @sync_results
202 @sync_results
203 @save_ids
203 @save_ids
204 def _really_apply(self, f, args, kwargs, block=None, **options):
204 def _really_apply(self, f, args, kwargs, block=None, **options):
205 """wrapper for client.send_apply_request"""
205 """wrapper for client.send_apply_request"""
206 raise NotImplementedError("Implement in subclasses")
206 raise NotImplementedError("Implement in subclasses")
207
207
208 def apply(self, f, *args, **kwargs):
208 def apply(self, f, *args, **kwargs):
209 """calls f(*args, **kwargs) on remote engines, returning the result.
209 """calls f(*args, **kwargs) on remote engines, returning the result.
210
210
211 This method sets all apply flags via this View's attributes.
211 This method sets all apply flags via this View's attributes.
212
212
213 if self.block is False:
213 if self.block is False:
214 returns AsyncResult
214 returns AsyncResult
215 else:
215 else:
216 returns actual result of f(*args, **kwargs)
216 returns actual result of f(*args, **kwargs)
217 """
217 """
218 return self._really_apply(f, args, kwargs)
218 return self._really_apply(f, args, kwargs)
219
219
220 def apply_async(self, f, *args, **kwargs):
220 def apply_async(self, f, *args, **kwargs):
221 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
221 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
222
222
223 returns AsyncResult
223 returns AsyncResult
224 """
224 """
225 return self._really_apply(f, args, kwargs, block=False)
225 return self._really_apply(f, args, kwargs, block=False)
226
226
227 @spin_after
227 @spin_after
228 def apply_sync(self, f, *args, **kwargs):
228 def apply_sync(self, f, *args, **kwargs):
229 """calls f(*args, **kwargs) on remote engines in a blocking manner,
229 """calls f(*args, **kwargs) on remote engines in a blocking manner,
230 returning the result.
230 returning the result.
231
231
232 returns: actual result of f(*args, **kwargs)
232 returns: actual result of f(*args, **kwargs)
233 """
233 """
234 return self._really_apply(f, args, kwargs, block=True)
234 return self._really_apply(f, args, kwargs, block=True)
235
235
236 #----------------------------------------------------------------
236 #----------------------------------------------------------------
237 # wrappers for client and control methods
237 # wrappers for client and control methods
238 #----------------------------------------------------------------
238 #----------------------------------------------------------------
239 @sync_results
239 @sync_results
240 def spin(self):
240 def spin(self):
241 """spin the client, and sync"""
241 """spin the client, and sync"""
242 self.client.spin()
242 self.client.spin()
243
243
244 @sync_results
244 @sync_results
245 def wait(self, jobs=None, timeout=-1):
245 def wait(self, jobs=None, timeout=-1):
246 """waits on one or more `jobs`, for up to `timeout` seconds.
246 """waits on one or more `jobs`, for up to `timeout` seconds.
247
247
248 Parameters
248 Parameters
249 ----------
249 ----------
250
250
251 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
251 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
252 ints are indices to self.history
252 ints are indices to self.history
253 strs are msg_ids
253 strs are msg_ids
254 default: wait on all outstanding messages
254 default: wait on all outstanding messages
255 timeout : float
255 timeout : float
256 a time in seconds, after which to give up.
256 a time in seconds, after which to give up.
257 default is -1, which means no timeout
257 default is -1, which means no timeout
258
258
259 Returns
259 Returns
260 -------
260 -------
261
261
262 True : when all msg_ids are done
262 True : when all msg_ids are done
263 False : timeout reached, some msg_ids still outstanding
263 False : timeout reached, some msg_ids still outstanding
264 """
264 """
265 if jobs is None:
265 if jobs is None:
266 jobs = self.history
266 jobs = self.history
267 return self.client.wait(jobs, timeout)
267 return self.client.wait(jobs, timeout)
268
268
269 def abort(self, jobs=None, targets=None, block=None):
269 def abort(self, jobs=None, targets=None, block=None):
270 """Abort jobs on my engines.
270 """Abort jobs on my engines.
271
271
272 Parameters
272 Parameters
273 ----------
273 ----------
274
274
275 jobs : None, str, list of strs, optional
275 jobs : None, str, list of strs, optional
276 if None: abort all jobs.
276 if None: abort all jobs.
277 else: abort specific msg_id(s).
277 else: abort specific msg_id(s).
278 """
278 """
279 block = block if block is not None else self.block
279 block = block if block is not None else self.block
280 targets = targets if targets is not None else self.targets
280 targets = targets if targets is not None else self.targets
281 jobs = jobs if jobs is not None else list(self.outstanding)
281 jobs = jobs if jobs is not None else list(self.outstanding)
282
282
283 return self.client.abort(jobs=jobs, targets=targets, block=block)
283 return self.client.abort(jobs=jobs, targets=targets, block=block)
284
284
285 def queue_status(self, targets=None, verbose=False):
285 def queue_status(self, targets=None, verbose=False):
286 """Fetch the Queue status of my engines"""
286 """Fetch the Queue status of my engines"""
287 targets = targets if targets is not None else self.targets
287 targets = targets if targets is not None else self.targets
288 return self.client.queue_status(targets=targets, verbose=verbose)
288 return self.client.queue_status(targets=targets, verbose=verbose)
289
289
290 def purge_results(self, jobs=[], targets=[]):
290 def purge_results(self, jobs=[], targets=[]):
291 """Instruct the controller to forget specific results."""
291 """Instruct the controller to forget specific results."""
292 if targets is None or targets == 'all':
292 if targets is None or targets == 'all':
293 targets = self.targets
293 targets = self.targets
294 return self.client.purge_results(jobs=jobs, targets=targets)
294 return self.client.purge_results(jobs=jobs, targets=targets)
295
295
296 def shutdown(self, targets=None, restart=False, hub=False, block=None):
296 def shutdown(self, targets=None, restart=False, hub=False, block=None):
297 """Terminates one or more engine processes, optionally including the hub.
297 """Terminates one or more engine processes, optionally including the hub.
298 """
298 """
299 block = self.block if block is None else block
299 block = self.block if block is None else block
300 if targets is None or targets == 'all':
300 if targets is None or targets == 'all':
301 targets = self.targets
301 targets = self.targets
302 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
302 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
303
303
304 @spin_after
304 @spin_after
305 def get_result(self, indices_or_msg_ids=None):
305 def get_result(self, indices_or_msg_ids=None):
306 """return one or more results, specified by history index or msg_id.
306 """return one or more results, specified by history index or msg_id.
307
307
308 See client.get_result for details.
308 See client.get_result for details.
309
309
310 """
310 """
311
311
312 if indices_or_msg_ids is None:
312 if indices_or_msg_ids is None:
313 indices_or_msg_ids = -1
313 indices_or_msg_ids = -1
314 if isinstance(indices_or_msg_ids, int):
314 if isinstance(indices_or_msg_ids, int):
315 indices_or_msg_ids = self.history[indices_or_msg_ids]
315 indices_or_msg_ids = self.history[indices_or_msg_ids]
316 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
316 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
317 indices_or_msg_ids = list(indices_or_msg_ids)
317 indices_or_msg_ids = list(indices_or_msg_ids)
318 for i,index in enumerate(indices_or_msg_ids):
318 for i,index in enumerate(indices_or_msg_ids):
319 if isinstance(index, int):
319 if isinstance(index, int):
320 indices_or_msg_ids[i] = self.history[index]
320 indices_or_msg_ids[i] = self.history[index]
321 return self.client.get_result(indices_or_msg_ids)
321 return self.client.get_result(indices_or_msg_ids)
322
322
323 #-------------------------------------------------------------------
323 #-------------------------------------------------------------------
324 # Map
324 # Map
325 #-------------------------------------------------------------------
325 #-------------------------------------------------------------------
326
326
327 def map(self, f, *sequences, **kwargs):
327 def map(self, f, *sequences, **kwargs):
328 """override in subclasses"""
328 """override in subclasses"""
329 raise NotImplementedError
329 raise NotImplementedError
330
330
331 def map_async(self, f, *sequences, **kwargs):
331 def map_async(self, f, *sequences, **kwargs):
332 """Parallel version of builtin `map`, using this view's engines.
332 """Parallel version of builtin `map`, using this view's engines.
333
333
334 This is equivalent to map(...block=False)
334 This is equivalent to map(...block=False)
335
335
336 See `self.map` for details.
336 See `self.map` for details.
337 """
337 """
338 if 'block' in kwargs:
338 if 'block' in kwargs:
339 raise TypeError("map_async doesn't take a `block` keyword argument.")
339 raise TypeError("map_async doesn't take a `block` keyword argument.")
340 kwargs['block'] = False
340 kwargs['block'] = False
341 return self.map(f,*sequences,**kwargs)
341 return self.map(f,*sequences,**kwargs)
342
342
343 def map_sync(self, f, *sequences, **kwargs):
343 def map_sync(self, f, *sequences, **kwargs):
344 """Parallel version of builtin `map`, using this view's engines.
344 """Parallel version of builtin `map`, using this view's engines.
345
345
346 This is equivalent to map(...block=True)
346 This is equivalent to map(...block=True)
347
347
348 See `self.map` for details.
348 See `self.map` for details.
349 """
349 """
350 if 'block' in kwargs:
350 if 'block' in kwargs:
351 raise TypeError("map_sync doesn't take a `block` keyword argument.")
351 raise TypeError("map_sync doesn't take a `block` keyword argument.")
352 kwargs['block'] = True
352 kwargs['block'] = True
353 return self.map(f,*sequences,**kwargs)
353 return self.map(f,*sequences,**kwargs)
354
354
355 def imap(self, f, *sequences, **kwargs):
355 def imap(self, f, *sequences, **kwargs):
356 """Parallel version of `itertools.imap`.
356 """Parallel version of `itertools.imap`.
357
357
358 See `self.map` for details.
358 See `self.map` for details.
359
359
360 """
360 """
361
361
362 return iter(self.map_async(f,*sequences, **kwargs))
362 return iter(self.map_async(f,*sequences, **kwargs))
363
363
364 #-------------------------------------------------------------------
364 #-------------------------------------------------------------------
365 # Decorators
365 # Decorators
366 #-------------------------------------------------------------------
366 #-------------------------------------------------------------------
367
367
368 def remote(self, block=True, **flags):
368 def remote(self, block=True, **flags):
369 """Decorator for making a RemoteFunction"""
369 """Decorator for making a RemoteFunction"""
370 block = self.block if block is None else block
370 block = self.block if block is None else block
371 return remote(self, block=block, **flags)
371 return remote(self, block=block, **flags)
372
372
373 def parallel(self, dist='b', block=None, **flags):
373 def parallel(self, dist='b', block=None, **flags):
374 """Decorator for making a ParallelFunction"""
374 """Decorator for making a ParallelFunction"""
375 block = self.block if block is None else block
375 block = self.block if block is None else block
376 return parallel(self, dist=dist, block=block, **flags)
376 return parallel(self, dist=dist, block=block, **flags)
377
377
378 @skip_doctest
378 @skip_doctest
379 class DirectView(View):
379 class DirectView(View):
380 """Direct Multiplexer View of one or more engines.
380 """Direct Multiplexer View of one or more engines.
381
381
382 These are created via indexed access to a client:
382 These are created via indexed access to a client:
383
383
384 >>> dv_1 = client[1]
384 >>> dv_1 = client[1]
385 >>> dv_all = client[:]
385 >>> dv_all = client[:]
386 >>> dv_even = client[::2]
386 >>> dv_even = client[::2]
387 >>> dv_some = client[1:3]
387 >>> dv_some = client[1:3]
388
388
389 This object provides dictionary access to engine namespaces:
389 This object provides dictionary access to engine namespaces:
390
390
391 # push a=5:
391 # push a=5:
392 >>> dv['a'] = 5
392 >>> dv['a'] = 5
393 # pull 'foo':
393 # pull 'foo':
394 >>> db['foo']
394 >>> db['foo']
395
395
396 """
396 """
397
397
398 def __init__(self, client=None, socket=None, targets=None):
398 def __init__(self, client=None, socket=None, targets=None):
399 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
399 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
400
400
401 @property
401 @property
402 def importer(self):
402 def importer(self):
403 """sync_imports(local=True) as a property.
403 """sync_imports(local=True) as a property.
404
404
405 See sync_imports for details.
405 See sync_imports for details.
406
406
407 """
407 """
408 return self.sync_imports(True)
408 return self.sync_imports(True)
409
409
410 @contextmanager
410 @contextmanager
411 def sync_imports(self, local=True, quiet=False):
411 def sync_imports(self, local=True, quiet=False):
412 """Context Manager for performing simultaneous local and remote imports.
412 """Context Manager for performing simultaneous local and remote imports.
413
413
414 'import x as y' will *not* work. The 'as y' part will simply be ignored.
414 'import x as y' will *not* work. The 'as y' part will simply be ignored.
415
415
416 If `local=True`, then the package will also be imported locally.
416 If `local=True`, then the package will also be imported locally.
417
417
418 If `quiet=True`, no output will be produced when attempting remote
418 If `quiet=True`, no output will be produced when attempting remote
419 imports.
419 imports.
420
420
421 Note that remote-only (`local=False`) imports have not been implemented.
421 Note that remote-only (`local=False`) imports have not been implemented.
422
422
423 >>> with view.sync_imports():
423 >>> with view.sync_imports():
424 ... from numpy import recarray
424 ... from numpy import recarray
425 importing recarray from numpy on engine(s)
425 importing recarray from numpy on engine(s)
426
426
427 """
427 """
428 import __builtin__
428 import __builtin__
429 local_import = __builtin__.__import__
429 local_import = __builtin__.__import__
430 modules = set()
430 modules = set()
431 results = []
431 results = []
432 @util.interactive
432 @util.interactive
433 def remote_import(name, fromlist, level):
433 def remote_import(name, fromlist, level):
434 """the function to be passed to apply, that actually performs the import
434 """the function to be passed to apply, that actually performs the import
435 on the engine, and loads up the user namespace.
435 on the engine, and loads up the user namespace.
436 """
436 """
437 import sys
437 import sys
438 user_ns = globals()
438 user_ns = globals()
439 mod = __import__(name, fromlist=fromlist, level=level)
439 mod = __import__(name, fromlist=fromlist, level=level)
440 if fromlist:
440 if fromlist:
441 for key in fromlist:
441 for key in fromlist:
442 user_ns[key] = getattr(mod, key)
442 user_ns[key] = getattr(mod, key)
443 else:
443 else:
444 user_ns[name] = sys.modules[name]
444 user_ns[name] = sys.modules[name]
445
445
446 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
446 def view_import(name, globals={}, locals={}, fromlist=[], level=-1):
447 """the drop-in replacement for __import__, that optionally imports
447 """the drop-in replacement for __import__, that optionally imports
448 locally as well.
448 locally as well.
449 """
449 """
450 # don't override nested imports
450 # don't override nested imports
451 save_import = __builtin__.__import__
451 save_import = __builtin__.__import__
452 __builtin__.__import__ = local_import
452 __builtin__.__import__ = local_import
453
453
454 if imp.lock_held():
454 if imp.lock_held():
455 # this is a side-effect import, don't do it remotely, or even
455 # this is a side-effect import, don't do it remotely, or even
456 # ignore the local effects
456 # ignore the local effects
457 return local_import(name, globals, locals, fromlist, level)
457 return local_import(name, globals, locals, fromlist, level)
458
458
459 imp.acquire_lock()
459 imp.acquire_lock()
460 if local:
460 if local:
461 mod = local_import(name, globals, locals, fromlist, level)
461 mod = local_import(name, globals, locals, fromlist, level)
462 else:
462 else:
463 raise NotImplementedError("remote-only imports not yet implemented")
463 raise NotImplementedError("remote-only imports not yet implemented")
464 imp.release_lock()
464 imp.release_lock()
465
465
466 key = name+':'+','.join(fromlist or [])
466 key = name+':'+','.join(fromlist or [])
467 if level == -1 and key not in modules:
467 if level == -1 and key not in modules:
468 modules.add(key)
468 modules.add(key)
469 if not quiet:
469 if not quiet:
470 if fromlist:
470 if fromlist:
471 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
471 print "importing %s from %s on engine(s)"%(','.join(fromlist), name)
472 else:
472 else:
473 print "importing %s on engine(s)"%name
473 print "importing %s on engine(s)"%name
474 results.append(self.apply_async(remote_import, name, fromlist, level))
474 results.append(self.apply_async(remote_import, name, fromlist, level))
475 # restore override
475 # restore override
476 __builtin__.__import__ = save_import
476 __builtin__.__import__ = save_import
477
477
478 return mod
478 return mod
479
479
480 # override __import__
480 # override __import__
481 __builtin__.__import__ = view_import
481 __builtin__.__import__ = view_import
482 try:
482 try:
483 # enter the block
483 # enter the block
484 yield
484 yield
485 except ImportError:
485 except ImportError:
486 if local:
486 if local:
487 raise
487 raise
488 else:
488 else:
489 # ignore import errors if not doing local imports
489 # ignore import errors if not doing local imports
490 pass
490 pass
491 finally:
491 finally:
492 # always restore __import__
492 # always restore __import__
493 __builtin__.__import__ = local_import
493 __builtin__.__import__ = local_import
494
494
495 for r in results:
495 for r in results:
496 # raise possible remote ImportErrors here
496 # raise possible remote ImportErrors here
497 r.get()
497 r.get()
498
498
499
499
500 @sync_results
500 @sync_results
501 @save_ids
501 @save_ids
502 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
502 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
503 """calls f(*args, **kwargs) on remote engines, returning the result.
503 """calls f(*args, **kwargs) on remote engines, returning the result.
504
504
505 This method sets all of `apply`'s flags via this View's attributes.
505 This method sets all of `apply`'s flags via this View's attributes.
506
506
507 Parameters
507 Parameters
508 ----------
508 ----------
509
509
510 f : callable
510 f : callable
511
511
512 args : list [default: empty]
512 args : list [default: empty]
513
513
514 kwargs : dict [default: empty]
514 kwargs : dict [default: empty]
515
515
516 targets : target list [default: self.targets]
516 targets : target list [default: self.targets]
517 where to run
517 where to run
518 block : bool [default: self.block]
518 block : bool [default: self.block]
519 whether to block
519 whether to block
520 track : bool [default: self.track]
520 track : bool [default: self.track]
521 whether to ask zmq to track the message, for safe non-copying sends
521 whether to ask zmq to track the message, for safe non-copying sends
522
522
523 Returns
523 Returns
524 -------
524 -------
525
525
526 if self.block is False:
526 if self.block is False:
527 returns AsyncResult
527 returns AsyncResult
528 else:
528 else:
529 returns actual result of f(*args, **kwargs) on the engine(s)
529 returns actual result of f(*args, **kwargs) on the engine(s)
530 This will be a list of self.targets is also a list (even length 1), or
530 This will be a list of self.targets is also a list (even length 1), or
531 the single result if self.targets is an integer engine id
531 the single result if self.targets is an integer engine id
532 """
532 """
533 args = [] if args is None else args
533 args = [] if args is None else args
534 kwargs = {} if kwargs is None else kwargs
534 kwargs = {} if kwargs is None else kwargs
535 block = self.block if block is None else block
535 block = self.block if block is None else block
536 track = self.track if track is None else track
536 track = self.track if track is None else track
537 targets = self.targets if targets is None else targets
537 targets = self.targets if targets is None else targets
538
538
539 _idents = self.client._build_targets(targets)[0]
539 _idents = self.client._build_targets(targets)[0]
540 msg_ids = []
540 msg_ids = []
541 trackers = []
541 trackers = []
542 for ident in _idents:
542 for ident in _idents:
543 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
543 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
544 ident=ident)
544 ident=ident)
545 if track:
545 if track:
546 trackers.append(msg['tracker'])
546 trackers.append(msg['tracker'])
547 msg_ids.append(msg['header']['msg_id'])
547 msg_ids.append(msg['header']['msg_id'])
548 tracker = None if track is False else zmq.MessageTracker(*trackers)
548 tracker = None if track is False else zmq.MessageTracker(*trackers)
549 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
549 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
550 if block:
550 if block:
551 try:
551 try:
552 return ar.get()
552 return ar.get()
553 except KeyboardInterrupt:
553 except KeyboardInterrupt:
554 pass
554 pass
555 return ar
555 return ar
556
556
557
557
558 @spin_after
558 @spin_after
559 def map(self, f, *sequences, **kwargs):
559 def map(self, f, *sequences, **kwargs):
560 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
560 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
561
561
562 Parallel version of builtin `map`, using this View's `targets`.
562 Parallel version of builtin `map`, using this View's `targets`.
563
563
564 There will be one task per target, so work will be chunked
564 There will be one task per target, so work will be chunked
565 if the sequences are longer than `targets`.
565 if the sequences are longer than `targets`.
566
566
567 Results can be iterated as they are ready, but will become available in chunks.
567 Results can be iterated as they are ready, but will become available in chunks.
568
568
569 Parameters
569 Parameters
570 ----------
570 ----------
571
571
572 f : callable
572 f : callable
573 function to be mapped
573 function to be mapped
574 *sequences: one or more sequences of matching length
574 *sequences: one or more sequences of matching length
575 the sequences to be distributed and passed to `f`
575 the sequences to be distributed and passed to `f`
576 block : bool
576 block : bool
577 whether to wait for the result or not [default self.block]
577 whether to wait for the result or not [default self.block]
578
578
579 Returns
579 Returns
580 -------
580 -------
581
581
582 if block=False:
582 if block=False:
583 AsyncMapResult
583 AsyncMapResult
584 An object like AsyncResult, but which reassembles the sequence of results
584 An object like AsyncResult, but which reassembles the sequence of results
585 into a single list. AsyncMapResults can be iterated through before all
585 into a single list. AsyncMapResults can be iterated through before all
586 results are complete.
586 results are complete.
587 else:
587 else:
588 list
588 list
589 the result of map(f,*sequences)
589 the result of map(f,*sequences)
590 """
590 """
591
591
592 block = kwargs.pop('block', self.block)
592 block = kwargs.pop('block', self.block)
593 for k in kwargs.keys():
593 for k in kwargs.keys():
594 if k not in ['block', 'track']:
594 if k not in ['block', 'track']:
595 raise TypeError("invalid keyword arg, %r"%k)
595 raise TypeError("invalid keyword arg, %r"%k)
596
596
597 assert len(sequences) > 0, "must have some sequences to map onto!"
597 assert len(sequences) > 0, "must have some sequences to map onto!"
598 pf = ParallelFunction(self, f, block=block, **kwargs)
598 pf = ParallelFunction(self, f, block=block, **kwargs)
599 return pf.map(*sequences)
599 return pf.map(*sequences)
600
600
601 @sync_results
601 @sync_results
602 @save_ids
602 @save_ids
603 def execute(self, code, silent=True, targets=None, block=None):
603 def execute(self, code, silent=True, targets=None, block=None):
604 """Executes `code` on `targets` in blocking or nonblocking manner.
604 """Executes `code` on `targets` in blocking or nonblocking manner.
605
605
606 ``execute`` is always `bound` (affects engine namespace)
606 ``execute`` is always `bound` (affects engine namespace)
607
607
608 Parameters
608 Parameters
609 ----------
609 ----------
610
610
611 code : str
611 code : str
612 the code string to be executed
612 the code string to be executed
613 block : bool
613 block : bool
614 whether or not to wait until done to return
614 whether or not to wait until done to return
615 default: self.block
615 default: self.block
616 """
616 """
617 block = self.block if block is None else block
617 block = self.block if block is None else block
618 targets = self.targets if targets is None else targets
618 targets = self.targets if targets is None else targets
619
619
620 _idents = self.client._build_targets(targets)[0]
620 _idents = self.client._build_targets(targets)[0]
621 msg_ids = []
621 msg_ids = []
622 trackers = []
622 trackers = []
623 for ident in _idents:
623 for ident in _idents:
624 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
624 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
625 msg_ids.append(msg['header']['msg_id'])
625 msg_ids.append(msg['header']['msg_id'])
626 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets)
626 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets)
627 if block:
627 if block:
628 try:
628 try:
629 ar.get()
629 ar.get()
630 except KeyboardInterrupt:
630 except KeyboardInterrupt:
631 pass
631 pass
632 return ar
632 return ar
633
633
634 def run(self, filename, targets=None, block=None):
634 def run(self, filename, targets=None, block=None):
635 """Execute contents of `filename` on my engine(s).
635 """Execute contents of `filename` on my engine(s).
636
636
637 This simply reads the contents of the file and calls `execute`.
637 This simply reads the contents of the file and calls `execute`.
638
638
639 Parameters
639 Parameters
640 ----------
640 ----------
641
641
642 filename : str
642 filename : str
643 The path to the file
643 The path to the file
644 targets : int/str/list of ints/strs
644 targets : int/str/list of ints/strs
645 the engines on which to execute
645 the engines on which to execute
646 default : all
646 default : all
647 block : bool
647 block : bool
648 whether or not to wait until done
648 whether or not to wait until done
649 default: self.block
649 default: self.block
650
650
651 """
651 """
652 with open(filename, 'r') as f:
652 with open(filename, 'r') as f:
653 # add newline in case of trailing indented whitespace
653 # add newline in case of trailing indented whitespace
654 # which will cause SyntaxError
654 # which will cause SyntaxError
655 code = f.read()+'\n'
655 code = f.read()+'\n'
656 return self.execute(code, block=block, targets=targets)
656 return self.execute(code, block=block, targets=targets)
657
657
658 def update(self, ns):
658 def update(self, ns):
659 """update remote namespace with dict `ns`
659 """update remote namespace with dict `ns`
660
660
661 See `push` for details.
661 See `push` for details.
662 """
662 """
663 return self.push(ns, block=self.block, track=self.track)
663 return self.push(ns, block=self.block, track=self.track)
664
664
665 def push(self, ns, targets=None, block=None, track=None):
665 def push(self, ns, targets=None, block=None, track=None):
666 """update remote namespace with dict `ns`
666 """update remote namespace with dict `ns`
667
667
668 Parameters
668 Parameters
669 ----------
669 ----------
670
670
671 ns : dict
671 ns : dict
672 dict of keys with which to update engine namespace(s)
672 dict of keys with which to update engine namespace(s)
673 block : bool [default : self.block]
673 block : bool [default : self.block]
674 whether to wait to be notified of engine receipt
674 whether to wait to be notified of engine receipt
675
675
676 """
676 """
677
677
678 block = block if block is not None else self.block
678 block = block if block is not None else self.block
679 track = track if track is not None else self.track
679 track = track if track is not None else self.track
680 targets = targets if targets is not None else self.targets
680 targets = targets if targets is not None else self.targets
681 # applier = self.apply_sync if block else self.apply_async
681 # applier = self.apply_sync if block else self.apply_async
682 if not isinstance(ns, dict):
682 if not isinstance(ns, dict):
683 raise TypeError("Must be a dict, not %s"%type(ns))
683 raise TypeError("Must be a dict, not %s"%type(ns))
684 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
684 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
685
685
686 def get(self, key_s):
686 def get(self, key_s):
687 """get object(s) by `key_s` from remote namespace
687 """get object(s) by `key_s` from remote namespace
688
688
689 see `pull` for details.
689 see `pull` for details.
690 """
690 """
691 # block = block if block is not None else self.block
691 # block = block if block is not None else self.block
692 return self.pull(key_s, block=True)
692 return self.pull(key_s, block=True)
693
693
694 def pull(self, names, targets=None, block=None):
694 def pull(self, names, targets=None, block=None):
695 """get object(s) by `name` from remote namespace
695 """get object(s) by `name` from remote namespace
696
696
697 will return one object if it is a key.
697 will return one object if it is a key.
698 can also take a list of keys, in which case it will return a list of objects.
698 can also take a list of keys, in which case it will return a list of objects.
699 """
699 """
700 block = block if block is not None else self.block
700 block = block if block is not None else self.block
701 targets = targets if targets is not None else self.targets
701 targets = targets if targets is not None else self.targets
702 applier = self.apply_sync if block else self.apply_async
702 applier = self.apply_sync if block else self.apply_async
703 if isinstance(names, basestring):
703 if isinstance(names, basestring):
704 pass
704 pass
705 elif isinstance(names, (list,tuple,set)):
705 elif isinstance(names, (list,tuple,set)):
706 for key in names:
706 for key in names:
707 if not isinstance(key, basestring):
707 if not isinstance(key, basestring):
708 raise TypeError("keys must be str, not type %r"%type(key))
708 raise TypeError("keys must be str, not type %r"%type(key))
709 else:
709 else:
710 raise TypeError("names must be strs, not %r"%names)
710 raise TypeError("names must be strs, not %r"%names)
711 return self._really_apply(util._pull, (names,), block=block, targets=targets)
711 return self._really_apply(util._pull, (names,), block=block, targets=targets)
712
712
713 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
713 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
714 """
714 """
715 Partition a Python sequence and send the partitions to a set of engines.
715 Partition a Python sequence and send the partitions to a set of engines.
716 """
716 """
717 block = block if block is not None else self.block
717 block = block if block is not None else self.block
718 track = track if track is not None else self.track
718 track = track if track is not None else self.track
719 targets = targets if targets is not None else self.targets
719 targets = targets if targets is not None else self.targets
720
720
721 # construct integer ID list:
721 # construct integer ID list:
722 targets = self.client._build_targets(targets)[1]
722 targets = self.client._build_targets(targets)[1]
723
723
724 mapObject = Map.dists[dist]()
724 mapObject = Map.dists[dist]()
725 nparts = len(targets)
725 nparts = len(targets)
726 msg_ids = []
726 msg_ids = []
727 trackers = []
727 trackers = []
728 for index, engineid in enumerate(targets):
728 for index, engineid in enumerate(targets):
729 partition = mapObject.getPartition(seq, index, nparts)
729 partition = mapObject.getPartition(seq, index, nparts)
730 if flatten and len(partition) == 1:
730 if flatten and len(partition) == 1:
731 ns = {key: partition[0]}
731 ns = {key: partition[0]}
732 else:
732 else:
733 ns = {key: partition}
733 ns = {key: partition}
734 r = self.push(ns, block=False, track=track, targets=engineid)
734 r = self.push(ns, block=False, track=track, targets=engineid)
735 msg_ids.extend(r.msg_ids)
735 msg_ids.extend(r.msg_ids)
736 if track:
736 if track:
737 trackers.append(r._tracker)
737 trackers.append(r._tracker)
738
738
739 if track:
739 if track:
740 tracker = zmq.MessageTracker(*trackers)
740 tracker = zmq.MessageTracker(*trackers)
741 else:
741 else:
742 tracker = None
742 tracker = None
743
743
744 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
744 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
745 if block:
745 if block:
746 r.wait()
746 r.wait()
747 else:
747 else:
748 return r
748 return r
749
749
750 @sync_results
750 @sync_results
751 @save_ids
751 @save_ids
752 def gather(self, key, dist='b', targets=None, block=None):
752 def gather(self, key, dist='b', targets=None, block=None):
753 """
753 """
754 Gather a partitioned sequence on a set of engines as a single local seq.
754 Gather a partitioned sequence on a set of engines as a single local seq.
755 """
755 """
756 block = block if block is not None else self.block
756 block = block if block is not None else self.block
757 targets = targets if targets is not None else self.targets
757 targets = targets if targets is not None else self.targets
758 mapObject = Map.dists[dist]()
758 mapObject = Map.dists[dist]()
759 msg_ids = []
759 msg_ids = []
760
760
761 # construct integer ID list:
761 # construct integer ID list:
762 targets = self.client._build_targets(targets)[1]
762 targets = self.client._build_targets(targets)[1]
763
763
764 for index, engineid in enumerate(targets):
764 for index, engineid in enumerate(targets):
765 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
765 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
766
766
767 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
767 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
768
768
769 if block:
769 if block:
770 try:
770 try:
771 return r.get()
771 return r.get()
772 except KeyboardInterrupt:
772 except KeyboardInterrupt:
773 pass
773 pass
774 return r
774 return r
775
775
776 def __getitem__(self, key):
776 def __getitem__(self, key):
777 return self.get(key)
777 return self.get(key)
778
778
779 def __setitem__(self,key, value):
779 def __setitem__(self,key, value):
780 self.update({key:value})
780 self.update({key:value})
781
781
782 def clear(self, targets=None, block=False):
782 def clear(self, targets=None, block=False):
783 """Clear the remote namespaces on my engines."""
783 """Clear the remote namespaces on my engines."""
784 block = block if block is not None else self.block
784 block = block if block is not None else self.block
785 targets = targets if targets is not None else self.targets
785 targets = targets if targets is not None else self.targets
786 return self.client.clear(targets=targets, block=block)
786 return self.client.clear(targets=targets, block=block)
787
787
788 def kill(self, targets=None, block=True):
788 def kill(self, targets=None, block=True):
789 """Kill my engines."""
789 """Kill my engines."""
790 block = block if block is not None else self.block
790 block = block if block is not None else self.block
791 targets = targets if targets is not None else self.targets
791 targets = targets if targets is not None else self.targets
792 return self.client.kill(targets=targets, block=block)
792 return self.client.kill(targets=targets, block=block)
793
793
794 #----------------------------------------
794 #----------------------------------------
795 # activate for %px,%autopx magics
795 # activate for %px, %autopx, etc. magics
796 #----------------------------------------
796 #----------------------------------------
797 def activate(self):
798 """Make this `View` active for parallel magic commands.
799
797
800 IPython has a magic command syntax to work with `MultiEngineClient` objects.
798 def activate(self, suffix=''):
801 In a given IPython session there is a single active one. While
799 """Activate IPython magics associated with this View
802 there can be many `Views` created and used by the user,
800
803 there is only one active one. The active `View` is used whenever
801 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
804 the magic commands %px and %autopx are used.
802
805
803 Parameters
806 The activate() method is called on a given `View` to make it
804 ----------
807 active. Once this has been done, the magic commands can be used.
805
806 suffix: str [default: '']
807 The suffix, if any, for the magics. This allows you to have
808 multiple views associated with parallel magics at the same time.
809
810 e.g. ``rc[::2].activate(suffix='_even')`` will give you
811 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
812 on the even engines.
808 """
813 """
809
814
815 from IPython.parallel.client.magics import ParallelMagics
816
810 try:
817 try:
811 # This is injected into __builtins__.
818 # This is injected into __builtins__.
812 ip = get_ipython()
819 ip = get_ipython()
813 except NameError:
820 except NameError:
814 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
821 print "The IPython parallel magics (%px, etc.) only work within IPython."
815 else:
822 return
816 pmagic = ip.magics_manager.registry.get('ParallelMagics')
823
817 if pmagic is None:
824 M = ParallelMagics(ip, self, suffix)
818 ip.magic('load_ext parallelmagic')
825 ip.magics_manager.register(M)
819 pmagic = ip.magics_manager.registry.get('ParallelMagics')
820
821 pmagic.active_view = self
822
826
823
827
824 @skip_doctest
828 @skip_doctest
825 class LoadBalancedView(View):
829 class LoadBalancedView(View):
826 """An load-balancing View that only executes via the Task scheduler.
830 """An load-balancing View that only executes via the Task scheduler.
827
831
828 Load-balanced views can be created with the client's `view` method:
832 Load-balanced views can be created with the client's `view` method:
829
833
830 >>> v = client.load_balanced_view()
834 >>> v = client.load_balanced_view()
831
835
832 or targets can be specified, to restrict the potential destinations:
836 or targets can be specified, to restrict the potential destinations:
833
837
834 >>> v = client.client.load_balanced_view([1,3])
838 >>> v = client.client.load_balanced_view([1,3])
835
839
836 which would restrict loadbalancing to between engines 1 and 3.
840 which would restrict loadbalancing to between engines 1 and 3.
837
841
838 """
842 """
839
843
840 follow=Any()
844 follow=Any()
841 after=Any()
845 after=Any()
842 timeout=CFloat()
846 timeout=CFloat()
843 retries = Integer(0)
847 retries = Integer(0)
844
848
845 _task_scheme = Any()
849 _task_scheme = Any()
846 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
850 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
847
851
848 def __init__(self, client=None, socket=None, **flags):
852 def __init__(self, client=None, socket=None, **flags):
849 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
853 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
850 self._task_scheme=client._task_scheme
854 self._task_scheme=client._task_scheme
851
855
852 def _validate_dependency(self, dep):
856 def _validate_dependency(self, dep):
853 """validate a dependency.
857 """validate a dependency.
854
858
855 For use in `set_flags`.
859 For use in `set_flags`.
856 """
860 """
857 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
861 if dep is None or isinstance(dep, (basestring, AsyncResult, Dependency)):
858 return True
862 return True
859 elif isinstance(dep, (list,set, tuple)):
863 elif isinstance(dep, (list,set, tuple)):
860 for d in dep:
864 for d in dep:
861 if not isinstance(d, (basestring, AsyncResult)):
865 if not isinstance(d, (basestring, AsyncResult)):
862 return False
866 return False
863 elif isinstance(dep, dict):
867 elif isinstance(dep, dict):
864 if set(dep.keys()) != set(Dependency().as_dict().keys()):
868 if set(dep.keys()) != set(Dependency().as_dict().keys()):
865 return False
869 return False
866 if not isinstance(dep['msg_ids'], list):
870 if not isinstance(dep['msg_ids'], list):
867 return False
871 return False
868 for d in dep['msg_ids']:
872 for d in dep['msg_ids']:
869 if not isinstance(d, basestring):
873 if not isinstance(d, basestring):
870 return False
874 return False
871 else:
875 else:
872 return False
876 return False
873
877
874 return True
878 return True
875
879
876 def _render_dependency(self, dep):
880 def _render_dependency(self, dep):
877 """helper for building jsonable dependencies from various input forms."""
881 """helper for building jsonable dependencies from various input forms."""
878 if isinstance(dep, Dependency):
882 if isinstance(dep, Dependency):
879 return dep.as_dict()
883 return dep.as_dict()
880 elif isinstance(dep, AsyncResult):
884 elif isinstance(dep, AsyncResult):
881 return dep.msg_ids
885 return dep.msg_ids
882 elif dep is None:
886 elif dep is None:
883 return []
887 return []
884 else:
888 else:
885 # pass to Dependency constructor
889 # pass to Dependency constructor
886 return list(Dependency(dep))
890 return list(Dependency(dep))
887
891
888 def set_flags(self, **kwargs):
892 def set_flags(self, **kwargs):
889 """set my attribute flags by keyword.
893 """set my attribute flags by keyword.
890
894
891 A View is a wrapper for the Client's apply method, but with attributes
895 A View is a wrapper for the Client's apply method, but with attributes
892 that specify keyword arguments, those attributes can be set by keyword
896 that specify keyword arguments, those attributes can be set by keyword
893 argument with this method.
897 argument with this method.
894
898
895 Parameters
899 Parameters
896 ----------
900 ----------
897
901
898 block : bool
902 block : bool
899 whether to wait for results
903 whether to wait for results
900 track : bool
904 track : bool
901 whether to create a MessageTracker to allow the user to
905 whether to create a MessageTracker to allow the user to
902 safely edit after arrays and buffers during non-copying
906 safely edit after arrays and buffers during non-copying
903 sends.
907 sends.
904
908
905 after : Dependency or collection of msg_ids
909 after : Dependency or collection of msg_ids
906 Only for load-balanced execution (targets=None)
910 Only for load-balanced execution (targets=None)
907 Specify a list of msg_ids as a time-based dependency.
911 Specify a list of msg_ids as a time-based dependency.
908 This job will only be run *after* the dependencies
912 This job will only be run *after* the dependencies
909 have been met.
913 have been met.
910
914
911 follow : Dependency or collection of msg_ids
915 follow : Dependency or collection of msg_ids
912 Only for load-balanced execution (targets=None)
916 Only for load-balanced execution (targets=None)
913 Specify a list of msg_ids as a location-based dependency.
917 Specify a list of msg_ids as a location-based dependency.
914 This job will only be run on an engine where this dependency
918 This job will only be run on an engine where this dependency
915 is met.
919 is met.
916
920
917 timeout : float/int or None
921 timeout : float/int or None
918 Only for load-balanced execution (targets=None)
922 Only for load-balanced execution (targets=None)
919 Specify an amount of time (in seconds) for the scheduler to
923 Specify an amount of time (in seconds) for the scheduler to
920 wait for dependencies to be met before failing with a
924 wait for dependencies to be met before failing with a
921 DependencyTimeout.
925 DependencyTimeout.
922
926
923 retries : int
927 retries : int
924 Number of times a task will be retried on failure.
928 Number of times a task will be retried on failure.
925 """
929 """
926
930
927 super(LoadBalancedView, self).set_flags(**kwargs)
931 super(LoadBalancedView, self).set_flags(**kwargs)
928 for name in ('follow', 'after'):
932 for name in ('follow', 'after'):
929 if name in kwargs:
933 if name in kwargs:
930 value = kwargs[name]
934 value = kwargs[name]
931 if self._validate_dependency(value):
935 if self._validate_dependency(value):
932 setattr(self, name, value)
936 setattr(self, name, value)
933 else:
937 else:
934 raise ValueError("Invalid dependency: %r"%value)
938 raise ValueError("Invalid dependency: %r"%value)
935 if 'timeout' in kwargs:
939 if 'timeout' in kwargs:
936 t = kwargs['timeout']
940 t = kwargs['timeout']
937 if not isinstance(t, (int, long, float, type(None))):
941 if not isinstance(t, (int, long, float, type(None))):
938 raise TypeError("Invalid type for timeout: %r"%type(t))
942 raise TypeError("Invalid type for timeout: %r"%type(t))
939 if t is not None:
943 if t is not None:
940 if t < 0:
944 if t < 0:
941 raise ValueError("Invalid timeout: %s"%t)
945 raise ValueError("Invalid timeout: %s"%t)
942 self.timeout = t
946 self.timeout = t
943
947
944 @sync_results
948 @sync_results
945 @save_ids
949 @save_ids
946 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
950 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
947 after=None, follow=None, timeout=None,
951 after=None, follow=None, timeout=None,
948 targets=None, retries=None):
952 targets=None, retries=None):
949 """calls f(*args, **kwargs) on a remote engine, returning the result.
953 """calls f(*args, **kwargs) on a remote engine, returning the result.
950
954
951 This method temporarily sets all of `apply`'s flags for a single call.
955 This method temporarily sets all of `apply`'s flags for a single call.
952
956
953 Parameters
957 Parameters
954 ----------
958 ----------
955
959
956 f : callable
960 f : callable
957
961
958 args : list [default: empty]
962 args : list [default: empty]
959
963
960 kwargs : dict [default: empty]
964 kwargs : dict [default: empty]
961
965
962 block : bool [default: self.block]
966 block : bool [default: self.block]
963 whether to block
967 whether to block
964 track : bool [default: self.track]
968 track : bool [default: self.track]
965 whether to ask zmq to track the message, for safe non-copying sends
969 whether to ask zmq to track the message, for safe non-copying sends
966
970
967 !!!!!! TODO: THE REST HERE !!!!
971 !!!!!! TODO: THE REST HERE !!!!
968
972
969 Returns
973 Returns
970 -------
974 -------
971
975
972 if self.block is False:
976 if self.block is False:
973 returns AsyncResult
977 returns AsyncResult
974 else:
978 else:
975 returns actual result of f(*args, **kwargs) on the engine(s)
979 returns actual result of f(*args, **kwargs) on the engine(s)
976 This will be a list of self.targets is also a list (even length 1), or
980 This will be a list of self.targets is also a list (even length 1), or
977 the single result if self.targets is an integer engine id
981 the single result if self.targets is an integer engine id
978 """
982 """
979
983
980 # validate whether we can run
984 # validate whether we can run
981 if self._socket.closed:
985 if self._socket.closed:
982 msg = "Task farming is disabled"
986 msg = "Task farming is disabled"
983 if self._task_scheme == 'pure':
987 if self._task_scheme == 'pure':
984 msg += " because the pure ZMQ scheduler cannot handle"
988 msg += " because the pure ZMQ scheduler cannot handle"
985 msg += " disappearing engines."
989 msg += " disappearing engines."
986 raise RuntimeError(msg)
990 raise RuntimeError(msg)
987
991
988 if self._task_scheme == 'pure':
992 if self._task_scheme == 'pure':
989 # pure zmq scheme doesn't support extra features
993 # pure zmq scheme doesn't support extra features
990 msg = "Pure ZMQ scheduler doesn't support the following flags:"
994 msg = "Pure ZMQ scheduler doesn't support the following flags:"
991 "follow, after, retries, targets, timeout"
995 "follow, after, retries, targets, timeout"
992 if (follow or after or retries or targets or timeout):
996 if (follow or after or retries or targets or timeout):
993 # hard fail on Scheduler flags
997 # hard fail on Scheduler flags
994 raise RuntimeError(msg)
998 raise RuntimeError(msg)
995 if isinstance(f, dependent):
999 if isinstance(f, dependent):
996 # soft warn on functional dependencies
1000 # soft warn on functional dependencies
997 warnings.warn(msg, RuntimeWarning)
1001 warnings.warn(msg, RuntimeWarning)
998
1002
999 # build args
1003 # build args
1000 args = [] if args is None else args
1004 args = [] if args is None else args
1001 kwargs = {} if kwargs is None else kwargs
1005 kwargs = {} if kwargs is None else kwargs
1002 block = self.block if block is None else block
1006 block = self.block if block is None else block
1003 track = self.track if track is None else track
1007 track = self.track if track is None else track
1004 after = self.after if after is None else after
1008 after = self.after if after is None else after
1005 retries = self.retries if retries is None else retries
1009 retries = self.retries if retries is None else retries
1006 follow = self.follow if follow is None else follow
1010 follow = self.follow if follow is None else follow
1007 timeout = self.timeout if timeout is None else timeout
1011 timeout = self.timeout if timeout is None else timeout
1008 targets = self.targets if targets is None else targets
1012 targets = self.targets if targets is None else targets
1009
1013
1010 if not isinstance(retries, int):
1014 if not isinstance(retries, int):
1011 raise TypeError('retries must be int, not %r'%type(retries))
1015 raise TypeError('retries must be int, not %r'%type(retries))
1012
1016
1013 if targets is None:
1017 if targets is None:
1014 idents = []
1018 idents = []
1015 else:
1019 else:
1016 idents = self.client._build_targets(targets)[0]
1020 idents = self.client._build_targets(targets)[0]
1017 # ensure *not* bytes
1021 # ensure *not* bytes
1018 idents = [ ident.decode() for ident in idents ]
1022 idents = [ ident.decode() for ident in idents ]
1019
1023
1020 after = self._render_dependency(after)
1024 after = self._render_dependency(after)
1021 follow = self._render_dependency(follow)
1025 follow = self._render_dependency(follow)
1022 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1026 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1023
1027
1024 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1028 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1025 subheader=subheader)
1029 subheader=subheader)
1026 tracker = None if track is False else msg['tracker']
1030 tracker = None if track is False else msg['tracker']
1027
1031
1028 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1032 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1029
1033
1030 if block:
1034 if block:
1031 try:
1035 try:
1032 return ar.get()
1036 return ar.get()
1033 except KeyboardInterrupt:
1037 except KeyboardInterrupt:
1034 pass
1038 pass
1035 return ar
1039 return ar
1036
1040
1037 @spin_after
1041 @spin_after
1038 @save_ids
1042 @save_ids
1039 def map(self, f, *sequences, **kwargs):
1043 def map(self, f, *sequences, **kwargs):
1040 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1044 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1041
1045
1042 Parallel version of builtin `map`, load-balanced by this View.
1046 Parallel version of builtin `map`, load-balanced by this View.
1043
1047
1044 `block`, and `chunksize` can be specified by keyword only.
1048 `block`, and `chunksize` can be specified by keyword only.
1045
1049
1046 Each `chunksize` elements will be a separate task, and will be
1050 Each `chunksize` elements will be a separate task, and will be
1047 load-balanced. This lets individual elements be available for iteration
1051 load-balanced. This lets individual elements be available for iteration
1048 as soon as they arrive.
1052 as soon as they arrive.
1049
1053
1050 Parameters
1054 Parameters
1051 ----------
1055 ----------
1052
1056
1053 f : callable
1057 f : callable
1054 function to be mapped
1058 function to be mapped
1055 *sequences: one or more sequences of matching length
1059 *sequences: one or more sequences of matching length
1056 the sequences to be distributed and passed to `f`
1060 the sequences to be distributed and passed to `f`
1057 block : bool [default self.block]
1061 block : bool [default self.block]
1058 whether to wait for the result or not
1062 whether to wait for the result or not
1059 track : bool
1063 track : bool
1060 whether to create a MessageTracker to allow the user to
1064 whether to create a MessageTracker to allow the user to
1061 safely edit after arrays and buffers during non-copying
1065 safely edit after arrays and buffers during non-copying
1062 sends.
1066 sends.
1063 chunksize : int [default 1]
1067 chunksize : int [default 1]
1064 how many elements should be in each task.
1068 how many elements should be in each task.
1065 ordered : bool [default True]
1069 ordered : bool [default True]
1066 Whether the results should be gathered as they arrive, or enforce
1070 Whether the results should be gathered as they arrive, or enforce
1067 the order of submission.
1071 the order of submission.
1068
1072
1069 Only applies when iterating through AsyncMapResult as results arrive.
1073 Only applies when iterating through AsyncMapResult as results arrive.
1070 Has no effect when block=True.
1074 Has no effect when block=True.
1071
1075
1072 Returns
1076 Returns
1073 -------
1077 -------
1074
1078
1075 if block=False:
1079 if block=False:
1076 AsyncMapResult
1080 AsyncMapResult
1077 An object like AsyncResult, but which reassembles the sequence of results
1081 An object like AsyncResult, but which reassembles the sequence of results
1078 into a single list. AsyncMapResults can be iterated through before all
1082 into a single list. AsyncMapResults can be iterated through before all
1079 results are complete.
1083 results are complete.
1080 else:
1084 else:
1081 the result of map(f,*sequences)
1085 the result of map(f,*sequences)
1082
1086
1083 """
1087 """
1084
1088
1085 # default
1089 # default
1086 block = kwargs.get('block', self.block)
1090 block = kwargs.get('block', self.block)
1087 chunksize = kwargs.get('chunksize', 1)
1091 chunksize = kwargs.get('chunksize', 1)
1088 ordered = kwargs.get('ordered', True)
1092 ordered = kwargs.get('ordered', True)
1089
1093
1090 keyset = set(kwargs.keys())
1094 keyset = set(kwargs.keys())
1091 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1095 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1092 if extra_keys:
1096 if extra_keys:
1093 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1097 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1094
1098
1095 assert len(sequences) > 0, "must have some sequences to map onto!"
1099 assert len(sequences) > 0, "must have some sequences to map onto!"
1096
1100
1097 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1101 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1098 return pf.map(*sequences)
1102 return pf.map(*sequences)
1099
1103
1100 __all__ = ['LoadBalancedView', 'DirectView']
1104 __all__ = ['LoadBalancedView', 'DirectView']
@@ -1,422 +1,436 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython import parallel
27 from IPython.parallel.client import client as clientmod
28 from IPython.parallel.client import client as clientmod
28 from IPython.parallel import error
29 from IPython.parallel import error
29 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import LoadBalancedView, DirectView
31 from IPython.parallel import LoadBalancedView, DirectView
31
32
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33 from clienttest import ClusterTestCase, segfault, wait, add_engines
33
34
34 def setup():
35 def setup():
35 add_engines(4, total=True)
36 add_engines(4, total=True)
36
37
37 class TestClient(ClusterTestCase):
38 class TestClient(ClusterTestCase):
38
39
39 def test_ids(self):
40 def test_ids(self):
40 n = len(self.client.ids)
41 n = len(self.client.ids)
41 self.add_engines(2)
42 self.add_engines(2)
42 self.assertEquals(len(self.client.ids), n+2)
43 self.assertEquals(len(self.client.ids), n+2)
43
44
44 def test_view_indexing(self):
45 def test_view_indexing(self):
45 """test index access for views"""
46 """test index access for views"""
46 self.minimum_engines(4)
47 self.minimum_engines(4)
47 targets = self.client._build_targets('all')[-1]
48 targets = self.client._build_targets('all')[-1]
48 v = self.client[:]
49 v = self.client[:]
49 self.assertEquals(v.targets, targets)
50 self.assertEquals(v.targets, targets)
50 t = self.client.ids[2]
51 t = self.client.ids[2]
51 v = self.client[t]
52 v = self.client[t]
52 self.assert_(isinstance(v, DirectView))
53 self.assert_(isinstance(v, DirectView))
53 self.assertEquals(v.targets, t)
54 self.assertEquals(v.targets, t)
54 t = self.client.ids[2:4]
55 t = self.client.ids[2:4]
55 v = self.client[t]
56 v = self.client[t]
56 self.assert_(isinstance(v, DirectView))
57 self.assert_(isinstance(v, DirectView))
57 self.assertEquals(v.targets, t)
58 self.assertEquals(v.targets, t)
58 v = self.client[::2]
59 v = self.client[::2]
59 self.assert_(isinstance(v, DirectView))
60 self.assert_(isinstance(v, DirectView))
60 self.assertEquals(v.targets, targets[::2])
61 self.assertEquals(v.targets, targets[::2])
61 v = self.client[1::3]
62 v = self.client[1::3]
62 self.assert_(isinstance(v, DirectView))
63 self.assert_(isinstance(v, DirectView))
63 self.assertEquals(v.targets, targets[1::3])
64 self.assertEquals(v.targets, targets[1::3])
64 v = self.client[:-3]
65 v = self.client[:-3]
65 self.assert_(isinstance(v, DirectView))
66 self.assert_(isinstance(v, DirectView))
66 self.assertEquals(v.targets, targets[:-3])
67 self.assertEquals(v.targets, targets[:-3])
67 v = self.client[-1]
68 v = self.client[-1]
68 self.assert_(isinstance(v, DirectView))
69 self.assert_(isinstance(v, DirectView))
69 self.assertEquals(v.targets, targets[-1])
70 self.assertEquals(v.targets, targets[-1])
70 self.assertRaises(TypeError, lambda : self.client[None])
71 self.assertRaises(TypeError, lambda : self.client[None])
71
72
72 def test_lbview_targets(self):
73 def test_lbview_targets(self):
73 """test load_balanced_view targets"""
74 """test load_balanced_view targets"""
74 v = self.client.load_balanced_view()
75 v = self.client.load_balanced_view()
75 self.assertEquals(v.targets, None)
76 self.assertEquals(v.targets, None)
76 v = self.client.load_balanced_view(-1)
77 v = self.client.load_balanced_view(-1)
77 self.assertEquals(v.targets, [self.client.ids[-1]])
78 self.assertEquals(v.targets, [self.client.ids[-1]])
78 v = self.client.load_balanced_view('all')
79 v = self.client.load_balanced_view('all')
79 self.assertEquals(v.targets, None)
80 self.assertEquals(v.targets, None)
80
81
81 def test_dview_targets(self):
82 def test_dview_targets(self):
82 """test direct_view targets"""
83 """test direct_view targets"""
83 v = self.client.direct_view()
84 v = self.client.direct_view()
84 self.assertEquals(v.targets, 'all')
85 self.assertEquals(v.targets, 'all')
85 v = self.client.direct_view('all')
86 v = self.client.direct_view('all')
86 self.assertEquals(v.targets, 'all')
87 self.assertEquals(v.targets, 'all')
87 v = self.client.direct_view(-1)
88 v = self.client.direct_view(-1)
88 self.assertEquals(v.targets, self.client.ids[-1])
89 self.assertEquals(v.targets, self.client.ids[-1])
89
90
90 def test_lazy_all_targets(self):
91 def test_lazy_all_targets(self):
91 """test lazy evaluation of rc.direct_view('all')"""
92 """test lazy evaluation of rc.direct_view('all')"""
92 v = self.client.direct_view()
93 v = self.client.direct_view()
93 self.assertEquals(v.targets, 'all')
94 self.assertEquals(v.targets, 'all')
94
95
95 def double(x):
96 def double(x):
96 return x*2
97 return x*2
97 seq = range(100)
98 seq = range(100)
98 ref = [ double(x) for x in seq ]
99 ref = [ double(x) for x in seq ]
99
100
100 # add some engines, which should be used
101 # add some engines, which should be used
101 self.add_engines(1)
102 self.add_engines(1)
102 n1 = len(self.client.ids)
103 n1 = len(self.client.ids)
103
104
104 # simple apply
105 # simple apply
105 r = v.apply_sync(lambda : 1)
106 r = v.apply_sync(lambda : 1)
106 self.assertEquals(r, [1] * n1)
107 self.assertEquals(r, [1] * n1)
107
108
108 # map goes through remotefunction
109 # map goes through remotefunction
109 r = v.map_sync(double, seq)
110 r = v.map_sync(double, seq)
110 self.assertEquals(r, ref)
111 self.assertEquals(r, ref)
111
112
112 # add a couple more engines, and try again
113 # add a couple more engines, and try again
113 self.add_engines(2)
114 self.add_engines(2)
114 n2 = len(self.client.ids)
115 n2 = len(self.client.ids)
115 self.assertNotEquals(n2, n1)
116 self.assertNotEquals(n2, n1)
116
117
117 # apply
118 # apply
118 r = v.apply_sync(lambda : 1)
119 r = v.apply_sync(lambda : 1)
119 self.assertEquals(r, [1] * n2)
120 self.assertEquals(r, [1] * n2)
120
121
121 # map
122 # map
122 r = v.map_sync(double, seq)
123 r = v.map_sync(double, seq)
123 self.assertEquals(r, ref)
124 self.assertEquals(r, ref)
124
125
125 def test_targets(self):
126 def test_targets(self):
126 """test various valid targets arguments"""
127 """test various valid targets arguments"""
127 build = self.client._build_targets
128 build = self.client._build_targets
128 ids = self.client.ids
129 ids = self.client.ids
129 idents,targets = build(None)
130 idents,targets = build(None)
130 self.assertEquals(ids, targets)
131 self.assertEquals(ids, targets)
131
132
132 def test_clear(self):
133 def test_clear(self):
133 """test clear behavior"""
134 """test clear behavior"""
134 self.minimum_engines(2)
135 self.minimum_engines(2)
135 v = self.client[:]
136 v = self.client[:]
136 v.block=True
137 v.block=True
137 v.push(dict(a=5))
138 v.push(dict(a=5))
138 v.pull('a')
139 v.pull('a')
139 id0 = self.client.ids[-1]
140 id0 = self.client.ids[-1]
140 self.client.clear(targets=id0, block=True)
141 self.client.clear(targets=id0, block=True)
141 a = self.client[:-1].get('a')
142 a = self.client[:-1].get('a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.client.clear(block=True)
144 self.client.clear(block=True)
144 for i in self.client.ids:
145 for i in self.client.ids:
145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146
147
147 def test_get_result(self):
148 def test_get_result(self):
148 """test getting results from the Hub."""
149 """test getting results from the Hub."""
149 c = clientmod.Client(profile='iptest')
150 c = clientmod.Client(profile='iptest')
150 t = c.ids[-1]
151 t = c.ids[-1]
151 ar = c[t].apply_async(wait, 1)
152 ar = c[t].apply_async(wait, 1)
152 # give the monitor time to notice the message
153 # give the monitor time to notice the message
153 time.sleep(.25)
154 time.sleep(.25)
154 ahr = self.client.get_result(ar.msg_ids)
155 ahr = self.client.get_result(ar.msg_ids)
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertEquals(ahr.get(), ar.get())
157 self.assertEquals(ahr.get(), ar.get())
157 ar2 = self.client.get_result(ar.msg_ids)
158 ar2 = self.client.get_result(ar.msg_ids)
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 c.close()
160 c.close()
160
161
161 def test_ids_list(self):
162 def test_ids_list(self):
162 """test client.ids"""
163 """test client.ids"""
163 ids = self.client.ids
164 ids = self.client.ids
164 self.assertEquals(ids, self.client._ids)
165 self.assertEquals(ids, self.client._ids)
165 self.assertFalse(ids is self.client._ids)
166 self.assertFalse(ids is self.client._ids)
166 ids.remove(ids[-1])
167 ids.remove(ids[-1])
167 self.assertNotEquals(ids, self.client._ids)
168 self.assertNotEquals(ids, self.client._ids)
168
169
169 def test_queue_status(self):
170 def test_queue_status(self):
170 ids = self.client.ids
171 ids = self.client.ids
171 id0 = ids[0]
172 id0 = ids[0]
172 qs = self.client.queue_status(targets=id0)
173 qs = self.client.queue_status(targets=id0)
173 self.assertTrue(isinstance(qs, dict))
174 self.assertTrue(isinstance(qs, dict))
174 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
175 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
175 allqs = self.client.queue_status()
176 allqs = self.client.queue_status()
176 self.assertTrue(isinstance(allqs, dict))
177 self.assertTrue(isinstance(allqs, dict))
177 intkeys = list(allqs.keys())
178 intkeys = list(allqs.keys())
178 intkeys.remove('unassigned')
179 intkeys.remove('unassigned')
179 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
180 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
180 unassigned = allqs.pop('unassigned')
181 unassigned = allqs.pop('unassigned')
181 for eid,qs in allqs.items():
182 for eid,qs in allqs.items():
182 self.assertTrue(isinstance(qs, dict))
183 self.assertTrue(isinstance(qs, dict))
183 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
184 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
184
185
185 def test_shutdown(self):
186 def test_shutdown(self):
186 ids = self.client.ids
187 ids = self.client.ids
187 id0 = ids[0]
188 id0 = ids[0]
188 self.client.shutdown(id0, block=True)
189 self.client.shutdown(id0, block=True)
189 while id0 in self.client.ids:
190 while id0 in self.client.ids:
190 time.sleep(0.1)
191 time.sleep(0.1)
191 self.client.spin()
192 self.client.spin()
192
193
193 self.assertRaises(IndexError, lambda : self.client[id0])
194 self.assertRaises(IndexError, lambda : self.client[id0])
194
195
195 def test_result_status(self):
196 def test_result_status(self):
196 pass
197 pass
197 # to be written
198 # to be written
198
199
199 def test_db_query_dt(self):
200 def test_db_query_dt(self):
200 """test db query by date"""
201 """test db query by date"""
201 hist = self.client.hub_history()
202 hist = self.client.hub_history()
202 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
203 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
203 tic = middle['submitted']
204 tic = middle['submitted']
204 before = self.client.db_query({'submitted' : {'$lt' : tic}})
205 before = self.client.db_query({'submitted' : {'$lt' : tic}})
205 after = self.client.db_query({'submitted' : {'$gte' : tic}})
206 after = self.client.db_query({'submitted' : {'$gte' : tic}})
206 self.assertEquals(len(before)+len(after),len(hist))
207 self.assertEquals(len(before)+len(after),len(hist))
207 for b in before:
208 for b in before:
208 self.assertTrue(b['submitted'] < tic)
209 self.assertTrue(b['submitted'] < tic)
209 for a in after:
210 for a in after:
210 self.assertTrue(a['submitted'] >= tic)
211 self.assertTrue(a['submitted'] >= tic)
211 same = self.client.db_query({'submitted' : tic})
212 same = self.client.db_query({'submitted' : tic})
212 for s in same:
213 for s in same:
213 self.assertTrue(s['submitted'] == tic)
214 self.assertTrue(s['submitted'] == tic)
214
215
215 def test_db_query_keys(self):
216 def test_db_query_keys(self):
216 """test extracting subset of record keys"""
217 """test extracting subset of record keys"""
217 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
218 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
218 for rec in found:
219 for rec in found:
219 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
220 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
220
221
221 def test_db_query_default_keys(self):
222 def test_db_query_default_keys(self):
222 """default db_query excludes buffers"""
223 """default db_query excludes buffers"""
223 found = self.client.db_query({'msg_id': {'$ne' : ''}})
224 found = self.client.db_query({'msg_id': {'$ne' : ''}})
224 for rec in found:
225 for rec in found:
225 keys = set(rec.keys())
226 keys = set(rec.keys())
226 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
227 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
227 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
228 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
228
229
229 def test_db_query_msg_id(self):
230 def test_db_query_msg_id(self):
230 """ensure msg_id is always in db queries"""
231 """ensure msg_id is always in db queries"""
231 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
232 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
232 for rec in found:
233 for rec in found:
233 self.assertTrue('msg_id' in rec.keys())
234 self.assertTrue('msg_id' in rec.keys())
234 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
235 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
235 for rec in found:
236 for rec in found:
236 self.assertTrue('msg_id' in rec.keys())
237 self.assertTrue('msg_id' in rec.keys())
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
238 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
238 for rec in found:
239 for rec in found:
239 self.assertTrue('msg_id' in rec.keys())
240 self.assertTrue('msg_id' in rec.keys())
240
241
241 def test_db_query_get_result(self):
242 def test_db_query_get_result(self):
242 """pop in db_query shouldn't pop from result itself"""
243 """pop in db_query shouldn't pop from result itself"""
243 self.client[:].apply_sync(lambda : 1)
244 self.client[:].apply_sync(lambda : 1)
244 found = self.client.db_query({'msg_id': {'$ne' : ''}})
245 found = self.client.db_query({'msg_id': {'$ne' : ''}})
245 rc2 = clientmod.Client(profile='iptest')
246 rc2 = clientmod.Client(profile='iptest')
246 # If this bug is not fixed, this call will hang:
247 # If this bug is not fixed, this call will hang:
247 ar = rc2.get_result(self.client.history[-1])
248 ar = rc2.get_result(self.client.history[-1])
248 ar.wait(2)
249 ar.wait(2)
249 self.assertTrue(ar.ready())
250 self.assertTrue(ar.ready())
250 ar.get()
251 ar.get()
251 rc2.close()
252 rc2.close()
252
253
253 def test_db_query_in(self):
254 def test_db_query_in(self):
254 """test db query with '$in','$nin' operators"""
255 """test db query with '$in','$nin' operators"""
255 hist = self.client.hub_history()
256 hist = self.client.hub_history()
256 even = hist[::2]
257 even = hist[::2]
257 odd = hist[1::2]
258 odd = hist[1::2]
258 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
259 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
259 found = [ r['msg_id'] for r in recs ]
260 found = [ r['msg_id'] for r in recs ]
260 self.assertEquals(set(even), set(found))
261 self.assertEquals(set(even), set(found))
261 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
262 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
262 found = [ r['msg_id'] for r in recs ]
263 found = [ r['msg_id'] for r in recs ]
263 self.assertEquals(set(odd), set(found))
264 self.assertEquals(set(odd), set(found))
264
265
265 def test_hub_history(self):
266 def test_hub_history(self):
266 hist = self.client.hub_history()
267 hist = self.client.hub_history()
267 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
268 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
268 recdict = {}
269 recdict = {}
269 for rec in recs:
270 for rec in recs:
270 recdict[rec['msg_id']] = rec
271 recdict[rec['msg_id']] = rec
271
272
272 latest = datetime(1984,1,1)
273 latest = datetime(1984,1,1)
273 for msg_id in hist:
274 for msg_id in hist:
274 rec = recdict[msg_id]
275 rec = recdict[msg_id]
275 newt = rec['submitted']
276 newt = rec['submitted']
276 self.assertTrue(newt >= latest)
277 self.assertTrue(newt >= latest)
277 latest = newt
278 latest = newt
278 ar = self.client[-1].apply_async(lambda : 1)
279 ar = self.client[-1].apply_async(lambda : 1)
279 ar.get()
280 ar.get()
280 time.sleep(0.25)
281 time.sleep(0.25)
281 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
282 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
282
283
283 def _wait_for_idle(self):
284 def _wait_for_idle(self):
284 """wait for an engine to become idle, according to the Hub"""
285 """wait for an engine to become idle, according to the Hub"""
285 rc = self.client
286 rc = self.client
286
287
287 # timeout 2s, polling every 100ms
288 # timeout 2s, polling every 100ms
288 for i in range(20):
289 for i in range(20):
289 qs = rc.queue_status()
290 qs = rc.queue_status()
290 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
291 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
291 time.sleep(0.1)
292 time.sleep(0.1)
292 else:
293 else:
293 break
294 break
294
295
295 # ensure Hub up to date:
296 # ensure Hub up to date:
296 qs = rc.queue_status()
297 qs = rc.queue_status()
297 self.assertEquals(qs['unassigned'], 0)
298 self.assertEquals(qs['unassigned'], 0)
298 for eid in rc.ids:
299 for eid in rc.ids:
299 self.assertEquals(qs[eid]['tasks'], 0)
300 self.assertEquals(qs[eid]['tasks'], 0)
300
301
301
302
302 def test_resubmit(self):
303 def test_resubmit(self):
303 def f():
304 def f():
304 import random
305 import random
305 return random.random()
306 return random.random()
306 v = self.client.load_balanced_view()
307 v = self.client.load_balanced_view()
307 ar = v.apply_async(f)
308 ar = v.apply_async(f)
308 r1 = ar.get(1)
309 r1 = ar.get(1)
309 # give the Hub a chance to notice:
310 # give the Hub a chance to notice:
310 self._wait_for_idle()
311 self._wait_for_idle()
311 ahr = self.client.resubmit(ar.msg_ids)
312 ahr = self.client.resubmit(ar.msg_ids)
312 r2 = ahr.get(1)
313 r2 = ahr.get(1)
313 self.assertFalse(r1 == r2)
314 self.assertFalse(r1 == r2)
314
315
315 def test_resubmit_chain(self):
316 def test_resubmit_chain(self):
316 """resubmit resubmitted tasks"""
317 """resubmit resubmitted tasks"""
317 v = self.client.load_balanced_view()
318 v = self.client.load_balanced_view()
318 ar = v.apply_async(lambda x: x, 'x'*1024)
319 ar = v.apply_async(lambda x: x, 'x'*1024)
319 ar.get()
320 ar.get()
320 self._wait_for_idle()
321 self._wait_for_idle()
321 ars = [ar]
322 ars = [ar]
322
323
323 for i in range(10):
324 for i in range(10):
324 ar = ars[-1]
325 ar = ars[-1]
325 ar2 = self.client.resubmit(ar.msg_ids)
326 ar2 = self.client.resubmit(ar.msg_ids)
326
327
327 [ ar.get() for ar in ars ]
328 [ ar.get() for ar in ars ]
328
329
329 def test_resubmit_header(self):
330 def test_resubmit_header(self):
330 """resubmit shouldn't clobber the whole header"""
331 """resubmit shouldn't clobber the whole header"""
331 def f():
332 def f():
332 import random
333 import random
333 return random.random()
334 return random.random()
334 v = self.client.load_balanced_view()
335 v = self.client.load_balanced_view()
335 v.retries = 1
336 v.retries = 1
336 ar = v.apply_async(f)
337 ar = v.apply_async(f)
337 r1 = ar.get(1)
338 r1 = ar.get(1)
338 # give the Hub a chance to notice:
339 # give the Hub a chance to notice:
339 self._wait_for_idle()
340 self._wait_for_idle()
340 ahr = self.client.resubmit(ar.msg_ids)
341 ahr = self.client.resubmit(ar.msg_ids)
341 ahr.get(1)
342 ahr.get(1)
342 time.sleep(0.5)
343 time.sleep(0.5)
343 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
344 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
344 h1,h2 = [ r['header'] for r in records ]
345 h1,h2 = [ r['header'] for r in records ]
345 for key in set(h1.keys()).union(set(h2.keys())):
346 for key in set(h1.keys()).union(set(h2.keys())):
346 if key in ('msg_id', 'date'):
347 if key in ('msg_id', 'date'):
347 self.assertNotEquals(h1[key], h2[key])
348 self.assertNotEquals(h1[key], h2[key])
348 else:
349 else:
349 self.assertEquals(h1[key], h2[key])
350 self.assertEquals(h1[key], h2[key])
350
351
351 def test_resubmit_aborted(self):
352 def test_resubmit_aborted(self):
352 def f():
353 def f():
353 import random
354 import random
354 return random.random()
355 return random.random()
355 v = self.client.load_balanced_view()
356 v = self.client.load_balanced_view()
356 # restrict to one engine, so we can put a sleep
357 # restrict to one engine, so we can put a sleep
357 # ahead of the task, so it will get aborted
358 # ahead of the task, so it will get aborted
358 eid = self.client.ids[-1]
359 eid = self.client.ids[-1]
359 v.targets = [eid]
360 v.targets = [eid]
360 sleep = v.apply_async(time.sleep, 0.5)
361 sleep = v.apply_async(time.sleep, 0.5)
361 ar = v.apply_async(f)
362 ar = v.apply_async(f)
362 ar.abort()
363 ar.abort()
363 self.assertRaises(error.TaskAborted, ar.get)
364 self.assertRaises(error.TaskAborted, ar.get)
364 # Give the Hub a chance to get up to date:
365 # Give the Hub a chance to get up to date:
365 self._wait_for_idle()
366 self._wait_for_idle()
366 ahr = self.client.resubmit(ar.msg_ids)
367 ahr = self.client.resubmit(ar.msg_ids)
367 r2 = ahr.get(1)
368 r2 = ahr.get(1)
368
369
369 def test_resubmit_inflight(self):
370 def test_resubmit_inflight(self):
370 """resubmit of inflight task"""
371 """resubmit of inflight task"""
371 v = self.client.load_balanced_view()
372 v = self.client.load_balanced_view()
372 ar = v.apply_async(time.sleep,1)
373 ar = v.apply_async(time.sleep,1)
373 # give the message a chance to arrive
374 # give the message a chance to arrive
374 time.sleep(0.2)
375 time.sleep(0.2)
375 ahr = self.client.resubmit(ar.msg_ids)
376 ahr = self.client.resubmit(ar.msg_ids)
376 ar.get(2)
377 ar.get(2)
377 ahr.get(2)
378 ahr.get(2)
378
379
379 def test_resubmit_badkey(self):
380 def test_resubmit_badkey(self):
380 """ensure KeyError on resubmit of nonexistant task"""
381 """ensure KeyError on resubmit of nonexistant task"""
381 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
382 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
382
383
383 def test_purge_results(self):
384 def test_purge_results(self):
384 # ensure there are some tasks
385 # ensure there are some tasks
385 for i in range(5):
386 for i in range(5):
386 self.client[:].apply_sync(lambda : 1)
387 self.client[:].apply_sync(lambda : 1)
387 # Wait for the Hub to realise the result is done:
388 # Wait for the Hub to realise the result is done:
388 # This prevents a race condition, where we
389 # This prevents a race condition, where we
389 # might purge a result the Hub still thinks is pending.
390 # might purge a result the Hub still thinks is pending.
390 time.sleep(0.1)
391 time.sleep(0.1)
391 rc2 = clientmod.Client(profile='iptest')
392 rc2 = clientmod.Client(profile='iptest')
392 hist = self.client.hub_history()
393 hist = self.client.hub_history()
393 ahr = rc2.get_result([hist[-1]])
394 ahr = rc2.get_result([hist[-1]])
394 ahr.wait(10)
395 ahr.wait(10)
395 self.client.purge_results(hist[-1])
396 self.client.purge_results(hist[-1])
396 newhist = self.client.hub_history()
397 newhist = self.client.hub_history()
397 self.assertEquals(len(newhist)+1,len(hist))
398 self.assertEquals(len(newhist)+1,len(hist))
398 rc2.spin()
399 rc2.spin()
399 rc2.close()
400 rc2.close()
400
401
401 def test_purge_all_results(self):
402 def test_purge_all_results(self):
402 self.client.purge_results('all')
403 self.client.purge_results('all')
403 hist = self.client.hub_history()
404 hist = self.client.hub_history()
404 self.assertEquals(len(hist), 0)
405 self.assertEquals(len(hist), 0)
405
406
406 def test_spin_thread(self):
407 def test_spin_thread(self):
407 self.client.spin_thread(0.01)
408 self.client.spin_thread(0.01)
408 ar = self.client[-1].apply_async(lambda : 1)
409 ar = self.client[-1].apply_async(lambda : 1)
409 time.sleep(0.1)
410 time.sleep(0.1)
410 self.assertTrue(ar.wall_time < 0.1,
411 self.assertTrue(ar.wall_time < 0.1,
411 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
412 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
412 )
413 )
413
414
414 def test_stop_spin_thread(self):
415 def test_stop_spin_thread(self):
415 self.client.spin_thread(0.01)
416 self.client.spin_thread(0.01)
416 self.client.stop_spin_thread()
417 self.client.stop_spin_thread()
417 ar = self.client[-1].apply_async(lambda : 1)
418 ar = self.client[-1].apply_async(lambda : 1)
418 time.sleep(0.15)
419 time.sleep(0.15)
419 self.assertTrue(ar.wall_time > 0.1,
420 self.assertTrue(ar.wall_time > 0.1,
420 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
421 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
421 )
422 )
422
423
424 def test_activate(self):
425 ip = get_ipython()
426 magics = ip.magics_manager.magics
427 self.assertTrue('px' in magics['line'])
428 self.assertTrue('px' in magics['cell'])
429 v0 = self.client.activate(-1, '0')
430 self.assertTrue('px0' in magics['line'])
431 self.assertTrue('px0' in magics['cell'])
432 self.assertEquals(v0.targets, self.client.ids[-1])
433 v0 = self.client.activate('all', 'all')
434 self.assertTrue('pxall' in magics['line'])
435 self.assertTrue('pxall' in magics['cell'])
436 self.assertEquals(v0.targets, 'all')
@@ -1,340 +1,345 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """Test Parallel magics
2 """Test Parallel magics
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import re
19 import re
20 import sys
20 import sys
21 import time
21 import time
22
22
23 import zmq
23 import zmq
24 from nose import SkipTest
24 from nose import SkipTest
25
25
26 from IPython.testing import decorators as dec
26 from IPython.testing import decorators as dec
27 from IPython.testing.ipunittest import ParametricTestCase
27 from IPython.testing.ipunittest import ParametricTestCase
28 from IPython.utils.io import capture_output
28 from IPython.utils.io import capture_output
29
29
30 from IPython import parallel as pmod
30 from IPython import parallel as pmod
31 from IPython.parallel import error
31 from IPython.parallel import error
32 from IPython.parallel import AsyncResult
32 from IPython.parallel import AsyncResult
33 from IPython.parallel.util import interactive
33 from IPython.parallel.util import interactive
34
34
35 from IPython.parallel.tests import add_engines
35 from IPython.parallel.tests import add_engines
36
36
37 from .clienttest import ClusterTestCase, generate_output
37 from .clienttest import ClusterTestCase, generate_output
38
38
39 def setup():
39 def setup():
40 add_engines(3, total=True)
40 add_engines(3, total=True)
41
41
42 class TestParallelMagics(ClusterTestCase, ParametricTestCase):
42 class TestParallelMagics(ClusterTestCase, ParametricTestCase):
43
43
44 def test_px_blocking(self):
44 def test_px_blocking(self):
45 ip = get_ipython()
45 ip = get_ipython()
46 v = self.client[-1:]
46 v = self.client[-1:]
47 v.activate()
47 v.activate()
48 v.block=True
48 v.block=True
49
49
50 ip.magic('px a=5')
50 ip.magic('px a=5')
51 self.assertEquals(v['a'], [5])
51 self.assertEquals(v['a'], [5])
52 ip.magic('px a=10')
52 ip.magic('px a=10')
53 self.assertEquals(v['a'], [10])
53 self.assertEquals(v['a'], [10])
54 # just 'print a' works ~99% of the time, but this ensures that
54 # just 'print a' works ~99% of the time, but this ensures that
55 # the stdout message has arrived when the result is finished:
55 # the stdout message has arrived when the result is finished:
56 with capture_output() as io:
56 with capture_output() as io:
57 ip.magic(
57 ip.magic(
58 'px import sys,time;print(a);sys.stdout.flush();time.sleep(0.2)'
58 'px import sys,time;print(a);sys.stdout.flush();time.sleep(0.2)'
59 )
59 )
60 out = io.stdout
60 out = io.stdout
61 self.assertTrue('[stdout:' in out, out)
61 self.assertTrue('[stdout:' in out, out)
62 self.assertFalse('\n\n' in out)
62 self.assertFalse('\n\n' in out)
63 self.assertTrue(out.rstrip().endswith('10'))
63 self.assertTrue(out.rstrip().endswith('10'))
64 self.assertRaisesRemote(ZeroDivisionError, ip.magic, 'px 1/0')
64 self.assertRaisesRemote(ZeroDivisionError, ip.magic, 'px 1/0')
65
65
66 def _check_generated_stderr(self, stderr, n):
66 def _check_generated_stderr(self, stderr, n):
67 expected = [
67 expected = [
68 r'\[stderr:\d+\]',
68 r'\[stderr:\d+\]',
69 '^stderr$',
69 '^stderr$',
70 '^stderr2$',
70 '^stderr2$',
71 ] * n
71 ] * n
72
72
73 self.assertFalse('\n\n' in stderr, stderr)
73 self.assertFalse('\n\n' in stderr, stderr)
74 lines = stderr.splitlines()
74 lines = stderr.splitlines()
75 self.assertEquals(len(lines), len(expected), stderr)
75 self.assertEquals(len(lines), len(expected), stderr)
76 for line,expect in zip(lines, expected):
76 for line,expect in zip(lines, expected):
77 if isinstance(expect, str):
77 if isinstance(expect, str):
78 expect = [expect]
78 expect = [expect]
79 for ex in expect:
79 for ex in expect:
80 self.assertTrue(re.search(ex, line) is not None, "Expected %r in %r" % (ex, line))
80 self.assertTrue(re.search(ex, line) is not None, "Expected %r in %r" % (ex, line))
81
81
82 def test_cellpx_block_args(self):
82 def test_cellpx_block_args(self):
83 """%%px --[no]block flags work"""
83 """%%px --[no]block flags work"""
84 ip = get_ipython()
84 ip = get_ipython()
85 v = self.client[-1:]
85 v = self.client[-1:]
86 v.activate()
86 v.activate()
87 v.block=False
87 v.block=False
88
88
89 for block in (True, False):
89 for block in (True, False):
90 v.block = block
90 v.block = block
91
91
92 with capture_output() as io:
92 with capture_output() as io:
93 ip.run_cell_magic("px", "", "1")
93 ip.run_cell_magic("px", "", "1")
94 if block:
94 if block:
95 self.assertTrue(io.stdout.startswith("Parallel"), io.stdout)
95 self.assertTrue(io.stdout.startswith("Parallel"), io.stdout)
96 else:
96 else:
97 self.assertTrue(io.stdout.startswith("Async"), io.stdout)
97 self.assertTrue(io.stdout.startswith("Async"), io.stdout)
98
98
99 with capture_output() as io:
99 with capture_output() as io:
100 ip.run_cell_magic("px", "--block", "1")
100 ip.run_cell_magic("px", "--block", "1")
101 self.assertTrue(io.stdout.startswith("Parallel"), io.stdout)
101 self.assertTrue(io.stdout.startswith("Parallel"), io.stdout)
102
102
103 with capture_output() as io:
103 with capture_output() as io:
104 ip.run_cell_magic("px", "--noblock", "1")
104 ip.run_cell_magic("px", "--noblock", "1")
105 self.assertTrue(io.stdout.startswith("Async"), io.stdout)
105 self.assertTrue(io.stdout.startswith("Async"), io.stdout)
106
106
107 def test_cellpx_groupby_engine(self):
107 def test_cellpx_groupby_engine(self):
108 """%%px --group-outputs=engine"""
108 """%%px --group-outputs=engine"""
109 ip = get_ipython()
109 ip = get_ipython()
110 v = self.client[:]
110 v = self.client[:]
111 v.block = True
111 v.block = True
112 v.activate()
112 v.activate()
113
113
114 v['generate_output'] = generate_output
114 v['generate_output'] = generate_output
115
115
116 with capture_output() as io:
116 with capture_output() as io:
117 ip.run_cell_magic('px', '--group-outputs=engine', 'generate_output()')
117 ip.run_cell_magic('px', '--group-outputs=engine', 'generate_output()')
118
118
119 self.assertFalse('\n\n' in io.stdout)
119 self.assertFalse('\n\n' in io.stdout)
120 lines = io.stdout.splitlines()[1:]
120 lines = io.stdout.splitlines()[1:]
121 expected = [
121 expected = [
122 r'\[stdout:\d+\]',
122 r'\[stdout:\d+\]',
123 'stdout',
123 'stdout',
124 'stdout2',
124 'stdout2',
125 r'\[output:\d+\]',
125 r'\[output:\d+\]',
126 r'IPython\.core\.display\.HTML',
126 r'IPython\.core\.display\.HTML',
127 r'IPython\.core\.display\.Math',
127 r'IPython\.core\.display\.Math',
128 r'Out\[\d+:\d+\]:.*IPython\.core\.display\.Math',
128 r'Out\[\d+:\d+\]:.*IPython\.core\.display\.Math',
129 ] * len(v)
129 ] * len(v)
130
130
131 self.assertEquals(len(lines), len(expected), io.stdout)
131 self.assertEquals(len(lines), len(expected), io.stdout)
132 for line,expect in zip(lines, expected):
132 for line,expect in zip(lines, expected):
133 if isinstance(expect, str):
133 if isinstance(expect, str):
134 expect = [expect]
134 expect = [expect]
135 for ex in expect:
135 for ex in expect:
136 self.assertTrue(re.search(ex, line) is not None, "Expected %r in %r" % (ex, line))
136 self.assertTrue(re.search(ex, line) is not None, "Expected %r in %r" % (ex, line))
137
137
138 self._check_generated_stderr(io.stderr, len(v))
138 self._check_generated_stderr(io.stderr, len(v))
139
139
140
140
141 def test_cellpx_groupby_order(self):
141 def test_cellpx_groupby_order(self):
142 """%%px --group-outputs=order"""
142 """%%px --group-outputs=order"""
143 ip = get_ipython()
143 ip = get_ipython()
144 v = self.client[:]
144 v = self.client[:]
145 v.block = True
145 v.block = True
146 v.activate()
146 v.activate()
147
147
148 v['generate_output'] = generate_output
148 v['generate_output'] = generate_output
149
149
150 with capture_output() as io:
150 with capture_output() as io:
151 ip.run_cell_magic('px', '--group-outputs=order', 'generate_output()')
151 ip.run_cell_magic('px', '--group-outputs=order', 'generate_output()')
152
152
153 self.assertFalse('\n\n' in io.stdout)
153 self.assertFalse('\n\n' in io.stdout)
154 lines = io.stdout.splitlines()[1:]
154 lines = io.stdout.splitlines()[1:]
155 expected = []
155 expected = []
156 expected.extend([
156 expected.extend([
157 r'\[stdout:\d+\]',
157 r'\[stdout:\d+\]',
158 'stdout',
158 'stdout',
159 'stdout2',
159 'stdout2',
160 ] * len(v))
160 ] * len(v))
161 expected.extend([
161 expected.extend([
162 r'\[output:\d+\]',
162 r'\[output:\d+\]',
163 'IPython.core.display.HTML',
163 'IPython.core.display.HTML',
164 ] * len(v))
164 ] * len(v))
165 expected.extend([
165 expected.extend([
166 r'\[output:\d+\]',
166 r'\[output:\d+\]',
167 'IPython.core.display.Math',
167 'IPython.core.display.Math',
168 ] * len(v))
168 ] * len(v))
169 expected.extend([
169 expected.extend([
170 r'Out\[\d+:\d+\]:.*IPython\.core\.display\.Math'
170 r'Out\[\d+:\d+\]:.*IPython\.core\.display\.Math'
171 ] * len(v))
171 ] * len(v))
172
172
173 self.assertEquals(len(lines), len(expected), io.stdout)
173 self.assertEquals(len(lines), len(expected), io.stdout)
174 for line,expect in zip(lines, expected):
174 for line,expect in zip(lines, expected):
175 if isinstance(expect, str):
175 if isinstance(expect, str):
176 expect = [expect]
176 expect = [expect]
177 for ex in expect:
177 for ex in expect:
178 self.assertTrue(re.search(ex, line) is not None, "Expected %r in %r" % (ex, line))
178 self.assertTrue(re.search(ex, line) is not None, "Expected %r in %r" % (ex, line))
179
179
180 self._check_generated_stderr(io.stderr, len(v))
180 self._check_generated_stderr(io.stderr, len(v))
181
181
182 def test_cellpx_groupby_type(self):
182 def test_cellpx_groupby_type(self):
183 """%%px --group-outputs=type"""
183 """%%px --group-outputs=type"""
184 ip = get_ipython()
184 ip = get_ipython()
185 v = self.client[:]
185 v = self.client[:]
186 v.block = True
186 v.block = True
187 v.activate()
187 v.activate()
188
188
189 v['generate_output'] = generate_output
189 v['generate_output'] = generate_output
190
190
191 with capture_output() as io:
191 with capture_output() as io:
192 ip.run_cell_magic('px', '--group-outputs=type', 'generate_output()')
192 ip.run_cell_magic('px', '--group-outputs=type', 'generate_output()')
193
193
194 self.assertFalse('\n\n' in io.stdout)
194 self.assertFalse('\n\n' in io.stdout)
195 lines = io.stdout.splitlines()[1:]
195 lines = io.stdout.splitlines()[1:]
196
196
197 expected = []
197 expected = []
198 expected.extend([
198 expected.extend([
199 r'\[stdout:\d+\]',
199 r'\[stdout:\d+\]',
200 'stdout',
200 'stdout',
201 'stdout2',
201 'stdout2',
202 ] * len(v))
202 ] * len(v))
203 expected.extend([
203 expected.extend([
204 r'\[output:\d+\]',
204 r'\[output:\d+\]',
205 r'IPython\.core\.display\.HTML',
205 r'IPython\.core\.display\.HTML',
206 r'IPython\.core\.display\.Math',
206 r'IPython\.core\.display\.Math',
207 ] * len(v))
207 ] * len(v))
208 expected.extend([
208 expected.extend([
209 (r'Out\[\d+:\d+\]', r'IPython\.core\.display\.Math')
209 (r'Out\[\d+:\d+\]', r'IPython\.core\.display\.Math')
210 ] * len(v))
210 ] * len(v))
211
211
212 self.assertEquals(len(lines), len(expected), io.stdout)
212 self.assertEquals(len(lines), len(expected), io.stdout)
213 for line,expect in zip(lines, expected):
213 for line,expect in zip(lines, expected):
214 if isinstance(expect, str):
214 if isinstance(expect, str):
215 expect = [expect]
215 expect = [expect]
216 for ex in expect:
216 for ex in expect:
217 self.assertTrue(re.search(ex, line) is not None, "Expected %r in %r" % (ex, line))
217 self.assertTrue(re.search(ex, line) is not None, "Expected %r in %r" % (ex, line))
218
218
219 self._check_generated_stderr(io.stderr, len(v))
219 self._check_generated_stderr(io.stderr, len(v))
220
220
221
221
222 def test_px_nonblocking(self):
222 def test_px_nonblocking(self):
223 ip = get_ipython()
223 ip = get_ipython()
224 v = self.client[-1:]
224 v = self.client[-1:]
225 v.activate()
225 v.activate()
226 v.block=False
226 v.block=False
227
227
228 ip.magic('px a=5')
228 ip.magic('px a=5')
229 self.assertEquals(v['a'], [5])
229 self.assertEquals(v['a'], [5])
230 ip.magic('px a=10')
230 ip.magic('px a=10')
231 self.assertEquals(v['a'], [10])
231 self.assertEquals(v['a'], [10])
232 with capture_output() as io:
232 with capture_output() as io:
233 ar = ip.magic('px print (a)')
233 ar = ip.magic('px print (a)')
234 self.assertTrue(isinstance(ar, AsyncResult))
234 self.assertTrue(isinstance(ar, AsyncResult))
235 self.assertTrue('Async' in io.stdout)
235 self.assertTrue('Async' in io.stdout)
236 self.assertFalse('[stdout:' in io.stdout)
236 self.assertFalse('[stdout:' in io.stdout)
237 self.assertFalse('\n\n' in io.stdout)
237 self.assertFalse('\n\n' in io.stdout)
238
238
239 ar = ip.magic('px 1/0')
239 ar = ip.magic('px 1/0')
240 self.assertRaisesRemote(ZeroDivisionError, ar.get)
240 self.assertRaisesRemote(ZeroDivisionError, ar.get)
241
241
242 def test_autopx_blocking(self):
242 def test_autopx_blocking(self):
243 ip = get_ipython()
243 ip = get_ipython()
244 v = self.client[-1]
244 v = self.client[-1]
245 v.activate()
245 v.activate()
246 v.block=True
246 v.block=True
247
247
248 with capture_output() as io:
248 with capture_output() as io:
249 ip.magic('autopx')
249 ip.magic('autopx')
250 ip.run_cell('\n'.join(('a=5','b=12345','c=0')))
250 ip.run_cell('\n'.join(('a=5','b=12345','c=0')))
251 ip.run_cell('b*=2')
251 ip.run_cell('b*=2')
252 ip.run_cell('print (b)')
252 ip.run_cell('print (b)')
253 ip.run_cell('b')
253 ip.run_cell('b')
254 ip.run_cell("b/c")
254 ip.run_cell("b/c")
255 ip.magic('autopx')
255 ip.magic('autopx')
256
256
257 output = io.stdout
257 output = io.stdout
258
258
259 self.assertTrue(output.startswith('%autopx enabled'), output)
259 self.assertTrue(output.startswith('%autopx enabled'), output)
260 self.assertTrue(output.rstrip().endswith('%autopx disabled'), output)
260 self.assertTrue(output.rstrip().endswith('%autopx disabled'), output)
261 self.assertTrue('ZeroDivisionError' in output, output)
261 self.assertTrue('ZeroDivisionError' in output, output)
262 self.assertTrue('\nOut[' in output, output)
262 self.assertTrue('\nOut[' in output, output)
263 self.assertTrue(': 24690' in output, output)
263 self.assertTrue(': 24690' in output, output)
264 ar = v.get_result(-1)
264 ar = v.get_result(-1)
265 self.assertEquals(v['a'], 5)
265 self.assertEquals(v['a'], 5)
266 self.assertEquals(v['b'], 24690)
266 self.assertEquals(v['b'], 24690)
267 self.assertRaisesRemote(ZeroDivisionError, ar.get)
267 self.assertRaisesRemote(ZeroDivisionError, ar.get)
268
268
269 def test_autopx_nonblocking(self):
269 def test_autopx_nonblocking(self):
270 ip = get_ipython()
270 ip = get_ipython()
271 v = self.client[-1]
271 v = self.client[-1]
272 v.activate()
272 v.activate()
273 v.block=False
273 v.block=False
274
274
275 with capture_output() as io:
275 with capture_output() as io:
276 ip.magic('autopx')
276 ip.magic('autopx')
277 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
277 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
278 ip.run_cell('print (b)')
278 ip.run_cell('print (b)')
279 ip.run_cell('import time; time.sleep(0.1)')
279 ip.run_cell('import time; time.sleep(0.1)')
280 ip.run_cell("b/c")
280 ip.run_cell("b/c")
281 ip.run_cell('b*=2')
281 ip.run_cell('b*=2')
282 ip.magic('autopx')
282 ip.magic('autopx')
283
283
284 output = io.stdout.rstrip()
284 output = io.stdout.rstrip()
285
285
286 self.assertTrue(output.startswith('%autopx enabled'))
286 self.assertTrue(output.startswith('%autopx enabled'))
287 self.assertTrue(output.endswith('%autopx disabled'))
287 self.assertTrue(output.endswith('%autopx disabled'))
288 self.assertFalse('ZeroDivisionError' in output)
288 self.assertFalse('ZeroDivisionError' in output)
289 ar = v.get_result(-2)
289 ar = v.get_result(-2)
290 self.assertRaisesRemote(ZeroDivisionError, ar.get)
290 self.assertRaisesRemote(ZeroDivisionError, ar.get)
291 # prevent TaskAborted on pulls, due to ZeroDivisionError
291 # prevent TaskAborted on pulls, due to ZeroDivisionError
292 time.sleep(0.5)
292 time.sleep(0.5)
293 self.assertEquals(v['a'], 5)
293 self.assertEquals(v['a'], 5)
294 # b*=2 will not fire, due to abort
294 # b*=2 will not fire, due to abort
295 self.assertEquals(v['b'], 10)
295 self.assertEquals(v['b'], 10)
296
296
297 def test_result(self):
297 def test_result(self):
298 ip = get_ipython()
298 ip = get_ipython()
299 v = self.client[-1]
299 v = self.client[-1]
300 v.activate()
300 v.activate()
301 data = dict(a=111,b=222)
301 data = dict(a=111,b=222)
302 v.push(data, block=True)
302 v.push(data, block=True)
303
303
304 ip.magic('px a')
304 for name in ('a', 'b'):
305 ip.magic('px b')
305 ip.magic('px ' + name)
306 for idx, name in [
307 ('', 'b'),
308 ('-1', 'b'),
309 ('2', 'b'),
310 ('1', 'a'),
311 ('-2', 'a'),
312 ]:
313 with capture_output() as io:
306 with capture_output() as io:
314 ip.magic('result ' + idx)
307 ip.magic('pxresult')
315 output = io.stdout
308 output = io.stdout
316 msg = "expected %s output to include %s, but got: %s" % \
309 msg = "expected %s output to include %s, but got: %s" % \
317 ('%result '+idx, str(data[name]), output)
310 ('%pxresult', str(data[name]), output)
318 self.assertTrue(str(data[name]) in output, msg)
311 self.assertTrue(str(data[name]) in output, msg)
319
312
320 @dec.skipif_not_matplotlib
313 @dec.skipif_not_matplotlib
321 def test_px_pylab(self):
314 def test_px_pylab(self):
322 """%pylab works on engines"""
315 """%pylab works on engines"""
323 ip = get_ipython()
316 ip = get_ipython()
324 v = self.client[-1]
317 v = self.client[-1]
325 v.block = True
318 v.block = True
326 v.activate()
319 v.activate()
327
320
328 with capture_output() as io:
321 with capture_output() as io:
329 ip.magic("px %pylab inline")
322 ip.magic("px %pylab inline")
330
323
331 self.assertTrue("Welcome to pylab" in io.stdout, io.stdout)
324 self.assertTrue("Welcome to pylab" in io.stdout, io.stdout)
332 self.assertTrue("backend_inline" in io.stdout, io.stdout)
325 self.assertTrue("backend_inline" in io.stdout, io.stdout)
333
326
334 with capture_output() as io:
327 with capture_output() as io:
335 ip.magic("px plot(rand(100))")
328 ip.magic("px plot(rand(100))")
336
329
337 self.assertTrue('Out[' in io.stdout, io.stdout)
330 self.assertTrue('Out[' in io.stdout, io.stdout)
338 self.assertTrue('matplotlib.lines' in io.stdout, io.stdout)
331 self.assertTrue('matplotlib.lines' in io.stdout, io.stdout)
332
333 def test_pxconfig(self):
334 ip = get_ipython()
335 rc = self.client
336 v = rc.activate(-1, '_tst')
337 self.assertEquals(v.targets, rc.ids[-1])
338 ip.magic("%pxconfig_tst -t :")
339 self.assertEquals(v.targets, rc.ids)
340 ip.magic("%pxconfig_tst --block")
341 self.assertEquals(v.block, True)
342 ip.magic("%pxconfig_tst --noblock")
343 self.assertEquals(v.block, False)
339
344
340
345
General Comments 0
You need to be logged in to leave comments. Login now