##// END OF EJS Templates
protect LBView.targets, AsyncResult._msg_ids -> .msg_ds
MinRK -
Show More
@@ -1,112 +1,112 b''
1 """AsyncResult objects for the client"""
1 """AsyncResult objects for the client"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import error
13 import error
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Classes
16 # Classes
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 class AsyncResult(object):
19 class AsyncResult(object):
20 """Class for representing results of non-blocking calls.
20 """Class for representing results of non-blocking calls.
21
21
22 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
22 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
23 """
23 """
24 def __init__(self, client, msg_ids):
24 def __init__(self, client, msg_ids):
25 self._client = client
25 self._client = client
26 self._msg_ids = msg_ids
26 self.msg_ids = msg_ids
27 self._ready = False
27 self._ready = False
28 self._success = None
28 self._success = None
29
29
30 def __repr__(self):
30 def __repr__(self):
31 if self._ready:
31 if self._ready:
32 return "<%s: finished>"%(self.__class__.__name__)
32 return "<%s: finished>"%(self.__class__.__name__)
33 else:
33 else:
34 return "<%s: %r>"%(self.__class__.__name__,self._msg_ids)
34 return "<%s: %r>"%(self.__class__.__name__,self.msg_ids)
35
35
36
36
37 def _reconstruct_result(self, res):
37 def _reconstruct_result(self, res):
38 """
38 """
39 Override me in subclasses for turning a list of results
39 Override me in subclasses for turning a list of results
40 into the expected form.
40 into the expected form.
41 """
41 """
42 if len(res) == 1:
42 if len(res) == 1:
43 return res[0]
43 return res[0]
44 else:
44 else:
45 return res
45 return res
46
46
47 def get(self, timeout=-1):
47 def get(self, timeout=-1):
48 """Return the result when it arrives.
48 """Return the result when it arrives.
49
49
50 If `timeout` is not ``None`` and the result does not arrive within
50 If `timeout` is not ``None`` and the result does not arrive within
51 `timeout` seconds then ``TimeoutError`` is raised. If the
51 `timeout` seconds then ``TimeoutError`` is raised. If the
52 remote call raised an exception then that exception will be reraised
52 remote call raised an exception then that exception will be reraised
53 by get().
53 by get().
54 """
54 """
55 if not self.ready():
55 if not self.ready():
56 self.wait(timeout)
56 self.wait(timeout)
57
57
58 if self._ready:
58 if self._ready:
59 if self._success:
59 if self._success:
60 return self._result
60 return self._result
61 else:
61 else:
62 raise self._exception
62 raise self._exception
63 else:
63 else:
64 raise error.TimeoutError("Result not ready.")
64 raise error.TimeoutError("Result not ready.")
65
65
66 def ready(self):
66 def ready(self):
67 """Return whether the call has completed."""
67 """Return whether the call has completed."""
68 if not self._ready:
68 if not self._ready:
69 self.wait(0)
69 self.wait(0)
70 return self._ready
70 return self._ready
71
71
72 def wait(self, timeout=-1):
72 def wait(self, timeout=-1):
73 """Wait until the result is available or until `timeout` seconds pass.
73 """Wait until the result is available or until `timeout` seconds pass.
74 """
74 """
75 if self._ready:
75 if self._ready:
76 return
76 return
77 self._ready = self._client.barrier(self._msg_ids, timeout)
77 self._ready = self._client.barrier(self.msg_ids, timeout)
78 if self._ready:
78 if self._ready:
79 try:
79 try:
80 results = map(self._client.results.get, self._msg_ids)
80 results = map(self._client.results.get, self.msg_ids)
81 results = error.collect_exceptions(results, 'get')
81 results = error.collect_exceptions(results, 'get')
82 self._result = self._reconstruct_result(results)
82 self._result = self._reconstruct_result(results)
83 except Exception, e:
83 except Exception, e:
84 self._exception = e
84 self._exception = e
85 self._success = False
85 self._success = False
86 else:
86 else:
87 self._success = True
87 self._success = True
88
88
89
89
90 def successful(self):
90 def successful(self):
91 """Return whether the call completed without raising an exception.
91 """Return whether the call completed without raising an exception.
92
92
93 Will raise ``AssertionError`` if the result is not ready.
93 Will raise ``AssertionError`` if the result is not ready.
94 """
94 """
95 assert self._ready
95 assert self._ready
96 return self._success
96 return self._success
97
97
98 class AsyncMapResult(AsyncResult):
98 class AsyncMapResult(AsyncResult):
99 """Class for representing results of non-blocking gathers.
99 """Class for representing results of non-blocking gathers.
100
100
101 This will properly reconstruct the gather.
101 This will properly reconstruct the gather.
102 """
102 """
103
103
104 def __init__(self, client, msg_ids, mapObject):
104 def __init__(self, client, msg_ids, mapObject):
105 self._mapObject = mapObject
105 self._mapObject = mapObject
106 AsyncResult.__init__(self, client, msg_ids)
106 AsyncResult.__init__(self, client, msg_ids)
107
107
108 def _reconstruct_result(self, res):
108 def _reconstruct_result(self, res):
109 """Perform the gather on the actual results."""
109 """Perform the gather on the actual results."""
110 return self._mapObject.joinPartitions(res)
110 return self._mapObject.joinPartitions(res)
111
111
112
112
@@ -1,1053 +1,1053 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import time
14 import time
15 from getpass import getpass
15 from getpass import getpass
16 from pprint import pprint
16 from pprint import pprint
17
17
18 import zmq
18 import zmq
19 from zmq.eventloop import ioloop, zmqstream
19 from zmq.eventloop import ioloop, zmqstream
20
20
21 from IPython.external.decorator import decorator
21 from IPython.external.decorator import decorator
22 from IPython.zmq import tunnel
22 from IPython.zmq import tunnel
23
23
24 import streamsession as ss
24 import streamsession as ss
25 # from remotenamespace import RemoteNamespace
25 # from remotenamespace import RemoteNamespace
26 from view import DirectView, LoadBalancedView
26 from view import DirectView, LoadBalancedView
27 from dependency import Dependency, depend, require
27 from dependency import Dependency, depend, require
28 import error
28 import error
29 import map as Map
29 import map as Map
30 from asyncresult import AsyncResult, AsyncMapResult
30 from asyncresult import AsyncResult, AsyncMapResult
31 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
31 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
32
32
33 #--------------------------------------------------------------------------
33 #--------------------------------------------------------------------------
34 # helpers for implementing old MEC API via client.apply
34 # helpers for implementing old MEC API via client.apply
35 #--------------------------------------------------------------------------
35 #--------------------------------------------------------------------------
36
36
37 def _push(ns):
37 def _push(ns):
38 """helper method for implementing `client.push` via `client.apply`"""
38 """helper method for implementing `client.push` via `client.apply`"""
39 globals().update(ns)
39 globals().update(ns)
40
40
41 def _pull(keys):
41 def _pull(keys):
42 """helper method for implementing `client.pull` via `client.apply`"""
42 """helper method for implementing `client.pull` via `client.apply`"""
43 g = globals()
43 g = globals()
44 if isinstance(keys, (list,tuple, set)):
44 if isinstance(keys, (list,tuple, set)):
45 for key in keys:
45 for key in keys:
46 if not g.has_key(key):
46 if not g.has_key(key):
47 raise NameError("name '%s' is not defined"%key)
47 raise NameError("name '%s' is not defined"%key)
48 return map(g.get, keys)
48 return map(g.get, keys)
49 else:
49 else:
50 if not g.has_key(keys):
50 if not g.has_key(keys):
51 raise NameError("name '%s' is not defined"%keys)
51 raise NameError("name '%s' is not defined"%keys)
52 return g.get(keys)
52 return g.get(keys)
53
53
54 def _clear():
54 def _clear():
55 """helper method for implementing `client.clear` via `client.apply`"""
55 """helper method for implementing `client.clear` via `client.apply`"""
56 globals().clear()
56 globals().clear()
57
57
58 def _execute(code):
58 def _execute(code):
59 """helper method for implementing `client.execute` via `client.apply`"""
59 """helper method for implementing `client.execute` via `client.apply`"""
60 exec code in globals()
60 exec code in globals()
61
61
62
62
63 #--------------------------------------------------------------------------
63 #--------------------------------------------------------------------------
64 # Decorators for Client methods
64 # Decorators for Client methods
65 #--------------------------------------------------------------------------
65 #--------------------------------------------------------------------------
66
66
67 @decorator
67 @decorator
68 def spinfirst(f, self, *args, **kwargs):
68 def spinfirst(f, self, *args, **kwargs):
69 """Call spin() to sync state prior to calling the method."""
69 """Call spin() to sync state prior to calling the method."""
70 self.spin()
70 self.spin()
71 return f(self, *args, **kwargs)
71 return f(self, *args, **kwargs)
72
72
73 @decorator
73 @decorator
74 def defaultblock(f, self, *args, **kwargs):
74 def defaultblock(f, self, *args, **kwargs):
75 """Default to self.block; preserve self.block."""
75 """Default to self.block; preserve self.block."""
76 block = kwargs.get('block',None)
76 block = kwargs.get('block',None)
77 block = self.block if block is None else block
77 block = self.block if block is None else block
78 saveblock = self.block
78 saveblock = self.block
79 self.block = block
79 self.block = block
80 try:
80 try:
81 ret = f(self, *args, **kwargs)
81 ret = f(self, *args, **kwargs)
82 finally:
82 finally:
83 self.block = saveblock
83 self.block = saveblock
84 return ret
84 return ret
85
85
86
86
87 class AbortedTask(object):
87 class AbortedTask(object):
88 """A basic wrapper object describing an aborted task."""
88 """A basic wrapper object describing an aborted task."""
89 def __init__(self, msg_id):
89 def __init__(self, msg_id):
90 self.msg_id = msg_id
90 self.msg_id = msg_id
91
91
92 class ResultDict(dict):
92 class ResultDict(dict):
93 """A subclass of dict that raises errors if it has them."""
93 """A subclass of dict that raises errors if it has them."""
94 def __getitem__(self, key):
94 def __getitem__(self, key):
95 res = dict.__getitem__(self, key)
95 res = dict.__getitem__(self, key)
96 if isinstance(res, error.KernelError):
96 if isinstance(res, error.KernelError):
97 raise res
97 raise res
98 return res
98 return res
99
99
100 class Client(object):
100 class Client(object):
101 """A semi-synchronous client to the IPython ZMQ controller
101 """A semi-synchronous client to the IPython ZMQ controller
102
102
103 Parameters
103 Parameters
104 ----------
104 ----------
105
105
106 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
106 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
107 The address of the controller's registration socket.
107 The address of the controller's registration socket.
108 [Default: 'tcp://127.0.0.1:10101']
108 [Default: 'tcp://127.0.0.1:10101']
109 context : zmq.Context
109 context : zmq.Context
110 Pass an existing zmq.Context instance, otherwise the client will create its own
110 Pass an existing zmq.Context instance, otherwise the client will create its own
111 username : bytes
111 username : bytes
112 set username to be passed to the Session object
112 set username to be passed to the Session object
113 debug : bool
113 debug : bool
114 flag for lots of message printing for debug purposes
114 flag for lots of message printing for debug purposes
115
115
116 #-------------- ssh related args ----------------
116 #-------------- ssh related args ----------------
117 # These are args for configuring the ssh tunnel to be used
117 # These are args for configuring the ssh tunnel to be used
118 # credentials are used to forward connections over ssh to the Controller
118 # credentials are used to forward connections over ssh to the Controller
119 # Note that the ip given in `addr` needs to be relative to sshserver
119 # Note that the ip given in `addr` needs to be relative to sshserver
120 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
120 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
121 # and set sshserver as the same machine the Controller is on. However,
121 # and set sshserver as the same machine the Controller is on. However,
122 # the only requirement is that sshserver is able to see the Controller
122 # the only requirement is that sshserver is able to see the Controller
123 # (i.e. is within the same trusted network).
123 # (i.e. is within the same trusted network).
124
124
125 sshserver : str
125 sshserver : str
126 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
126 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
127 If keyfile or password is specified, and this is not, it will default to
127 If keyfile or password is specified, and this is not, it will default to
128 the ip given in addr.
128 the ip given in addr.
129 sshkey : str; path to public ssh key file
129 sshkey : str; path to public ssh key file
130 This specifies a key to be used in ssh login, default None.
130 This specifies a key to be used in ssh login, default None.
131 Regular default ssh keys will be used without specifying this argument.
131 Regular default ssh keys will be used without specifying this argument.
132 password : str;
132 password : str;
133 Your ssh password to sshserver. Note that if this is left None,
133 Your ssh password to sshserver. Note that if this is left None,
134 you will be prompted for it if passwordless key based login is unavailable.
134 you will be prompted for it if passwordless key based login is unavailable.
135
135
136 #------- exec authentication args -------
136 #------- exec authentication args -------
137 # If even localhost is untrusted, you can have some protection against
137 # If even localhost is untrusted, you can have some protection against
138 # unauthorized execution by using a key. Messages are still sent
138 # unauthorized execution by using a key. Messages are still sent
139 # as cleartext, so if someone can snoop your loopback traffic this will
139 # as cleartext, so if someone can snoop your loopback traffic this will
140 # not help anything.
140 # not help anything.
141
141
142 exec_key : str
142 exec_key : str
143 an authentication key or file containing a key
143 an authentication key or file containing a key
144 default: None
144 default: None
145
145
146
146
147 Attributes
147 Attributes
148 ----------
148 ----------
149 ids : set of int engine IDs
149 ids : set of int engine IDs
150 requesting the ids attribute always synchronizes
150 requesting the ids attribute always synchronizes
151 the registration state. To request ids without synchronization,
151 the registration state. To request ids without synchronization,
152 use semi-private _ids attributes.
152 use semi-private _ids attributes.
153
153
154 history : list of msg_ids
154 history : list of msg_ids
155 a list of msg_ids, keeping track of all the execution
155 a list of msg_ids, keeping track of all the execution
156 messages you have submitted in order.
156 messages you have submitted in order.
157
157
158 outstanding : set of msg_ids
158 outstanding : set of msg_ids
159 a set of msg_ids that have been submitted, but whose
159 a set of msg_ids that have been submitted, but whose
160 results have not yet been received.
160 results have not yet been received.
161
161
162 results : dict
162 results : dict
163 a dict of all our results, keyed by msg_id
163 a dict of all our results, keyed by msg_id
164
164
165 block : bool
165 block : bool
166 determines default behavior when block not specified
166 determines default behavior when block not specified
167 in execution methods
167 in execution methods
168
168
169 Methods
169 Methods
170 -------
170 -------
171 spin : flushes incoming results and registration state changes
171 spin : flushes incoming results and registration state changes
172 control methods spin, and requesting `ids` also ensures up to date
172 control methods spin, and requesting `ids` also ensures up to date
173
173
174 barrier : wait on one or more msg_ids
174 barrier : wait on one or more msg_ids
175
175
176 execution methods: apply/apply_bound/apply_to/apply_bound
176 execution methods: apply/apply_bound/apply_to/apply_bound
177 legacy: execute, run
177 legacy: execute, run
178
178
179 query methods: queue_status, get_result, purge
179 query methods: queue_status, get_result, purge
180
180
181 control methods: abort, kill
181 control methods: abort, kill
182
182
183 """
183 """
184
184
185
185
186 _connected=False
186 _connected=False
187 _ssh=False
187 _ssh=False
188 _engines=None
188 _engines=None
189 _addr='tcp://127.0.0.1:10101'
189 _addr='tcp://127.0.0.1:10101'
190 _registration_socket=None
190 _registration_socket=None
191 _query_socket=None
191 _query_socket=None
192 _control_socket=None
192 _control_socket=None
193 _notification_socket=None
193 _notification_socket=None
194 _mux_socket=None
194 _mux_socket=None
195 _task_socket=None
195 _task_socket=None
196 block = False
196 block = False
197 outstanding=None
197 outstanding=None
198 results = None
198 results = None
199 history = None
199 history = None
200 debug = False
200 debug = False
201 targets = None
201 targets = None
202
202
203 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
203 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
204 sshserver=None, sshkey=None, password=None, paramiko=None,
204 sshserver=None, sshkey=None, password=None, paramiko=None,
205 exec_key=None,):
205 exec_key=None,):
206 if context is None:
206 if context is None:
207 context = zmq.Context()
207 context = zmq.Context()
208 self.context = context
208 self.context = context
209 self.targets = 'all'
209 self.targets = 'all'
210 self._addr = addr
210 self._addr = addr
211 self._ssh = bool(sshserver or sshkey or password)
211 self._ssh = bool(sshserver or sshkey or password)
212 if self._ssh and sshserver is None:
212 if self._ssh and sshserver is None:
213 # default to the same
213 # default to the same
214 sshserver = addr.split('://')[1].split(':')[0]
214 sshserver = addr.split('://')[1].split(':')[0]
215 if self._ssh and password is None:
215 if self._ssh and password is None:
216 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
216 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
217 password=False
217 password=False
218 else:
218 else:
219 password = getpass("SSH Password for %s: "%sshserver)
219 password = getpass("SSH Password for %s: "%sshserver)
220 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
220 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
221
221
222 if exec_key is not None and os.path.isfile(exec_key):
222 if exec_key is not None and os.path.isfile(exec_key):
223 arg = 'keyfile'
223 arg = 'keyfile'
224 else:
224 else:
225 arg = 'key'
225 arg = 'key'
226 key_arg = {arg:exec_key}
226 key_arg = {arg:exec_key}
227 if username is None:
227 if username is None:
228 self.session = ss.StreamSession(**key_arg)
228 self.session = ss.StreamSession(**key_arg)
229 else:
229 else:
230 self.session = ss.StreamSession(username, **key_arg)
230 self.session = ss.StreamSession(username, **key_arg)
231 self._registration_socket = self.context.socket(zmq.XREQ)
231 self._registration_socket = self.context.socket(zmq.XREQ)
232 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
232 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
233 if self._ssh:
233 if self._ssh:
234 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
234 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
235 else:
235 else:
236 self._registration_socket.connect(addr)
236 self._registration_socket.connect(addr)
237 self._engines = {}
237 self._engines = {}
238 self._ids = set()
238 self._ids = set()
239 self.outstanding=set()
239 self.outstanding=set()
240 self.results = {}
240 self.results = {}
241 self.history = []
241 self.history = []
242 self.debug = debug
242 self.debug = debug
243 self.session.debug = debug
243 self.session.debug = debug
244
244
245 self._notification_handlers = {'registration_notification' : self._register_engine,
245 self._notification_handlers = {'registration_notification' : self._register_engine,
246 'unregistration_notification' : self._unregister_engine,
246 'unregistration_notification' : self._unregister_engine,
247 }
247 }
248 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
248 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
249 'apply_reply' : self._handle_apply_reply}
249 'apply_reply' : self._handle_apply_reply}
250 self._connect(sshserver, ssh_kwargs)
250 self._connect(sshserver, ssh_kwargs)
251
251
252
252
253 @property
253 @property
254 def ids(self):
254 def ids(self):
255 """Always up to date ids property."""
255 """Always up to date ids property."""
256 self._flush_notifications()
256 self._flush_notifications()
257 return self._ids
257 return self._ids
258
258
259 def _update_engines(self, engines):
259 def _update_engines(self, engines):
260 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
260 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
261 for k,v in engines.iteritems():
261 for k,v in engines.iteritems():
262 eid = int(k)
262 eid = int(k)
263 self._engines[eid] = bytes(v) # force not unicode
263 self._engines[eid] = bytes(v) # force not unicode
264 self._ids.add(eid)
264 self._ids.add(eid)
265
265
266 def _build_targets(self, targets):
266 def _build_targets(self, targets):
267 """Turn valid target IDs or 'all' into two lists:
267 """Turn valid target IDs or 'all' into two lists:
268 (int_ids, uuids).
268 (int_ids, uuids).
269 """
269 """
270 if targets is None:
270 if targets is None:
271 targets = self._ids
271 targets = self._ids
272 elif isinstance(targets, str):
272 elif isinstance(targets, str):
273 if targets.lower() == 'all':
273 if targets.lower() == 'all':
274 targets = self._ids
274 targets = self._ids
275 else:
275 else:
276 raise TypeError("%r not valid str target, must be 'all'"%(targets))
276 raise TypeError("%r not valid str target, must be 'all'"%(targets))
277 elif isinstance(targets, int):
277 elif isinstance(targets, int):
278 targets = [targets]
278 targets = [targets]
279 return [self._engines[t] for t in targets], list(targets)
279 return [self._engines[t] for t in targets], list(targets)
280
280
281 def _connect(self, sshserver, ssh_kwargs):
281 def _connect(self, sshserver, ssh_kwargs):
282 """setup all our socket connections to the controller. This is called from
282 """setup all our socket connections to the controller. This is called from
283 __init__."""
283 __init__."""
284 if self._connected:
284 if self._connected:
285 return
285 return
286 self._connected=True
286 self._connected=True
287
287
288 def connect_socket(s, addr):
288 def connect_socket(s, addr):
289 if self._ssh:
289 if self._ssh:
290 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
290 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
291 else:
291 else:
292 return s.connect(addr)
292 return s.connect(addr)
293
293
294 self.session.send(self._registration_socket, 'connection_request')
294 self.session.send(self._registration_socket, 'connection_request')
295 idents,msg = self.session.recv(self._registration_socket,mode=0)
295 idents,msg = self.session.recv(self._registration_socket,mode=0)
296 if self.debug:
296 if self.debug:
297 pprint(msg)
297 pprint(msg)
298 msg = ss.Message(msg)
298 msg = ss.Message(msg)
299 content = msg.content
299 content = msg.content
300 if content.status == 'ok':
300 if content.status == 'ok':
301 if content.queue:
301 if content.queue:
302 self._mux_socket = self.context.socket(zmq.PAIR)
302 self._mux_socket = self.context.socket(zmq.PAIR)
303 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
303 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
304 connect_socket(self._mux_socket, content.queue)
304 connect_socket(self._mux_socket, content.queue)
305 if content.task:
305 if content.task:
306 self._task_socket = self.context.socket(zmq.PAIR)
306 self._task_socket = self.context.socket(zmq.PAIR)
307 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
307 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
308 connect_socket(self._task_socket, content.task)
308 connect_socket(self._task_socket, content.task)
309 if content.notification:
309 if content.notification:
310 self._notification_socket = self.context.socket(zmq.SUB)
310 self._notification_socket = self.context.socket(zmq.SUB)
311 connect_socket(self._notification_socket, content.notification)
311 connect_socket(self._notification_socket, content.notification)
312 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
312 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
313 if content.query:
313 if content.query:
314 self._query_socket = self.context.socket(zmq.PAIR)
314 self._query_socket = self.context.socket(zmq.PAIR)
315 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
315 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
316 connect_socket(self._query_socket, content.query)
316 connect_socket(self._query_socket, content.query)
317 if content.control:
317 if content.control:
318 self._control_socket = self.context.socket(zmq.PAIR)
318 self._control_socket = self.context.socket(zmq.PAIR)
319 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
319 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
320 connect_socket(self._control_socket, content.control)
320 connect_socket(self._control_socket, content.control)
321 self._update_engines(dict(content.engines))
321 self._update_engines(dict(content.engines))
322
322
323 else:
323 else:
324 self._connected = False
324 self._connected = False
325 raise Exception("Failed to connect!")
325 raise Exception("Failed to connect!")
326
326
327 #--------------------------------------------------------------------------
327 #--------------------------------------------------------------------------
328 # handlers and callbacks for incoming messages
328 # handlers and callbacks for incoming messages
329 #--------------------------------------------------------------------------
329 #--------------------------------------------------------------------------
330
330
331 def _register_engine(self, msg):
331 def _register_engine(self, msg):
332 """Register a new engine, and update our connection info."""
332 """Register a new engine, and update our connection info."""
333 content = msg['content']
333 content = msg['content']
334 eid = content['id']
334 eid = content['id']
335 d = {eid : content['queue']}
335 d = {eid : content['queue']}
336 self._update_engines(d)
336 self._update_engines(d)
337 self._ids.add(int(eid))
337 self._ids.add(int(eid))
338
338
339 def _unregister_engine(self, msg):
339 def _unregister_engine(self, msg):
340 """Unregister an engine that has died."""
340 """Unregister an engine that has died."""
341 content = msg['content']
341 content = msg['content']
342 eid = int(content['id'])
342 eid = int(content['id'])
343 if eid in self._ids:
343 if eid in self._ids:
344 self._ids.remove(eid)
344 self._ids.remove(eid)
345 self._engines.pop(eid)
345 self._engines.pop(eid)
346
346
347 def _handle_execute_reply(self, msg):
347 def _handle_execute_reply(self, msg):
348 """Save the reply to an execute_request into our results."""
348 """Save the reply to an execute_request into our results."""
349 parent = msg['parent_header']
349 parent = msg['parent_header']
350 msg_id = parent['msg_id']
350 msg_id = parent['msg_id']
351 if msg_id not in self.outstanding:
351 if msg_id not in self.outstanding:
352 print("got unknown result: %s"%msg_id)
352 print("got unknown result: %s"%msg_id)
353 else:
353 else:
354 self.outstanding.remove(msg_id)
354 self.outstanding.remove(msg_id)
355 self.results[msg_id] = ss.unwrap_exception(msg['content'])
355 self.results[msg_id] = ss.unwrap_exception(msg['content'])
356
356
357 def _handle_apply_reply(self, msg):
357 def _handle_apply_reply(self, msg):
358 """Save the reply to an apply_request into our results."""
358 """Save the reply to an apply_request into our results."""
359 parent = msg['parent_header']
359 parent = msg['parent_header']
360 msg_id = parent['msg_id']
360 msg_id = parent['msg_id']
361 if msg_id not in self.outstanding:
361 if msg_id not in self.outstanding:
362 print ("got unknown result: %s"%msg_id)
362 print ("got unknown result: %s"%msg_id)
363 else:
363 else:
364 self.outstanding.remove(msg_id)
364 self.outstanding.remove(msg_id)
365 content = msg['content']
365 content = msg['content']
366 if content['status'] == 'ok':
366 if content['status'] == 'ok':
367 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
367 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
368 elif content['status'] == 'aborted':
368 elif content['status'] == 'aborted':
369 self.results[msg_id] = error.AbortedTask(msg_id)
369 self.results[msg_id] = error.AbortedTask(msg_id)
370 elif content['status'] == 'resubmitted':
370 elif content['status'] == 'resubmitted':
371 # TODO: handle resubmission
371 # TODO: handle resubmission
372 pass
372 pass
373 else:
373 else:
374 e = ss.unwrap_exception(content)
374 e = ss.unwrap_exception(content)
375 e_uuid = e.engine_info['engineid']
375 e_uuid = e.engine_info['engineid']
376 for k,v in self._engines.iteritems():
376 for k,v in self._engines.iteritems():
377 if v == e_uuid:
377 if v == e_uuid:
378 e.engine_info['engineid'] = k
378 e.engine_info['engineid'] = k
379 break
379 break
380 self.results[msg_id] = e
380 self.results[msg_id] = e
381
381
382 def _flush_notifications(self):
382 def _flush_notifications(self):
383 """Flush notifications of engine registrations waiting
383 """Flush notifications of engine registrations waiting
384 in ZMQ queue."""
384 in ZMQ queue."""
385 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
385 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
386 while msg is not None:
386 while msg is not None:
387 if self.debug:
387 if self.debug:
388 pprint(msg)
388 pprint(msg)
389 msg = msg[-1]
389 msg = msg[-1]
390 msg_type = msg['msg_type']
390 msg_type = msg['msg_type']
391 handler = self._notification_handlers.get(msg_type, None)
391 handler = self._notification_handlers.get(msg_type, None)
392 if handler is None:
392 if handler is None:
393 raise Exception("Unhandled message type: %s"%msg.msg_type)
393 raise Exception("Unhandled message type: %s"%msg.msg_type)
394 else:
394 else:
395 handler(msg)
395 handler(msg)
396 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
396 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
397
397
398 def _flush_results(self, sock):
398 def _flush_results(self, sock):
399 """Flush task or queue results waiting in ZMQ queue."""
399 """Flush task or queue results waiting in ZMQ queue."""
400 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
400 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
401 while msg is not None:
401 while msg is not None:
402 if self.debug:
402 if self.debug:
403 pprint(msg)
403 pprint(msg)
404 msg = msg[-1]
404 msg = msg[-1]
405 msg_type = msg['msg_type']
405 msg_type = msg['msg_type']
406 handler = self._queue_handlers.get(msg_type, None)
406 handler = self._queue_handlers.get(msg_type, None)
407 if handler is None:
407 if handler is None:
408 raise Exception("Unhandled message type: %s"%msg.msg_type)
408 raise Exception("Unhandled message type: %s"%msg.msg_type)
409 else:
409 else:
410 handler(msg)
410 handler(msg)
411 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
411 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
412
412
413 def _flush_control(self, sock):
413 def _flush_control(self, sock):
414 """Flush replies from the control channel waiting
414 """Flush replies from the control channel waiting
415 in the ZMQ queue.
415 in the ZMQ queue.
416
416
417 Currently: ignore them."""
417 Currently: ignore them."""
418 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
418 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
419 while msg is not None:
419 while msg is not None:
420 if self.debug:
420 if self.debug:
421 pprint(msg)
421 pprint(msg)
422 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
422 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
423
423
424 #--------------------------------------------------------------------------
424 #--------------------------------------------------------------------------
425 # getitem
425 # getitem
426 #--------------------------------------------------------------------------
426 #--------------------------------------------------------------------------
427
427
428 def __getitem__(self, key):
428 def __getitem__(self, key):
429 """Dict access returns DirectView multiplexer objects or,
429 """Dict access returns DirectView multiplexer objects or,
430 if key is None, a LoadBalancedView."""
430 if key is None, a LoadBalancedView."""
431 if key is None:
431 if key is None:
432 return LoadBalancedView(self)
432 return LoadBalancedView(self)
433 if isinstance(key, int):
433 if isinstance(key, int):
434 if key not in self.ids:
434 if key not in self.ids:
435 raise IndexError("No such engine: %i"%key)
435 raise IndexError("No such engine: %i"%key)
436 return DirectView(self, key)
436 return DirectView(self, key)
437
437
438 if isinstance(key, slice):
438 if isinstance(key, slice):
439 indices = range(len(self.ids))[key]
439 indices = range(len(self.ids))[key]
440 ids = sorted(self._ids)
440 ids = sorted(self._ids)
441 key = [ ids[i] for i in indices ]
441 key = [ ids[i] for i in indices ]
442 # newkeys = sorted(self._ids)[thekeys[k]]
442 # newkeys = sorted(self._ids)[thekeys[k]]
443
443
444 if isinstance(key, (tuple, list, xrange)):
444 if isinstance(key, (tuple, list, xrange)):
445 _,targets = self._build_targets(list(key))
445 _,targets = self._build_targets(list(key))
446 return DirectView(self, targets)
446 return DirectView(self, targets)
447 else:
447 else:
448 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
448 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
449
449
450 #--------------------------------------------------------------------------
450 #--------------------------------------------------------------------------
451 # Begin public methods
451 # Begin public methods
452 #--------------------------------------------------------------------------
452 #--------------------------------------------------------------------------
453
453
454 @property
454 @property
455 def remote(self):
455 def remote(self):
456 """property for convenient RemoteFunction generation.
456 """property for convenient RemoteFunction generation.
457
457
458 >>> @client.remote
458 >>> @client.remote
459 ... def f():
459 ... def f():
460 import os
460 import os
461 print (os.getpid())
461 print (os.getpid())
462 """
462 """
463 return remote(self, block=self.block)
463 return remote(self, block=self.block)
464
464
465 def spin(self):
465 def spin(self):
466 """Flush any registration notifications and execution results
466 """Flush any registration notifications and execution results
467 waiting in the ZMQ queue.
467 waiting in the ZMQ queue.
468 """
468 """
469 if self._notification_socket:
469 if self._notification_socket:
470 self._flush_notifications()
470 self._flush_notifications()
471 if self._mux_socket:
471 if self._mux_socket:
472 self._flush_results(self._mux_socket)
472 self._flush_results(self._mux_socket)
473 if self._task_socket:
473 if self._task_socket:
474 self._flush_results(self._task_socket)
474 self._flush_results(self._task_socket)
475 if self._control_socket:
475 if self._control_socket:
476 self._flush_control(self._control_socket)
476 self._flush_control(self._control_socket)
477
477
478 def barrier(self, msg_ids=None, timeout=-1):
478 def barrier(self, msg_ids=None, timeout=-1):
479 """waits on one or more `msg_ids`, for up to `timeout` seconds.
479 """waits on one or more `msg_ids`, for up to `timeout` seconds.
480
480
481 Parameters
481 Parameters
482 ----------
482 ----------
483 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
483 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
484 ints are indices to self.history
484 ints are indices to self.history
485 strs are msg_ids
485 strs are msg_ids
486 default: wait on all outstanding messages
486 default: wait on all outstanding messages
487 timeout : float
487 timeout : float
488 a time in seconds, after which to give up.
488 a time in seconds, after which to give up.
489 default is -1, which means no timeout
489 default is -1, which means no timeout
490
490
491 Returns
491 Returns
492 -------
492 -------
493 True : when all msg_ids are done
493 True : when all msg_ids are done
494 False : timeout reached, some msg_ids still outstanding
494 False : timeout reached, some msg_ids still outstanding
495 """
495 """
496 tic = time.time()
496 tic = time.time()
497 if msg_ids is None:
497 if msg_ids is None:
498 theids = self.outstanding
498 theids = self.outstanding
499 else:
499 else:
500 if isinstance(msg_ids, (int, str, AsyncResult)):
500 if isinstance(msg_ids, (int, str, AsyncResult)):
501 msg_ids = [msg_ids]
501 msg_ids = [msg_ids]
502 theids = set()
502 theids = set()
503 for msg_id in msg_ids:
503 for msg_id in msg_ids:
504 if isinstance(msg_id, int):
504 if isinstance(msg_id, int):
505 msg_id = self.history[msg_id]
505 msg_id = self.history[msg_id]
506 elif isinstance(msg_id, AsyncResult):
506 elif isinstance(msg_id, AsyncResult):
507 map(theids.add, msg_id._msg_ids)
507 map(theids.add, msg_id.msg_ids)
508 continue
508 continue
509 theids.add(msg_id)
509 theids.add(msg_id)
510 if not theids.intersection(self.outstanding):
510 if not theids.intersection(self.outstanding):
511 return True
511 return True
512 self.spin()
512 self.spin()
513 while theids.intersection(self.outstanding):
513 while theids.intersection(self.outstanding):
514 if timeout >= 0 and ( time.time()-tic ) > timeout:
514 if timeout >= 0 and ( time.time()-tic ) > timeout:
515 break
515 break
516 time.sleep(1e-3)
516 time.sleep(1e-3)
517 self.spin()
517 self.spin()
518 return len(theids.intersection(self.outstanding)) == 0
518 return len(theids.intersection(self.outstanding)) == 0
519
519
520 #--------------------------------------------------------------------------
520 #--------------------------------------------------------------------------
521 # Control methods
521 # Control methods
522 #--------------------------------------------------------------------------
522 #--------------------------------------------------------------------------
523
523
524 @spinfirst
524 @spinfirst
525 @defaultblock
525 @defaultblock
526 def clear(self, targets=None, block=None):
526 def clear(self, targets=None, block=None):
527 """Clear the namespace in target(s)."""
527 """Clear the namespace in target(s)."""
528 targets = self._build_targets(targets)[0]
528 targets = self._build_targets(targets)[0]
529 for t in targets:
529 for t in targets:
530 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
530 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
531 error = False
531 error = False
532 if self.block:
532 if self.block:
533 for i in range(len(targets)):
533 for i in range(len(targets)):
534 idents,msg = self.session.recv(self._control_socket,0)
534 idents,msg = self.session.recv(self._control_socket,0)
535 if self.debug:
535 if self.debug:
536 pprint(msg)
536 pprint(msg)
537 if msg['content']['status'] != 'ok':
537 if msg['content']['status'] != 'ok':
538 error = ss.unwrap_exception(msg['content'])
538 error = ss.unwrap_exception(msg['content'])
539 if error:
539 if error:
540 return error
540 return error
541
541
542
542
543 @spinfirst
543 @spinfirst
544 @defaultblock
544 @defaultblock
545 def abort(self, msg_ids = None, targets=None, block=None):
545 def abort(self, msg_ids = None, targets=None, block=None):
546 """Abort the execution queues of target(s)."""
546 """Abort the execution queues of target(s)."""
547 targets = self._build_targets(targets)[0]
547 targets = self._build_targets(targets)[0]
548 if isinstance(msg_ids, basestring):
548 if isinstance(msg_ids, basestring):
549 msg_ids = [msg_ids]
549 msg_ids = [msg_ids]
550 content = dict(msg_ids=msg_ids)
550 content = dict(msg_ids=msg_ids)
551 for t in targets:
551 for t in targets:
552 self.session.send(self._control_socket, 'abort_request',
552 self.session.send(self._control_socket, 'abort_request',
553 content=content, ident=t)
553 content=content, ident=t)
554 error = False
554 error = False
555 if self.block:
555 if self.block:
556 for i in range(len(targets)):
556 for i in range(len(targets)):
557 idents,msg = self.session.recv(self._control_socket,0)
557 idents,msg = self.session.recv(self._control_socket,0)
558 if self.debug:
558 if self.debug:
559 pprint(msg)
559 pprint(msg)
560 if msg['content']['status'] != 'ok':
560 if msg['content']['status'] != 'ok':
561 error = ss.unwrap_exception(msg['content'])
561 error = ss.unwrap_exception(msg['content'])
562 if error:
562 if error:
563 return error
563 return error
564
564
565 @spinfirst
565 @spinfirst
566 @defaultblock
566 @defaultblock
567 def shutdown(self, targets=None, restart=False, controller=False, block=None):
567 def shutdown(self, targets=None, restart=False, controller=False, block=None):
568 """Terminates one or more engine processes, optionally including the controller."""
568 """Terminates one or more engine processes, optionally including the controller."""
569 if controller:
569 if controller:
570 targets = 'all'
570 targets = 'all'
571 targets = self._build_targets(targets)[0]
571 targets = self._build_targets(targets)[0]
572 for t in targets:
572 for t in targets:
573 self.session.send(self._control_socket, 'shutdown_request',
573 self.session.send(self._control_socket, 'shutdown_request',
574 content={'restart':restart},ident=t)
574 content={'restart':restart},ident=t)
575 error = False
575 error = False
576 if block or controller:
576 if block or controller:
577 for i in range(len(targets)):
577 for i in range(len(targets)):
578 idents,msg = self.session.recv(self._control_socket,0)
578 idents,msg = self.session.recv(self._control_socket,0)
579 if self.debug:
579 if self.debug:
580 pprint(msg)
580 pprint(msg)
581 if msg['content']['status'] != 'ok':
581 if msg['content']['status'] != 'ok':
582 error = ss.unwrap_exception(msg['content'])
582 error = ss.unwrap_exception(msg['content'])
583
583
584 if controller:
584 if controller:
585 time.sleep(0.25)
585 time.sleep(0.25)
586 self.session.send(self._query_socket, 'shutdown_request')
586 self.session.send(self._query_socket, 'shutdown_request')
587 idents,msg = self.session.recv(self._query_socket, 0)
587 idents,msg = self.session.recv(self._query_socket, 0)
588 if self.debug:
588 if self.debug:
589 pprint(msg)
589 pprint(msg)
590 if msg['content']['status'] != 'ok':
590 if msg['content']['status'] != 'ok':
591 error = ss.unwrap_exception(msg['content'])
591 error = ss.unwrap_exception(msg['content'])
592
592
593 if error:
593 if error:
594 raise error
594 raise error
595
595
596 #--------------------------------------------------------------------------
596 #--------------------------------------------------------------------------
597 # Execution methods
597 # Execution methods
598 #--------------------------------------------------------------------------
598 #--------------------------------------------------------------------------
599
599
600 @defaultblock
600 @defaultblock
601 def execute(self, code, targets='all', block=None):
601 def execute(self, code, targets='all', block=None):
602 """Executes `code` on `targets` in blocking or nonblocking manner.
602 """Executes `code` on `targets` in blocking or nonblocking manner.
603
603
604 ``execute`` is always `bound` (affects engine namespace)
604 ``execute`` is always `bound` (affects engine namespace)
605
605
606 Parameters
606 Parameters
607 ----------
607 ----------
608 code : str
608 code : str
609 the code string to be executed
609 the code string to be executed
610 targets : int/str/list of ints/strs
610 targets : int/str/list of ints/strs
611 the engines on which to execute
611 the engines on which to execute
612 default : all
612 default : all
613 block : bool
613 block : bool
614 whether or not to wait until done to return
614 whether or not to wait until done to return
615 default: self.block
615 default: self.block
616 """
616 """
617 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
617 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
618 return result
618 return result
619
619
620 def run(self, code, block=None):
620 def run(self, code, block=None):
621 """Runs `code` on an engine.
621 """Runs `code` on an engine.
622
622
623 Calls to this are load-balanced.
623 Calls to this are load-balanced.
624
624
625 ``run`` is never `bound` (no effect on engine namespace)
625 ``run`` is never `bound` (no effect on engine namespace)
626
626
627 Parameters
627 Parameters
628 ----------
628 ----------
629 code : str
629 code : str
630 the code string to be executed
630 the code string to be executed
631 block : bool
631 block : bool
632 whether or not to wait until done
632 whether or not to wait until done
633
633
634 """
634 """
635 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
635 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
636 return result
636 return result
637
637
638 def _maybe_raise(self, result):
638 def _maybe_raise(self, result):
639 """wrapper for maybe raising an exception if apply failed."""
639 """wrapper for maybe raising an exception if apply failed."""
640 if isinstance(result, error.RemoteError):
640 if isinstance(result, error.RemoteError):
641 raise result
641 raise result
642
642
643 return result
643 return result
644
644
645 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
645 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
646 after=None, follow=None):
646 after=None, follow=None):
647 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
647 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
648
648
649 This is the central execution command for the client.
649 This is the central execution command for the client.
650
650
651 Parameters
651 Parameters
652 ----------
652 ----------
653
653
654 f : function
654 f : function
655 The fuction to be called remotely
655 The fuction to be called remotely
656 args : tuple/list
656 args : tuple/list
657 The positional arguments passed to `f`
657 The positional arguments passed to `f`
658 kwargs : dict
658 kwargs : dict
659 The keyword arguments passed to `f`
659 The keyword arguments passed to `f`
660 bound : bool (default: True)
660 bound : bool (default: True)
661 Whether to execute in the Engine(s) namespace, or in a clean
661 Whether to execute in the Engine(s) namespace, or in a clean
662 namespace not affecting the engine.
662 namespace not affecting the engine.
663 block : bool (default: self.block)
663 block : bool (default: self.block)
664 Whether to wait for the result, or return immediately.
664 Whether to wait for the result, or return immediately.
665 False:
665 False:
666 returns msg_id(s)
666 returns msg_id(s)
667 if multiple targets:
667 if multiple targets:
668 list of ids
668 list of ids
669 True:
669 True:
670 returns actual result(s) of f(*args, **kwargs)
670 returns actual result(s) of f(*args, **kwargs)
671 if multiple targets:
671 if multiple targets:
672 dict of results, by engine ID
672 dict of results, by engine ID
673 targets : int,list of ints, 'all', None
673 targets : int,list of ints, 'all', None
674 Specify the destination of the job.
674 Specify the destination of the job.
675 if None:
675 if None:
676 Submit via Task queue for load-balancing.
676 Submit via Task queue for load-balancing.
677 if 'all':
677 if 'all':
678 Run on all active engines
678 Run on all active engines
679 if list:
679 if list:
680 Run on each specified engine
680 Run on each specified engine
681 if int:
681 if int:
682 Run on single engine
682 Run on single engine
683
683
684 after : Dependency or collection of msg_ids
684 after : Dependency or collection of msg_ids
685 Only for load-balanced execution (targets=None)
685 Only for load-balanced execution (targets=None)
686 Specify a list of msg_ids as a time-based dependency.
686 Specify a list of msg_ids as a time-based dependency.
687 This job will only be run *after* the dependencies
687 This job will only be run *after* the dependencies
688 have been met.
688 have been met.
689
689
690 follow : Dependency or collection of msg_ids
690 follow : Dependency or collection of msg_ids
691 Only for load-balanced execution (targets=None)
691 Only for load-balanced execution (targets=None)
692 Specify a list of msg_ids as a location-based dependency.
692 Specify a list of msg_ids as a location-based dependency.
693 This job will only be run on an engine where this dependency
693 This job will only be run on an engine where this dependency
694 is met.
694 is met.
695
695
696 Returns
696 Returns
697 -------
697 -------
698 if block is False:
698 if block is False:
699 if single target:
699 if single target:
700 return msg_id
700 return msg_id
701 else:
701 else:
702 return list of msg_ids
702 return list of msg_ids
703 ? (should this be dict like block=True) ?
703 ? (should this be dict like block=True) ?
704 else:
704 else:
705 if single target:
705 if single target:
706 return result of f(*args, **kwargs)
706 return result of f(*args, **kwargs)
707 else:
707 else:
708 return dict of results, keyed by engine
708 return dict of results, keyed by engine
709 """
709 """
710
710
711 # defaults:
711 # defaults:
712 block = block if block is not None else self.block
712 block = block if block is not None else self.block
713 args = args if args is not None else []
713 args = args if args is not None else []
714 kwargs = kwargs if kwargs is not None else {}
714 kwargs = kwargs if kwargs is not None else {}
715
715
716 # enforce types of f,args,kwrags
716 # enforce types of f,args,kwrags
717 if not callable(f):
717 if not callable(f):
718 raise TypeError("f must be callable, not %s"%type(f))
718 raise TypeError("f must be callable, not %s"%type(f))
719 if not isinstance(args, (tuple, list)):
719 if not isinstance(args, (tuple, list)):
720 raise TypeError("args must be tuple or list, not %s"%type(args))
720 raise TypeError("args must be tuple or list, not %s"%type(args))
721 if not isinstance(kwargs, dict):
721 if not isinstance(kwargs, dict):
722 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
722 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
723
723
724 options = dict(bound=bound, block=block, after=after, follow=follow)
724 options = dict(bound=bound, block=block, after=after, follow=follow)
725
725
726 if targets is None:
726 if targets is None:
727 return self._apply_balanced(f, args, kwargs, **options)
727 return self._apply_balanced(f, args, kwargs, **options)
728 else:
728 else:
729 return self._apply_direct(f, args, kwargs, targets=targets, **options)
729 return self._apply_direct(f, args, kwargs, targets=targets, **options)
730
730
731 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
731 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
732 after=None, follow=None):
732 after=None, follow=None):
733 """The underlying method for applying functions in a load balanced
733 """The underlying method for applying functions in a load balanced
734 manner, via the task queue."""
734 manner, via the task queue."""
735 if isinstance(after, Dependency):
735 if isinstance(after, Dependency):
736 after = after.as_dict()
736 after = after.as_dict()
737 elif after is None:
737 elif after is None:
738 after = []
738 after = []
739 if isinstance(follow, Dependency):
739 if isinstance(follow, Dependency):
740 follow = follow.as_dict()
740 follow = follow.as_dict()
741 elif follow is None:
741 elif follow is None:
742 follow = []
742 follow = []
743 subheader = dict(after=after, follow=follow)
743 subheader = dict(after=after, follow=follow)
744
744
745 bufs = ss.pack_apply_message(f,args,kwargs)
745 bufs = ss.pack_apply_message(f,args,kwargs)
746 content = dict(bound=bound)
746 content = dict(bound=bound)
747 msg = self.session.send(self._task_socket, "apply_request",
747 msg = self.session.send(self._task_socket, "apply_request",
748 content=content, buffers=bufs, subheader=subheader)
748 content=content, buffers=bufs, subheader=subheader)
749 msg_id = msg['msg_id']
749 msg_id = msg['msg_id']
750 self.outstanding.add(msg_id)
750 self.outstanding.add(msg_id)
751 self.history.append(msg_id)
751 self.history.append(msg_id)
752 if block:
752 if block:
753 self.barrier(msg_id)
753 self.barrier(msg_id)
754 return self._maybe_raise(self.results[msg_id])
754 return self._maybe_raise(self.results[msg_id])
755 else:
755 else:
756 return AsyncResult(self, [msg_id])
756 return AsyncResult(self, [msg_id])
757
757
758 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
758 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
759 after=None, follow=None):
759 after=None, follow=None):
760 """Then underlying method for applying functions to specific engines
760 """Then underlying method for applying functions to specific engines
761 via the MUX queue."""
761 via the MUX queue."""
762
762
763 queues,targets = self._build_targets(targets)
763 queues,targets = self._build_targets(targets)
764 bufs = ss.pack_apply_message(f,args,kwargs)
764 bufs = ss.pack_apply_message(f,args,kwargs)
765 if isinstance(after, Dependency):
765 if isinstance(after, Dependency):
766 after = after.as_dict()
766 after = after.as_dict()
767 elif after is None:
767 elif after is None:
768 after = []
768 after = []
769 if isinstance(follow, Dependency):
769 if isinstance(follow, Dependency):
770 follow = follow.as_dict()
770 follow = follow.as_dict()
771 elif follow is None:
771 elif follow is None:
772 follow = []
772 follow = []
773 subheader = dict(after=after, follow=follow)
773 subheader = dict(after=after, follow=follow)
774 content = dict(bound=bound)
774 content = dict(bound=bound)
775 msg_ids = []
775 msg_ids = []
776 for queue in queues:
776 for queue in queues:
777 msg = self.session.send(self._mux_socket, "apply_request",
777 msg = self.session.send(self._mux_socket, "apply_request",
778 content=content, buffers=bufs,ident=queue, subheader=subheader)
778 content=content, buffers=bufs,ident=queue, subheader=subheader)
779 msg_id = msg['msg_id']
779 msg_id = msg['msg_id']
780 self.outstanding.add(msg_id)
780 self.outstanding.add(msg_id)
781 self.history.append(msg_id)
781 self.history.append(msg_id)
782 msg_ids.append(msg_id)
782 msg_ids.append(msg_id)
783 if block:
783 if block:
784 self.barrier(msg_ids)
784 self.barrier(msg_ids)
785 else:
785 else:
786 return AsyncResult(self, msg_ids)
786 return AsyncResult(self, msg_ids)
787 if len(msg_ids) == 1:
787 if len(msg_ids) == 1:
788 return self._maybe_raise(self.results[msg_ids[0]])
788 return self._maybe_raise(self.results[msg_ids[0]])
789 else:
789 else:
790 result = {}
790 result = {}
791 for target,mid in zip(targets, msg_ids):
791 for target,mid in zip(targets, msg_ids):
792 result[target] = self.results[mid]
792 result[target] = self.results[mid]
793 return error.collect_exceptions(result, f.__name__)
793 return error.collect_exceptions(result, f.__name__)
794
794
795 #--------------------------------------------------------------------------
795 #--------------------------------------------------------------------------
796 # Map and decorators
796 # Map and decorators
797 #--------------------------------------------------------------------------
797 #--------------------------------------------------------------------------
798
798
799 def map(self, f, *sequences):
799 def map(self, f, *sequences):
800 """Parallel version of builtin `map`, using all our engines."""
800 """Parallel version of builtin `map`, using all our engines."""
801 pf = ParallelFunction(self, f, block=self.block,
801 pf = ParallelFunction(self, f, block=self.block,
802 bound=True, targets='all')
802 bound=True, targets='all')
803 return pf.map(*sequences)
803 return pf.map(*sequences)
804
804
805 def parallel(self, bound=True, targets='all', block=True):
805 def parallel(self, bound=True, targets='all', block=True):
806 """Decorator for making a ParallelFunction."""
806 """Decorator for making a ParallelFunction."""
807 return parallel(self, bound=bound, targets=targets, block=block)
807 return parallel(self, bound=bound, targets=targets, block=block)
808
808
809 def remote(self, bound=True, targets='all', block=True):
809 def remote(self, bound=True, targets='all', block=True):
810 """Decorator for making a RemoteFunction."""
810 """Decorator for making a RemoteFunction."""
811 return remote(self, bound=bound, targets=targets, block=block)
811 return remote(self, bound=bound, targets=targets, block=block)
812
812
813 #--------------------------------------------------------------------------
813 #--------------------------------------------------------------------------
814 # Data movement
814 # Data movement
815 #--------------------------------------------------------------------------
815 #--------------------------------------------------------------------------
816
816
817 @defaultblock
817 @defaultblock
818 def push(self, ns, targets='all', block=None):
818 def push(self, ns, targets='all', block=None):
819 """Push the contents of `ns` into the namespace on `target`"""
819 """Push the contents of `ns` into the namespace on `target`"""
820 if not isinstance(ns, dict):
820 if not isinstance(ns, dict):
821 raise TypeError("Must be a dict, not %s"%type(ns))
821 raise TypeError("Must be a dict, not %s"%type(ns))
822 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
822 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
823 return result
823 return result
824
824
825 @defaultblock
825 @defaultblock
826 def pull(self, keys, targets='all', block=None):
826 def pull(self, keys, targets='all', block=None):
827 """Pull objects from `target`'s namespace by `keys`"""
827 """Pull objects from `target`'s namespace by `keys`"""
828 if isinstance(keys, str):
828 if isinstance(keys, str):
829 pass
829 pass
830 elif isinstance(keys, (list,tuple,set)):
830 elif isinstance(keys, (list,tuple,set)):
831 for key in keys:
831 for key in keys:
832 if not isinstance(key, str):
832 if not isinstance(key, str):
833 raise TypeError
833 raise TypeError
834 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
834 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
835 return result
835 return result
836
836
837 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
837 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
838 """
838 """
839 Partition a Python sequence and send the partitions to a set of engines.
839 Partition a Python sequence and send the partitions to a set of engines.
840 """
840 """
841 block = block if block is not None else self.block
841 block = block if block is not None else self.block
842 targets = self._build_targets(targets)[-1]
842 targets = self._build_targets(targets)[-1]
843 mapObject = Map.dists[dist]()
843 mapObject = Map.dists[dist]()
844 nparts = len(targets)
844 nparts = len(targets)
845 msg_ids = []
845 msg_ids = []
846 for index, engineid in enumerate(targets):
846 for index, engineid in enumerate(targets):
847 partition = mapObject.getPartition(seq, index, nparts)
847 partition = mapObject.getPartition(seq, index, nparts)
848 if flatten and len(partition) == 1:
848 if flatten and len(partition) == 1:
849 r = self.push({key: partition[0]}, targets=engineid, block=False)
849 r = self.push({key: partition[0]}, targets=engineid, block=False)
850 else:
850 else:
851 r = self.push({key: partition}, targets=engineid, block=False)
851 r = self.push({key: partition}, targets=engineid, block=False)
852 msg_ids.extend(r._msg_ids)
852 msg_ids.extend(r.msg_ids)
853 r = AsyncResult(self, msg_ids)
853 r = AsyncResult(self, msg_ids)
854 if block:
854 if block:
855 return r.get()
855 return r.get()
856 else:
856 else:
857 return r
857 return r
858
858
859 def gather(self, key, dist='b', targets='all', block=None):
859 def gather(self, key, dist='b', targets='all', block=None):
860 """
860 """
861 Gather a partitioned sequence on a set of engines as a single local seq.
861 Gather a partitioned sequence on a set of engines as a single local seq.
862 """
862 """
863 block = block if block is not None else self.block
863 block = block if block is not None else self.block
864
864
865 targets = self._build_targets(targets)[-1]
865 targets = self._build_targets(targets)[-1]
866 mapObject = Map.dists[dist]()
866 mapObject = Map.dists[dist]()
867 msg_ids = []
867 msg_ids = []
868 for index, engineid in enumerate(targets):
868 for index, engineid in enumerate(targets):
869 msg_ids.extend(self.pull(key, targets=engineid,block=False)._msg_ids)
869 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
870
870
871 r = AsyncMapResult(self, msg_ids, mapObject)
871 r = AsyncMapResult(self, msg_ids, mapObject)
872 if block:
872 if block:
873 return r.get()
873 return r.get()
874 else:
874 else:
875 return r
875 return r
876
876
877 #--------------------------------------------------------------------------
877 #--------------------------------------------------------------------------
878 # Query methods
878 # Query methods
879 #--------------------------------------------------------------------------
879 #--------------------------------------------------------------------------
880
880
881 @spinfirst
881 @spinfirst
882 def get_results(self, msg_ids, status_only=False):
882 def get_results(self, msg_ids, status_only=False):
883 """Returns the result of the execute or task request with `msg_ids`.
883 """Returns the result of the execute or task request with `msg_ids`.
884
884
885 Parameters
885 Parameters
886 ----------
886 ----------
887 msg_ids : list of ints or msg_ids
887 msg_ids : list of ints or msg_ids
888 if int:
888 if int:
889 Passed as index to self.history for convenience.
889 Passed as index to self.history for convenience.
890 status_only : bool (default: False)
890 status_only : bool (default: False)
891 if False:
891 if False:
892 return the actual results
892 return the actual results
893 """
893 """
894 if not isinstance(msg_ids, (list,tuple)):
894 if not isinstance(msg_ids, (list,tuple)):
895 msg_ids = [msg_ids]
895 msg_ids = [msg_ids]
896 theids = []
896 theids = []
897 for msg_id in msg_ids:
897 for msg_id in msg_ids:
898 if isinstance(msg_id, int):
898 if isinstance(msg_id, int):
899 msg_id = self.history[msg_id]
899 msg_id = self.history[msg_id]
900 if not isinstance(msg_id, str):
900 if not isinstance(msg_id, str):
901 raise TypeError("msg_ids must be str, not %r"%msg_id)
901 raise TypeError("msg_ids must be str, not %r"%msg_id)
902 theids.append(msg_id)
902 theids.append(msg_id)
903
903
904 completed = []
904 completed = []
905 local_results = {}
905 local_results = {}
906 for msg_id in list(theids):
906 for msg_id in list(theids):
907 if msg_id in self.results:
907 if msg_id in self.results:
908 completed.append(msg_id)
908 completed.append(msg_id)
909 local_results[msg_id] = self.results[msg_id]
909 local_results[msg_id] = self.results[msg_id]
910 theids.remove(msg_id)
910 theids.remove(msg_id)
911
911
912 if theids: # some not locally cached
912 if theids: # some not locally cached
913 content = dict(msg_ids=theids, status_only=status_only)
913 content = dict(msg_ids=theids, status_only=status_only)
914 msg = self.session.send(self._query_socket, "result_request", content=content)
914 msg = self.session.send(self._query_socket, "result_request", content=content)
915 zmq.select([self._query_socket], [], [])
915 zmq.select([self._query_socket], [], [])
916 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
916 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
917 if self.debug:
917 if self.debug:
918 pprint(msg)
918 pprint(msg)
919 content = msg['content']
919 content = msg['content']
920 if content['status'] != 'ok':
920 if content['status'] != 'ok':
921 raise ss.unwrap_exception(content)
921 raise ss.unwrap_exception(content)
922 else:
922 else:
923 content = dict(completed=[],pending=[])
923 content = dict(completed=[],pending=[])
924 if not status_only:
924 if not status_only:
925 # load cached results into result:
925 # load cached results into result:
926 content['completed'].extend(completed)
926 content['completed'].extend(completed)
927 content.update(local_results)
927 content.update(local_results)
928 # update cache with results:
928 # update cache with results:
929 for msg_id in msg_ids:
929 for msg_id in msg_ids:
930 if msg_id in content['completed']:
930 if msg_id in content['completed']:
931 self.results[msg_id] = content[msg_id]
931 self.results[msg_id] = content[msg_id]
932 return content
932 return content
933
933
934 @spinfirst
934 @spinfirst
935 def queue_status(self, targets=None, verbose=False):
935 def queue_status(self, targets=None, verbose=False):
936 """Fetch the status of engine queues.
936 """Fetch the status of engine queues.
937
937
938 Parameters
938 Parameters
939 ----------
939 ----------
940 targets : int/str/list of ints/strs
940 targets : int/str/list of ints/strs
941 the engines on which to execute
941 the engines on which to execute
942 default : all
942 default : all
943 verbose : bool
943 verbose : bool
944 Whether to return lengths only, or lists of ids for each element
944 Whether to return lengths only, or lists of ids for each element
945 """
945 """
946 targets = self._build_targets(targets)[1]
946 targets = self._build_targets(targets)[1]
947 content = dict(targets=targets, verbose=verbose)
947 content = dict(targets=targets, verbose=verbose)
948 self.session.send(self._query_socket, "queue_request", content=content)
948 self.session.send(self._query_socket, "queue_request", content=content)
949 idents,msg = self.session.recv(self._query_socket, 0)
949 idents,msg = self.session.recv(self._query_socket, 0)
950 if self.debug:
950 if self.debug:
951 pprint(msg)
951 pprint(msg)
952 content = msg['content']
952 content = msg['content']
953 status = content.pop('status')
953 status = content.pop('status')
954 if status != 'ok':
954 if status != 'ok':
955 raise ss.unwrap_exception(content)
955 raise ss.unwrap_exception(content)
956 return content
956 return content
957
957
958 @spinfirst
958 @spinfirst
959 def purge_results(self, msg_ids=[], targets=[]):
959 def purge_results(self, msg_ids=[], targets=[]):
960 """Tell the controller to forget results.
960 """Tell the controller to forget results.
961
961
962 Individual results can be purged by msg_id, or the entire
962 Individual results can be purged by msg_id, or the entire
963 history of specific targets can be purged.
963 history of specific targets can be purged.
964
964
965 Parameters
965 Parameters
966 ----------
966 ----------
967 msg_ids : str or list of strs
967 msg_ids : str or list of strs
968 the msg_ids whose results should be forgotten.
968 the msg_ids whose results should be forgotten.
969 targets : int/str/list of ints/strs
969 targets : int/str/list of ints/strs
970 The targets, by uuid or int_id, whose entire history is to be purged.
970 The targets, by uuid or int_id, whose entire history is to be purged.
971 Use `targets='all'` to scrub everything from the controller's memory.
971 Use `targets='all'` to scrub everything from the controller's memory.
972
972
973 default : None
973 default : None
974 """
974 """
975 if not targets and not msg_ids:
975 if not targets and not msg_ids:
976 raise ValueError
976 raise ValueError
977 if targets:
977 if targets:
978 targets = self._build_targets(targets)[1]
978 targets = self._build_targets(targets)[1]
979 content = dict(targets=targets, msg_ids=msg_ids)
979 content = dict(targets=targets, msg_ids=msg_ids)
980 self.session.send(self._query_socket, "purge_request", content=content)
980 self.session.send(self._query_socket, "purge_request", content=content)
981 idents, msg = self.session.recv(self._query_socket, 0)
981 idents, msg = self.session.recv(self._query_socket, 0)
982 if self.debug:
982 if self.debug:
983 pprint(msg)
983 pprint(msg)
984 content = msg['content']
984 content = msg['content']
985 if content['status'] != 'ok':
985 if content['status'] != 'ok':
986 raise ss.unwrap_exception(content)
986 raise ss.unwrap_exception(content)
987
987
988 #----------------------------------------
988 #----------------------------------------
989 # activate for %px,%autopx magics
989 # activate for %px,%autopx magics
990 #----------------------------------------
990 #----------------------------------------
991 def activate(self):
991 def activate(self):
992 """Make this `View` active for parallel magic commands.
992 """Make this `View` active for parallel magic commands.
993
993
994 IPython has a magic command syntax to work with `MultiEngineClient` objects.
994 IPython has a magic command syntax to work with `MultiEngineClient` objects.
995 In a given IPython session there is a single active one. While
995 In a given IPython session there is a single active one. While
996 there can be many `Views` created and used by the user,
996 there can be many `Views` created and used by the user,
997 there is only one active one. The active `View` is used whenever
997 there is only one active one. The active `View` is used whenever
998 the magic commands %px and %autopx are used.
998 the magic commands %px and %autopx are used.
999
999
1000 The activate() method is called on a given `View` to make it
1000 The activate() method is called on a given `View` to make it
1001 active. Once this has been done, the magic commands can be used.
1001 active. Once this has been done, the magic commands can be used.
1002 """
1002 """
1003
1003
1004 try:
1004 try:
1005 # This is injected into __builtins__.
1005 # This is injected into __builtins__.
1006 ip = get_ipython()
1006 ip = get_ipython()
1007 except NameError:
1007 except NameError:
1008 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1008 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1009 else:
1009 else:
1010 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1010 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1011 if pmagic is not None:
1011 if pmagic is not None:
1012 pmagic.active_multiengine_client = self
1012 pmagic.active_multiengine_client = self
1013 else:
1013 else:
1014 print "You must first load the parallelmagic extension " \
1014 print "You must first load the parallelmagic extension " \
1015 "by doing '%load_ext parallelmagic'"
1015 "by doing '%load_ext parallelmagic'"
1016
1016
1017 class AsynClient(Client):
1017 class AsynClient(Client):
1018 """An Asynchronous client, using the Tornado Event Loop.
1018 """An Asynchronous client, using the Tornado Event Loop.
1019 !!!unfinished!!!"""
1019 !!!unfinished!!!"""
1020 io_loop = None
1020 io_loop = None
1021 _queue_stream = None
1021 _queue_stream = None
1022 _notifier_stream = None
1022 _notifier_stream = None
1023 _task_stream = None
1023 _task_stream = None
1024 _control_stream = None
1024 _control_stream = None
1025
1025
1026 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1026 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1027 Client.__init__(self, addr, context, username, debug)
1027 Client.__init__(self, addr, context, username, debug)
1028 if io_loop is None:
1028 if io_loop is None:
1029 io_loop = ioloop.IOLoop.instance()
1029 io_loop = ioloop.IOLoop.instance()
1030 self.io_loop = io_loop
1030 self.io_loop = io_loop
1031
1031
1032 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1032 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1033 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1033 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1034 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1034 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1035 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1035 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1036
1036
1037 def spin(self):
1037 def spin(self):
1038 for stream in (self.queue_stream, self.notifier_stream,
1038 for stream in (self.queue_stream, self.notifier_stream,
1039 self.task_stream, self.control_stream):
1039 self.task_stream, self.control_stream):
1040 stream.flush()
1040 stream.flush()
1041
1041
1042 __all__ = [ 'Client',
1042 __all__ = [ 'Client',
1043 'depend',
1043 'depend',
1044 'require',
1044 'require',
1045 'remote',
1045 'remote',
1046 'parallel',
1046 'parallel',
1047 'RemoteFunction',
1047 'RemoteFunction',
1048 'ParallelFunction',
1048 'ParallelFunction',
1049 'DirectView',
1049 'DirectView',
1050 'LoadBalancedView',
1050 'LoadBalancedView',
1051 'AsyncResult',
1051 'AsyncResult',
1052 'AsyncMapResult'
1052 'AsyncMapResult'
1053 ]
1053 ]
@@ -1,347 +1,355 b''
1 """Views of remote engines"""
1 """Views of remote engines"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.external.decorator import decorator
13 from IPython.external.decorator import decorator
14 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
14 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Decorators
17 # Decorators
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 @decorator
20 @decorator
21 def myblock(f, self, *args, **kwargs):
21 def myblock(f, self, *args, **kwargs):
22 """override client.block with self.block during a call"""
22 """override client.block with self.block during a call"""
23 block = self.client.block
23 block = self.client.block
24 self.client.block = self.block
24 self.client.block = self.block
25 try:
25 try:
26 ret = f(self, *args, **kwargs)
26 ret = f(self, *args, **kwargs)
27 finally:
27 finally:
28 self.client.block = block
28 self.client.block = block
29 return ret
29 return ret
30
30
31 @decorator
31 @decorator
32 def save_ids(f, self, *args, **kwargs):
32 def save_ids(f, self, *args, **kwargs):
33 """Keep our history and outstanding attributes up to date after a method call."""
33 """Keep our history and outstanding attributes up to date after a method call."""
34 n_previous = len(self.client.history)
34 n_previous = len(self.client.history)
35 ret = f(self, *args, **kwargs)
35 ret = f(self, *args, **kwargs)
36 nmsgs = len(self.client.history) - n_previous
36 nmsgs = len(self.client.history) - n_previous
37 msg_ids = self.client.history[-nmsgs:]
37 msg_ids = self.client.history[-nmsgs:]
38 self.history.extend(msg_ids)
38 self.history.extend(msg_ids)
39 map(self.outstanding.add, msg_ids)
39 map(self.outstanding.add, msg_ids)
40 return ret
40 return ret
41
41
42 @decorator
42 @decorator
43 def sync_results(f, self, *args, **kwargs):
43 def sync_results(f, self, *args, **kwargs):
44 """sync relevant results from self.client to our results attribute."""
44 """sync relevant results from self.client to our results attribute."""
45 ret = f(self, *args, **kwargs)
45 ret = f(self, *args, **kwargs)
46 delta = self.outstanding.difference(self.client.outstanding)
46 delta = self.outstanding.difference(self.client.outstanding)
47 completed = self.outstanding.intersection(delta)
47 completed = self.outstanding.intersection(delta)
48 self.outstanding = self.outstanding.difference(completed)
48 self.outstanding = self.outstanding.difference(completed)
49 for msg_id in completed:
49 for msg_id in completed:
50 self.results[msg_id] = self.client.results[msg_id]
50 self.results[msg_id] = self.client.results[msg_id]
51 return ret
51 return ret
52
52
53 @decorator
53 @decorator
54 def spin_after(f, self, *args, **kwargs):
54 def spin_after(f, self, *args, **kwargs):
55 """call spin after the method."""
55 """call spin after the method."""
56 ret = f(self, *args, **kwargs)
56 ret = f(self, *args, **kwargs)
57 self.spin()
57 self.spin()
58 return ret
58 return ret
59
59
60 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
61 # Classes
61 # Classes
62 #-----------------------------------------------------------------------------
62 #-----------------------------------------------------------------------------
63
63
64 class View(object):
64 class View(object):
65 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
65 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
66
66
67 Don't use this class, use subclasses.
67 Don't use this class, use subclasses.
68 """
68 """
69 _targets = None
69 _targets = None
70 block=None
70 block=None
71 bound=None
71 bound=None
72 history=None
72 history=None
73
73
74 def __init__(self, client, targets=None):
74 def __init__(self, client, targets=None):
75 self.client = client
75 self.client = client
76 self._targets = targets
76 self._targets = targets
77 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
77 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
78 self.block = client.block
78 self.block = client.block
79 self.bound=False
79 self.bound=False
80 self.history = []
80 self.history = []
81 self.outstanding = set()
81 self.outstanding = set()
82 self.results = {}
82 self.results = {}
83
83
84 def __repr__(self):
84 def __repr__(self):
85 strtargets = str(self._targets)
85 strtargets = str(self._targets)
86 if len(strtargets) > 16:
86 if len(strtargets) > 16:
87 strtargets = strtargets[:12]+'...]'
87 strtargets = strtargets[:12]+'...]'
88 return "<%s %s>"%(self.__class__.__name__, strtargets)
88 return "<%s %s>"%(self.__class__.__name__, strtargets)
89
89
90 @property
90 @property
91 def targets(self):
91 def targets(self):
92 return self._targets
92 return self._targets
93
93
94 @targets.setter
94 @targets.setter
95 def targets(self, value):
95 def targets(self, value):
96 self._targets = value
96 self._targets = value
97 # raise AttributeError("Cannot set my targets argument after construction!")
97 # raise AttributeError("Cannot set my targets argument after construction!")
98
98
99 @sync_results
99 @sync_results
100 def spin(self):
100 def spin(self):
101 """spin the client, and sync"""
101 """spin the client, and sync"""
102 self.client.spin()
102 self.client.spin()
103
103
104 @sync_results
104 @sync_results
105 @save_ids
105 @save_ids
106 def apply(self, f, *args, **kwargs):
106 def apply(self, f, *args, **kwargs):
107 """calls f(*args, **kwargs) on remote engines, returning the result.
107 """calls f(*args, **kwargs) on remote engines, returning the result.
108
108
109 This method does not involve the engine's namespace.
109 This method does not involve the engine's namespace.
110
110
111 if self.block is False:
111 if self.block is False:
112 returns msg_id
112 returns msg_id
113 else:
113 else:
114 returns actual result of f(*args, **kwargs)
114 returns actual result of f(*args, **kwargs)
115 """
115 """
116 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
116 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
117
117
118 @save_ids
118 @save_ids
119 def apply_async(self, f, *args, **kwargs):
119 def apply_async(self, f, *args, **kwargs):
120 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
120 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
121
121
122 This method does not involve the engine's namespace.
122 This method does not involve the engine's namespace.
123
123
124 returns msg_id
124 returns msg_id
125 """
125 """
126 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
126 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
127
127
128 @spin_after
128 @spin_after
129 @save_ids
129 @save_ids
130 def apply_sync(self, f, *args, **kwargs):
130 def apply_sync(self, f, *args, **kwargs):
131 """calls f(*args, **kwargs) on remote engines in a blocking manner,
131 """calls f(*args, **kwargs) on remote engines in a blocking manner,
132 returning the result.
132 returning the result.
133
133
134 This method does not involve the engine's namespace.
134 This method does not involve the engine's namespace.
135
135
136 returns: actual result of f(*args, **kwargs)
136 returns: actual result of f(*args, **kwargs)
137 """
137 """
138 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
138 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
139
139
140 @sync_results
140 @sync_results
141 @save_ids
141 @save_ids
142 def apply_bound(self, f, *args, **kwargs):
142 def apply_bound(self, f, *args, **kwargs):
143 """calls f(*args, **kwargs) bound to engine namespace(s).
143 """calls f(*args, **kwargs) bound to engine namespace(s).
144
144
145 if self.block is False:
145 if self.block is False:
146 returns msg_id
146 returns msg_id
147 else:
147 else:
148 returns actual result of f(*args, **kwargs)
148 returns actual result of f(*args, **kwargs)
149
149
150 This method has access to the targets' globals
150 This method has access to the targets' globals
151
151
152 """
152 """
153 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
153 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
154
154
155 @sync_results
155 @sync_results
156 @save_ids
156 @save_ids
157 def apply_async_bound(self, f, *args, **kwargs):
157 def apply_async_bound(self, f, *args, **kwargs):
158 """calls f(*args, **kwargs) bound to engine namespace(s)
158 """calls f(*args, **kwargs) bound to engine namespace(s)
159 in a nonblocking manner.
159 in a nonblocking manner.
160
160
161 returns: msg_id
161 returns: msg_id
162
162
163 This method has access to the targets' globals
163 This method has access to the targets' globals
164
164
165 """
165 """
166 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
166 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
167
167
168 @spin_after
168 @spin_after
169 @save_ids
169 @save_ids
170 def apply_sync_bound(self, f, *args, **kwargs):
170 def apply_sync_bound(self, f, *args, **kwargs):
171 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
171 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
172
172
173 returns: actual result of f(*args, **kwargs)
173 returns: actual result of f(*args, **kwargs)
174
174
175 This method has access to the targets' globals
175 This method has access to the targets' globals
176
176
177 """
177 """
178 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
178 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
179
179
180 @spin_after
180 @spin_after
181 @save_ids
181 @save_ids
182 def map(self, f, *sequences):
182 def map(self, f, *sequences):
183 """Parallel version of builtin `map`, using this view's engines."""
183 """Parallel version of builtin `map`, using this view's engines."""
184 if isinstance(self.targets, int):
184 if isinstance(self.targets, int):
185 targets = [self.targets]
185 targets = [self.targets]
186 pf = ParallelFunction(self.client, f, block=self.block,
186 pf = ParallelFunction(self.client, f, block=self.block,
187 bound=True, targets=targets)
187 bound=True, targets=targets)
188 return pf.map(*sequences)
188 return pf.map(*sequences)
189
189
190 def parallel(self, bound=True, block=True):
190 def parallel(self, bound=True, block=True):
191 """Decorator for making a ParallelFunction"""
191 """Decorator for making a ParallelFunction"""
192 return parallel(self.client, bound=bound, targets=self.targets, block=block)
192 return parallel(self.client, bound=bound, targets=self.targets, block=block)
193
193
194 def abort(self, msg_ids=None, block=None):
194 def abort(self, msg_ids=None, block=None):
195 """Abort jobs on my engines.
195 """Abort jobs on my engines.
196
196
197 Parameters
197 Parameters
198 ----------
198 ----------
199
199
200 msg_ids : None, str, list of strs, optional
200 msg_ids : None, str, list of strs, optional
201 if None: abort all jobs.
201 if None: abort all jobs.
202 else: abort specific msg_id(s).
202 else: abort specific msg_id(s).
203 """
203 """
204 block = block if block is not None else self.block
204 block = block if block is not None else self.block
205 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
205 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
206
206
207 def queue_status(self, verbose=False):
207 def queue_status(self, verbose=False):
208 """Fetch the Queue status of my engines"""
208 """Fetch the Queue status of my engines"""
209 return self.client.queue_status(targets=self.targets, verbose=verbose)
209 return self.client.queue_status(targets=self.targets, verbose=verbose)
210
210
211 def purge_results(self, msg_ids=[], targets=[]):
211 def purge_results(self, msg_ids=[], targets=[]):
212 """Instruct the controller to forget specific results."""
212 """Instruct the controller to forget specific results."""
213 if targets is None or targets == 'all':
213 if targets is None or targets == 'all':
214 targets = self.targets
214 targets = self.targets
215 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
215 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
216
216
217
217
218
218
219 class DirectView(View):
219 class DirectView(View):
220 """Direct Multiplexer View of one or more engines.
220 """Direct Multiplexer View of one or more engines.
221
221
222 These are created via indexed access to a client:
222 These are created via indexed access to a client:
223
223
224 >>> dv_1 = client[1]
224 >>> dv_1 = client[1]
225 >>> dv_all = client[:]
225 >>> dv_all = client[:]
226 >>> dv_even = client[::2]
226 >>> dv_even = client[::2]
227 >>> dv_some = client[1:3]
227 >>> dv_some = client[1:3]
228
228
229 This object provides dictionary access
229 This object provides dictionary access
230
230
231 """
231 """
232
232
233 @sync_results
233 @sync_results
234 @save_ids
234 @save_ids
235 def execute(self, code, block=True):
235 def execute(self, code, block=True):
236 """execute some code on my targets."""
236 """execute some code on my targets."""
237 return self.client.execute(code, block=self.block, targets=self.targets)
237 return self.client.execute(code, block=self.block, targets=self.targets)
238
238
239 def update(self, ns):
239 def update(self, ns):
240 """update remote namespace with dict `ns`"""
240 """update remote namespace with dict `ns`"""
241 return self.client.push(ns, targets=self.targets, block=self.block)
241 return self.client.push(ns, targets=self.targets, block=self.block)
242
242
243 push = update
243 push = update
244
244
245 def get(self, key_s):
245 def get(self, key_s):
246 """get object(s) by `key_s` from remote namespace
246 """get object(s) by `key_s` from remote namespace
247 will return one object if it is a key.
247 will return one object if it is a key.
248 It also takes a list of keys, and will return a list of objects."""
248 It also takes a list of keys, and will return a list of objects."""
249 # block = block if block is not None else self.block
249 # block = block if block is not None else self.block
250 return self.client.pull(key_s, block=True, targets=self.targets)
250 return self.client.pull(key_s, block=True, targets=self.targets)
251
251
252 @sync_results
252 @sync_results
253 @save_ids
253 @save_ids
254 def pull(self, key_s, block=True):
254 def pull(self, key_s, block=True):
255 """get object(s) by `key_s` from remote namespace
255 """get object(s) by `key_s` from remote namespace
256 will return one object if it is a key.
256 will return one object if it is a key.
257 It also takes a list of keys, and will return a list of objects."""
257 It also takes a list of keys, and will return a list of objects."""
258 block = block if block is not None else self.block
258 block = block if block is not None else self.block
259 return self.client.pull(key_s, block=block, targets=self.targets)
259 return self.client.pull(key_s, block=block, targets=self.targets)
260
260
261 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
261 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
262 """
262 """
263 Partition a Python sequence and send the partitions to a set of engines.
263 Partition a Python sequence and send the partitions to a set of engines.
264 """
264 """
265 block = block if block is not None else self.block
265 block = block if block is not None else self.block
266 if targets is None:
266 if targets is None:
267 targets = self.targets
267 targets = self.targets
268
268
269 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
269 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
270 targets=targets, block=block)
270 targets=targets, block=block)
271
271
272 @sync_results
272 @sync_results
273 @save_ids
273 @save_ids
274 def gather(self, key, dist='b', targets=None, block=True):
274 def gather(self, key, dist='b', targets=None, block=True):
275 """
275 """
276 Gather a partitioned sequence on a set of engines as a single local seq.
276 Gather a partitioned sequence on a set of engines as a single local seq.
277 """
277 """
278 block = block if block is not None else self.block
278 block = block if block is not None else self.block
279 if targets is None:
279 if targets is None:
280 targets = self.targets
280 targets = self.targets
281
281
282 return self.client.gather(key, dist=dist, targets=targets, block=block)
282 return self.client.gather(key, dist=dist, targets=targets, block=block)
283
283
284 def __getitem__(self, key):
284 def __getitem__(self, key):
285 return self.get(key)
285 return self.get(key)
286
286
287 def __setitem__(self,key, value):
287 def __setitem__(self,key, value):
288 self.update({key:value})
288 self.update({key:value})
289
289
290 def clear(self, block=False):
290 def clear(self, block=False):
291 """Clear the remote namespaces on my engines."""
291 """Clear the remote namespaces on my engines."""
292 block = block if block is not None else self.block
292 block = block if block is not None else self.block
293 return self.client.clear(targets=self.targets, block=block)
293 return self.client.clear(targets=self.targets, block=block)
294
294
295 def kill(self, block=True):
295 def kill(self, block=True):
296 """Kill my engines."""
296 """Kill my engines."""
297 block = block if block is not None else self.block
297 block = block if block is not None else self.block
298 return self.client.kill(targets=self.targets, block=block)
298 return self.client.kill(targets=self.targets, block=block)
299
299
300 #----------------------------------------
300 #----------------------------------------
301 # activate for %px,%autopx magics
301 # activate for %px,%autopx magics
302 #----------------------------------------
302 #----------------------------------------
303 def activate(self):
303 def activate(self):
304 """Make this `View` active for parallel magic commands.
304 """Make this `View` active for parallel magic commands.
305
305
306 IPython has a magic command syntax to work with `MultiEngineClient` objects.
306 IPython has a magic command syntax to work with `MultiEngineClient` objects.
307 In a given IPython session there is a single active one. While
307 In a given IPython session there is a single active one. While
308 there can be many `Views` created and used by the user,
308 there can be many `Views` created and used by the user,
309 there is only one active one. The active `View` is used whenever
309 there is only one active one. The active `View` is used whenever
310 the magic commands %px and %autopx are used.
310 the magic commands %px and %autopx are used.
311
311
312 The activate() method is called on a given `View` to make it
312 The activate() method is called on a given `View` to make it
313 active. Once this has been done, the magic commands can be used.
313 active. Once this has been done, the magic commands can be used.
314 """
314 """
315
315
316 try:
316 try:
317 # This is injected into __builtins__.
317 # This is injected into __builtins__.
318 ip = get_ipython()
318 ip = get_ipython()
319 except NameError:
319 except NameError:
320 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
320 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
321 else:
321 else:
322 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
322 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
323 if pmagic is not None:
323 if pmagic is not None:
324 pmagic.active_multiengine_client = self
324 pmagic.active_multiengine_client = self
325 else:
325 else:
326 print "You must first load the parallelmagic extension " \
326 print "You must first load the parallelmagic extension " \
327 "by doing '%load_ext parallelmagic'"
327 "by doing '%load_ext parallelmagic'"
328
328
329
329
330 class LoadBalancedView(View):
330 class LoadBalancedView(View):
331 """An engine-agnostic View that only executes via the Task queue.
331 """An engine-agnostic View that only executes via the Task queue.
332
332
333 Typically created via:
333 Typically created via:
334
334
335 >>> lbv = client[None]
335 >>> lbv = client[None]
336 <LoadBalancedView tcp://127.0.0.1:12345>
336 <LoadBalancedView tcp://127.0.0.1:12345>
337
337
338 but can also be created with:
338 but can also be created with:
339
339
340 >>> lbc = LoadBalancedView(client)
340 >>> lbc = LoadBalancedView(client)
341
341
342 TODO: allow subset of engines across which to balance.
342 TODO: allow subset of engines across which to balance.
343 """
343 """
344 def __repr__(self):
344 def __repr__(self):
345 return "<%s %s>"%(self.__class__.__name__, self.client._addr)
345 return "<%s %s>"%(self.__class__.__name__, self.client._addr)
346
346
347 @property
348 def targets(self):
349 return None
350
351 @targets.setter
352 def targets(self, value):
353 raise AttributeError("Cannot set targets for LoadbalancedView!")
354
347 No newline at end of file
355
General Comments 0
You need to be logged in to leave comments. Login now