##// END OF EJS Templates
tweaks related to docs + add activate() for magics
MinRK -
Show More
@@ -1,1019 +1,1053 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from __future__ import print_function
14
15 import os
13 import os
16 import time
14 import time
17 from getpass import getpass
15 from getpass import getpass
18 from pprint import pprint
16 from pprint import pprint
19
17
20 import zmq
18 import zmq
21 from zmq.eventloop import ioloop, zmqstream
19 from zmq.eventloop import ioloop, zmqstream
22
20
23 from IPython.external.decorator import decorator
21 from IPython.external.decorator import decorator
24 from IPython.zmq import tunnel
22 from IPython.zmq import tunnel
25
23
26 import streamsession as ss
24 import streamsession as ss
27 # from remotenamespace import RemoteNamespace
25 # from remotenamespace import RemoteNamespace
28 from view import DirectView, LoadBalancedView
26 from view import DirectView, LoadBalancedView
29 from dependency import Dependency, depend, require
27 from dependency import Dependency, depend, require
30 import error
28 import error
31 import map as Map
29 import map as Map
32 from asyncresult import AsyncResult, AsyncMapResult
30 from asyncresult import AsyncResult, AsyncMapResult
33 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
31 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
34
32
35 #--------------------------------------------------------------------------
33 #--------------------------------------------------------------------------
36 # helpers for implementing old MEC API via client.apply
34 # helpers for implementing old MEC API via client.apply
37 #--------------------------------------------------------------------------
35 #--------------------------------------------------------------------------
38
36
39 def _push(ns):
37 def _push(ns):
40 """helper method for implementing `client.push` via `client.apply`"""
38 """helper method for implementing `client.push` via `client.apply`"""
41 globals().update(ns)
39 globals().update(ns)
42
40
43 def _pull(keys):
41 def _pull(keys):
44 """helper method for implementing `client.pull` via `client.apply`"""
42 """helper method for implementing `client.pull` via `client.apply`"""
45 g = globals()
43 g = globals()
46 if isinstance(keys, (list,tuple, set)):
44 if isinstance(keys, (list,tuple, set)):
47 for key in keys:
45 for key in keys:
48 if not g.has_key(key):
46 if not g.has_key(key):
49 raise NameError("name '%s' is not defined"%key)
47 raise NameError("name '%s' is not defined"%key)
50 return map(g.get, keys)
48 return map(g.get, keys)
51 else:
49 else:
52 if not g.has_key(keys):
50 if not g.has_key(keys):
53 raise NameError("name '%s' is not defined"%keys)
51 raise NameError("name '%s' is not defined"%keys)
54 return g.get(keys)
52 return g.get(keys)
55
53
56 def _clear():
54 def _clear():
57 """helper method for implementing `client.clear` via `client.apply`"""
55 """helper method for implementing `client.clear` via `client.apply`"""
58 globals().clear()
56 globals().clear()
59
57
60 def execute(code):
58 def _execute(code):
61 """helper method for implementing `client.execute` via `client.apply`"""
59 """helper method for implementing `client.execute` via `client.apply`"""
62 exec code in globals()
60 exec code in globals()
63
61
64
62
65 #--------------------------------------------------------------------------
63 #--------------------------------------------------------------------------
66 # Decorators for Client methods
64 # Decorators for Client methods
67 #--------------------------------------------------------------------------
65 #--------------------------------------------------------------------------
68
66
69 @decorator
67 @decorator
70 def spinfirst(f, self, *args, **kwargs):
68 def spinfirst(f, self, *args, **kwargs):
71 """Call spin() to sync state prior to calling the method."""
69 """Call spin() to sync state prior to calling the method."""
72 self.spin()
70 self.spin()
73 return f(self, *args, **kwargs)
71 return f(self, *args, **kwargs)
74
72
75 @decorator
73 @decorator
76 def defaultblock(f, self, *args, **kwargs):
74 def defaultblock(f, self, *args, **kwargs):
77 """Default to self.block; preserve self.block."""
75 """Default to self.block; preserve self.block."""
78 block = kwargs.get('block',None)
76 block = kwargs.get('block',None)
79 block = self.block if block is None else block
77 block = self.block if block is None else block
80 saveblock = self.block
78 saveblock = self.block
81 self.block = block
79 self.block = block
82 ret = f(self, *args, **kwargs)
80 try:
83 self.block = saveblock
81 ret = f(self, *args, **kwargs)
82 finally:
83 self.block = saveblock
84 return ret
84 return ret
85
85
86
86
87 class AbortedTask(object):
87 class AbortedTask(object):
88 """A basic wrapper object describing an aborted task."""
88 """A basic wrapper object describing an aborted task."""
89 def __init__(self, msg_id):
89 def __init__(self, msg_id):
90 self.msg_id = msg_id
90 self.msg_id = msg_id
91
91
92 class ResultDict(dict):
92 class ResultDict(dict):
93 """A subclass of dict that raises errors if it has them."""
93 """A subclass of dict that raises errors if it has them."""
94 def __getitem__(self, key):
94 def __getitem__(self, key):
95 res = dict.__getitem__(self, key)
95 res = dict.__getitem__(self, key)
96 if isinstance(res, error.KernelError):
96 if isinstance(res, error.KernelError):
97 raise res
97 raise res
98 return res
98 return res
99
99
100 class Client(object):
100 class Client(object):
101 """A semi-synchronous client to the IPython ZMQ controller
101 """A semi-synchronous client to the IPython ZMQ controller
102
102
103 Parameters
103 Parameters
104 ----------
104 ----------
105
105
106 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
106 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
107 The address of the controller's registration socket.
107 The address of the controller's registration socket.
108 [Default: 'tcp://127.0.0.1:10101']
108 [Default: 'tcp://127.0.0.1:10101']
109 context : zmq.Context
109 context : zmq.Context
110 Pass an existing zmq.Context instance, otherwise the client will create its own
110 Pass an existing zmq.Context instance, otherwise the client will create its own
111 username : bytes
111 username : bytes
112 set username to be passed to the Session object
112 set username to be passed to the Session object
113 debug : bool
113 debug : bool
114 flag for lots of message printing for debug purposes
114 flag for lots of message printing for debug purposes
115
115
116 #-------------- ssh related args ----------------
116 #-------------- ssh related args ----------------
117 # These are args for configuring the ssh tunnel to be used
117 # These are args for configuring the ssh tunnel to be used
118 # credentials are used to forward connections over ssh to the Controller
118 # credentials are used to forward connections over ssh to the Controller
119 # Note that the ip given in `addr` needs to be relative to sshserver
119 # Note that the ip given in `addr` needs to be relative to sshserver
120 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
120 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
121 # and set sshserver as the same machine the Controller is on. However,
121 # and set sshserver as the same machine the Controller is on. However,
122 # the only requirement is that sshserver is able to see the Controller
122 # the only requirement is that sshserver is able to see the Controller
123 # (i.e. is within the same trusted network).
123 # (i.e. is within the same trusted network).
124
124
125 sshserver : str
125 sshserver : str
126 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
126 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
127 If keyfile or password is specified, and this is not, it will default to
127 If keyfile or password is specified, and this is not, it will default to
128 the ip given in addr.
128 the ip given in addr.
129 sshkey : str; path to public ssh key file
129 sshkey : str; path to public ssh key file
130 This specifies a key to be used in ssh login, default None.
130 This specifies a key to be used in ssh login, default None.
131 Regular default ssh keys will be used without specifying this argument.
131 Regular default ssh keys will be used without specifying this argument.
132 password : str;
132 password : str;
133 Your ssh password to sshserver. Note that if this is left None,
133 Your ssh password to sshserver. Note that if this is left None,
134 you will be prompted for it if passwordless key based login is unavailable.
134 you will be prompted for it if passwordless key based login is unavailable.
135
135
136 #------- exec authentication args -------
136 #------- exec authentication args -------
137 # If even localhost is untrusted, you can have some protection against
137 # If even localhost is untrusted, you can have some protection against
138 # unauthorized execution by using a key. Messages are still sent
138 # unauthorized execution by using a key. Messages are still sent
139 # as cleartext, so if someone can snoop your loopback traffic this will
139 # as cleartext, so if someone can snoop your loopback traffic this will
140 # not help anything.
140 # not help anything.
141
141
142 exec_key : str
142 exec_key : str
143 an authentication key or file containing a key
143 an authentication key or file containing a key
144 default: None
144 default: None
145
145
146
146
147 Attributes
147 Attributes
148 ----------
148 ----------
149 ids : set of int engine IDs
149 ids : set of int engine IDs
150 requesting the ids attribute always synchronizes
150 requesting the ids attribute always synchronizes
151 the registration state. To request ids without synchronization,
151 the registration state. To request ids without synchronization,
152 use semi-private _ids attributes.
152 use semi-private _ids attributes.
153
153
154 history : list of msg_ids
154 history : list of msg_ids
155 a list of msg_ids, keeping track of all the execution
155 a list of msg_ids, keeping track of all the execution
156 messages you have submitted in order.
156 messages you have submitted in order.
157
157
158 outstanding : set of msg_ids
158 outstanding : set of msg_ids
159 a set of msg_ids that have been submitted, but whose
159 a set of msg_ids that have been submitted, but whose
160 results have not yet been received.
160 results have not yet been received.
161
161
162 results : dict
162 results : dict
163 a dict of all our results, keyed by msg_id
163 a dict of all our results, keyed by msg_id
164
164
165 block : bool
165 block : bool
166 determines default behavior when block not specified
166 determines default behavior when block not specified
167 in execution methods
167 in execution methods
168
168
169 Methods
169 Methods
170 -------
170 -------
171 spin : flushes incoming results and registration state changes
171 spin : flushes incoming results and registration state changes
172 control methods spin, and requesting `ids` also ensures up to date
172 control methods spin, and requesting `ids` also ensures up to date
173
173
174 barrier : wait on one or more msg_ids
174 barrier : wait on one or more msg_ids
175
175
176 execution methods: apply/apply_bound/apply_to/apply_bound
176 execution methods: apply/apply_bound/apply_to/apply_bound
177 legacy: execute, run
177 legacy: execute, run
178
178
179 query methods: queue_status, get_result, purge
179 query methods: queue_status, get_result, purge
180
180
181 control methods: abort, kill
181 control methods: abort, kill
182
182
183 """
183 """
184
184
185
185
186 _connected=False
186 _connected=False
187 _ssh=False
187 _ssh=False
188 _engines=None
188 _engines=None
189 _addr='tcp://127.0.0.1:10101'
189 _addr='tcp://127.0.0.1:10101'
190 _registration_socket=None
190 _registration_socket=None
191 _query_socket=None
191 _query_socket=None
192 _control_socket=None
192 _control_socket=None
193 _notification_socket=None
193 _notification_socket=None
194 _mux_socket=None
194 _mux_socket=None
195 _task_socket=None
195 _task_socket=None
196 block = False
196 block = False
197 outstanding=None
197 outstanding=None
198 results = None
198 results = None
199 history = None
199 history = None
200 debug = False
200 debug = False
201 targets = None
201
202
202 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
203 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
203 sshserver=None, sshkey=None, password=None, paramiko=None,
204 sshserver=None, sshkey=None, password=None, paramiko=None,
204 exec_key=None,):
205 exec_key=None,):
205 if context is None:
206 if context is None:
206 context = zmq.Context()
207 context = zmq.Context()
207 self.context = context
208 self.context = context
209 self.targets = 'all'
208 self._addr = addr
210 self._addr = addr
209 self._ssh = bool(sshserver or sshkey or password)
211 self._ssh = bool(sshserver or sshkey or password)
210 if self._ssh and sshserver is None:
212 if self._ssh and sshserver is None:
211 # default to the same
213 # default to the same
212 sshserver = addr.split('://')[1].split(':')[0]
214 sshserver = addr.split('://')[1].split(':')[0]
213 if self._ssh and password is None:
215 if self._ssh and password is None:
214 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
216 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
215 password=False
217 password=False
216 else:
218 else:
217 password = getpass("SSH Password for %s: "%sshserver)
219 password = getpass("SSH Password for %s: "%sshserver)
218 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
220 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
219
221
220 if exec_key is not None and os.path.isfile(exec_key):
222 if exec_key is not None and os.path.isfile(exec_key):
221 arg = 'keyfile'
223 arg = 'keyfile'
222 else:
224 else:
223 arg = 'key'
225 arg = 'key'
224 key_arg = {arg:exec_key}
226 key_arg = {arg:exec_key}
225 if username is None:
227 if username is None:
226 self.session = ss.StreamSession(**key_arg)
228 self.session = ss.StreamSession(**key_arg)
227 else:
229 else:
228 self.session = ss.StreamSession(username, **key_arg)
230 self.session = ss.StreamSession(username, **key_arg)
229 self._registration_socket = self.context.socket(zmq.XREQ)
231 self._registration_socket = self.context.socket(zmq.XREQ)
230 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
232 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
231 if self._ssh:
233 if self._ssh:
232 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
234 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
233 else:
235 else:
234 self._registration_socket.connect(addr)
236 self._registration_socket.connect(addr)
235 self._engines = {}
237 self._engines = {}
236 self._ids = set()
238 self._ids = set()
237 self.outstanding=set()
239 self.outstanding=set()
238 self.results = {}
240 self.results = {}
239 self.history = []
241 self.history = []
240 self.debug = debug
242 self.debug = debug
241 self.session.debug = debug
243 self.session.debug = debug
242
244
243 self._notification_handlers = {'registration_notification' : self._register_engine,
245 self._notification_handlers = {'registration_notification' : self._register_engine,
244 'unregistration_notification' : self._unregister_engine,
246 'unregistration_notification' : self._unregister_engine,
245 }
247 }
246 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
248 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
247 'apply_reply' : self._handle_apply_reply}
249 'apply_reply' : self._handle_apply_reply}
248 self._connect(sshserver, ssh_kwargs)
250 self._connect(sshserver, ssh_kwargs)
249
251
250
252
251 @property
253 @property
252 def ids(self):
254 def ids(self):
253 """Always up to date ids property."""
255 """Always up to date ids property."""
254 self._flush_notifications()
256 self._flush_notifications()
255 return self._ids
257 return self._ids
256
258
257 def _update_engines(self, engines):
259 def _update_engines(self, engines):
258 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
260 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
259 for k,v in engines.iteritems():
261 for k,v in engines.iteritems():
260 eid = int(k)
262 eid = int(k)
261 self._engines[eid] = bytes(v) # force not unicode
263 self._engines[eid] = bytes(v) # force not unicode
262 self._ids.add(eid)
264 self._ids.add(eid)
263
265
264 def _build_targets(self, targets):
266 def _build_targets(self, targets):
265 """Turn valid target IDs or 'all' into two lists:
267 """Turn valid target IDs or 'all' into two lists:
266 (int_ids, uuids).
268 (int_ids, uuids).
267 """
269 """
268 if targets is None:
270 if targets is None:
269 targets = self._ids
271 targets = self._ids
270 elif isinstance(targets, str):
272 elif isinstance(targets, str):
271 if targets.lower() == 'all':
273 if targets.lower() == 'all':
272 targets = self._ids
274 targets = self._ids
273 else:
275 else:
274 raise TypeError("%r not valid str target, must be 'all'"%(targets))
276 raise TypeError("%r not valid str target, must be 'all'"%(targets))
275 elif isinstance(targets, int):
277 elif isinstance(targets, int):
276 targets = [targets]
278 targets = [targets]
277 return [self._engines[t] for t in targets], list(targets)
279 return [self._engines[t] for t in targets], list(targets)
278
280
279 def _connect(self, sshserver, ssh_kwargs):
281 def _connect(self, sshserver, ssh_kwargs):
280 """setup all our socket connections to the controller. This is called from
282 """setup all our socket connections to the controller. This is called from
281 __init__."""
283 __init__."""
282 if self._connected:
284 if self._connected:
283 return
285 return
284 self._connected=True
286 self._connected=True
285
287
286 def connect_socket(s, addr):
288 def connect_socket(s, addr):
287 if self._ssh:
289 if self._ssh:
288 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
290 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
289 else:
291 else:
290 return s.connect(addr)
292 return s.connect(addr)
291
293
292 self.session.send(self._registration_socket, 'connection_request')
294 self.session.send(self._registration_socket, 'connection_request')
293 idents,msg = self.session.recv(self._registration_socket,mode=0)
295 idents,msg = self.session.recv(self._registration_socket,mode=0)
294 if self.debug:
296 if self.debug:
295 pprint(msg)
297 pprint(msg)
296 msg = ss.Message(msg)
298 msg = ss.Message(msg)
297 content = msg.content
299 content = msg.content
298 if content.status == 'ok':
300 if content.status == 'ok':
299 if content.queue:
301 if content.queue:
300 self._mux_socket = self.context.socket(zmq.PAIR)
302 self._mux_socket = self.context.socket(zmq.PAIR)
301 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
303 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
302 connect_socket(self._mux_socket, content.queue)
304 connect_socket(self._mux_socket, content.queue)
303 if content.task:
305 if content.task:
304 self._task_socket = self.context.socket(zmq.PAIR)
306 self._task_socket = self.context.socket(zmq.PAIR)
305 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
307 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
306 connect_socket(self._task_socket, content.task)
308 connect_socket(self._task_socket, content.task)
307 if content.notification:
309 if content.notification:
308 self._notification_socket = self.context.socket(zmq.SUB)
310 self._notification_socket = self.context.socket(zmq.SUB)
309 connect_socket(self._notification_socket, content.notification)
311 connect_socket(self._notification_socket, content.notification)
310 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
312 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
311 if content.query:
313 if content.query:
312 self._query_socket = self.context.socket(zmq.PAIR)
314 self._query_socket = self.context.socket(zmq.PAIR)
313 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
315 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
314 connect_socket(self._query_socket, content.query)
316 connect_socket(self._query_socket, content.query)
315 if content.control:
317 if content.control:
316 self._control_socket = self.context.socket(zmq.PAIR)
318 self._control_socket = self.context.socket(zmq.PAIR)
317 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
319 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
318 connect_socket(self._control_socket, content.control)
320 connect_socket(self._control_socket, content.control)
319 self._update_engines(dict(content.engines))
321 self._update_engines(dict(content.engines))
320
322
321 else:
323 else:
322 self._connected = False
324 self._connected = False
323 raise Exception("Failed to connect!")
325 raise Exception("Failed to connect!")
324
326
325 #--------------------------------------------------------------------------
327 #--------------------------------------------------------------------------
326 # handlers and callbacks for incoming messages
328 # handlers and callbacks for incoming messages
327 #--------------------------------------------------------------------------
329 #--------------------------------------------------------------------------
328
330
329 def _register_engine(self, msg):
331 def _register_engine(self, msg):
330 """Register a new engine, and update our connection info."""
332 """Register a new engine, and update our connection info."""
331 content = msg['content']
333 content = msg['content']
332 eid = content['id']
334 eid = content['id']
333 d = {eid : content['queue']}
335 d = {eid : content['queue']}
334 self._update_engines(d)
336 self._update_engines(d)
335 self._ids.add(int(eid))
337 self._ids.add(int(eid))
336
338
337 def _unregister_engine(self, msg):
339 def _unregister_engine(self, msg):
338 """Unregister an engine that has died."""
340 """Unregister an engine that has died."""
339 content = msg['content']
341 content = msg['content']
340 eid = int(content['id'])
342 eid = int(content['id'])
341 if eid in self._ids:
343 if eid in self._ids:
342 self._ids.remove(eid)
344 self._ids.remove(eid)
343 self._engines.pop(eid)
345 self._engines.pop(eid)
344
346
345 def _handle_execute_reply(self, msg):
347 def _handle_execute_reply(self, msg):
346 """Save the reply to an execute_request into our results."""
348 """Save the reply to an execute_request into our results."""
347 parent = msg['parent_header']
349 parent = msg['parent_header']
348 msg_id = parent['msg_id']
350 msg_id = parent['msg_id']
349 if msg_id not in self.outstanding:
351 if msg_id not in self.outstanding:
350 print("got unknown result: %s"%msg_id)
352 print("got unknown result: %s"%msg_id)
351 else:
353 else:
352 self.outstanding.remove(msg_id)
354 self.outstanding.remove(msg_id)
353 self.results[msg_id] = ss.unwrap_exception(msg['content'])
355 self.results[msg_id] = ss.unwrap_exception(msg['content'])
354
356
355 def _handle_apply_reply(self, msg):
357 def _handle_apply_reply(self, msg):
356 """Save the reply to an apply_request into our results."""
358 """Save the reply to an apply_request into our results."""
357 parent = msg['parent_header']
359 parent = msg['parent_header']
358 msg_id = parent['msg_id']
360 msg_id = parent['msg_id']
359 if msg_id not in self.outstanding:
361 if msg_id not in self.outstanding:
360 print ("got unknown result: %s"%msg_id)
362 print ("got unknown result: %s"%msg_id)
361 else:
363 else:
362 self.outstanding.remove(msg_id)
364 self.outstanding.remove(msg_id)
363 content = msg['content']
365 content = msg['content']
364 if content['status'] == 'ok':
366 if content['status'] == 'ok':
365 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
367 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
366 elif content['status'] == 'aborted':
368 elif content['status'] == 'aborted':
367 self.results[msg_id] = error.AbortedTask(msg_id)
369 self.results[msg_id] = error.AbortedTask(msg_id)
368 elif content['status'] == 'resubmitted':
370 elif content['status'] == 'resubmitted':
369 # TODO: handle resubmission
371 # TODO: handle resubmission
370 pass
372 pass
371 else:
373 else:
372 e = ss.unwrap_exception(content)
374 e = ss.unwrap_exception(content)
373 e_uuid = e.engine_info['engineid']
375 e_uuid = e.engine_info['engineid']
374 for k,v in self._engines.iteritems():
376 for k,v in self._engines.iteritems():
375 if v == e_uuid:
377 if v == e_uuid:
376 e.engine_info['engineid'] = k
378 e.engine_info['engineid'] = k
377 break
379 break
378 self.results[msg_id] = e
380 self.results[msg_id] = e
379
381
380 def _flush_notifications(self):
382 def _flush_notifications(self):
381 """Flush notifications of engine registrations waiting
383 """Flush notifications of engine registrations waiting
382 in ZMQ queue."""
384 in ZMQ queue."""
383 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
385 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
384 while msg is not None:
386 while msg is not None:
385 if self.debug:
387 if self.debug:
386 pprint(msg)
388 pprint(msg)
387 msg = msg[-1]
389 msg = msg[-1]
388 msg_type = msg['msg_type']
390 msg_type = msg['msg_type']
389 handler = self._notification_handlers.get(msg_type, None)
391 handler = self._notification_handlers.get(msg_type, None)
390 if handler is None:
392 if handler is None:
391 raise Exception("Unhandled message type: %s"%msg.msg_type)
393 raise Exception("Unhandled message type: %s"%msg.msg_type)
392 else:
394 else:
393 handler(msg)
395 handler(msg)
394 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
396 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
395
397
396 def _flush_results(self, sock):
398 def _flush_results(self, sock):
397 """Flush task or queue results waiting in ZMQ queue."""
399 """Flush task or queue results waiting in ZMQ queue."""
398 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
400 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
399 while msg is not None:
401 while msg is not None:
400 if self.debug:
402 if self.debug:
401 pprint(msg)
403 pprint(msg)
402 msg = msg[-1]
404 msg = msg[-1]
403 msg_type = msg['msg_type']
405 msg_type = msg['msg_type']
404 handler = self._queue_handlers.get(msg_type, None)
406 handler = self._queue_handlers.get(msg_type, None)
405 if handler is None:
407 if handler is None:
406 raise Exception("Unhandled message type: %s"%msg.msg_type)
408 raise Exception("Unhandled message type: %s"%msg.msg_type)
407 else:
409 else:
408 handler(msg)
410 handler(msg)
409 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
411 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
410
412
411 def _flush_control(self, sock):
413 def _flush_control(self, sock):
412 """Flush replies from the control channel waiting
414 """Flush replies from the control channel waiting
413 in the ZMQ queue.
415 in the ZMQ queue.
414
416
415 Currently: ignore them."""
417 Currently: ignore them."""
416 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
418 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
417 while msg is not None:
419 while msg is not None:
418 if self.debug:
420 if self.debug:
419 pprint(msg)
421 pprint(msg)
420 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
422 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
421
423
422 #--------------------------------------------------------------------------
424 #--------------------------------------------------------------------------
423 # getitem
425 # getitem
424 #--------------------------------------------------------------------------
426 #--------------------------------------------------------------------------
425
427
426 def __getitem__(self, key):
428 def __getitem__(self, key):
427 """Dict access returns DirectView multiplexer objects or,
429 """Dict access returns DirectView multiplexer objects or,
428 if key is None, a LoadBalancedView."""
430 if key is None, a LoadBalancedView."""
429 if key is None:
431 if key is None:
430 return LoadBalancedView(self)
432 return LoadBalancedView(self)
431 if isinstance(key, int):
433 if isinstance(key, int):
432 if key not in self.ids:
434 if key not in self.ids:
433 raise IndexError("No such engine: %i"%key)
435 raise IndexError("No such engine: %i"%key)
434 return DirectView(self, key)
436 return DirectView(self, key)
435
437
436 if isinstance(key, slice):
438 if isinstance(key, slice):
437 indices = range(len(self.ids))[key]
439 indices = range(len(self.ids))[key]
438 ids = sorted(self._ids)
440 ids = sorted(self._ids)
439 key = [ ids[i] for i in indices ]
441 key = [ ids[i] for i in indices ]
440 # newkeys = sorted(self._ids)[thekeys[k]]
442 # newkeys = sorted(self._ids)[thekeys[k]]
441
443
442 if isinstance(key, (tuple, list, xrange)):
444 if isinstance(key, (tuple, list, xrange)):
443 _,targets = self._build_targets(list(key))
445 _,targets = self._build_targets(list(key))
444 return DirectView(self, targets)
446 return DirectView(self, targets)
445 else:
447 else:
446 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
448 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
447
449
448 #--------------------------------------------------------------------------
450 #--------------------------------------------------------------------------
449 # Begin public methods
451 # Begin public methods
450 #--------------------------------------------------------------------------
452 #--------------------------------------------------------------------------
451
453
452 @property
454 @property
453 def remote(self):
455 def remote(self):
454 """property for convenient RemoteFunction generation.
456 """property for convenient RemoteFunction generation.
455
457
456 >>> @client.remote
458 >>> @client.remote
457 ... def f():
459 ... def f():
458 import os
460 import os
459 print (os.getpid())
461 print (os.getpid())
460 """
462 """
461 return remote(self, block=self.block)
463 return remote(self, block=self.block)
462
464
463 def spin(self):
465 def spin(self):
464 """Flush any registration notifications and execution results
466 """Flush any registration notifications and execution results
465 waiting in the ZMQ queue.
467 waiting in the ZMQ queue.
466 """
468 """
467 if self._notification_socket:
469 if self._notification_socket:
468 self._flush_notifications()
470 self._flush_notifications()
469 if self._mux_socket:
471 if self._mux_socket:
470 self._flush_results(self._mux_socket)
472 self._flush_results(self._mux_socket)
471 if self._task_socket:
473 if self._task_socket:
472 self._flush_results(self._task_socket)
474 self._flush_results(self._task_socket)
473 if self._control_socket:
475 if self._control_socket:
474 self._flush_control(self._control_socket)
476 self._flush_control(self._control_socket)
475
477
476 def barrier(self, msg_ids=None, timeout=-1):
478 def barrier(self, msg_ids=None, timeout=-1):
477 """waits on one or more `msg_ids`, for up to `timeout` seconds.
479 """waits on one or more `msg_ids`, for up to `timeout` seconds.
478
480
479 Parameters
481 Parameters
480 ----------
482 ----------
481 msg_ids : int, str, or list of ints and/or strs
483 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
482 ints are indices to self.history
484 ints are indices to self.history
483 strs are msg_ids
485 strs are msg_ids
484 default: wait on all outstanding messages
486 default: wait on all outstanding messages
485 timeout : float
487 timeout : float
486 a time in seconds, after which to give up.
488 a time in seconds, after which to give up.
487 default is -1, which means no timeout
489 default is -1, which means no timeout
488
490
489 Returns
491 Returns
490 -------
492 -------
491 True : when all msg_ids are done
493 True : when all msg_ids are done
492 False : timeout reached, some msg_ids still outstanding
494 False : timeout reached, some msg_ids still outstanding
493 """
495 """
494 tic = time.time()
496 tic = time.time()
495 if msg_ids is None:
497 if msg_ids is None:
496 theids = self.outstanding
498 theids = self.outstanding
497 else:
499 else:
498 if isinstance(msg_ids, (int, str)):
500 if isinstance(msg_ids, (int, str, AsyncResult)):
499 msg_ids = [msg_ids]
501 msg_ids = [msg_ids]
500 theids = set()
502 theids = set()
501 for msg_id in msg_ids:
503 for msg_id in msg_ids:
502 if isinstance(msg_id, int):
504 if isinstance(msg_id, int):
503 msg_id = self.history[msg_id]
505 msg_id = self.history[msg_id]
506 elif isinstance(msg_id, AsyncResult):
507 map(theids.add, msg_id._msg_ids)
508 continue
504 theids.add(msg_id)
509 theids.add(msg_id)
510 if not theids.intersection(self.outstanding):
511 return True
505 self.spin()
512 self.spin()
506 while theids.intersection(self.outstanding):
513 while theids.intersection(self.outstanding):
507 if timeout >= 0 and ( time.time()-tic ) > timeout:
514 if timeout >= 0 and ( time.time()-tic ) > timeout:
508 break
515 break
509 time.sleep(1e-3)
516 time.sleep(1e-3)
510 self.spin()
517 self.spin()
511 return len(theids.intersection(self.outstanding)) == 0
518 return len(theids.intersection(self.outstanding)) == 0
512
519
513 #--------------------------------------------------------------------------
520 #--------------------------------------------------------------------------
514 # Control methods
521 # Control methods
515 #--------------------------------------------------------------------------
522 #--------------------------------------------------------------------------
516
523
517 @spinfirst
524 @spinfirst
518 @defaultblock
525 @defaultblock
519 def clear(self, targets=None, block=None):
526 def clear(self, targets=None, block=None):
520 """Clear the namespace in target(s)."""
527 """Clear the namespace in target(s)."""
521 targets = self._build_targets(targets)[0]
528 targets = self._build_targets(targets)[0]
522 for t in targets:
529 for t in targets:
523 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
530 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
524 error = False
531 error = False
525 if self.block:
532 if self.block:
526 for i in range(len(targets)):
533 for i in range(len(targets)):
527 idents,msg = self.session.recv(self._control_socket,0)
534 idents,msg = self.session.recv(self._control_socket,0)
528 if self.debug:
535 if self.debug:
529 pprint(msg)
536 pprint(msg)
530 if msg['content']['status'] != 'ok':
537 if msg['content']['status'] != 'ok':
531 error = ss.unwrap_exception(msg['content'])
538 error = ss.unwrap_exception(msg['content'])
532 if error:
539 if error:
533 return error
540 return error
534
541
535
542
536 @spinfirst
543 @spinfirst
537 @defaultblock
544 @defaultblock
538 def abort(self, msg_ids = None, targets=None, block=None):
545 def abort(self, msg_ids = None, targets=None, block=None):
539 """Abort the execution queues of target(s)."""
546 """Abort the execution queues of target(s)."""
540 targets = self._build_targets(targets)[0]
547 targets = self._build_targets(targets)[0]
541 if isinstance(msg_ids, basestring):
548 if isinstance(msg_ids, basestring):
542 msg_ids = [msg_ids]
549 msg_ids = [msg_ids]
543 content = dict(msg_ids=msg_ids)
550 content = dict(msg_ids=msg_ids)
544 for t in targets:
551 for t in targets:
545 self.session.send(self._control_socket, 'abort_request',
552 self.session.send(self._control_socket, 'abort_request',
546 content=content, ident=t)
553 content=content, ident=t)
547 error = False
554 error = False
548 if self.block:
555 if self.block:
549 for i in range(len(targets)):
556 for i in range(len(targets)):
550 idents,msg = self.session.recv(self._control_socket,0)
557 idents,msg = self.session.recv(self._control_socket,0)
551 if self.debug:
558 if self.debug:
552 pprint(msg)
559 pprint(msg)
553 if msg['content']['status'] != 'ok':
560 if msg['content']['status'] != 'ok':
554 error = ss.unwrap_exception(msg['content'])
561 error = ss.unwrap_exception(msg['content'])
555 if error:
562 if error:
556 return error
563 return error
557
564
558 @spinfirst
565 @spinfirst
559 @defaultblock
566 @defaultblock
560 def shutdown(self, targets=None, restart=False, controller=False, block=None):
567 def shutdown(self, targets=None, restart=False, controller=False, block=None):
561 """Terminates one or more engine processes, optionally including the controller."""
568 """Terminates one or more engine processes, optionally including the controller."""
562 if controller:
569 if controller:
563 targets = 'all'
570 targets = 'all'
564 targets = self._build_targets(targets)[0]
571 targets = self._build_targets(targets)[0]
565 for t in targets:
572 for t in targets:
566 self.session.send(self._control_socket, 'shutdown_request',
573 self.session.send(self._control_socket, 'shutdown_request',
567 content={'restart':restart},ident=t)
574 content={'restart':restart},ident=t)
568 error = False
575 error = False
569 if block or controller:
576 if block or controller:
570 for i in range(len(targets)):
577 for i in range(len(targets)):
571 idents,msg = self.session.recv(self._control_socket,0)
578 idents,msg = self.session.recv(self._control_socket,0)
572 if self.debug:
579 if self.debug:
573 pprint(msg)
580 pprint(msg)
574 if msg['content']['status'] != 'ok':
581 if msg['content']['status'] != 'ok':
575 error = ss.unwrap_exception(msg['content'])
582 error = ss.unwrap_exception(msg['content'])
576
583
577 if controller:
584 if controller:
578 time.sleep(0.25)
585 time.sleep(0.25)
579 self.session.send(self._query_socket, 'shutdown_request')
586 self.session.send(self._query_socket, 'shutdown_request')
580 idents,msg = self.session.recv(self._query_socket, 0)
587 idents,msg = self.session.recv(self._query_socket, 0)
581 if self.debug:
588 if self.debug:
582 pprint(msg)
589 pprint(msg)
583 if msg['content']['status'] != 'ok':
590 if msg['content']['status'] != 'ok':
584 error = ss.unwrap_exception(msg['content'])
591 error = ss.unwrap_exception(msg['content'])
585
592
586 if error:
593 if error:
587 raise error
594 raise error
588
595
589 #--------------------------------------------------------------------------
596 #--------------------------------------------------------------------------
590 # Execution methods
597 # Execution methods
591 #--------------------------------------------------------------------------
598 #--------------------------------------------------------------------------
592
599
593 @defaultblock
600 @defaultblock
594 def execute(self, code, targets='all', block=None):
601 def execute(self, code, targets='all', block=None):
595 """Executes `code` on `targets` in blocking or nonblocking manner.
602 """Executes `code` on `targets` in blocking or nonblocking manner.
596
603
597 ``execute`` is always `bound` (affects engine namespace)
604 ``execute`` is always `bound` (affects engine namespace)
598
605
599 Parameters
606 Parameters
600 ----------
607 ----------
601 code : str
608 code : str
602 the code string to be executed
609 the code string to be executed
603 targets : int/str/list of ints/strs
610 targets : int/str/list of ints/strs
604 the engines on which to execute
611 the engines on which to execute
605 default : all
612 default : all
606 block : bool
613 block : bool
607 whether or not to wait until done to return
614 whether or not to wait until done to return
608 default: self.block
615 default: self.block
609 """
616 """
610 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
617 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
611 return result
618 return result
612
619
613 def run(self, code, block=None):
620 def run(self, code, block=None):
614 """Runs `code` on an engine.
621 """Runs `code` on an engine.
615
622
616 Calls to this are load-balanced.
623 Calls to this are load-balanced.
617
624
618 ``run`` is never `bound` (no effect on engine namespace)
625 ``run`` is never `bound` (no effect on engine namespace)
619
626
620 Parameters
627 Parameters
621 ----------
628 ----------
622 code : str
629 code : str
623 the code string to be executed
630 the code string to be executed
624 block : bool
631 block : bool
625 whether or not to wait until done
632 whether or not to wait until done
626
633
627 """
634 """
628 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
635 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
629 return result
636 return result
630
637
631 def _maybe_raise(self, result):
638 def _maybe_raise(self, result):
632 """wrapper for maybe raising an exception if apply failed."""
639 """wrapper for maybe raising an exception if apply failed."""
633 if isinstance(result, error.RemoteError):
640 if isinstance(result, error.RemoteError):
634 raise result
641 raise result
635
642
636 return result
643 return result
637
644
638 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
645 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
639 after=None, follow=None):
646 after=None, follow=None):
640 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
647 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
641
648
642 This is the central execution command for the client.
649 This is the central execution command for the client.
643
650
644 Parameters
651 Parameters
645 ----------
652 ----------
646
653
647 f : function
654 f : function
648 The fuction to be called remotely
655 The fuction to be called remotely
649 args : tuple/list
656 args : tuple/list
650 The positional arguments passed to `f`
657 The positional arguments passed to `f`
651 kwargs : dict
658 kwargs : dict
652 The keyword arguments passed to `f`
659 The keyword arguments passed to `f`
653 bound : bool (default: True)
660 bound : bool (default: True)
654 Whether to execute in the Engine(s) namespace, or in a clean
661 Whether to execute in the Engine(s) namespace, or in a clean
655 namespace not affecting the engine.
662 namespace not affecting the engine.
656 block : bool (default: self.block)
663 block : bool (default: self.block)
657 Whether to wait for the result, or return immediately.
664 Whether to wait for the result, or return immediately.
658 False:
665 False:
659 returns msg_id(s)
666 returns msg_id(s)
660 if multiple targets:
667 if multiple targets:
661 list of ids
668 list of ids
662 True:
669 True:
663 returns actual result(s) of f(*args, **kwargs)
670 returns actual result(s) of f(*args, **kwargs)
664 if multiple targets:
671 if multiple targets:
665 dict of results, by engine ID
672 dict of results, by engine ID
666 targets : int,list of ints, 'all', None
673 targets : int,list of ints, 'all', None
667 Specify the destination of the job.
674 Specify the destination of the job.
668 if None:
675 if None:
669 Submit via Task queue for load-balancing.
676 Submit via Task queue for load-balancing.
670 if 'all':
677 if 'all':
671 Run on all active engines
678 Run on all active engines
672 if list:
679 if list:
673 Run on each specified engine
680 Run on each specified engine
674 if int:
681 if int:
675 Run on single engine
682 Run on single engine
676
683
677 after : Dependency or collection of msg_ids
684 after : Dependency or collection of msg_ids
678 Only for load-balanced execution (targets=None)
685 Only for load-balanced execution (targets=None)
679 Specify a list of msg_ids as a time-based dependency.
686 Specify a list of msg_ids as a time-based dependency.
680 This job will only be run *after* the dependencies
687 This job will only be run *after* the dependencies
681 have been met.
688 have been met.
682
689
683 follow : Dependency or collection of msg_ids
690 follow : Dependency or collection of msg_ids
684 Only for load-balanced execution (targets=None)
691 Only for load-balanced execution (targets=None)
685 Specify a list of msg_ids as a location-based dependency.
692 Specify a list of msg_ids as a location-based dependency.
686 This job will only be run on an engine where this dependency
693 This job will only be run on an engine where this dependency
687 is met.
694 is met.
688
695
689 Returns
696 Returns
690 -------
697 -------
691 if block is False:
698 if block is False:
692 if single target:
699 if single target:
693 return msg_id
700 return msg_id
694 else:
701 else:
695 return list of msg_ids
702 return list of msg_ids
696 ? (should this be dict like block=True) ?
703 ? (should this be dict like block=True) ?
697 else:
704 else:
698 if single target:
705 if single target:
699 return result of f(*args, **kwargs)
706 return result of f(*args, **kwargs)
700 else:
707 else:
701 return dict of results, keyed by engine
708 return dict of results, keyed by engine
702 """
709 """
703
710
704 # defaults:
711 # defaults:
705 block = block if block is not None else self.block
712 block = block if block is not None else self.block
706 args = args if args is not None else []
713 args = args if args is not None else []
707 kwargs = kwargs if kwargs is not None else {}
714 kwargs = kwargs if kwargs is not None else {}
708
715
709 # enforce types of f,args,kwrags
716 # enforce types of f,args,kwrags
710 if not callable(f):
717 if not callable(f):
711 raise TypeError("f must be callable, not %s"%type(f))
718 raise TypeError("f must be callable, not %s"%type(f))
712 if not isinstance(args, (tuple, list)):
719 if not isinstance(args, (tuple, list)):
713 raise TypeError("args must be tuple or list, not %s"%type(args))
720 raise TypeError("args must be tuple or list, not %s"%type(args))
714 if not isinstance(kwargs, dict):
721 if not isinstance(kwargs, dict):
715 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
722 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
716
723
717 options = dict(bound=bound, block=block, after=after, follow=follow)
724 options = dict(bound=bound, block=block, after=after, follow=follow)
718
725
719 if targets is None:
726 if targets is None:
720 return self._apply_balanced(f, args, kwargs, **options)
727 return self._apply_balanced(f, args, kwargs, **options)
721 else:
728 else:
722 return self._apply_direct(f, args, kwargs, targets=targets, **options)
729 return self._apply_direct(f, args, kwargs, targets=targets, **options)
723
730
724 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
731 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
725 after=None, follow=None):
732 after=None, follow=None):
726 """The underlying method for applying functions in a load balanced
733 """The underlying method for applying functions in a load balanced
727 manner, via the task queue."""
734 manner, via the task queue."""
728 if isinstance(after, Dependency):
735 if isinstance(after, Dependency):
729 after = after.as_dict()
736 after = after.as_dict()
730 elif after is None:
737 elif after is None:
731 after = []
738 after = []
732 if isinstance(follow, Dependency):
739 if isinstance(follow, Dependency):
733 follow = follow.as_dict()
740 follow = follow.as_dict()
734 elif follow is None:
741 elif follow is None:
735 follow = []
742 follow = []
736 subheader = dict(after=after, follow=follow)
743 subheader = dict(after=after, follow=follow)
737
744
738 bufs = ss.pack_apply_message(f,args,kwargs)
745 bufs = ss.pack_apply_message(f,args,kwargs)
739 content = dict(bound=bound)
746 content = dict(bound=bound)
740 msg = self.session.send(self._task_socket, "apply_request",
747 msg = self.session.send(self._task_socket, "apply_request",
741 content=content, buffers=bufs, subheader=subheader)
748 content=content, buffers=bufs, subheader=subheader)
742 msg_id = msg['msg_id']
749 msg_id = msg['msg_id']
743 self.outstanding.add(msg_id)
750 self.outstanding.add(msg_id)
744 self.history.append(msg_id)
751 self.history.append(msg_id)
745 if block:
752 if block:
746 self.barrier(msg_id)
753 self.barrier(msg_id)
747 return self._maybe_raise(self.results[msg_id])
754 return self._maybe_raise(self.results[msg_id])
748 else:
755 else:
749 return AsyncResult(self, [msg_id])
756 return AsyncResult(self, [msg_id])
750
757
751 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
758 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
752 after=None, follow=None):
759 after=None, follow=None):
753 """Then underlying method for applying functions to specific engines
760 """Then underlying method for applying functions to specific engines
754 via the MUX queue."""
761 via the MUX queue."""
755
762
756 queues,targets = self._build_targets(targets)
763 queues,targets = self._build_targets(targets)
757 bufs = ss.pack_apply_message(f,args,kwargs)
764 bufs = ss.pack_apply_message(f,args,kwargs)
758 if isinstance(after, Dependency):
765 if isinstance(after, Dependency):
759 after = after.as_dict()
766 after = after.as_dict()
760 elif after is None:
767 elif after is None:
761 after = []
768 after = []
762 if isinstance(follow, Dependency):
769 if isinstance(follow, Dependency):
763 follow = follow.as_dict()
770 follow = follow.as_dict()
764 elif follow is None:
771 elif follow is None:
765 follow = []
772 follow = []
766 subheader = dict(after=after, follow=follow)
773 subheader = dict(after=after, follow=follow)
767 content = dict(bound=bound)
774 content = dict(bound=bound)
768 msg_ids = []
775 msg_ids = []
769 for queue in queues:
776 for queue in queues:
770 msg = self.session.send(self._mux_socket, "apply_request",
777 msg = self.session.send(self._mux_socket, "apply_request",
771 content=content, buffers=bufs,ident=queue, subheader=subheader)
778 content=content, buffers=bufs,ident=queue, subheader=subheader)
772 msg_id = msg['msg_id']
779 msg_id = msg['msg_id']
773 self.outstanding.add(msg_id)
780 self.outstanding.add(msg_id)
774 self.history.append(msg_id)
781 self.history.append(msg_id)
775 msg_ids.append(msg_id)
782 msg_ids.append(msg_id)
776 if block:
783 if block:
777 self.barrier(msg_ids)
784 self.barrier(msg_ids)
778 else:
785 else:
779 return AsyncResult(self, msg_ids)
786 return AsyncResult(self, msg_ids)
780 if len(msg_ids) == 1:
787 if len(msg_ids) == 1:
781 return self._maybe_raise(self.results[msg_ids[0]])
788 return self._maybe_raise(self.results[msg_ids[0]])
782 else:
789 else:
783 result = {}
790 result = {}
784 for target,mid in zip(targets, msg_ids):
791 for target,mid in zip(targets, msg_ids):
785 result[target] = self.results[mid]
792 result[target] = self.results[mid]
786 return error.collect_exceptions(result, f.__name__)
793 return error.collect_exceptions(result, f.__name__)
787
794
788 #--------------------------------------------------------------------------
795 #--------------------------------------------------------------------------
789 # Map and decorators
796 # Map and decorators
790 #--------------------------------------------------------------------------
797 #--------------------------------------------------------------------------
791
798
792 def map(self, f, *sequences):
799 def map(self, f, *sequences):
793 """Parallel version of builtin `map`, using all our engines."""
800 """Parallel version of builtin `map`, using all our engines."""
794 pf = ParallelFunction(self, f, block=self.block,
801 pf = ParallelFunction(self, f, block=self.block,
795 bound=True, targets='all')
802 bound=True, targets='all')
796 return pf.map(*sequences)
803 return pf.map(*sequences)
797
804
798 def parallel(self, bound=True, targets='all', block=True):
805 def parallel(self, bound=True, targets='all', block=True):
799 """Decorator for making a ParallelFunction"""
806 """Decorator for making a ParallelFunction."""
800 return parallel(self, bound=bound, targets=targets, block=block)
807 return parallel(self, bound=bound, targets=targets, block=block)
801
808
802 def remote(self, bound=True, targets='all', block=True):
809 def remote(self, bound=True, targets='all', block=True):
803 """Decorator for making a RemoteFunction"""
810 """Decorator for making a RemoteFunction."""
804 return remote(self, bound=bound, targets=targets, block=block)
811 return remote(self, bound=bound, targets=targets, block=block)
805
812
806 #--------------------------------------------------------------------------
813 #--------------------------------------------------------------------------
807 # Data movement
814 # Data movement
808 #--------------------------------------------------------------------------
815 #--------------------------------------------------------------------------
809
816
810 @defaultblock
817 @defaultblock
811 def push(self, ns, targets='all', block=None):
818 def push(self, ns, targets='all', block=None):
812 """Push the contents of `ns` into the namespace on `target`"""
819 """Push the contents of `ns` into the namespace on `target`"""
813 if not isinstance(ns, dict):
820 if not isinstance(ns, dict):
814 raise TypeError("Must be a dict, not %s"%type(ns))
821 raise TypeError("Must be a dict, not %s"%type(ns))
815 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
822 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
816 return result
823 return result
817
824
818 @defaultblock
825 @defaultblock
819 def pull(self, keys, targets='all', block=True):
826 def pull(self, keys, targets='all', block=None):
820 """Pull objects from `target`'s namespace by `keys`"""
827 """Pull objects from `target`'s namespace by `keys`"""
821 if isinstance(keys, str):
828 if isinstance(keys, str):
822 pass
829 pass
823 elif isinstance(keys, (list,tuple,set)):
830 elif isinstance(keys, (list,tuple,set)):
824 for key in keys:
831 for key in keys:
825 if not isinstance(key, str):
832 if not isinstance(key, str):
826 raise TypeError
833 raise TypeError
827 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
834 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
828 return result
835 return result
829
836
830 @defaultblock
831 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
837 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
832 """
838 """
833 Partition a Python sequence and send the partitions to a set of engines.
839 Partition a Python sequence and send the partitions to a set of engines.
834 """
840 """
841 block = block if block is not None else self.block
835 targets = self._build_targets(targets)[-1]
842 targets = self._build_targets(targets)[-1]
836 mapObject = Map.dists[dist]()
843 mapObject = Map.dists[dist]()
837 nparts = len(targets)
844 nparts = len(targets)
838 msg_ids = []
845 msg_ids = []
839 for index, engineid in enumerate(targets):
846 for index, engineid in enumerate(targets):
840 partition = mapObject.getPartition(seq, index, nparts)
847 partition = mapObject.getPartition(seq, index, nparts)
841 if flatten and len(partition) == 1:
848 if flatten and len(partition) == 1:
842 mid = self.push({key: partition[0]}, targets=engineid, block=False)
849 r = self.push({key: partition[0]}, targets=engineid, block=False)
843 else:
850 else:
844 mid = self.push({key: partition}, targets=engineid, block=False)
851 r = self.push({key: partition}, targets=engineid, block=False)
845 msg_ids.append(mid)
852 msg_ids.extend(r._msg_ids)
846 r = AsyncResult(self, msg_ids)
853 r = AsyncResult(self, msg_ids)
847 if block:
854 if block:
848 r.wait()
855 return r.get()
849 return
850 else:
856 else:
851 return r
857 return r
852
858
853 @defaultblock
859 def gather(self, key, dist='b', targets='all', block=None):
854 def gather(self, key, dist='b', targets='all', block=True):
855 """
860 """
856 Gather a partitioned sequence on a set of engines as a single local seq.
861 Gather a partitioned sequence on a set of engines as a single local seq.
857 """
862 """
863 block = block if block is not None else self.block
858
864
859 targets = self._build_targets(targets)[-1]
865 targets = self._build_targets(targets)[-1]
860 mapObject = Map.dists[dist]()
866 mapObject = Map.dists[dist]()
861 msg_ids = []
867 msg_ids = []
862 for index, engineid in enumerate(targets):
868 for index, engineid in enumerate(targets):
863 msg_ids.append(self.pull(key, targets=engineid,block=False))
869 msg_ids.extend(self.pull(key, targets=engineid,block=False)._msg_ids)
864
870
865 r = AsyncMapResult(self, msg_ids, mapObject)
871 r = AsyncMapResult(self, msg_ids, mapObject)
866 if block:
872 if block:
867 r.wait()
873 return r.get()
868 return r.result
869 else:
874 else:
870 return r
875 return r
871
876
872 #--------------------------------------------------------------------------
877 #--------------------------------------------------------------------------
873 # Query methods
878 # Query methods
874 #--------------------------------------------------------------------------
879 #--------------------------------------------------------------------------
875
880
876 @spinfirst
881 @spinfirst
877 def get_results(self, msg_ids, status_only=False):
882 def get_results(self, msg_ids, status_only=False):
878 """Returns the result of the execute or task request with `msg_ids`.
883 """Returns the result of the execute or task request with `msg_ids`.
879
884
880 Parameters
885 Parameters
881 ----------
886 ----------
882 msg_ids : list of ints or msg_ids
887 msg_ids : list of ints or msg_ids
883 if int:
888 if int:
884 Passed as index to self.history for convenience.
889 Passed as index to self.history for convenience.
885 status_only : bool (default: False)
890 status_only : bool (default: False)
886 if False:
891 if False:
887 return the actual results
892 return the actual results
888 """
893 """
889 if not isinstance(msg_ids, (list,tuple)):
894 if not isinstance(msg_ids, (list,tuple)):
890 msg_ids = [msg_ids]
895 msg_ids = [msg_ids]
891 theids = []
896 theids = []
892 for msg_id in msg_ids:
897 for msg_id in msg_ids:
893 if isinstance(msg_id, int):
898 if isinstance(msg_id, int):
894 msg_id = self.history[msg_id]
899 msg_id = self.history[msg_id]
895 if not isinstance(msg_id, str):
900 if not isinstance(msg_id, str):
896 raise TypeError("msg_ids must be str, not %r"%msg_id)
901 raise TypeError("msg_ids must be str, not %r"%msg_id)
897 theids.append(msg_id)
902 theids.append(msg_id)
898
903
899 completed = []
904 completed = []
900 local_results = {}
905 local_results = {}
901 for msg_id in list(theids):
906 for msg_id in list(theids):
902 if msg_id in self.results:
907 if msg_id in self.results:
903 completed.append(msg_id)
908 completed.append(msg_id)
904 local_results[msg_id] = self.results[msg_id]
909 local_results[msg_id] = self.results[msg_id]
905 theids.remove(msg_id)
910 theids.remove(msg_id)
906
911
907 if theids: # some not locally cached
912 if theids: # some not locally cached
908 content = dict(msg_ids=theids, status_only=status_only)
913 content = dict(msg_ids=theids, status_only=status_only)
909 msg = self.session.send(self._query_socket, "result_request", content=content)
914 msg = self.session.send(self._query_socket, "result_request", content=content)
910 zmq.select([self._query_socket], [], [])
915 zmq.select([self._query_socket], [], [])
911 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
916 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
912 if self.debug:
917 if self.debug:
913 pprint(msg)
918 pprint(msg)
914 content = msg['content']
919 content = msg['content']
915 if content['status'] != 'ok':
920 if content['status'] != 'ok':
916 raise ss.unwrap_exception(content)
921 raise ss.unwrap_exception(content)
917 else:
922 else:
918 content = dict(completed=[],pending=[])
923 content = dict(completed=[],pending=[])
919 if not status_only:
924 if not status_only:
920 # load cached results into result:
925 # load cached results into result:
921 content['completed'].extend(completed)
926 content['completed'].extend(completed)
922 content.update(local_results)
927 content.update(local_results)
923 # update cache with results:
928 # update cache with results:
924 for msg_id in msg_ids:
929 for msg_id in msg_ids:
925 if msg_id in content['completed']:
930 if msg_id in content['completed']:
926 self.results[msg_id] = content[msg_id]
931 self.results[msg_id] = content[msg_id]
927 return content
932 return content
928
933
929 @spinfirst
934 @spinfirst
930 def queue_status(self, targets=None, verbose=False):
935 def queue_status(self, targets=None, verbose=False):
931 """Fetch the status of engine queues.
936 """Fetch the status of engine queues.
932
937
933 Parameters
938 Parameters
934 ----------
939 ----------
935 targets : int/str/list of ints/strs
940 targets : int/str/list of ints/strs
936 the engines on which to execute
941 the engines on which to execute
937 default : all
942 default : all
938 verbose : bool
943 verbose : bool
939 Whether to return lengths only, or lists of ids for each element
944 Whether to return lengths only, or lists of ids for each element
940 """
945 """
941 targets = self._build_targets(targets)[1]
946 targets = self._build_targets(targets)[1]
942 content = dict(targets=targets, verbose=verbose)
947 content = dict(targets=targets, verbose=verbose)
943 self.session.send(self._query_socket, "queue_request", content=content)
948 self.session.send(self._query_socket, "queue_request", content=content)
944 idents,msg = self.session.recv(self._query_socket, 0)
949 idents,msg = self.session.recv(self._query_socket, 0)
945 if self.debug:
950 if self.debug:
946 pprint(msg)
951 pprint(msg)
947 content = msg['content']
952 content = msg['content']
948 status = content.pop('status')
953 status = content.pop('status')
949 if status != 'ok':
954 if status != 'ok':
950 raise ss.unwrap_exception(content)
955 raise ss.unwrap_exception(content)
951 return content
956 return content
952
957
953 @spinfirst
958 @spinfirst
954 def purge_results(self, msg_ids=[], targets=[]):
959 def purge_results(self, msg_ids=[], targets=[]):
955 """Tell the controller to forget results.
960 """Tell the controller to forget results.
956
961
957 Individual results can be purged by msg_id, or the entire
962 Individual results can be purged by msg_id, or the entire
958 history of specific targets can be purged.
963 history of specific targets can be purged.
959
964
960 Parameters
965 Parameters
961 ----------
966 ----------
962 msg_ids : str or list of strs
967 msg_ids : str or list of strs
963 the msg_ids whose results should be forgotten.
968 the msg_ids whose results should be forgotten.
964 targets : int/str/list of ints/strs
969 targets : int/str/list of ints/strs
965 The targets, by uuid or int_id, whose entire history is to be purged.
970 The targets, by uuid or int_id, whose entire history is to be purged.
966 Use `targets='all'` to scrub everything from the controller's memory.
971 Use `targets='all'` to scrub everything from the controller's memory.
967
972
968 default : None
973 default : None
969 """
974 """
970 if not targets and not msg_ids:
975 if not targets and not msg_ids:
971 raise ValueError
976 raise ValueError
972 if targets:
977 if targets:
973 targets = self._build_targets(targets)[1]
978 targets = self._build_targets(targets)[1]
974 content = dict(targets=targets, msg_ids=msg_ids)
979 content = dict(targets=targets, msg_ids=msg_ids)
975 self.session.send(self._query_socket, "purge_request", content=content)
980 self.session.send(self._query_socket, "purge_request", content=content)
976 idents, msg = self.session.recv(self._query_socket, 0)
981 idents, msg = self.session.recv(self._query_socket, 0)
977 if self.debug:
982 if self.debug:
978 pprint(msg)
983 pprint(msg)
979 content = msg['content']
984 content = msg['content']
980 if content['status'] != 'ok':
985 if content['status'] != 'ok':
981 raise ss.unwrap_exception(content)
986 raise ss.unwrap_exception(content)
982
987
988 #----------------------------------------
989 # activate for %px,%autopx magics
990 #----------------------------------------
991 def activate(self):
992 """Make this `View` active for parallel magic commands.
993
994 IPython has a magic command syntax to work with `MultiEngineClient` objects.
995 In a given IPython session there is a single active one. While
996 there can be many `Views` created and used by the user,
997 there is only one active one. The active `View` is used whenever
998 the magic commands %px and %autopx are used.
999
1000 The activate() method is called on a given `View` to make it
1001 active. Once this has been done, the magic commands can be used.
1002 """
1003
1004 try:
1005 # This is injected into __builtins__.
1006 ip = get_ipython()
1007 except NameError:
1008 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1009 else:
1010 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1011 if pmagic is not None:
1012 pmagic.active_multiengine_client = self
1013 else:
1014 print "You must first load the parallelmagic extension " \
1015 "by doing '%load_ext parallelmagic'"
1016
983 class AsynClient(Client):
1017 class AsynClient(Client):
984 """An Asynchronous client, using the Tornado Event Loop.
1018 """An Asynchronous client, using the Tornado Event Loop.
985 !!!unfinished!!!"""
1019 !!!unfinished!!!"""
986 io_loop = None
1020 io_loop = None
987 _queue_stream = None
1021 _queue_stream = None
988 _notifier_stream = None
1022 _notifier_stream = None
989 _task_stream = None
1023 _task_stream = None
990 _control_stream = None
1024 _control_stream = None
991
1025
992 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1026 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
993 Client.__init__(self, addr, context, username, debug)
1027 Client.__init__(self, addr, context, username, debug)
994 if io_loop is None:
1028 if io_loop is None:
995 io_loop = ioloop.IOLoop.instance()
1029 io_loop = ioloop.IOLoop.instance()
996 self.io_loop = io_loop
1030 self.io_loop = io_loop
997
1031
998 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1032 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
999 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1033 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1000 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1034 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1001 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1035 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1002
1036
1003 def spin(self):
1037 def spin(self):
1004 for stream in (self.queue_stream, self.notifier_stream,
1038 for stream in (self.queue_stream, self.notifier_stream,
1005 self.task_stream, self.control_stream):
1039 self.task_stream, self.control_stream):
1006 stream.flush()
1040 stream.flush()
1007
1041
1008 __all__ = [ 'Client',
1042 __all__ = [ 'Client',
1009 'depend',
1043 'depend',
1010 'require',
1044 'require',
1011 'remote',
1045 'remote',
1012 'parallel',
1046 'parallel',
1013 'RemoteFunction',
1047 'RemoteFunction',
1014 'ParallelFunction',
1048 'ParallelFunction',
1015 'DirectView',
1049 'DirectView',
1016 'LoadBalancedView',
1050 'LoadBalancedView',
1017 'AsyncResult',
1051 'AsyncResult',
1018 'AsyncMapResult'
1052 'AsyncMapResult'
1019 ]
1053 ]
@@ -1,298 +1,347 b''
1 """Views of remote engines"""
1 """Views of remote engines"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.external.decorator import decorator
13 from IPython.external.decorator import decorator
14 from IPython.zmq.parallel.remotefunction import ParallelFunction
14 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Decorators
17 # Decorators
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 @decorator
20 @decorator
21 def myblock(f, self, *args, **kwargs):
21 def myblock(f, self, *args, **kwargs):
22 """override client.block with self.block during a call"""
22 """override client.block with self.block during a call"""
23 block = self.client.block
23 block = self.client.block
24 self.client.block = self.block
24 self.client.block = self.block
25 ret = f(self, *args, **kwargs)
25 try:
26 self.client.block = block
26 ret = f(self, *args, **kwargs)
27 finally:
28 self.client.block = block
27 return ret
29 return ret
28
30
29 @decorator
31 @decorator
30 def save_ids(f, self, *args, **kwargs):
32 def save_ids(f, self, *args, **kwargs):
31 """Keep our history and outstanding attributes up to date after a method call."""
33 """Keep our history and outstanding attributes up to date after a method call."""
32 n_previous = len(self.client.history)
34 n_previous = len(self.client.history)
33 ret = f(self, *args, **kwargs)
35 ret = f(self, *args, **kwargs)
34 nmsgs = len(self.client.history) - n_previous
36 nmsgs = len(self.client.history) - n_previous
35 msg_ids = self.client.history[-nmsgs:]
37 msg_ids = self.client.history[-nmsgs:]
36 self.history.extend(msg_ids)
38 self.history.extend(msg_ids)
37 map(self.outstanding.add, msg_ids)
39 map(self.outstanding.add, msg_ids)
38 return ret
40 return ret
39
41
40 @decorator
42 @decorator
41 def sync_results(f, self, *args, **kwargs):
43 def sync_results(f, self, *args, **kwargs):
42 """sync relevant results from self.client to our results attribute."""
44 """sync relevant results from self.client to our results attribute."""
43 ret = f(self, *args, **kwargs)
45 ret = f(self, *args, **kwargs)
44 delta = self.outstanding.difference(self.client.outstanding)
46 delta = self.outstanding.difference(self.client.outstanding)
45 completed = self.outstanding.intersection(delta)
47 completed = self.outstanding.intersection(delta)
46 self.outstanding = self.outstanding.difference(completed)
48 self.outstanding = self.outstanding.difference(completed)
47 for msg_id in completed:
49 for msg_id in completed:
48 self.results[msg_id] = self.client.results[msg_id]
50 self.results[msg_id] = self.client.results[msg_id]
49 return ret
51 return ret
50
52
51 @decorator
53 @decorator
52 def spin_after(f, self, *args, **kwargs):
54 def spin_after(f, self, *args, **kwargs):
53 """call spin after the method."""
55 """call spin after the method."""
54 ret = f(self, *args, **kwargs)
56 ret = f(self, *args, **kwargs)
55 self.spin()
57 self.spin()
56 return ret
58 return ret
57
59
58 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
59 # Classes
61 # Classes
60 #-----------------------------------------------------------------------------
62 #-----------------------------------------------------------------------------
61
63
62 class View(object):
64 class View(object):
63 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
65 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
64
66
65 Don't use this class, use subclasses.
67 Don't use this class, use subclasses.
66 """
68 """
67 _targets = None
69 _targets = None
68 _ntargets = None
69 block=None
70 block=None
70 bound=None
71 bound=None
71 history=None
72 history=None
72
73
73 def __init__(self, client, targets=None):
74 def __init__(self, client, targets=None):
74 self.client = client
75 self.client = client
75 self._targets = targets
76 self._targets = targets
76 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
77 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
77 self.block = client.block
78 self.block = client.block
78 self.bound=True
79 self.bound=False
79 self.history = []
80 self.history = []
80 self.outstanding = set()
81 self.outstanding = set()
81 self.results = {}
82 self.results = {}
82
83
83 def __repr__(self):
84 def __repr__(self):
84 strtargets = str(self._targets)
85 strtargets = str(self._targets)
85 if len(strtargets) > 16:
86 if len(strtargets) > 16:
86 strtargets = strtargets[:12]+'...]'
87 strtargets = strtargets[:12]+'...]'
87 return "<%s %s>"%(self.__class__.__name__, strtargets)
88 return "<%s %s>"%(self.__class__.__name__, strtargets)
88
89
89 @property
90 @property
90 def targets(self):
91 def targets(self):
91 return self._targets
92 return self._targets
92
93
93 @targets.setter
94 @targets.setter
94 def targets(self, value):
95 def targets(self, value):
95 raise AttributeError("Cannot set my targets argument after construction!")
96 self._targets = value
97 # raise AttributeError("Cannot set my targets argument after construction!")
96
98
97 @sync_results
99 @sync_results
98 def spin(self):
100 def spin(self):
99 """spin the client, and sync"""
101 """spin the client, and sync"""
100 self.client.spin()
102 self.client.spin()
101
103
102 @sync_results
104 @sync_results
103 @save_ids
105 @save_ids
104 def apply(self, f, *args, **kwargs):
106 def apply(self, f, *args, **kwargs):
105 """calls f(*args, **kwargs) on remote engines, returning the result.
107 """calls f(*args, **kwargs) on remote engines, returning the result.
106
108
107 This method does not involve the engine's namespace.
109 This method does not involve the engine's namespace.
108
110
109 if self.block is False:
111 if self.block is False:
110 returns msg_id
112 returns msg_id
111 else:
113 else:
112 returns actual result of f(*args, **kwargs)
114 returns actual result of f(*args, **kwargs)
113 """
115 """
114 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
116 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
115
117
116 @save_ids
118 @save_ids
117 def apply_async(self, f, *args, **kwargs):
119 def apply_async(self, f, *args, **kwargs):
118 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
120 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
119
121
120 This method does not involve the engine's namespace.
122 This method does not involve the engine's namespace.
121
123
122 returns msg_id
124 returns msg_id
123 """
125 """
124 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
126 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
125
127
126 @spin_after
128 @spin_after
127 @save_ids
129 @save_ids
128 def apply_sync(self, f, *args, **kwargs):
130 def apply_sync(self, f, *args, **kwargs):
129 """calls f(*args, **kwargs) on remote engines in a blocking manner,
131 """calls f(*args, **kwargs) on remote engines in a blocking manner,
130 returning the result.
132 returning the result.
131
133
132 This method does not involve the engine's namespace.
134 This method does not involve the engine's namespace.
133
135
134 returns: actual result of f(*args, **kwargs)
136 returns: actual result of f(*args, **kwargs)
135 """
137 """
136 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
138 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
137
139
138 @sync_results
140 @sync_results
139 @save_ids
141 @save_ids
140 def apply_bound(self, f, *args, **kwargs):
142 def apply_bound(self, f, *args, **kwargs):
141 """calls f(*args, **kwargs) bound to engine namespace(s).
143 """calls f(*args, **kwargs) bound to engine namespace(s).
142
144
143 if self.block is False:
145 if self.block is False:
144 returns msg_id
146 returns msg_id
145 else:
147 else:
146 returns actual result of f(*args, **kwargs)
148 returns actual result of f(*args, **kwargs)
147
149
148 This method has access to the targets' globals
150 This method has access to the targets' globals
149
151
150 """
152 """
151 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
153 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
152
154
153 @sync_results
155 @sync_results
154 @save_ids
156 @save_ids
155 def apply_async_bound(self, f, *args, **kwargs):
157 def apply_async_bound(self, f, *args, **kwargs):
156 """calls f(*args, **kwargs) bound to engine namespace(s)
158 """calls f(*args, **kwargs) bound to engine namespace(s)
157 in a nonblocking manner.
159 in a nonblocking manner.
158
160
159 returns: msg_id
161 returns: msg_id
160
162
161 This method has access to the targets' globals
163 This method has access to the targets' globals
162
164
163 """
165 """
164 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
166 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
165
167
166 @spin_after
168 @spin_after
167 @save_ids
169 @save_ids
168 def apply_sync_bound(self, f, *args, **kwargs):
170 def apply_sync_bound(self, f, *args, **kwargs):
169 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
171 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
170
172
171 returns: actual result of f(*args, **kwargs)
173 returns: actual result of f(*args, **kwargs)
172
174
173 This method has access to the targets' globals
175 This method has access to the targets' globals
174
176
175 """
177 """
176 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
178 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
177
179
178 @spin_after
180 @spin_after
179 @save_ids
181 @save_ids
180 def map(self, f, *sequences):
182 def map(self, f, *sequences):
181 """Parallel version of builtin `map`, using this view's engines."""
183 """Parallel version of builtin `map`, using this view's engines."""
182 if isinstance(self.targets, int):
184 if isinstance(self.targets, int):
183 targets = [self.targets]
185 targets = [self.targets]
184 pf = ParallelFunction(self.client, f, block=self.block,
186 pf = ParallelFunction(self.client, f, block=self.block,
185 bound=True, targets=targets)
187 bound=True, targets=targets)
186 return pf.map(*sequences)
188 return pf.map(*sequences)
187
189
190 def parallel(self, bound=True, block=True):
191 """Decorator for making a ParallelFunction"""
192 return parallel(self.client, bound=bound, targets=self.targets, block=block)
193
188 def abort(self, msg_ids=None, block=None):
194 def abort(self, msg_ids=None, block=None):
189 """Abort jobs on my engines.
195 """Abort jobs on my engines.
190
196
191 Parameters
197 Parameters
192 ----------
198 ----------
193
199
194 msg_ids : None, str, list of strs, optional
200 msg_ids : None, str, list of strs, optional
195 if None: abort all jobs.
201 if None: abort all jobs.
196 else: abort specific msg_id(s).
202 else: abort specific msg_id(s).
197 """
203 """
198 block = block if block is not None else self.block
204 block = block if block is not None else self.block
199 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
205 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
200
206
201 def queue_status(self, verbose=False):
207 def queue_status(self, verbose=False):
202 """Fetch the Queue status of my engines"""
208 """Fetch the Queue status of my engines"""
203 return self.client.queue_status(targets=self.targets, verbose=verbose)
209 return self.client.queue_status(targets=self.targets, verbose=verbose)
204
210
205 def purge_results(self, msg_ids=[],targets=[]):
211 def purge_results(self, msg_ids=[], targets=[]):
206 """Instruct the controller to forget specific results."""
212 """Instruct the controller to forget specific results."""
207 if targets is None or targets == 'all':
213 if targets is None or targets == 'all':
208 targets = self.targets
214 targets = self.targets
209 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
215 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
216
210
217
211
218
212 class DirectView(View):
219 class DirectView(View):
213 """Direct Multiplexer View of one or more engines.
220 """Direct Multiplexer View of one or more engines.
214
221
215 These are created via indexed access to a client:
222 These are created via indexed access to a client:
216
223
217 >>> dv_1 = client[1]
224 >>> dv_1 = client[1]
218 >>> dv_all = client[:]
225 >>> dv_all = client[:]
219 >>> dv_even = client[::2]
226 >>> dv_even = client[::2]
220 >>> dv_some = client[1:3]
227 >>> dv_some = client[1:3]
221
228
229 This object provides dictionary access
230
222 """
231 """
223
232
233 @sync_results
234 @save_ids
235 def execute(self, code, block=True):
236 """execute some code on my targets."""
237 return self.client.execute(code, block=self.block, targets=self.targets)
238
224 def update(self, ns):
239 def update(self, ns):
225 """update remote namespace with dict `ns`"""
240 """update remote namespace with dict `ns`"""
226 return self.client.push(ns, targets=self.targets, block=self.block)
241 return self.client.push(ns, targets=self.targets, block=self.block)
227
242
228 push = update
243 push = update
229
244
230 def get(self, key_s):
245 def get(self, key_s):
231 """get object(s) by `key_s` from remote namespace
246 """get object(s) by `key_s` from remote namespace
232 will return one object if it is a key.
247 will return one object if it is a key.
233 It also takes a list of keys, and will return a list of objects."""
248 It also takes a list of keys, and will return a list of objects."""
234 # block = block if block is not None else self.block
249 # block = block if block is not None else self.block
235 return self.client.pull(key_s, block=True, targets=self.targets)
250 return self.client.pull(key_s, block=True, targets=self.targets)
236
251
252 @sync_results
253 @save_ids
237 def pull(self, key_s, block=True):
254 def pull(self, key_s, block=True):
238 """get object(s) by `key_s` from remote namespace
255 """get object(s) by `key_s` from remote namespace
239 will return one object if it is a key.
256 will return one object if it is a key.
240 It also takes a list of keys, and will return a list of objects."""
257 It also takes a list of keys, and will return a list of objects."""
241 block = block if block is not None else self.block
258 block = block if block is not None else self.block
242 return self.client.pull(key_s, block=block, targets=self.targets)
259 return self.client.pull(key_s, block=block, targets=self.targets)
243
260
244 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
261 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
245 """
262 """
246 Partition a Python sequence and send the partitions to a set of engines.
263 Partition a Python sequence and send the partitions to a set of engines.
247 """
264 """
248 block = block if block is not None else self.block
265 block = block if block is not None else self.block
249 if targets is None:
266 if targets is None:
250 targets = self.targets
267 targets = self.targets
251
268
252 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
269 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
253 targets=targets, block=block)
270 targets=targets, block=block)
254
271
272 @sync_results
273 @save_ids
255 def gather(self, key, dist='b', targets=None, block=True):
274 def gather(self, key, dist='b', targets=None, block=True):
256 """
275 """
257 Gather a partitioned sequence on a set of engines as a single local seq.
276 Gather a partitioned sequence on a set of engines as a single local seq.
258 """
277 """
259 block = block if block is not None else self.block
278 block = block if block is not None else self.block
260 if targets is None:
279 if targets is None:
261 targets = self.targets
280 targets = self.targets
262
281
263 return self.client.gather(key, dist=dist, targets=targets, block=block)
282 return self.client.gather(key, dist=dist, targets=targets, block=block)
264
283
265 def __getitem__(self, key):
284 def __getitem__(self, key):
266 return self.get(key)
285 return self.get(key)
267
286
268 def __setitem__(self,key, value):
287 def __setitem__(self,key, value):
269 self.update({key:value})
288 self.update({key:value})
270
289
271 def clear(self, block=False):
290 def clear(self, block=False):
272 """Clear the remote namespaces on my engines."""
291 """Clear the remote namespaces on my engines."""
273 block = block if block is not None else self.block
292 block = block if block is not None else self.block
274 return self.client.clear(targets=self.targets, block=block)
293 return self.client.clear(targets=self.targets, block=block)
275
294
276 def kill(self, block=True):
295 def kill(self, block=True):
277 """Kill my engines."""
296 """Kill my engines."""
278 block = block if block is not None else self.block
297 block = block if block is not None else self.block
279 return self.client.kill(targets=self.targets, block=block)
298 return self.client.kill(targets=self.targets, block=block)
280
299
300 #----------------------------------------
301 # activate for %px,%autopx magics
302 #----------------------------------------
303 def activate(self):
304 """Make this `View` active for parallel magic commands.
305
306 IPython has a magic command syntax to work with `MultiEngineClient` objects.
307 In a given IPython session there is a single active one. While
308 there can be many `Views` created and used by the user,
309 there is only one active one. The active `View` is used whenever
310 the magic commands %px and %autopx are used.
311
312 The activate() method is called on a given `View` to make it
313 active. Once this has been done, the magic commands can be used.
314 """
315
316 try:
317 # This is injected into __builtins__.
318 ip = get_ipython()
319 except NameError:
320 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
321 else:
322 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
323 if pmagic is not None:
324 pmagic.active_multiengine_client = self
325 else:
326 print "You must first load the parallelmagic extension " \
327 "by doing '%load_ext parallelmagic'"
328
329
281 class LoadBalancedView(View):
330 class LoadBalancedView(View):
282 """An engine-agnostic View that only executes via the Task queue.
331 """An engine-agnostic View that only executes via the Task queue.
283
332
284 Typically created via:
333 Typically created via:
285
334
286 >>> lbv = client[None]
335 >>> lbv = client[None]
287 <LoadBalancedView tcp://127.0.0.1:12345>
336 <LoadBalancedView tcp://127.0.0.1:12345>
288
337
289 but can also be created with:
338 but can also be created with:
290
339
291 >>> lbc = LoadBalancedView(client)
340 >>> lbc = LoadBalancedView(client)
292
341
293 TODO: allow subset of engines across which to balance.
342 TODO: allow subset of engines across which to balance.
294 """
343 """
295 def __repr__(self):
344 def __repr__(self):
296 return "<%s %s>"%(self.__class__.__name__, self.client._addr)
345 return "<%s %s>"%(self.__class__.__name__, self.client._addr)
297
346
298 No newline at end of file
347
General Comments 0
You need to be logged in to leave comments. Login now