##// END OF EJS Templates
Fix small bug when no key given to client
Fernando Perez -
Show More
@@ -1,924 +1,924 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from __future__ import print_function
13 from __future__ import print_function
14
14
15 import os
15 import os
16 import time
16 import time
17 from pprint import pprint
17 from pprint import pprint
18
18
19 import zmq
19 import zmq
20 from zmq.eventloop import ioloop, zmqstream
20 from zmq.eventloop import ioloop, zmqstream
21
21
22 from IPython.external.decorator import decorator
22 from IPython.external.decorator import decorator
23 from IPython.zmq import tunnel
23 from IPython.zmq import tunnel
24
24
25 import streamsession as ss
25 import streamsession as ss
26 # from remotenamespace import RemoteNamespace
26 # from remotenamespace import RemoteNamespace
27 from view import DirectView, LoadBalancedView
27 from view import DirectView, LoadBalancedView
28 from dependency import Dependency, depend, require
28 from dependency import Dependency, depend, require
29
29
30 def _push(ns):
30 def _push(ns):
31 globals().update(ns)
31 globals().update(ns)
32
32
33 def _pull(keys):
33 def _pull(keys):
34 g = globals()
34 g = globals()
35 if isinstance(keys, (list,tuple, set)):
35 if isinstance(keys, (list,tuple, set)):
36 for key in keys:
36 for key in keys:
37 if not g.has_key(key):
37 if not g.has_key(key):
38 raise NameError("name '%s' is not defined"%key)
38 raise NameError("name '%s' is not defined"%key)
39 return map(g.get, keys)
39 return map(g.get, keys)
40 else:
40 else:
41 if not g.has_key(keys):
41 if not g.has_key(keys):
42 raise NameError("name '%s' is not defined"%keys)
42 raise NameError("name '%s' is not defined"%keys)
43 return g.get(keys)
43 return g.get(keys)
44
44
45 def _clear():
45 def _clear():
46 globals().clear()
46 globals().clear()
47
47
48 def execute(code):
48 def execute(code):
49 exec code in globals()
49 exec code in globals()
50
50
51 #--------------------------------------------------------------------------
51 #--------------------------------------------------------------------------
52 # Decorators for Client methods
52 # Decorators for Client methods
53 #--------------------------------------------------------------------------
53 #--------------------------------------------------------------------------
54
54
55 @decorator
55 @decorator
56 def spinfirst(f, self, *args, **kwargs):
56 def spinfirst(f, self, *args, **kwargs):
57 """Call spin() to sync state prior to calling the method."""
57 """Call spin() to sync state prior to calling the method."""
58 self.spin()
58 self.spin()
59 return f(self, *args, **kwargs)
59 return f(self, *args, **kwargs)
60
60
61 @decorator
61 @decorator
62 def defaultblock(f, self, *args, **kwargs):
62 def defaultblock(f, self, *args, **kwargs):
63 """Default to self.block; preserve self.block."""
63 """Default to self.block; preserve self.block."""
64 block = kwargs.get('block',None)
64 block = kwargs.get('block',None)
65 block = self.block if block is None else block
65 block = self.block if block is None else block
66 saveblock = self.block
66 saveblock = self.block
67 self.block = block
67 self.block = block
68 ret = f(self, *args, **kwargs)
68 ret = f(self, *args, **kwargs)
69 self.block = saveblock
69 self.block = saveblock
70 return ret
70 return ret
71
71
72 def remote(client, bound=False, block=None, targets=None):
72 def remote(client, bound=False, block=None, targets=None):
73 """Turn a function into a remote function.
73 """Turn a function into a remote function.
74
74
75 This method can be used for map:
75 This method can be used for map:
76
76
77 >>> @remote(client,block=True)
77 >>> @remote(client,block=True)
78 def func(a)
78 def func(a)
79 """
79 """
80 def remote_function(f):
80 def remote_function(f):
81 return RemoteFunction(client, f, bound, block, targets)
81 return RemoteFunction(client, f, bound, block, targets)
82 return remote_function
82 return remote_function
83
83
84 #--------------------------------------------------------------------------
84 #--------------------------------------------------------------------------
85 # Classes
85 # Classes
86 #--------------------------------------------------------------------------
86 #--------------------------------------------------------------------------
87
87
88 class RemoteFunction(object):
88 class RemoteFunction(object):
89 """Turn an existing function into a remote function"""
89 """Turn an existing function into a remote function"""
90
90
91 def __init__(self, client, f, bound=False, block=None, targets=None):
91 def __init__(self, client, f, bound=False, block=None, targets=None):
92 self.client = client
92 self.client = client
93 self.func = f
93 self.func = f
94 self.block=block
94 self.block=block
95 self.bound=bound
95 self.bound=bound
96 self.targets=targets
96 self.targets=targets
97
97
98 def __call__(self, *args, **kwargs):
98 def __call__(self, *args, **kwargs):
99 return self.client.apply(self.func, args=args, kwargs=kwargs,
99 return self.client.apply(self.func, args=args, kwargs=kwargs,
100 block=self.block, targets=self.targets, bound=self.bound)
100 block=self.block, targets=self.targets, bound=self.bound)
101
101
102
102
103 class AbortedTask(object):
103 class AbortedTask(object):
104 """A basic wrapper object describing an aborted task."""
104 """A basic wrapper object describing an aborted task."""
105 def __init__(self, msg_id):
105 def __init__(self, msg_id):
106 self.msg_id = msg_id
106 self.msg_id = msg_id
107
107
108 class ControllerError(Exception):
108 class ControllerError(Exception):
109 def __init__(self, etype, evalue, tb):
109 def __init__(self, etype, evalue, tb):
110 self.etype = etype
110 self.etype = etype
111 self.evalue = evalue
111 self.evalue = evalue
112 self.traceback=tb
112 self.traceback=tb
113
113
114 class Client(object):
114 class Client(object):
115 """A semi-synchronous client to the IPython ZMQ controller
115 """A semi-synchronous client to the IPython ZMQ controller
116
116
117 Parameters
117 Parameters
118 ----------
118 ----------
119
119
120 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
120 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
121 The address of the controller's registration socket.
121 The address of the controller's registration socket.
122 [Default: 'tcp://127.0.0.1:10101']
122 [Default: 'tcp://127.0.0.1:10101']
123 context : zmq.Context
123 context : zmq.Context
124 Pass an existing zmq.Context instance, otherwise the client will create its own
124 Pass an existing zmq.Context instance, otherwise the client will create its own
125 username : bytes
125 username : bytes
126 set username to be passed to the Session object
126 set username to be passed to the Session object
127 debug : bool
127 debug : bool
128 flag for lots of message printing for debug purposes
128 flag for lots of message printing for debug purposes
129
129
130 #-------------- ssh related args ----------------
130 #-------------- ssh related args ----------------
131 # These are args for configuring the ssh tunnel to be used
131 # These are args for configuring the ssh tunnel to be used
132 # credentials are used to forward connections over ssh to the Controller
132 # credentials are used to forward connections over ssh to the Controller
133 # Note that the ip given in `addr` needs to be relative to sshserver
133 # Note that the ip given in `addr` needs to be relative to sshserver
134 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
134 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
135 # and set sshserver as the same machine the Controller is on. However,
135 # and set sshserver as the same machine the Controller is on. However,
136 # the only requirement is that sshserver is able to see the Controller
136 # the only requirement is that sshserver is able to see the Controller
137 # (i.e. is within the same trusted network).
137 # (i.e. is within the same trusted network).
138
138
139 sshserver : str
139 sshserver : str
140 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
140 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
141 If keyfile or password is specified, and this is not, it will default to
141 If keyfile or password is specified, and this is not, it will default to
142 the ip given in addr.
142 the ip given in addr.
143 sshkey : str; path to public ssh key file
143 sshkey : str; path to public ssh key file
144 This specifies a key to be used in ssh login, default None.
144 This specifies a key to be used in ssh login, default None.
145 Regular default ssh keys will be used without specifying this argument.
145 Regular default ssh keys will be used without specifying this argument.
146 password : str;
146 password : str;
147 Your ssh password to sshserver. Note that if this is left None,
147 Your ssh password to sshserver. Note that if this is left None,
148 you will be prompted for it if passwordless key based login is unavailable.
148 you will be prompted for it if passwordless key based login is unavailable.
149
149
150 #------- exec authentication args -------
150 #------- exec authentication args -------
151 # If even localhost is untrusted, you can have some protection against
151 # If even localhost is untrusted, you can have some protection against
152 # unauthorized execution by using a key. Messages are still sent
152 # unauthorized execution by using a key. Messages are still sent
153 # as cleartext, so if someone can snoop your loopback traffic this will
153 # as cleartext, so if someone can snoop your loopback traffic this will
154 # not help anything.
154 # not help anything.
155
155
156 exec_key : str
156 exec_key : str
157 an authentication key or file containing a key
157 an authentication key or file containing a key
158 default: None
158 default: None
159
159
160
160
161 Attributes
161 Attributes
162 ----------
162 ----------
163 ids : set of int engine IDs
163 ids : set of int engine IDs
164 requesting the ids attribute always synchronizes
164 requesting the ids attribute always synchronizes
165 the registration state. To request ids without synchronization,
165 the registration state. To request ids without synchronization,
166 use semi-private _ids attributes.
166 use semi-private _ids attributes.
167
167
168 history : list of msg_ids
168 history : list of msg_ids
169 a list of msg_ids, keeping track of all the execution
169 a list of msg_ids, keeping track of all the execution
170 messages you have submitted in order.
170 messages you have submitted in order.
171
171
172 outstanding : set of msg_ids
172 outstanding : set of msg_ids
173 a set of msg_ids that have been submitted, but whose
173 a set of msg_ids that have been submitted, but whose
174 results have not yet been received.
174 results have not yet been received.
175
175
176 results : dict
176 results : dict
177 a dict of all our results, keyed by msg_id
177 a dict of all our results, keyed by msg_id
178
178
179 block : bool
179 block : bool
180 determines default behavior when block not specified
180 determines default behavior when block not specified
181 in execution methods
181 in execution methods
182
182
183 Methods
183 Methods
184 -------
184 -------
185 spin : flushes incoming results and registration state changes
185 spin : flushes incoming results and registration state changes
186 control methods spin, and requesting `ids` also ensures up to date
186 control methods spin, and requesting `ids` also ensures up to date
187
187
188 barrier : wait on one or more msg_ids
188 barrier : wait on one or more msg_ids
189
189
190 execution methods: apply/apply_bound/apply_to/apply_bound
190 execution methods: apply/apply_bound/apply_to/apply_bound
191 legacy: execute, run
191 legacy: execute, run
192
192
193 query methods: queue_status, get_result, purge
193 query methods: queue_status, get_result, purge
194
194
195 control methods: abort, kill
195 control methods: abort, kill
196
196
197 """
197 """
198
198
199
199
200 _connected=False
200 _connected=False
201 _ssh=False
201 _ssh=False
202 _engines=None
202 _engines=None
203 _addr='tcp://127.0.0.1:10101'
203 _addr='tcp://127.0.0.1:10101'
204 _registration_socket=None
204 _registration_socket=None
205 _query_socket=None
205 _query_socket=None
206 _control_socket=None
206 _control_socket=None
207 _notification_socket=None
207 _notification_socket=None
208 _mux_socket=None
208 _mux_socket=None
209 _task_socket=None
209 _task_socket=None
210 block = False
210 block = False
211 outstanding=None
211 outstanding=None
212 results = None
212 results = None
213 history = None
213 history = None
214 debug = False
214 debug = False
215
215
216 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
216 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
217 sshserver=None, sshkey=None, password=None, paramiko=None,
217 sshserver=None, sshkey=None, password=None, paramiko=None,
218 exec_key=None,):
218 exec_key=None,):
219 if context is None:
219 if context is None:
220 context = zmq.Context()
220 context = zmq.Context()
221 self.context = context
221 self.context = context
222 self._addr = addr
222 self._addr = addr
223 self._ssh = bool(sshserver or sshkey or password)
223 self._ssh = bool(sshserver or sshkey or password)
224 if self._ssh and sshserver is None:
224 if self._ssh and sshserver is None:
225 # default to the same
225 # default to the same
226 sshserver = addr.split('://')[1].split(':')[0]
226 sshserver = addr.split('://')[1].split(':')[0]
227 if self._ssh and password is None:
227 if self._ssh and password is None:
228 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
228 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
229 password=False
229 password=False
230 else:
230 else:
231 password = getpass("SSH Password for %s: "%sshserver)
231 password = getpass("SSH Password for %s: "%sshserver)
232 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
232 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
233
233
234 if os.path.isfile(exec_key):
234 if exec_key is not None and os.path.isfile(exec_key):
235 arg = 'keyfile'
235 arg = 'keyfile'
236 else:
236 else:
237 arg = 'key'
237 arg = 'key'
238 key_arg = {arg:exec_key}
238 key_arg = {arg:exec_key}
239 if username is None:
239 if username is None:
240 self.session = ss.StreamSession(**key_arg)
240 self.session = ss.StreamSession(**key_arg)
241 else:
241 else:
242 self.session = ss.StreamSession(username, **key_arg)
242 self.session = ss.StreamSession(username, **key_arg)
243 self._registration_socket = self.context.socket(zmq.XREQ)
243 self._registration_socket = self.context.socket(zmq.XREQ)
244 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
244 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
245 if self._ssh:
245 if self._ssh:
246 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
246 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
247 else:
247 else:
248 self._registration_socket.connect(addr)
248 self._registration_socket.connect(addr)
249 self._engines = {}
249 self._engines = {}
250 self._ids = set()
250 self._ids = set()
251 self.outstanding=set()
251 self.outstanding=set()
252 self.results = {}
252 self.results = {}
253 self.history = []
253 self.history = []
254 self.debug = debug
254 self.debug = debug
255 self.session.debug = debug
255 self.session.debug = debug
256
256
257 self._notification_handlers = {'registration_notification' : self._register_engine,
257 self._notification_handlers = {'registration_notification' : self._register_engine,
258 'unregistration_notification' : self._unregister_engine,
258 'unregistration_notification' : self._unregister_engine,
259 }
259 }
260 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
260 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
261 'apply_reply' : self._handle_apply_reply}
261 'apply_reply' : self._handle_apply_reply}
262 self._connect(sshserver, ssh_kwargs)
262 self._connect(sshserver, ssh_kwargs)
263
263
264
264
265 @property
265 @property
266 def ids(self):
266 def ids(self):
267 """Always up to date ids property."""
267 """Always up to date ids property."""
268 self._flush_notifications()
268 self._flush_notifications()
269 return self._ids
269 return self._ids
270
270
271 def _update_engines(self, engines):
271 def _update_engines(self, engines):
272 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
272 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
273 for k,v in engines.iteritems():
273 for k,v in engines.iteritems():
274 eid = int(k)
274 eid = int(k)
275 self._engines[eid] = bytes(v) # force not unicode
275 self._engines[eid] = bytes(v) # force not unicode
276 self._ids.add(eid)
276 self._ids.add(eid)
277
277
278 def _build_targets(self, targets):
278 def _build_targets(self, targets):
279 """Turn valid target IDs or 'all' into two lists:
279 """Turn valid target IDs or 'all' into two lists:
280 (int_ids, uuids).
280 (int_ids, uuids).
281 """
281 """
282 if targets is None:
282 if targets is None:
283 targets = self._ids
283 targets = self._ids
284 elif isinstance(targets, str):
284 elif isinstance(targets, str):
285 if targets.lower() == 'all':
285 if targets.lower() == 'all':
286 targets = self._ids
286 targets = self._ids
287 else:
287 else:
288 raise TypeError("%r not valid str target, must be 'all'"%(targets))
288 raise TypeError("%r not valid str target, must be 'all'"%(targets))
289 elif isinstance(targets, int):
289 elif isinstance(targets, int):
290 targets = [targets]
290 targets = [targets]
291 return [self._engines[t] for t in targets], list(targets)
291 return [self._engines[t] for t in targets], list(targets)
292
292
293 def _connect(self, sshserver, ssh_kwargs):
293 def _connect(self, sshserver, ssh_kwargs):
294 """setup all our socket connections to the controller. This is called from
294 """setup all our socket connections to the controller. This is called from
295 __init__."""
295 __init__."""
296 if self._connected:
296 if self._connected:
297 return
297 return
298 self._connected=True
298 self._connected=True
299
299
300 def connect_socket(s, addr):
300 def connect_socket(s, addr):
301 if self._ssh:
301 if self._ssh:
302 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
302 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
303 else:
303 else:
304 return s.connect(addr)
304 return s.connect(addr)
305
305
306 self.session.send(self._registration_socket, 'connection_request')
306 self.session.send(self._registration_socket, 'connection_request')
307 idents,msg = self.session.recv(self._registration_socket,mode=0)
307 idents,msg = self.session.recv(self._registration_socket,mode=0)
308 if self.debug:
308 if self.debug:
309 pprint(msg)
309 pprint(msg)
310 msg = ss.Message(msg)
310 msg = ss.Message(msg)
311 content = msg.content
311 content = msg.content
312 if content.status == 'ok':
312 if content.status == 'ok':
313 if content.queue:
313 if content.queue:
314 self._mux_socket = self.context.socket(zmq.PAIR)
314 self._mux_socket = self.context.socket(zmq.PAIR)
315 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
315 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
316 connect_socket(self._mux_socket, content.queue)
316 connect_socket(self._mux_socket, content.queue)
317 if content.task:
317 if content.task:
318 self._task_socket = self.context.socket(zmq.PAIR)
318 self._task_socket = self.context.socket(zmq.PAIR)
319 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
319 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
320 connect_socket(self._task_socket, content.task)
320 connect_socket(self._task_socket, content.task)
321 if content.notification:
321 if content.notification:
322 self._notification_socket = self.context.socket(zmq.SUB)
322 self._notification_socket = self.context.socket(zmq.SUB)
323 connect_socket(self._notification_socket, content.notification)
323 connect_socket(self._notification_socket, content.notification)
324 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
324 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
325 if content.query:
325 if content.query:
326 self._query_socket = self.context.socket(zmq.PAIR)
326 self._query_socket = self.context.socket(zmq.PAIR)
327 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
327 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
328 connect_socket(self._query_socket, content.query)
328 connect_socket(self._query_socket, content.query)
329 if content.control:
329 if content.control:
330 self._control_socket = self.context.socket(zmq.PAIR)
330 self._control_socket = self.context.socket(zmq.PAIR)
331 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
331 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
332 connect_socket(self._control_socket, content.control)
332 connect_socket(self._control_socket, content.control)
333 self._update_engines(dict(content.engines))
333 self._update_engines(dict(content.engines))
334
334
335 else:
335 else:
336 self._connected = False
336 self._connected = False
337 raise Exception("Failed to connect!")
337 raise Exception("Failed to connect!")
338
338
339 #--------------------------------------------------------------------------
339 #--------------------------------------------------------------------------
340 # handlers and callbacks for incoming messages
340 # handlers and callbacks for incoming messages
341 #--------------------------------------------------------------------------
341 #--------------------------------------------------------------------------
342
342
343 def _register_engine(self, msg):
343 def _register_engine(self, msg):
344 """Register a new engine, and update our connection info."""
344 """Register a new engine, and update our connection info."""
345 content = msg['content']
345 content = msg['content']
346 eid = content['id']
346 eid = content['id']
347 d = {eid : content['queue']}
347 d = {eid : content['queue']}
348 self._update_engines(d)
348 self._update_engines(d)
349 self._ids.add(int(eid))
349 self._ids.add(int(eid))
350
350
351 def _unregister_engine(self, msg):
351 def _unregister_engine(self, msg):
352 """Unregister an engine that has died."""
352 """Unregister an engine that has died."""
353 content = msg['content']
353 content = msg['content']
354 eid = int(content['id'])
354 eid = int(content['id'])
355 if eid in self._ids:
355 if eid in self._ids:
356 self._ids.remove(eid)
356 self._ids.remove(eid)
357 self._engines.pop(eid)
357 self._engines.pop(eid)
358
358
359 def _handle_execute_reply(self, msg):
359 def _handle_execute_reply(self, msg):
360 """Save the reply to an execute_request into our results."""
360 """Save the reply to an execute_request into our results."""
361 parent = msg['parent_header']
361 parent = msg['parent_header']
362 msg_id = parent['msg_id']
362 msg_id = parent['msg_id']
363 if msg_id not in self.outstanding:
363 if msg_id not in self.outstanding:
364 print("got unknown result: %s"%msg_id)
364 print("got unknown result: %s"%msg_id)
365 else:
365 else:
366 self.outstanding.remove(msg_id)
366 self.outstanding.remove(msg_id)
367 self.results[msg_id] = ss.unwrap_exception(msg['content'])
367 self.results[msg_id] = ss.unwrap_exception(msg['content'])
368
368
369 def _handle_apply_reply(self, msg):
369 def _handle_apply_reply(self, msg):
370 """Save the reply to an apply_request into our results."""
370 """Save the reply to an apply_request into our results."""
371 parent = msg['parent_header']
371 parent = msg['parent_header']
372 msg_id = parent['msg_id']
372 msg_id = parent['msg_id']
373 if msg_id not in self.outstanding:
373 if msg_id not in self.outstanding:
374 print ("got unknown result: %s"%msg_id)
374 print ("got unknown result: %s"%msg_id)
375 else:
375 else:
376 self.outstanding.remove(msg_id)
376 self.outstanding.remove(msg_id)
377 content = msg['content']
377 content = msg['content']
378 if content['status'] == 'ok':
378 if content['status'] == 'ok':
379 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
379 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
380 elif content['status'] == 'aborted':
380 elif content['status'] == 'aborted':
381 self.results[msg_id] = AbortedTask(msg_id)
381 self.results[msg_id] = AbortedTask(msg_id)
382 elif content['status'] == 'resubmitted':
382 elif content['status'] == 'resubmitted':
383 # TODO: handle resubmission
383 # TODO: handle resubmission
384 pass
384 pass
385 else:
385 else:
386 self.results[msg_id] = ss.unwrap_exception(content)
386 self.results[msg_id] = ss.unwrap_exception(content)
387
387
388 def _flush_notifications(self):
388 def _flush_notifications(self):
389 """Flush notifications of engine registrations waiting
389 """Flush notifications of engine registrations waiting
390 in ZMQ queue."""
390 in ZMQ queue."""
391 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
391 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
392 while msg is not None:
392 while msg is not None:
393 if self.debug:
393 if self.debug:
394 pprint(msg)
394 pprint(msg)
395 msg = msg[-1]
395 msg = msg[-1]
396 msg_type = msg['msg_type']
396 msg_type = msg['msg_type']
397 handler = self._notification_handlers.get(msg_type, None)
397 handler = self._notification_handlers.get(msg_type, None)
398 if handler is None:
398 if handler is None:
399 raise Exception("Unhandled message type: %s"%msg.msg_type)
399 raise Exception("Unhandled message type: %s"%msg.msg_type)
400 else:
400 else:
401 handler(msg)
401 handler(msg)
402 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
402 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
403
403
404 def _flush_results(self, sock):
404 def _flush_results(self, sock):
405 """Flush task or queue results waiting in ZMQ queue."""
405 """Flush task or queue results waiting in ZMQ queue."""
406 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
406 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
407 while msg is not None:
407 while msg is not None:
408 if self.debug:
408 if self.debug:
409 pprint(msg)
409 pprint(msg)
410 msg = msg[-1]
410 msg = msg[-1]
411 msg_type = msg['msg_type']
411 msg_type = msg['msg_type']
412 handler = self._queue_handlers.get(msg_type, None)
412 handler = self._queue_handlers.get(msg_type, None)
413 if handler is None:
413 if handler is None:
414 raise Exception("Unhandled message type: %s"%msg.msg_type)
414 raise Exception("Unhandled message type: %s"%msg.msg_type)
415 else:
415 else:
416 handler(msg)
416 handler(msg)
417 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
417 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
418
418
419 def _flush_control(self, sock):
419 def _flush_control(self, sock):
420 """Flush replies from the control channel waiting
420 """Flush replies from the control channel waiting
421 in the ZMQ queue.
421 in the ZMQ queue.
422
422
423 Currently: ignore them."""
423 Currently: ignore them."""
424 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
424 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
425 while msg is not None:
425 while msg is not None:
426 if self.debug:
426 if self.debug:
427 pprint(msg)
427 pprint(msg)
428 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
428 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
429
429
430 #--------------------------------------------------------------------------
430 #--------------------------------------------------------------------------
431 # getitem
431 # getitem
432 #--------------------------------------------------------------------------
432 #--------------------------------------------------------------------------
433
433
434 def __getitem__(self, key):
434 def __getitem__(self, key):
435 """Dict access returns DirectView multiplexer objects or,
435 """Dict access returns DirectView multiplexer objects or,
436 if key is None, a LoadBalancedView."""
436 if key is None, a LoadBalancedView."""
437 if key is None:
437 if key is None:
438 return LoadBalancedView(self)
438 return LoadBalancedView(self)
439 if isinstance(key, int):
439 if isinstance(key, int):
440 if key not in self.ids:
440 if key not in self.ids:
441 raise IndexError("No such engine: %i"%key)
441 raise IndexError("No such engine: %i"%key)
442 return DirectView(self, key)
442 return DirectView(self, key)
443
443
444 if isinstance(key, slice):
444 if isinstance(key, slice):
445 indices = range(len(self.ids))[key]
445 indices = range(len(self.ids))[key]
446 ids = sorted(self._ids)
446 ids = sorted(self._ids)
447 key = [ ids[i] for i in indices ]
447 key = [ ids[i] for i in indices ]
448 # newkeys = sorted(self._ids)[thekeys[k]]
448 # newkeys = sorted(self._ids)[thekeys[k]]
449
449
450 if isinstance(key, (tuple, list, xrange)):
450 if isinstance(key, (tuple, list, xrange)):
451 _,targets = self._build_targets(list(key))
451 _,targets = self._build_targets(list(key))
452 return DirectView(self, targets)
452 return DirectView(self, targets)
453 else:
453 else:
454 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
454 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
455
455
456 #--------------------------------------------------------------------------
456 #--------------------------------------------------------------------------
457 # Begin public methods
457 # Begin public methods
458 #--------------------------------------------------------------------------
458 #--------------------------------------------------------------------------
459
459
460 def spin(self):
460 def spin(self):
461 """Flush any registration notifications and execution results
461 """Flush any registration notifications and execution results
462 waiting in the ZMQ queue.
462 waiting in the ZMQ queue.
463 """
463 """
464 if self._notification_socket:
464 if self._notification_socket:
465 self._flush_notifications()
465 self._flush_notifications()
466 if self._mux_socket:
466 if self._mux_socket:
467 self._flush_results(self._mux_socket)
467 self._flush_results(self._mux_socket)
468 if self._task_socket:
468 if self._task_socket:
469 self._flush_results(self._task_socket)
469 self._flush_results(self._task_socket)
470 if self._control_socket:
470 if self._control_socket:
471 self._flush_control(self._control_socket)
471 self._flush_control(self._control_socket)
472
472
473 def barrier(self, msg_ids=None, timeout=-1):
473 def barrier(self, msg_ids=None, timeout=-1):
474 """waits on one or more `msg_ids`, for up to `timeout` seconds.
474 """waits on one or more `msg_ids`, for up to `timeout` seconds.
475
475
476 Parameters
476 Parameters
477 ----------
477 ----------
478 msg_ids : int, str, or list of ints and/or strs
478 msg_ids : int, str, or list of ints and/or strs
479 ints are indices to self.history
479 ints are indices to self.history
480 strs are msg_ids
480 strs are msg_ids
481 default: wait on all outstanding messages
481 default: wait on all outstanding messages
482 timeout : float
482 timeout : float
483 a time in seconds, after which to give up.
483 a time in seconds, after which to give up.
484 default is -1, which means no timeout
484 default is -1, which means no timeout
485
485
486 Returns
486 Returns
487 -------
487 -------
488 True : when all msg_ids are done
488 True : when all msg_ids are done
489 False : timeout reached, some msg_ids still outstanding
489 False : timeout reached, some msg_ids still outstanding
490 """
490 """
491 tic = time.time()
491 tic = time.time()
492 if msg_ids is None:
492 if msg_ids is None:
493 theids = self.outstanding
493 theids = self.outstanding
494 else:
494 else:
495 if isinstance(msg_ids, (int, str)):
495 if isinstance(msg_ids, (int, str)):
496 msg_ids = [msg_ids]
496 msg_ids = [msg_ids]
497 theids = set()
497 theids = set()
498 for msg_id in msg_ids:
498 for msg_id in msg_ids:
499 if isinstance(msg_id, int):
499 if isinstance(msg_id, int):
500 msg_id = self.history[msg_id]
500 msg_id = self.history[msg_id]
501 theids.add(msg_id)
501 theids.add(msg_id)
502 self.spin()
502 self.spin()
503 while theids.intersection(self.outstanding):
503 while theids.intersection(self.outstanding):
504 if timeout >= 0 and ( time.time()-tic ) > timeout:
504 if timeout >= 0 and ( time.time()-tic ) > timeout:
505 break
505 break
506 time.sleep(1e-3)
506 time.sleep(1e-3)
507 self.spin()
507 self.spin()
508 return len(theids.intersection(self.outstanding)) == 0
508 return len(theids.intersection(self.outstanding)) == 0
509
509
510 #--------------------------------------------------------------------------
510 #--------------------------------------------------------------------------
511 # Control methods
511 # Control methods
512 #--------------------------------------------------------------------------
512 #--------------------------------------------------------------------------
513
513
514 @spinfirst
514 @spinfirst
515 @defaultblock
515 @defaultblock
516 def clear(self, targets=None, block=None):
516 def clear(self, targets=None, block=None):
517 """Clear the namespace in target(s)."""
517 """Clear the namespace in target(s)."""
518 targets = self._build_targets(targets)[0]
518 targets = self._build_targets(targets)[0]
519 for t in targets:
519 for t in targets:
520 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
520 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
521 error = False
521 error = False
522 if self.block:
522 if self.block:
523 for i in range(len(targets)):
523 for i in range(len(targets)):
524 idents,msg = self.session.recv(self._control_socket,0)
524 idents,msg = self.session.recv(self._control_socket,0)
525 if self.debug:
525 if self.debug:
526 pprint(msg)
526 pprint(msg)
527 if msg['content']['status'] != 'ok':
527 if msg['content']['status'] != 'ok':
528 error = ss.unwrap_exception(msg['content'])
528 error = ss.unwrap_exception(msg['content'])
529 if error:
529 if error:
530 return error
530 return error
531
531
532
532
533 @spinfirst
533 @spinfirst
534 @defaultblock
534 @defaultblock
535 def abort(self, msg_ids = None, targets=None, block=None):
535 def abort(self, msg_ids = None, targets=None, block=None):
536 """Abort the execution queues of target(s)."""
536 """Abort the execution queues of target(s)."""
537 targets = self._build_targets(targets)[0]
537 targets = self._build_targets(targets)[0]
538 if isinstance(msg_ids, basestring):
538 if isinstance(msg_ids, basestring):
539 msg_ids = [msg_ids]
539 msg_ids = [msg_ids]
540 content = dict(msg_ids=msg_ids)
540 content = dict(msg_ids=msg_ids)
541 for t in targets:
541 for t in targets:
542 self.session.send(self._control_socket, 'abort_request',
542 self.session.send(self._control_socket, 'abort_request',
543 content=content, ident=t)
543 content=content, ident=t)
544 error = False
544 error = False
545 if self.block:
545 if self.block:
546 for i in range(len(targets)):
546 for i in range(len(targets)):
547 idents,msg = self.session.recv(self._control_socket,0)
547 idents,msg = self.session.recv(self._control_socket,0)
548 if self.debug:
548 if self.debug:
549 pprint(msg)
549 pprint(msg)
550 if msg['content']['status'] != 'ok':
550 if msg['content']['status'] != 'ok':
551 error = ss.unwrap_exception(msg['content'])
551 error = ss.unwrap_exception(msg['content'])
552 if error:
552 if error:
553 return error
553 return error
554
554
555 @spinfirst
555 @spinfirst
556 @defaultblock
556 @defaultblock
557 def shutdown(self, targets=None, restart=False, block=None):
557 def shutdown(self, targets=None, restart=False, block=None):
558 """Terminates one or more engine processes."""
558 """Terminates one or more engine processes."""
559 targets = self._build_targets(targets)[0]
559 targets = self._build_targets(targets)[0]
560 for t in targets:
560 for t in targets:
561 self.session.send(self._control_socket, 'shutdown_request',
561 self.session.send(self._control_socket, 'shutdown_request',
562 content={'restart':restart},ident=t)
562 content={'restart':restart},ident=t)
563 error = False
563 error = False
564 if self.block:
564 if self.block:
565 for i in range(len(targets)):
565 for i in range(len(targets)):
566 idents,msg = self.session.recv(self._control_socket,0)
566 idents,msg = self.session.recv(self._control_socket,0)
567 if self.debug:
567 if self.debug:
568 pprint(msg)
568 pprint(msg)
569 if msg['content']['status'] != 'ok':
569 if msg['content']['status'] != 'ok':
570 error = ss.unwrap_exception(msg['content'])
570 error = ss.unwrap_exception(msg['content'])
571 if error:
571 if error:
572 return error
572 return error
573
573
574 #--------------------------------------------------------------------------
574 #--------------------------------------------------------------------------
575 # Execution methods
575 # Execution methods
576 #--------------------------------------------------------------------------
576 #--------------------------------------------------------------------------
577
577
578 @defaultblock
578 @defaultblock
579 def execute(self, code, targets='all', block=None):
579 def execute(self, code, targets='all', block=None):
580 """Executes `code` on `targets` in blocking or nonblocking manner.
580 """Executes `code` on `targets` in blocking or nonblocking manner.
581
581
582 Parameters
582 Parameters
583 ----------
583 ----------
584 code : str
584 code : str
585 the code string to be executed
585 the code string to be executed
586 targets : int/str/list of ints/strs
586 targets : int/str/list of ints/strs
587 the engines on which to execute
587 the engines on which to execute
588 default : all
588 default : all
589 block : bool
589 block : bool
590 whether or not to wait until done to return
590 whether or not to wait until done to return
591 default: self.block
591 default: self.block
592 """
592 """
593 # block = self.block if block is None else block
593 # block = self.block if block is None else block
594 # saveblock = self.block
594 # saveblock = self.block
595 # self.block = block
595 # self.block = block
596 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
596 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
597 # self.block = saveblock
597 # self.block = saveblock
598 return result
598 return result
599
599
600 def run(self, code, block=None):
600 def run(self, code, block=None):
601 """Runs `code` on an engine.
601 """Runs `code` on an engine.
602
602
603 Calls to this are load-balanced.
603 Calls to this are load-balanced.
604
604
605 Parameters
605 Parameters
606 ----------
606 ----------
607 code : str
607 code : str
608 the code string to be executed
608 the code string to be executed
609 block : bool
609 block : bool
610 whether or not to wait until done
610 whether or not to wait until done
611
611
612 """
612 """
613 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
613 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
614 return result
614 return result
615
615
616 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
616 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
617 after=None, follow=None):
617 after=None, follow=None):
618 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
618 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
619
619
620 This is the central execution command for the client.
620 This is the central execution command for the client.
621
621
622 Parameters
622 Parameters
623 ----------
623 ----------
624
624
625 f : function
625 f : function
626 The fuction to be called remotely
626 The fuction to be called remotely
627 args : tuple/list
627 args : tuple/list
628 The positional arguments passed to `f`
628 The positional arguments passed to `f`
629 kwargs : dict
629 kwargs : dict
630 The keyword arguments passed to `f`
630 The keyword arguments passed to `f`
631 bound : bool (default: True)
631 bound : bool (default: True)
632 Whether to execute in the Engine(s) namespace, or in a clean
632 Whether to execute in the Engine(s) namespace, or in a clean
633 namespace not affecting the engine.
633 namespace not affecting the engine.
634 block : bool (default: self.block)
634 block : bool (default: self.block)
635 Whether to wait for the result, or return immediately.
635 Whether to wait for the result, or return immediately.
636 False:
636 False:
637 returns msg_id(s)
637 returns msg_id(s)
638 if multiple targets:
638 if multiple targets:
639 list of ids
639 list of ids
640 True:
640 True:
641 returns actual result(s) of f(*args, **kwargs)
641 returns actual result(s) of f(*args, **kwargs)
642 if multiple targets:
642 if multiple targets:
643 dict of results, by engine ID
643 dict of results, by engine ID
644 targets : int,list of ints, 'all', None
644 targets : int,list of ints, 'all', None
645 Specify the destination of the job.
645 Specify the destination of the job.
646 if None:
646 if None:
647 Submit via Task queue for load-balancing.
647 Submit via Task queue for load-balancing.
648 if 'all':
648 if 'all':
649 Run on all active engines
649 Run on all active engines
650 if list:
650 if list:
651 Run on each specified engine
651 Run on each specified engine
652 if int:
652 if int:
653 Run on single engine
653 Run on single engine
654
654
655 after : Dependency or collection of msg_ids
655 after : Dependency or collection of msg_ids
656 Only for load-balanced execution (targets=None)
656 Only for load-balanced execution (targets=None)
657 Specify a list of msg_ids as a time-based dependency.
657 Specify a list of msg_ids as a time-based dependency.
658 This job will only be run *after* the dependencies
658 This job will only be run *after* the dependencies
659 have been met.
659 have been met.
660
660
661 follow : Dependency or collection of msg_ids
661 follow : Dependency or collection of msg_ids
662 Only for load-balanced execution (targets=None)
662 Only for load-balanced execution (targets=None)
663 Specify a list of msg_ids as a location-based dependency.
663 Specify a list of msg_ids as a location-based dependency.
664 This job will only be run on an engine where this dependency
664 This job will only be run on an engine where this dependency
665 is met.
665 is met.
666
666
667 Returns
667 Returns
668 -------
668 -------
669 if block is False:
669 if block is False:
670 if single target:
670 if single target:
671 return msg_id
671 return msg_id
672 else:
672 else:
673 return list of msg_ids
673 return list of msg_ids
674 ? (should this be dict like block=True) ?
674 ? (should this be dict like block=True) ?
675 else:
675 else:
676 if single target:
676 if single target:
677 return result of f(*args, **kwargs)
677 return result of f(*args, **kwargs)
678 else:
678 else:
679 return dict of results, keyed by engine
679 return dict of results, keyed by engine
680 """
680 """
681
681
682 # defaults:
682 # defaults:
683 block = block if block is not None else self.block
683 block = block if block is not None else self.block
684 args = args if args is not None else []
684 args = args if args is not None else []
685 kwargs = kwargs if kwargs is not None else {}
685 kwargs = kwargs if kwargs is not None else {}
686
686
687 # enforce types of f,args,kwrags
687 # enforce types of f,args,kwrags
688 if not callable(f):
688 if not callable(f):
689 raise TypeError("f must be callable, not %s"%type(f))
689 raise TypeError("f must be callable, not %s"%type(f))
690 if not isinstance(args, (tuple, list)):
690 if not isinstance(args, (tuple, list)):
691 raise TypeError("args must be tuple or list, not %s"%type(args))
691 raise TypeError("args must be tuple or list, not %s"%type(args))
692 if not isinstance(kwargs, dict):
692 if not isinstance(kwargs, dict):
693 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
693 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
694
694
695 options = dict(bound=bound, block=block, after=after, follow=follow)
695 options = dict(bound=bound, block=block, after=after, follow=follow)
696
696
697 if targets is None:
697 if targets is None:
698 return self._apply_balanced(f, args, kwargs, **options)
698 return self._apply_balanced(f, args, kwargs, **options)
699 else:
699 else:
700 return self._apply_direct(f, args, kwargs, targets=targets, **options)
700 return self._apply_direct(f, args, kwargs, targets=targets, **options)
701
701
702 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
702 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
703 after=None, follow=None):
703 after=None, follow=None):
704 """The underlying method for applying functions in a load balanced
704 """The underlying method for applying functions in a load balanced
705 manner, via the task queue."""
705 manner, via the task queue."""
706 if isinstance(after, Dependency):
706 if isinstance(after, Dependency):
707 after = after.as_dict()
707 after = after.as_dict()
708 elif after is None:
708 elif after is None:
709 after = []
709 after = []
710 if isinstance(follow, Dependency):
710 if isinstance(follow, Dependency):
711 follow = follow.as_dict()
711 follow = follow.as_dict()
712 elif follow is None:
712 elif follow is None:
713 follow = []
713 follow = []
714 subheader = dict(after=after, follow=follow)
714 subheader = dict(after=after, follow=follow)
715
715
716 bufs = ss.pack_apply_message(f,args,kwargs)
716 bufs = ss.pack_apply_message(f,args,kwargs)
717 content = dict(bound=bound)
717 content = dict(bound=bound)
718 msg = self.session.send(self._task_socket, "apply_request",
718 msg = self.session.send(self._task_socket, "apply_request",
719 content=content, buffers=bufs, subheader=subheader)
719 content=content, buffers=bufs, subheader=subheader)
720 msg_id = msg['msg_id']
720 msg_id = msg['msg_id']
721 self.outstanding.add(msg_id)
721 self.outstanding.add(msg_id)
722 self.history.append(msg_id)
722 self.history.append(msg_id)
723 if block:
723 if block:
724 self.barrier(msg_id)
724 self.barrier(msg_id)
725 return self.results[msg_id]
725 return self.results[msg_id]
726 else:
726 else:
727 return msg_id
727 return msg_id
728
728
729 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
729 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
730 after=None, follow=None):
730 after=None, follow=None):
731 """Then underlying method for applying functions to specific engines
731 """Then underlying method for applying functions to specific engines
732 via the MUX queue."""
732 via the MUX queue."""
733
733
734 queues,targets = self._build_targets(targets)
734 queues,targets = self._build_targets(targets)
735 bufs = ss.pack_apply_message(f,args,kwargs)
735 bufs = ss.pack_apply_message(f,args,kwargs)
736 if isinstance(after, Dependency):
736 if isinstance(after, Dependency):
737 after = after.as_dict()
737 after = after.as_dict()
738 elif after is None:
738 elif after is None:
739 after = []
739 after = []
740 if isinstance(follow, Dependency):
740 if isinstance(follow, Dependency):
741 follow = follow.as_dict()
741 follow = follow.as_dict()
742 elif follow is None:
742 elif follow is None:
743 follow = []
743 follow = []
744 subheader = dict(after=after, follow=follow)
744 subheader = dict(after=after, follow=follow)
745 content = dict(bound=bound)
745 content = dict(bound=bound)
746 msg_ids = []
746 msg_ids = []
747 for queue in queues:
747 for queue in queues:
748 msg = self.session.send(self._mux_socket, "apply_request",
748 msg = self.session.send(self._mux_socket, "apply_request",
749 content=content, buffers=bufs,ident=queue, subheader=subheader)
749 content=content, buffers=bufs,ident=queue, subheader=subheader)
750 msg_id = msg['msg_id']
750 msg_id = msg['msg_id']
751 self.outstanding.add(msg_id)
751 self.outstanding.add(msg_id)
752 self.history.append(msg_id)
752 self.history.append(msg_id)
753 msg_ids.append(msg_id)
753 msg_ids.append(msg_id)
754 if block:
754 if block:
755 self.barrier(msg_ids)
755 self.barrier(msg_ids)
756 else:
756 else:
757 if len(msg_ids) == 1:
757 if len(msg_ids) == 1:
758 return msg_ids[0]
758 return msg_ids[0]
759 else:
759 else:
760 return msg_ids
760 return msg_ids
761 if len(msg_ids) == 1:
761 if len(msg_ids) == 1:
762 return self.results[msg_ids[0]]
762 return self.results[msg_ids[0]]
763 else:
763 else:
764 result = {}
764 result = {}
765 for target,mid in zip(targets, msg_ids):
765 for target,mid in zip(targets, msg_ids):
766 result[target] = self.results[mid]
766 result[target] = self.results[mid]
767 return result
767 return result
768
768
769 #--------------------------------------------------------------------------
769 #--------------------------------------------------------------------------
770 # Data movement
770 # Data movement
771 #--------------------------------------------------------------------------
771 #--------------------------------------------------------------------------
772
772
773 @defaultblock
773 @defaultblock
774 def push(self, ns, targets=None, block=None):
774 def push(self, ns, targets=None, block=None):
775 """Push the contents of `ns` into the namespace on `target`"""
775 """Push the contents of `ns` into the namespace on `target`"""
776 if not isinstance(ns, dict):
776 if not isinstance(ns, dict):
777 raise TypeError("Must be a dict, not %s"%type(ns))
777 raise TypeError("Must be a dict, not %s"%type(ns))
778 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
778 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
779 return result
779 return result
780
780
781 @defaultblock
781 @defaultblock
782 def pull(self, keys, targets=None, block=True):
782 def pull(self, keys, targets=None, block=True):
783 """Pull objects from `target`'s namespace by `keys`"""
783 """Pull objects from `target`'s namespace by `keys`"""
784 if isinstance(keys, str):
784 if isinstance(keys, str):
785 pass
785 pass
786 elif isistance(keys, (list,tuple,set)):
786 elif isistance(keys, (list,tuple,set)):
787 for key in keys:
787 for key in keys:
788 if not isinstance(key, str):
788 if not isinstance(key, str):
789 raise TypeError
789 raise TypeError
790 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
790 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
791 return result
791 return result
792
792
793 #--------------------------------------------------------------------------
793 #--------------------------------------------------------------------------
794 # Query methods
794 # Query methods
795 #--------------------------------------------------------------------------
795 #--------------------------------------------------------------------------
796
796
797 @spinfirst
797 @spinfirst
798 def get_results(self, msg_ids, status_only=False):
798 def get_results(self, msg_ids, status_only=False):
799 """Returns the result of the execute or task request with `msg_ids`.
799 """Returns the result of the execute or task request with `msg_ids`.
800
800
801 Parameters
801 Parameters
802 ----------
802 ----------
803 msg_ids : list of ints or msg_ids
803 msg_ids : list of ints or msg_ids
804 if int:
804 if int:
805 Passed as index to self.history for convenience.
805 Passed as index to self.history for convenience.
806 status_only : bool (default: False)
806 status_only : bool (default: False)
807 if False:
807 if False:
808 return the actual results
808 return the actual results
809 """
809 """
810 if not isinstance(msg_ids, (list,tuple)):
810 if not isinstance(msg_ids, (list,tuple)):
811 msg_ids = [msg_ids]
811 msg_ids = [msg_ids]
812 theids = []
812 theids = []
813 for msg_id in msg_ids:
813 for msg_id in msg_ids:
814 if isinstance(msg_id, int):
814 if isinstance(msg_id, int):
815 msg_id = self.history[msg_id]
815 msg_id = self.history[msg_id]
816 if not isinstance(msg_id, str):
816 if not isinstance(msg_id, str):
817 raise TypeError("msg_ids must be str, not %r"%msg_id)
817 raise TypeError("msg_ids must be str, not %r"%msg_id)
818 theids.append(msg_id)
818 theids.append(msg_id)
819
819
820 completed = []
820 completed = []
821 local_results = {}
821 local_results = {}
822 for msg_id in list(theids):
822 for msg_id in list(theids):
823 if msg_id in self.results:
823 if msg_id in self.results:
824 completed.append(msg_id)
824 completed.append(msg_id)
825 local_results[msg_id] = self.results[msg_id]
825 local_results[msg_id] = self.results[msg_id]
826 theids.remove(msg_id)
826 theids.remove(msg_id)
827
827
828 if theids: # some not locally cached
828 if theids: # some not locally cached
829 content = dict(msg_ids=theids, status_only=status_only)
829 content = dict(msg_ids=theids, status_only=status_only)
830 msg = self.session.send(self._query_socket, "result_request", content=content)
830 msg = self.session.send(self._query_socket, "result_request", content=content)
831 zmq.select([self._query_socket], [], [])
831 zmq.select([self._query_socket], [], [])
832 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
832 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
833 if self.debug:
833 if self.debug:
834 pprint(msg)
834 pprint(msg)
835 content = msg['content']
835 content = msg['content']
836 if content['status'] != 'ok':
836 if content['status'] != 'ok':
837 raise ss.unwrap_exception(content)
837 raise ss.unwrap_exception(content)
838 else:
838 else:
839 content = dict(completed=[],pending=[])
839 content = dict(completed=[],pending=[])
840 if not status_only:
840 if not status_only:
841 # load cached results into result:
841 # load cached results into result:
842 content['completed'].extend(completed)
842 content['completed'].extend(completed)
843 content.update(local_results)
843 content.update(local_results)
844 # update cache with results:
844 # update cache with results:
845 for msg_id in msg_ids:
845 for msg_id in msg_ids:
846 if msg_id in content['completed']:
846 if msg_id in content['completed']:
847 self.results[msg_id] = content[msg_id]
847 self.results[msg_id] = content[msg_id]
848 return content
848 return content
849
849
850 @spinfirst
850 @spinfirst
851 def queue_status(self, targets=None, verbose=False):
851 def queue_status(self, targets=None, verbose=False):
852 """Fetch the status of engine queues.
852 """Fetch the status of engine queues.
853
853
854 Parameters
854 Parameters
855 ----------
855 ----------
856 targets : int/str/list of ints/strs
856 targets : int/str/list of ints/strs
857 the engines on which to execute
857 the engines on which to execute
858 default : all
858 default : all
859 verbose : bool
859 verbose : bool
860 whether to return lengths only, or lists of ids for each element
860 whether to return lengths only, or lists of ids for each element
861 """
861 """
862 targets = self._build_targets(targets)[1]
862 targets = self._build_targets(targets)[1]
863 content = dict(targets=targets, verbose=verbose)
863 content = dict(targets=targets, verbose=verbose)
864 self.session.send(self._query_socket, "queue_request", content=content)
864 self.session.send(self._query_socket, "queue_request", content=content)
865 idents,msg = self.session.recv(self._query_socket, 0)
865 idents,msg = self.session.recv(self._query_socket, 0)
866 if self.debug:
866 if self.debug:
867 pprint(msg)
867 pprint(msg)
868 content = msg['content']
868 content = msg['content']
869 status = content.pop('status')
869 status = content.pop('status')
870 if status != 'ok':
870 if status != 'ok':
871 raise ss.unwrap_exception(content)
871 raise ss.unwrap_exception(content)
872 return content
872 return content
873
873
874 @spinfirst
874 @spinfirst
875 def purge_results(self, msg_ids=[], targets=[]):
875 def purge_results(self, msg_ids=[], targets=[]):
876 """Tell the controller to forget results.
876 """Tell the controller to forget results.
877
877
878 Individual results can be purged by msg_id, or the entire
878 Individual results can be purged by msg_id, or the entire
879 history of specific targets can
879 history of specific targets can
880
880
881 Parameters
881 Parameters
882 ----------
882 ----------
883 targets : int/str/list of ints/strs
883 targets : int/str/list of ints/strs
884 the targets
884 the targets
885 default : None
885 default : None
886 """
886 """
887 if not targets and not msg_ids:
887 if not targets and not msg_ids:
888 raise ValueError
888 raise ValueError
889 if targets:
889 if targets:
890 targets = self._build_targets(targets)[1]
890 targets = self._build_targets(targets)[1]
891 content = dict(targets=targets, msg_ids=msg_ids)
891 content = dict(targets=targets, msg_ids=msg_ids)
892 self.session.send(self._query_socket, "purge_request", content=content)
892 self.session.send(self._query_socket, "purge_request", content=content)
893 idents, msg = self.session.recv(self._query_socket, 0)
893 idents, msg = self.session.recv(self._query_socket, 0)
894 if self.debug:
894 if self.debug:
895 pprint(msg)
895 pprint(msg)
896 content = msg['content']
896 content = msg['content']
897 if content['status'] != 'ok':
897 if content['status'] != 'ok':
898 raise ss.unwrap_exception(content)
898 raise ss.unwrap_exception(content)
899
899
900 class AsynClient(Client):
900 class AsynClient(Client):
901 """An Asynchronous client, using the Tornado Event Loop.
901 """An Asynchronous client, using the Tornado Event Loop.
902 !!!unfinished!!!"""
902 !!!unfinished!!!"""
903 io_loop = None
903 io_loop = None
904 _queue_stream = None
904 _queue_stream = None
905 _notifier_stream = None
905 _notifier_stream = None
906 _task_stream = None
906 _task_stream = None
907 _control_stream = None
907 _control_stream = None
908
908
909 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
909 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
910 Client.__init__(self, addr, context, username, debug)
910 Client.__init__(self, addr, context, username, debug)
911 if io_loop is None:
911 if io_loop is None:
912 io_loop = ioloop.IOLoop.instance()
912 io_loop = ioloop.IOLoop.instance()
913 self.io_loop = io_loop
913 self.io_loop = io_loop
914
914
915 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
915 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
916 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
916 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
917 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
917 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
918 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
918 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
919
919
920 def spin(self):
920 def spin(self):
921 for stream in (self.queue_stream, self.notifier_stream,
921 for stream in (self.queue_stream, self.notifier_stream,
922 self.task_stream, self.control_stream):
922 self.task_stream, self.control_stream):
923 stream.flush()
923 stream.flush()
924
924
General Comments 0
You need to be logged in to leave comments. Login now