##// END OF EJS Templates
added bound arg to RemoteFunction
MinRK -
Show More
@@ -1,854 +1,855 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from __future__ import print_function
13 from __future__ import print_function
14
14
15 import time
15 import time
16 from pprint import pprint
16 from pprint import pprint
17
17
18 import zmq
18 import zmq
19 from zmq.eventloop import ioloop, zmqstream
19 from zmq.eventloop import ioloop, zmqstream
20
20
21 from IPython.external.decorator import decorator
21 from IPython.external.decorator import decorator
22
22
23 import streamsession as ss
23 import streamsession as ss
24 # from remotenamespace import RemoteNamespace
24 # from remotenamespace import RemoteNamespace
25 from view import DirectView, LoadBalancedView
25 from view import DirectView, LoadBalancedView
26 from dependency import Dependency, depend, require
26 from dependency import Dependency, depend, require
27
27
28 def _push(ns):
28 def _push(ns):
29 globals().update(ns)
29 globals().update(ns)
30
30
31 def _pull(keys):
31 def _pull(keys):
32 g = globals()
32 g = globals()
33 if isinstance(keys, (list,tuple, set)):
33 if isinstance(keys, (list,tuple, set)):
34 for key in keys:
34 for key in keys:
35 if not g.has_key(key):
35 if not g.has_key(key):
36 raise NameError("name '%s' is not defined"%key)
36 raise NameError("name '%s' is not defined"%key)
37 return map(g.get, keys)
37 return map(g.get, keys)
38 else:
38 else:
39 if not g.has_key(keys):
39 if not g.has_key(keys):
40 raise NameError("name '%s' is not defined"%keys)
40 raise NameError("name '%s' is not defined"%keys)
41 return g.get(keys)
41 return g.get(keys)
42
42
43 def _clear():
43 def _clear():
44 globals().clear()
44 globals().clear()
45
45
46 def execute(code):
46 def execute(code):
47 exec code in globals()
47 exec code in globals()
48
48
49 #--------------------------------------------------------------------------
49 #--------------------------------------------------------------------------
50 # Decorators for Client methods
50 # Decorators for Client methods
51 #--------------------------------------------------------------------------
51 #--------------------------------------------------------------------------
52
52
53 @decorator
53 @decorator
54 def spinfirst(f, self, *args, **kwargs):
54 def spinfirst(f, self, *args, **kwargs):
55 """Call spin() to sync state prior to calling the method."""
55 """Call spin() to sync state prior to calling the method."""
56 self.spin()
56 self.spin()
57 return f(self, *args, **kwargs)
57 return f(self, *args, **kwargs)
58
58
59 @decorator
59 @decorator
60 def defaultblock(f, self, *args, **kwargs):
60 def defaultblock(f, self, *args, **kwargs):
61 """Default to self.block; preserve self.block."""
61 """Default to self.block; preserve self.block."""
62 block = kwargs.get('block',None)
62 block = kwargs.get('block',None)
63 block = self.block if block is None else block
63 block = self.block if block is None else block
64 saveblock = self.block
64 saveblock = self.block
65 self.block = block
65 self.block = block
66 ret = f(self, *args, **kwargs)
66 ret = f(self, *args, **kwargs)
67 self.block = saveblock
67 self.block = saveblock
68 return ret
68 return ret
69
69
70 def remote(client, block=None, targets=None):
70 def remote(client, bound=False, block=None, targets=None):
71 """Turn a function into a remote function.
71 """Turn a function into a remote function.
72
72
73 This method can be used for map:
73 This method can be used for map:
74
74
75 >>> @remote(client,block=True)
75 >>> @remote(client,block=True)
76 def func(a)
76 def func(a)
77 """
77 """
78 def remote_function(f):
78 def remote_function(f):
79 return RemoteFunction(client, f, block, targets)
79 return RemoteFunction(client, f, bound, block, targets)
80 return remote_function
80 return remote_function
81
81
82 #--------------------------------------------------------------------------
82 #--------------------------------------------------------------------------
83 # Classes
83 # Classes
84 #--------------------------------------------------------------------------
84 #--------------------------------------------------------------------------
85
85
86 class RemoteFunction(object):
86 class RemoteFunction(object):
87 """Turn an existing function into a remote function"""
87 """Turn an existing function into a remote function"""
88
88
89 def __init__(self, client, f, block=None, targets=None):
89 def __init__(self, client, f, bound=False, block=None, targets=None):
90 self.client = client
90 self.client = client
91 self.func = f
91 self.func = f
92 self.block=block
92 self.block=block
93 self.bound=bound
93 self.targets=targets
94 self.targets=targets
94
95
95 def __call__(self, *args, **kwargs):
96 def __call__(self, *args, **kwargs):
96 return self.client.apply(self.func, args=args, kwargs=kwargs,
97 return self.client.apply(self.func, args=args, kwargs=kwargs,
97 block=self.block, targets=self.targets)
98 block=self.block, targets=self.targets, bound=self.bound)
98
99
99
100
100 class AbortedTask(object):
101 class AbortedTask(object):
101 """A basic wrapper object describing an aborted task."""
102 """A basic wrapper object describing an aborted task."""
102 def __init__(self, msg_id):
103 def __init__(self, msg_id):
103 self.msg_id = msg_id
104 self.msg_id = msg_id
104
105
105 class ControllerError(Exception):
106 class ControllerError(Exception):
106 def __init__(self, etype, evalue, tb):
107 def __init__(self, etype, evalue, tb):
107 self.etype = etype
108 self.etype = etype
108 self.evalue = evalue
109 self.evalue = evalue
109 self.traceback=tb
110 self.traceback=tb
110
111
111 class Client(object):
112 class Client(object):
112 """A semi-synchronous client to the IPython ZMQ controller
113 """A semi-synchronous client to the IPython ZMQ controller
113
114
114 Parameters
115 Parameters
115 ----------
116 ----------
116
117
117 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
118 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
118 The address of the controller's registration socket.
119 The address of the controller's registration socket.
119
120
120
121
121 Attributes
122 Attributes
122 ----------
123 ----------
123 ids : set of int engine IDs
124 ids : set of int engine IDs
124 requesting the ids attribute always synchronizes
125 requesting the ids attribute always synchronizes
125 the registration state. To request ids without synchronization,
126 the registration state. To request ids without synchronization,
126 use semi-private _ids.
127 use semi-private _ids.
127
128
128 history : list of msg_ids
129 history : list of msg_ids
129 a list of msg_ids, keeping track of all the execution
130 a list of msg_ids, keeping track of all the execution
130 messages you have submitted in order.
131 messages you have submitted in order.
131
132
132 outstanding : set of msg_ids
133 outstanding : set of msg_ids
133 a set of msg_ids that have been submitted, but whose
134 a set of msg_ids that have been submitted, but whose
134 results have not yet been received.
135 results have not yet been received.
135
136
136 results : dict
137 results : dict
137 a dict of all our results, keyed by msg_id
138 a dict of all our results, keyed by msg_id
138
139
139 block : bool
140 block : bool
140 determines default behavior when block not specified
141 determines default behavior when block not specified
141 in execution methods
142 in execution methods
142
143
143 Methods
144 Methods
144 -------
145 -------
145 spin : flushes incoming results and registration state changes
146 spin : flushes incoming results and registration state changes
146 control methods spin, and requesting `ids` also ensures up to date
147 control methods spin, and requesting `ids` also ensures up to date
147
148
148 barrier : wait on one or more msg_ids
149 barrier : wait on one or more msg_ids
149
150
150 execution methods: apply/apply_bound/apply_to/applu_bount
151 execution methods: apply/apply_bound/apply_to/applu_bount
151 legacy: execute, run
152 legacy: execute, run
152
153
153 query methods: queue_status, get_result, purge
154 query methods: queue_status, get_result, purge
154
155
155 control methods: abort, kill
156 control methods: abort, kill
156
157
157 """
158 """
158
159
159
160
160 _connected=False
161 _connected=False
161 _engines=None
162 _engines=None
162 _addr='tcp://127.0.0.1:10101'
163 _addr='tcp://127.0.0.1:10101'
163 _registration_socket=None
164 _registration_socket=None
164 _query_socket=None
165 _query_socket=None
165 _control_socket=None
166 _control_socket=None
166 _notification_socket=None
167 _notification_socket=None
167 _mux_socket=None
168 _mux_socket=None
168 _task_socket=None
169 _task_socket=None
169 block = False
170 block = False
170 outstanding=None
171 outstanding=None
171 results = None
172 results = None
172 history = None
173 history = None
173 debug = False
174 debug = False
174
175
175 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False):
176 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False):
176 if context is None:
177 if context is None:
177 context = zmq.Context()
178 context = zmq.Context()
178 self.context = context
179 self.context = context
179 self._addr = addr
180 self._addr = addr
180 if username is None:
181 if username is None:
181 self.session = ss.StreamSession()
182 self.session = ss.StreamSession()
182 else:
183 else:
183 self.session = ss.StreamSession(username)
184 self.session = ss.StreamSession(username)
184 self._registration_socket = self.context.socket(zmq.PAIR)
185 self._registration_socket = self.context.socket(zmq.PAIR)
185 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
186 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
186 self._registration_socket.connect(addr)
187 self._registration_socket.connect(addr)
187 self._engines = {}
188 self._engines = {}
188 self._ids = set()
189 self._ids = set()
189 self.outstanding=set()
190 self.outstanding=set()
190 self.results = {}
191 self.results = {}
191 self.history = []
192 self.history = []
192 self.debug = debug
193 self.debug = debug
193 self.session.debug = debug
194 self.session.debug = debug
194
195
195 self._notification_handlers = {'registration_notification' : self._register_engine,
196 self._notification_handlers = {'registration_notification' : self._register_engine,
196 'unregistration_notification' : self._unregister_engine,
197 'unregistration_notification' : self._unregister_engine,
197 }
198 }
198 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
199 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
199 'apply_reply' : self._handle_apply_reply}
200 'apply_reply' : self._handle_apply_reply}
200 self._connect()
201 self._connect()
201
202
202
203
203 @property
204 @property
204 def ids(self):
205 def ids(self):
205 """Always up to date ids property."""
206 """Always up to date ids property."""
206 self._flush_notifications()
207 self._flush_notifications()
207 return self._ids
208 return self._ids
208
209
209 def _update_engines(self, engines):
210 def _update_engines(self, engines):
210 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
211 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
211 for k,v in engines.iteritems():
212 for k,v in engines.iteritems():
212 eid = int(k)
213 eid = int(k)
213 self._engines[eid] = bytes(v) # force not unicode
214 self._engines[eid] = bytes(v) # force not unicode
214 self._ids.add(eid)
215 self._ids.add(eid)
215
216
216 def _build_targets(self, targets):
217 def _build_targets(self, targets):
217 """Turn valid target IDs or 'all' into two lists:
218 """Turn valid target IDs or 'all' into two lists:
218 (int_ids, uuids).
219 (int_ids, uuids).
219 """
220 """
220 if targets is None:
221 if targets is None:
221 targets = self._ids
222 targets = self._ids
222 elif isinstance(targets, str):
223 elif isinstance(targets, str):
223 if targets.lower() == 'all':
224 if targets.lower() == 'all':
224 targets = self._ids
225 targets = self._ids
225 else:
226 else:
226 raise TypeError("%r not valid str target, must be 'all'"%(targets))
227 raise TypeError("%r not valid str target, must be 'all'"%(targets))
227 elif isinstance(targets, int):
228 elif isinstance(targets, int):
228 targets = [targets]
229 targets = [targets]
229 return [self._engines[t] for t in targets], list(targets)
230 return [self._engines[t] for t in targets], list(targets)
230
231
231 def _connect(self):
232 def _connect(self):
232 """setup all our socket connections to the controller. This is called from
233 """setup all our socket connections to the controller. This is called from
233 __init__."""
234 __init__."""
234 if self._connected:
235 if self._connected:
235 return
236 return
236 self._connected=True
237 self._connected=True
237 self.session.send(self._registration_socket, 'connection_request')
238 self.session.send(self._registration_socket, 'connection_request')
238 idents,msg = self.session.recv(self._registration_socket,mode=0)
239 idents,msg = self.session.recv(self._registration_socket,mode=0)
239 if self.debug:
240 if self.debug:
240 pprint(msg)
241 pprint(msg)
241 msg = ss.Message(msg)
242 msg = ss.Message(msg)
242 content = msg.content
243 content = msg.content
243 if content.status == 'ok':
244 if content.status == 'ok':
244 if content.queue:
245 if content.queue:
245 self._mux_socket = self.context.socket(zmq.PAIR)
246 self._mux_socket = self.context.socket(zmq.PAIR)
246 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
247 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
247 self._mux_socket.connect(content.queue)
248 self._mux_socket.connect(content.queue)
248 if content.task:
249 if content.task:
249 self._task_socket = self.context.socket(zmq.PAIR)
250 self._task_socket = self.context.socket(zmq.PAIR)
250 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
251 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
251 self._task_socket.connect(content.task)
252 self._task_socket.connect(content.task)
252 if content.notification:
253 if content.notification:
253 self._notification_socket = self.context.socket(zmq.SUB)
254 self._notification_socket = self.context.socket(zmq.SUB)
254 self._notification_socket.connect(content.notification)
255 self._notification_socket.connect(content.notification)
255 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
256 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
256 if content.query:
257 if content.query:
257 self._query_socket = self.context.socket(zmq.PAIR)
258 self._query_socket = self.context.socket(zmq.PAIR)
258 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
259 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
259 self._query_socket.connect(content.query)
260 self._query_socket.connect(content.query)
260 if content.control:
261 if content.control:
261 self._control_socket = self.context.socket(zmq.PAIR)
262 self._control_socket = self.context.socket(zmq.PAIR)
262 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
263 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
263 self._control_socket.connect(content.control)
264 self._control_socket.connect(content.control)
264 self._update_engines(dict(content.engines))
265 self._update_engines(dict(content.engines))
265
266
266 else:
267 else:
267 self._connected = False
268 self._connected = False
268 raise Exception("Failed to connect!")
269 raise Exception("Failed to connect!")
269
270
270 #--------------------------------------------------------------------------
271 #--------------------------------------------------------------------------
271 # handlers and callbacks for incoming messages
272 # handlers and callbacks for incoming messages
272 #--------------------------------------------------------------------------
273 #--------------------------------------------------------------------------
273
274
274 def _register_engine(self, msg):
275 def _register_engine(self, msg):
275 """Register a new engine, and update our connection info."""
276 """Register a new engine, and update our connection info."""
276 content = msg['content']
277 content = msg['content']
277 eid = content['id']
278 eid = content['id']
278 d = {eid : content['queue']}
279 d = {eid : content['queue']}
279 self._update_engines(d)
280 self._update_engines(d)
280 self._ids.add(int(eid))
281 self._ids.add(int(eid))
281
282
282 def _unregister_engine(self, msg):
283 def _unregister_engine(self, msg):
283 """Unregister an engine that has died."""
284 """Unregister an engine that has died."""
284 content = msg['content']
285 content = msg['content']
285 eid = int(content['id'])
286 eid = int(content['id'])
286 if eid in self._ids:
287 if eid in self._ids:
287 self._ids.remove(eid)
288 self._ids.remove(eid)
288 self._engines.pop(eid)
289 self._engines.pop(eid)
289
290
290 def _handle_execute_reply(self, msg):
291 def _handle_execute_reply(self, msg):
291 """Save the reply to an execute_request into our results."""
292 """Save the reply to an execute_request into our results."""
292 parent = msg['parent_header']
293 parent = msg['parent_header']
293 msg_id = parent['msg_id']
294 msg_id = parent['msg_id']
294 if msg_id not in self.outstanding:
295 if msg_id not in self.outstanding:
295 print("got unknown result: %s"%msg_id)
296 print("got unknown result: %s"%msg_id)
296 else:
297 else:
297 self.outstanding.remove(msg_id)
298 self.outstanding.remove(msg_id)
298 self.results[msg_id] = ss.unwrap_exception(msg['content'])
299 self.results[msg_id] = ss.unwrap_exception(msg['content'])
299
300
300 def _handle_apply_reply(self, msg):
301 def _handle_apply_reply(self, msg):
301 """Save the reply to an apply_request into our results."""
302 """Save the reply to an apply_request into our results."""
302 parent = msg['parent_header']
303 parent = msg['parent_header']
303 msg_id = parent['msg_id']
304 msg_id = parent['msg_id']
304 if msg_id not in self.outstanding:
305 if msg_id not in self.outstanding:
305 print ("got unknown result: %s"%msg_id)
306 print ("got unknown result: %s"%msg_id)
306 else:
307 else:
307 self.outstanding.remove(msg_id)
308 self.outstanding.remove(msg_id)
308 content = msg['content']
309 content = msg['content']
309 if content['status'] == 'ok':
310 if content['status'] == 'ok':
310 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
311 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
311 elif content['status'] == 'aborted':
312 elif content['status'] == 'aborted':
312 self.results[msg_id] = AbortedTask(msg_id)
313 self.results[msg_id] = AbortedTask(msg_id)
313 elif content['status'] == 'resubmitted':
314 elif content['status'] == 'resubmitted':
314 # TODO: handle resubmission
315 # TODO: handle resubmission
315 pass
316 pass
316 else:
317 else:
317 self.results[msg_id] = ss.unwrap_exception(content)
318 self.results[msg_id] = ss.unwrap_exception(content)
318
319
319 def _flush_notifications(self):
320 def _flush_notifications(self):
320 """Flush notifications of engine registrations waiting
321 """Flush notifications of engine registrations waiting
321 in ZMQ queue."""
322 in ZMQ queue."""
322 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
323 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
323 while msg is not None:
324 while msg is not None:
324 if self.debug:
325 if self.debug:
325 pprint(msg)
326 pprint(msg)
326 msg = msg[-1]
327 msg = msg[-1]
327 msg_type = msg['msg_type']
328 msg_type = msg['msg_type']
328 handler = self._notification_handlers.get(msg_type, None)
329 handler = self._notification_handlers.get(msg_type, None)
329 if handler is None:
330 if handler is None:
330 raise Exception("Unhandled message type: %s"%msg.msg_type)
331 raise Exception("Unhandled message type: %s"%msg.msg_type)
331 else:
332 else:
332 handler(msg)
333 handler(msg)
333 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
334 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
334
335
335 def _flush_results(self, sock):
336 def _flush_results(self, sock):
336 """Flush task or queue results waiting in ZMQ queue."""
337 """Flush task or queue results waiting in ZMQ queue."""
337 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
338 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
338 while msg is not None:
339 while msg is not None:
339 if self.debug:
340 if self.debug:
340 pprint(msg)
341 pprint(msg)
341 msg = msg[-1]
342 msg = msg[-1]
342 msg_type = msg['msg_type']
343 msg_type = msg['msg_type']
343 handler = self._queue_handlers.get(msg_type, None)
344 handler = self._queue_handlers.get(msg_type, None)
344 if handler is None:
345 if handler is None:
345 raise Exception("Unhandled message type: %s"%msg.msg_type)
346 raise Exception("Unhandled message type: %s"%msg.msg_type)
346 else:
347 else:
347 handler(msg)
348 handler(msg)
348 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
349 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
349
350
350 def _flush_control(self, sock):
351 def _flush_control(self, sock):
351 """Flush replies from the control channel waiting
352 """Flush replies from the control channel waiting
352 in the ZMQ queue.
353 in the ZMQ queue.
353
354
354 Currently: ignore them."""
355 Currently: ignore them."""
355 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
356 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
356 while msg is not None:
357 while msg is not None:
357 if self.debug:
358 if self.debug:
358 pprint(msg)
359 pprint(msg)
359 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
360 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
360
361
361 #--------------------------------------------------------------------------
362 #--------------------------------------------------------------------------
362 # getitem
363 # getitem
363 #--------------------------------------------------------------------------
364 #--------------------------------------------------------------------------
364
365
365 def __getitem__(self, key):
366 def __getitem__(self, key):
366 """Dict access returns DirectView multiplexer objects or,
367 """Dict access returns DirectView multiplexer objects or,
367 if key is None, a LoadBalancedView."""
368 if key is None, a LoadBalancedView."""
368 if key is None:
369 if key is None:
369 return LoadBalancedView(self)
370 return LoadBalancedView(self)
370 if isinstance(key, int):
371 if isinstance(key, int):
371 if key not in self.ids:
372 if key not in self.ids:
372 raise IndexError("No such engine: %i"%key)
373 raise IndexError("No such engine: %i"%key)
373 return DirectView(self, key)
374 return DirectView(self, key)
374
375
375 if isinstance(key, slice):
376 if isinstance(key, slice):
376 indices = range(len(self.ids))[key]
377 indices = range(len(self.ids))[key]
377 ids = sorted(self._ids)
378 ids = sorted(self._ids)
378 key = [ ids[i] for i in indices ]
379 key = [ ids[i] for i in indices ]
379 # newkeys = sorted(self._ids)[thekeys[k]]
380 # newkeys = sorted(self._ids)[thekeys[k]]
380
381
381 if isinstance(key, (tuple, list, xrange)):
382 if isinstance(key, (tuple, list, xrange)):
382 _,targets = self._build_targets(list(key))
383 _,targets = self._build_targets(list(key))
383 return DirectView(self, targets)
384 return DirectView(self, targets)
384 else:
385 else:
385 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
386 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
386
387
387 #--------------------------------------------------------------------------
388 #--------------------------------------------------------------------------
388 # Begin public methods
389 # Begin public methods
389 #--------------------------------------------------------------------------
390 #--------------------------------------------------------------------------
390
391
391 def spin(self):
392 def spin(self):
392 """Flush any registration notifications and execution results
393 """Flush any registration notifications and execution results
393 waiting in the ZMQ queue.
394 waiting in the ZMQ queue.
394 """
395 """
395 if self._notification_socket:
396 if self._notification_socket:
396 self._flush_notifications()
397 self._flush_notifications()
397 if self._mux_socket:
398 if self._mux_socket:
398 self._flush_results(self._mux_socket)
399 self._flush_results(self._mux_socket)
399 if self._task_socket:
400 if self._task_socket:
400 self._flush_results(self._task_socket)
401 self._flush_results(self._task_socket)
401 if self._control_socket:
402 if self._control_socket:
402 self._flush_control(self._control_socket)
403 self._flush_control(self._control_socket)
403
404
404 def barrier(self, msg_ids=None, timeout=-1):
405 def barrier(self, msg_ids=None, timeout=-1):
405 """waits on one or more `msg_ids`, for up to `timeout` seconds.
406 """waits on one or more `msg_ids`, for up to `timeout` seconds.
406
407
407 Parameters
408 Parameters
408 ----------
409 ----------
409 msg_ids : int, str, or list of ints and/or strs
410 msg_ids : int, str, or list of ints and/or strs
410 ints are indices to self.history
411 ints are indices to self.history
411 strs are msg_ids
412 strs are msg_ids
412 default: wait on all outstanding messages
413 default: wait on all outstanding messages
413 timeout : float
414 timeout : float
414 a time in seconds, after which to give up.
415 a time in seconds, after which to give up.
415 default is -1, which means no timeout
416 default is -1, which means no timeout
416
417
417 Returns
418 Returns
418 -------
419 -------
419 True : when all msg_ids are done
420 True : when all msg_ids are done
420 False : timeout reached, some msg_ids still outstanding
421 False : timeout reached, some msg_ids still outstanding
421 """
422 """
422 tic = time.time()
423 tic = time.time()
423 if msg_ids is None:
424 if msg_ids is None:
424 theids = self.outstanding
425 theids = self.outstanding
425 else:
426 else:
426 if isinstance(msg_ids, (int, str)):
427 if isinstance(msg_ids, (int, str)):
427 msg_ids = [msg_ids]
428 msg_ids = [msg_ids]
428 theids = set()
429 theids = set()
429 for msg_id in msg_ids:
430 for msg_id in msg_ids:
430 if isinstance(msg_id, int):
431 if isinstance(msg_id, int):
431 msg_id = self.history[msg_id]
432 msg_id = self.history[msg_id]
432 theids.add(msg_id)
433 theids.add(msg_id)
433 self.spin()
434 self.spin()
434 while theids.intersection(self.outstanding):
435 while theids.intersection(self.outstanding):
435 if timeout >= 0 and ( time.time()-tic ) > timeout:
436 if timeout >= 0 and ( time.time()-tic ) > timeout:
436 break
437 break
437 time.sleep(1e-3)
438 time.sleep(1e-3)
438 self.spin()
439 self.spin()
439 return len(theids.intersection(self.outstanding)) == 0
440 return len(theids.intersection(self.outstanding)) == 0
440
441
441 #--------------------------------------------------------------------------
442 #--------------------------------------------------------------------------
442 # Control methods
443 # Control methods
443 #--------------------------------------------------------------------------
444 #--------------------------------------------------------------------------
444
445
445 @spinfirst
446 @spinfirst
446 @defaultblock
447 @defaultblock
447 def clear(self, targets=None, block=None):
448 def clear(self, targets=None, block=None):
448 """Clear the namespace in target(s)."""
449 """Clear the namespace in target(s)."""
449 targets = self._build_targets(targets)[0]
450 targets = self._build_targets(targets)[0]
450 for t in targets:
451 for t in targets:
451 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
452 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
452 error = False
453 error = False
453 if self.block:
454 if self.block:
454 for i in range(len(targets)):
455 for i in range(len(targets)):
455 idents,msg = self.session.recv(self._control_socket,0)
456 idents,msg = self.session.recv(self._control_socket,0)
456 if self.debug:
457 if self.debug:
457 pprint(msg)
458 pprint(msg)
458 if msg['content']['status'] != 'ok':
459 if msg['content']['status'] != 'ok':
459 error = ss.unwrap_exception(msg['content'])
460 error = ss.unwrap_exception(msg['content'])
460 if error:
461 if error:
461 return error
462 return error
462
463
463
464
464 @spinfirst
465 @spinfirst
465 @defaultblock
466 @defaultblock
466 def abort(self, msg_ids = None, targets=None, block=None):
467 def abort(self, msg_ids = None, targets=None, block=None):
467 """Abort the execution queues of target(s)."""
468 """Abort the execution queues of target(s)."""
468 targets = self._build_targets(targets)[0]
469 targets = self._build_targets(targets)[0]
469 if isinstance(msg_ids, basestring):
470 if isinstance(msg_ids, basestring):
470 msg_ids = [msg_ids]
471 msg_ids = [msg_ids]
471 content = dict(msg_ids=msg_ids)
472 content = dict(msg_ids=msg_ids)
472 for t in targets:
473 for t in targets:
473 self.session.send(self._control_socket, 'abort_request',
474 self.session.send(self._control_socket, 'abort_request',
474 content=content, ident=t)
475 content=content, ident=t)
475 error = False
476 error = False
476 if self.block:
477 if self.block:
477 for i in range(len(targets)):
478 for i in range(len(targets)):
478 idents,msg = self.session.recv(self._control_socket,0)
479 idents,msg = self.session.recv(self._control_socket,0)
479 if self.debug:
480 if self.debug:
480 pprint(msg)
481 pprint(msg)
481 if msg['content']['status'] != 'ok':
482 if msg['content']['status'] != 'ok':
482 error = ss.unwrap_exception(msg['content'])
483 error = ss.unwrap_exception(msg['content'])
483 if error:
484 if error:
484 return error
485 return error
485
486
486 @spinfirst
487 @spinfirst
487 @defaultblock
488 @defaultblock
488 def kill(self, targets=None, block=None):
489 def kill(self, targets=None, block=None):
489 """Terminates one or more engine processes."""
490 """Terminates one or more engine processes."""
490 targets = self._build_targets(targets)[0]
491 targets = self._build_targets(targets)[0]
491 for t in targets:
492 for t in targets:
492 self.session.send(self._control_socket, 'kill_request', content={},ident=t)
493 self.session.send(self._control_socket, 'kill_request', content={},ident=t)
493 error = False
494 error = False
494 if self.block:
495 if self.block:
495 for i in range(len(targets)):
496 for i in range(len(targets)):
496 idents,msg = self.session.recv(self._control_socket,0)
497 idents,msg = self.session.recv(self._control_socket,0)
497 if self.debug:
498 if self.debug:
498 pprint(msg)
499 pprint(msg)
499 if msg['content']['status'] != 'ok':
500 if msg['content']['status'] != 'ok':
500 error = ss.unwrap_exception(msg['content'])
501 error = ss.unwrap_exception(msg['content'])
501 if error:
502 if error:
502 return error
503 return error
503
504
504 #--------------------------------------------------------------------------
505 #--------------------------------------------------------------------------
505 # Execution methods
506 # Execution methods
506 #--------------------------------------------------------------------------
507 #--------------------------------------------------------------------------
507
508
508 @defaultblock
509 @defaultblock
509 def execute(self, code, targets='all', block=None):
510 def execute(self, code, targets='all', block=None):
510 """Executes `code` on `targets` in blocking or nonblocking manner.
511 """Executes `code` on `targets` in blocking or nonblocking manner.
511
512
512 Parameters
513 Parameters
513 ----------
514 ----------
514 code : str
515 code : str
515 the code string to be executed
516 the code string to be executed
516 targets : int/str/list of ints/strs
517 targets : int/str/list of ints/strs
517 the engines on which to execute
518 the engines on which to execute
518 default : all
519 default : all
519 block : bool
520 block : bool
520 whether or not to wait until done to return
521 whether or not to wait until done to return
521 default: self.block
522 default: self.block
522 """
523 """
523 # block = self.block if block is None else block
524 # block = self.block if block is None else block
524 # saveblock = self.block
525 # saveblock = self.block
525 # self.block = block
526 # self.block = block
526 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
527 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
527 # self.block = saveblock
528 # self.block = saveblock
528 return result
529 return result
529
530
530 def run(self, code, block=None):
531 def run(self, code, block=None):
531 """Runs `code` on an engine.
532 """Runs `code` on an engine.
532
533
533 Calls to this are load-balanced.
534 Calls to this are load-balanced.
534
535
535 Parameters
536 Parameters
536 ----------
537 ----------
537 code : str
538 code : str
538 the code string to be executed
539 the code string to be executed
539 block : bool
540 block : bool
540 whether or not to wait until done
541 whether or not to wait until done
541
542
542 """
543 """
543 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
544 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
544 return result
545 return result
545
546
546 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
547 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
547 after=None, follow=None):
548 after=None, follow=None):
548 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
549 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
549
550
550 This is the central execution command for the client.
551 This is the central execution command for the client.
551
552
552 Parameters
553 Parameters
553 ----------
554 ----------
554
555
555 f : function
556 f : function
556 The fuction to be called remotely
557 The fuction to be called remotely
557 args : tuple/list
558 args : tuple/list
558 The positional arguments passed to `f`
559 The positional arguments passed to `f`
559 kwargs : dict
560 kwargs : dict
560 The keyword arguments passed to `f`
561 The keyword arguments passed to `f`
561 bound : bool (default: True)
562 bound : bool (default: True)
562 Whether to execute in the Engine(s) namespace, or in a clean
563 Whether to execute in the Engine(s) namespace, or in a clean
563 namespace not affecting the engine.
564 namespace not affecting the engine.
564 block : bool (default: self.block)
565 block : bool (default: self.block)
565 Whether to wait for the result, or return immediately.
566 Whether to wait for the result, or return immediately.
566 False:
567 False:
567 returns msg_id(s)
568 returns msg_id(s)
568 if multiple targets:
569 if multiple targets:
569 list of ids
570 list of ids
570 True:
571 True:
571 returns actual result(s) of f(*args, **kwargs)
572 returns actual result(s) of f(*args, **kwargs)
572 if multiple targets:
573 if multiple targets:
573 dict of results, by engine ID
574 dict of results, by engine ID
574 targets : int,list of ints, 'all', None
575 targets : int,list of ints, 'all', None
575 Specify the destination of the job.
576 Specify the destination of the job.
576 if None:
577 if None:
577 Submit via Task queue for load-balancing.
578 Submit via Task queue for load-balancing.
578 if 'all':
579 if 'all':
579 Run on all active engines
580 Run on all active engines
580 if list:
581 if list:
581 Run on each specified engine
582 Run on each specified engine
582 if int:
583 if int:
583 Run on single engine
584 Run on single engine
584
585
585 after : Dependency or collection of msg_ids
586 after : Dependency or collection of msg_ids
586 Only for load-balanced execution (targets=None)
587 Only for load-balanced execution (targets=None)
587 Specify a list of msg_ids as a time-based dependency.
588 Specify a list of msg_ids as a time-based dependency.
588 This job will only be run *after* the dependencies
589 This job will only be run *after* the dependencies
589 have been met.
590 have been met.
590
591
591 follow : Dependency or collection of msg_ids
592 follow : Dependency or collection of msg_ids
592 Only for load-balanced execution (targets=None)
593 Only for load-balanced execution (targets=None)
593 Specify a list of msg_ids as a location-based dependency.
594 Specify a list of msg_ids as a location-based dependency.
594 This job will only be run on an engine where this dependency
595 This job will only be run on an engine where this dependency
595 is met.
596 is met.
596
597
597 Returns
598 Returns
598 -------
599 -------
599 if block is False:
600 if block is False:
600 if single target:
601 if single target:
601 return msg_id
602 return msg_id
602 else:
603 else:
603 return list of msg_ids
604 return list of msg_ids
604 ? (should this be dict like block=True) ?
605 ? (should this be dict like block=True) ?
605 else:
606 else:
606 if single target:
607 if single target:
607 return result of f(*args, **kwargs)
608 return result of f(*args, **kwargs)
608 else:
609 else:
609 return dict of results, keyed by engine
610 return dict of results, keyed by engine
610 """
611 """
611
612
612 # defaults:
613 # defaults:
613 block = block if block is not None else self.block
614 block = block if block is not None else self.block
614 args = args if args is not None else []
615 args = args if args is not None else []
615 kwargs = kwargs if kwargs is not None else {}
616 kwargs = kwargs if kwargs is not None else {}
616
617
617 # enforce types of f,args,kwrags
618 # enforce types of f,args,kwrags
618 if not callable(f):
619 if not callable(f):
619 raise TypeError("f must be callable, not %s"%type(f))
620 raise TypeError("f must be callable, not %s"%type(f))
620 if not isinstance(args, (tuple, list)):
621 if not isinstance(args, (tuple, list)):
621 raise TypeError("args must be tuple or list, not %s"%type(args))
622 raise TypeError("args must be tuple or list, not %s"%type(args))
622 if not isinstance(kwargs, dict):
623 if not isinstance(kwargs, dict):
623 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
624 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
624
625
625 options = dict(bound=bound, block=block, after=after, follow=follow)
626 options = dict(bound=bound, block=block, after=after, follow=follow)
626
627
627 if targets is None:
628 if targets is None:
628 return self._apply_balanced(f, args, kwargs, **options)
629 return self._apply_balanced(f, args, kwargs, **options)
629 else:
630 else:
630 return self._apply_direct(f, args, kwargs, targets=targets, **options)
631 return self._apply_direct(f, args, kwargs, targets=targets, **options)
631
632
632 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
633 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
633 after=None, follow=None):
634 after=None, follow=None):
634 """The underlying method for applying functions in a load balanced
635 """The underlying method for applying functions in a load balanced
635 manner, via the task queue."""
636 manner, via the task queue."""
636 if isinstance(after, Dependency):
637 if isinstance(after, Dependency):
637 after = after.as_dict()
638 after = after.as_dict()
638 elif after is None:
639 elif after is None:
639 after = []
640 after = []
640 if isinstance(follow, Dependency):
641 if isinstance(follow, Dependency):
641 follow = follow.as_dict()
642 follow = follow.as_dict()
642 elif follow is None:
643 elif follow is None:
643 follow = []
644 follow = []
644 subheader = dict(after=after, follow=follow)
645 subheader = dict(after=after, follow=follow)
645
646
646 bufs = ss.pack_apply_message(f,args,kwargs)
647 bufs = ss.pack_apply_message(f,args,kwargs)
647 content = dict(bound=bound)
648 content = dict(bound=bound)
648 msg = self.session.send(self._task_socket, "apply_request",
649 msg = self.session.send(self._task_socket, "apply_request",
649 content=content, buffers=bufs, subheader=subheader)
650 content=content, buffers=bufs, subheader=subheader)
650 msg_id = msg['msg_id']
651 msg_id = msg['msg_id']
651 self.outstanding.add(msg_id)
652 self.outstanding.add(msg_id)
652 self.history.append(msg_id)
653 self.history.append(msg_id)
653 if block:
654 if block:
654 self.barrier(msg_id)
655 self.barrier(msg_id)
655 return self.results[msg_id]
656 return self.results[msg_id]
656 else:
657 else:
657 return msg_id
658 return msg_id
658
659
659 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
660 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
660 after=None, follow=None):
661 after=None, follow=None):
661 """Then underlying method for applying functions to specific engines
662 """Then underlying method for applying functions to specific engines
662 via the MUX queue."""
663 via the MUX queue."""
663
664
664 queues,targets = self._build_targets(targets)
665 queues,targets = self._build_targets(targets)
665 bufs = ss.pack_apply_message(f,args,kwargs)
666 bufs = ss.pack_apply_message(f,args,kwargs)
666 if isinstance(after, Dependency):
667 if isinstance(after, Dependency):
667 after = after.as_dict()
668 after = after.as_dict()
668 elif after is None:
669 elif after is None:
669 after = []
670 after = []
670 if isinstance(follow, Dependency):
671 if isinstance(follow, Dependency):
671 follow = follow.as_dict()
672 follow = follow.as_dict()
672 elif follow is None:
673 elif follow is None:
673 follow = []
674 follow = []
674 subheader = dict(after=after, follow=follow)
675 subheader = dict(after=after, follow=follow)
675 content = dict(bound=bound)
676 content = dict(bound=bound)
676 msg_ids = []
677 msg_ids = []
677 for queue in queues:
678 for queue in queues:
678 msg = self.session.send(self._mux_socket, "apply_request",
679 msg = self.session.send(self._mux_socket, "apply_request",
679 content=content, buffers=bufs,ident=queue, subheader=subheader)
680 content=content, buffers=bufs,ident=queue, subheader=subheader)
680 msg_id = msg['msg_id']
681 msg_id = msg['msg_id']
681 self.outstanding.add(msg_id)
682 self.outstanding.add(msg_id)
682 self.history.append(msg_id)
683 self.history.append(msg_id)
683 msg_ids.append(msg_id)
684 msg_ids.append(msg_id)
684 if block:
685 if block:
685 self.barrier(msg_ids)
686 self.barrier(msg_ids)
686 else:
687 else:
687 if len(msg_ids) == 1:
688 if len(msg_ids) == 1:
688 return msg_ids[0]
689 return msg_ids[0]
689 else:
690 else:
690 return msg_ids
691 return msg_ids
691 if len(msg_ids) == 1:
692 if len(msg_ids) == 1:
692 return self.results[msg_ids[0]]
693 return self.results[msg_ids[0]]
693 else:
694 else:
694 result = {}
695 result = {}
695 for target,mid in zip(targets, msg_ids):
696 for target,mid in zip(targets, msg_ids):
696 result[target] = self.results[mid]
697 result[target] = self.results[mid]
697 return result
698 return result
698
699
699 #--------------------------------------------------------------------------
700 #--------------------------------------------------------------------------
700 # Data movement
701 # Data movement
701 #--------------------------------------------------------------------------
702 #--------------------------------------------------------------------------
702
703
703 @defaultblock
704 @defaultblock
704 def push(self, ns, targets=None, block=None):
705 def push(self, ns, targets=None, block=None):
705 """Push the contents of `ns` into the namespace on `target`"""
706 """Push the contents of `ns` into the namespace on `target`"""
706 if not isinstance(ns, dict):
707 if not isinstance(ns, dict):
707 raise TypeError("Must be a dict, not %s"%type(ns))
708 raise TypeError("Must be a dict, not %s"%type(ns))
708 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
709 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
709 return result
710 return result
710
711
711 @defaultblock
712 @defaultblock
712 def pull(self, keys, targets=None, block=True):
713 def pull(self, keys, targets=None, block=True):
713 """Pull objects from `target`'s namespace by `keys`"""
714 """Pull objects from `target`'s namespace by `keys`"""
714 if isinstance(keys, str):
715 if isinstance(keys, str):
715 pass
716 pass
716 elif isistance(keys, (list,tuple,set)):
717 elif isistance(keys, (list,tuple,set)):
717 for key in keys:
718 for key in keys:
718 if not isinstance(key, str):
719 if not isinstance(key, str):
719 raise TypeError
720 raise TypeError
720 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
721 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
721 return result
722 return result
722
723
723 #--------------------------------------------------------------------------
724 #--------------------------------------------------------------------------
724 # Query methods
725 # Query methods
725 #--------------------------------------------------------------------------
726 #--------------------------------------------------------------------------
726
727
727 @spinfirst
728 @spinfirst
728 def get_results(self, msg_ids, status_only=False):
729 def get_results(self, msg_ids, status_only=False):
729 """Returns the result of the execute or task request with `msg_ids`.
730 """Returns the result of the execute or task request with `msg_ids`.
730
731
731 Parameters
732 Parameters
732 ----------
733 ----------
733 msg_ids : list of ints or msg_ids
734 msg_ids : list of ints or msg_ids
734 if int:
735 if int:
735 Passed as index to self.history for convenience.
736 Passed as index to self.history for convenience.
736 status_only : bool (default: False)
737 status_only : bool (default: False)
737 if False:
738 if False:
738 return the actual results
739 return the actual results
739 """
740 """
740 if not isinstance(msg_ids, (list,tuple)):
741 if not isinstance(msg_ids, (list,tuple)):
741 msg_ids = [msg_ids]
742 msg_ids = [msg_ids]
742 theids = []
743 theids = []
743 for msg_id in msg_ids:
744 for msg_id in msg_ids:
744 if isinstance(msg_id, int):
745 if isinstance(msg_id, int):
745 msg_id = self.history[msg_id]
746 msg_id = self.history[msg_id]
746 if not isinstance(msg_id, str):
747 if not isinstance(msg_id, str):
747 raise TypeError("msg_ids must be str, not %r"%msg_id)
748 raise TypeError("msg_ids must be str, not %r"%msg_id)
748 theids.append(msg_id)
749 theids.append(msg_id)
749
750
750 completed = []
751 completed = []
751 local_results = {}
752 local_results = {}
752 for msg_id in list(theids):
753 for msg_id in list(theids):
753 if msg_id in self.results:
754 if msg_id in self.results:
754 completed.append(msg_id)
755 completed.append(msg_id)
755 local_results[msg_id] = self.results[msg_id]
756 local_results[msg_id] = self.results[msg_id]
756 theids.remove(msg_id)
757 theids.remove(msg_id)
757
758
758 if theids: # some not locally cached
759 if theids: # some not locally cached
759 content = dict(msg_ids=theids, status_only=status_only)
760 content = dict(msg_ids=theids, status_only=status_only)
760 msg = self.session.send(self._query_socket, "result_request", content=content)
761 msg = self.session.send(self._query_socket, "result_request", content=content)
761 zmq.select([self._query_socket], [], [])
762 zmq.select([self._query_socket], [], [])
762 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
763 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
763 if self.debug:
764 if self.debug:
764 pprint(msg)
765 pprint(msg)
765 content = msg['content']
766 content = msg['content']
766 if content['status'] != 'ok':
767 if content['status'] != 'ok':
767 raise ss.unwrap_exception(content)
768 raise ss.unwrap_exception(content)
768 else:
769 else:
769 content = dict(completed=[],pending=[])
770 content = dict(completed=[],pending=[])
770 if not status_only:
771 if not status_only:
771 # load cached results into result:
772 # load cached results into result:
772 content['completed'].extend(completed)
773 content['completed'].extend(completed)
773 content.update(local_results)
774 content.update(local_results)
774 # update cache with results:
775 # update cache with results:
775 for msg_id in msg_ids:
776 for msg_id in msg_ids:
776 if msg_id in content['completed']:
777 if msg_id in content['completed']:
777 self.results[msg_id] = content[msg_id]
778 self.results[msg_id] = content[msg_id]
778 return content
779 return content
779
780
780 @spinfirst
781 @spinfirst
781 def queue_status(self, targets=None, verbose=False):
782 def queue_status(self, targets=None, verbose=False):
782 """Fetch the status of engine queues.
783 """Fetch the status of engine queues.
783
784
784 Parameters
785 Parameters
785 ----------
786 ----------
786 targets : int/str/list of ints/strs
787 targets : int/str/list of ints/strs
787 the engines on which to execute
788 the engines on which to execute
788 default : all
789 default : all
789 verbose : bool
790 verbose : bool
790 whether to return lengths only, or lists of ids for each element
791 whether to return lengths only, or lists of ids for each element
791 """
792 """
792 targets = self._build_targets(targets)[1]
793 targets = self._build_targets(targets)[1]
793 content = dict(targets=targets, verbose=verbose)
794 content = dict(targets=targets, verbose=verbose)
794 self.session.send(self._query_socket, "queue_request", content=content)
795 self.session.send(self._query_socket, "queue_request", content=content)
795 idents,msg = self.session.recv(self._query_socket, 0)
796 idents,msg = self.session.recv(self._query_socket, 0)
796 if self.debug:
797 if self.debug:
797 pprint(msg)
798 pprint(msg)
798 content = msg['content']
799 content = msg['content']
799 status = content.pop('status')
800 status = content.pop('status')
800 if status != 'ok':
801 if status != 'ok':
801 raise ss.unwrap_exception(content)
802 raise ss.unwrap_exception(content)
802 return content
803 return content
803
804
804 @spinfirst
805 @spinfirst
805 def purge_results(self, msg_ids=[], targets=[]):
806 def purge_results(self, msg_ids=[], targets=[]):
806 """Tell the controller to forget results.
807 """Tell the controller to forget results.
807
808
808 Individual results can be purged by msg_id, or the entire
809 Individual results can be purged by msg_id, or the entire
809 history of specific targets can
810 history of specific targets can
810
811
811 Parameters
812 Parameters
812 ----------
813 ----------
813 targets : int/str/list of ints/strs
814 targets : int/str/list of ints/strs
814 the targets
815 the targets
815 default : None
816 default : None
816 """
817 """
817 if not targets and not msg_ids:
818 if not targets and not msg_ids:
818 raise ValueError
819 raise ValueError
819 if targets:
820 if targets:
820 targets = self._build_targets(targets)[1]
821 targets = self._build_targets(targets)[1]
821 content = dict(targets=targets, msg_ids=msg_ids)
822 content = dict(targets=targets, msg_ids=msg_ids)
822 self.session.send(self._query_socket, "purge_request", content=content)
823 self.session.send(self._query_socket, "purge_request", content=content)
823 idents, msg = self.session.recv(self._query_socket, 0)
824 idents, msg = self.session.recv(self._query_socket, 0)
824 if self.debug:
825 if self.debug:
825 pprint(msg)
826 pprint(msg)
826 content = msg['content']
827 content = msg['content']
827 if content['status'] != 'ok':
828 if content['status'] != 'ok':
828 raise ss.unwrap_exception(content)
829 raise ss.unwrap_exception(content)
829
830
830 class AsynClient(Client):
831 class AsynClient(Client):
831 """An Asynchronous client, using the Tornado Event Loop.
832 """An Asynchronous client, using the Tornado Event Loop.
832 !!!unfinished!!!"""
833 !!!unfinished!!!"""
833 io_loop = None
834 io_loop = None
834 _queue_stream = None
835 _queue_stream = None
835 _notifier_stream = None
836 _notifier_stream = None
836 _task_stream = None
837 _task_stream = None
837 _control_stream = None
838 _control_stream = None
838
839
839 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
840 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
840 Client.__init__(self, addr, context, username, debug)
841 Client.__init__(self, addr, context, username, debug)
841 if io_loop is None:
842 if io_loop is None:
842 io_loop = ioloop.IOLoop.instance()
843 io_loop = ioloop.IOLoop.instance()
843 self.io_loop = io_loop
844 self.io_loop = io_loop
844
845
845 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
846 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
846 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
847 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
847 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
848 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
848 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
849 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
849
850
850 def spin(self):
851 def spin(self):
851 for stream in (self.queue_stream, self.notifier_stream,
852 for stream in (self.queue_stream, self.notifier_stream,
852 self.task_stream, self.control_stream):
853 self.task_stream, self.control_stream):
853 stream.flush()
854 stream.flush()
854 No newline at end of file
855
General Comments 0
You need to be logged in to leave comments. Login now