##// END OF EJS Templates
control channel progress
MinRK -
Show More
@@ -4,6 +4,8 b''
4 4 import time
5 5 import threading
6 6
7 from pprint import pprint
8
7 9 from functools import wraps
8 10
9 11 from IPython.external.decorator import decorator
@@ -46,7 +48,9 b' def defaultblock(f, self, *args, **kwargs):'
46 48 self.block = saveblock
47 49 return ret
48 50
49
51 class AbortedTask(object):
52 def __init__(self, msg_id):
53 self.msg_id = msg_id
50 54 # @decorator
51 55 # def checktargets(f):
52 56 # @wraps(f)
@@ -101,7 +105,11 b' class Client(object):'
101 105 execution methods: apply/apply_bound/apply_to
102 106 legacy: execute, run
103 107
104 control methods: queue_status, get_result
108 query methods: queue_status, get_result
109
110 control methods: abort, kill
111
112
105 113
106 114 """
107 115
@@ -109,7 +117,8 b' class Client(object):'
109 117 _connected=False
110 118 _engines=None
111 119 registration_socket=None
112 controller_socket=None
120 query_socket=None
121 control_socket=None
113 122 notification_socket=None
114 123 queue_socket=None
115 124 task_socket=None
@@ -117,8 +126,9 b' class Client(object):'
117 126 outstanding=None
118 127 results = None
119 128 history = None
129 debug = False
120 130
121 def __init__(self, addr, context=None, username=None):
131 def __init__(self, addr, context=None, username=None, debug=False):
122 132 if context is None:
123 133 context = zmq.Context()
124 134 self.context = context
@@ -135,6 +145,8 b' class Client(object):'
135 145 self.outstanding=set()
136 146 self.results = {}
137 147 self.history = []
148 self.debug = debug
149 self.session.debug = debug
138 150 self._connect()
139 151
140 152 self._notification_handlers = {'registration_notification' : self._register_engine,
@@ -152,7 +164,7 b' class Client(object):'
152 164 def _update_engines(self, engines):
153 165 for k,v in engines.iteritems():
154 166 eid = int(k)
155 self._engines[eid] = v
167 self._engines[eid] = bytes(v) # force not unicode
156 168 self._ids.add(eid)
157 169
158 170 def _build_targets(self, targets):
@@ -173,7 +185,9 b' class Client(object):'
173 185 return
174 186 self._connected=True
175 187 self.session.send(self.registration_socket, 'connection_request')
176 msg = self.session.recv(self.registration_socket,mode=0)[-1]
188 idents,msg = self.session.recv(self.registration_socket,mode=0)
189 if self.debug:
190 pprint(msg)
177 191 msg = ss.Message(msg)
178 192 content = msg.content
179 193 if content.status == 'ok':
@@ -189,10 +203,14 b' class Client(object):'
189 203 self.notification_socket = self.context.socket(zmq.SUB)
190 204 self.notification_socket.connect(content.notification)
191 205 self.notification_socket.setsockopt(zmq.SUBSCRIBE, "")
192 if content.controller:
193 self.controller_socket = self.context.socket(zmq.PAIR)
194 self.controller_socket.setsockopt(zmq.IDENTITY, self.session.session)
195 self.controller_socket.connect(content.controller)
206 if content.query:
207 self.query_socket = self.context.socket(zmq.PAIR)
208 self.query_socket.setsockopt(zmq.IDENTITY, self.session.session)
209 self.query_socket.connect(content.query)
210 if content.control:
211 self.control_socket = self.context.socket(zmq.PAIR)
212 self.control_socket.setsockopt(zmq.IDENTITY, self.session.session)
213 self.control_socket.connect(content.control)
196 214 self._update_engines(dict(content.engines))
197 215
198 216 else:
@@ -226,7 +244,7 b' class Client(object):'
226 244 self.results[msg_id] = ss.unwrap_exception(msg['content'])
227 245
228 246 def _handle_apply_reply(self, msg):
229 # print msg
247 # pprint(msg)
230 248 # msg_id = msg['msg_id']
231 249 parent = msg['parent_header']
232 250 msg_id = parent['msg_id']
@@ -237,14 +255,19 b' class Client(object):'
237 255 content = msg['content']
238 256 if content['status'] == 'ok':
239 257 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
258 elif content['status'] == 'aborted':
259 self.results[msg_id] = AbortedTask(msg_id)
260 elif content['status'] == 'resubmitted':
261 pass # handle resubmission
240 262 else:
241
242 263 self.results[msg_id] = ss.unwrap_exception(content)
243 264
244 265 def _flush_notifications(self):
245 266 "flush incoming notifications of engine registrations"
246 267 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
247 268 while msg is not None:
269 if self.debug:
270 pprint(msg)
248 271 msg = msg[-1]
249 272 msg_type = msg['msg_type']
250 273 handler = self._notification_handlers.get(msg_type, None)
@@ -258,6 +281,8 b' class Client(object):'
258 281 "flush incoming task or queue results"
259 282 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
260 283 while msg is not None:
284 if self.debug:
285 pprint(msg)
261 286 msg = msg[-1]
262 287 msg_type = msg['msg_type']
263 288 handler = self._queue_handlers.get(msg_type, None)
@@ -267,6 +292,14 b' class Client(object):'
267 292 handler(msg)
268 293 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
269 294
295 def _flush_control(self, sock):
296 "flush incoming control replies"
297 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
298 while msg is not None:
299 if self.debug:
300 pprint(msg)
301 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
302
270 303 ###### get/setitem ########
271 304
272 305 def __getitem__(self, key):
@@ -297,6 +330,8 b' class Client(object):'
297 330 self._flush_results(self.queue_socket)
298 331 if self.task_socket:
299 332 self._flush_results(self.task_socket)
333 if self.control_socket:
334 self._flush_control(self.control_socket)
300 335
301 336 @spinfirst
302 337 def queue_status(self, targets=None, verbose=False):
@@ -308,25 +343,79 b' class Client(object):'
308 343 the engines on which to execute
309 344 default : all
310 345 verbose : bool
311 whether to return
346 whether to return lengths only, or lists of ids for each element
312 347
313 348 """
314 349 targets = self._build_targets(targets)[1]
315 350 content = dict(targets=targets)
316 self.session.send(self.controller_socket, "queue_request", content=content)
317 idents,msg = self.session.recv(self.controller_socket, 0)
351 self.session.send(self.query_socket, "queue_request", content=content)
352 idents,msg = self.session.recv(self.query_socket, 0)
353 if self.debug:
354 pprint(msg)
318 355 return msg['content']
319 356
320 357 @spinfirst
321 def clear(self, targets=None):
358 @defaultblock
359 def clear(self, targets=None, block=None):
322 360 """clear the namespace in target(s)"""
323 pass
361 targets = self._build_targets(targets)[0]
362 print targets
363 for t in targets:
364 self.session.send(self.control_socket, 'clear_request', content={},ident=t)
365 error = False
366 if self.block:
367 for i in range(len(targets)):
368 idents,msg = self.session.recv(self.control_socket,0)
369 if self.debug:
370 pprint(msg)
371 if msg['content']['status'] != 'ok':
372 error = msg['content']
373 if error:
374 return error
375
324 376
325 377 @spinfirst
326 def abort(self, targets=None):
378 @defaultblock
379 def abort(self, msg_ids = None, targets=None, block=None):
327 380 """abort the Queues of target(s)"""
328 pass
381 targets = self._build_targets(targets)[0]
382 print targets
383 if isinstance(msg_ids, basestring):
384 msg_ids = [msg_ids]
385 content = dict(msg_ids=msg_ids)
386 for t in targets:
387 self.session.send(self.control_socket, 'abort_request',
388 content=content, ident=t)
389 error = False
390 if self.block:
391 for i in range(len(targets)):
392 idents,msg = self.session.recv(self.control_socket,0)
393 if self.debug:
394 pprint(msg)
395 if msg['content']['status'] != 'ok':
396 error = msg['content']
397 if error:
398 return error
329 399
400 @spinfirst
401 @defaultblock
402 def kill(self, targets=None, block=None):
403 """Terminates one or more engine processes."""
404 targets = self._build_targets(targets)[0]
405 print targets
406 for t in targets:
407 self.session.send(self.control_socket, 'kill_request', content={},ident=t)
408 error = False
409 if self.block:
410 for i in range(len(targets)):
411 idents,msg = self.session.recv(self.control_socket,0)
412 if self.debug:
413 pprint(msg)
414 if msg['content']['status'] != 'ok':
415 error = msg['content']
416 if error:
417 return error
418
330 419 @defaultblock
331 420 def execute(self, code, targets='all', block=None):
332 421 """executes `code` on `targets` in blocking or nonblocking manner.
@@ -363,22 +452,6 b' class Client(object):'
363 452 """
364 453 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
365 454 return result
366
367 # a = time.time()
368 # content = dict(code=code)
369 # b = time.time()
370 # msg = self.session.send(self.task_socket, 'execute_request',
371 # content=content)
372 # c = time.time()
373 # msg_id = msg['msg_id']
374 # self.outstanding.add(msg_id)
375 # self.history.append(msg_id)
376 # d = time.time()
377 # if block:
378 # self.barrier(msg_id)
379 # return self.results[msg_id]
380 # else:
381 # return msg_id
382 455
383 456 def _apply_balanced(self, f, args, kwargs, bound=True, block=None):
384 457 """the underlying method for applying functions in a load balanced
@@ -402,7 +475,7 b' class Client(object):'
402 475 """Then underlying method for applying functions to specific engines."""
403 476 block = block if block is not None else self.block
404 477 queues,targets = self._build_targets(targets)
405
478 print queues
406 479 bufs = ss.pack_apply_message(f,args,kwargs)
407 480 content = dict(bound=bound)
408 481 msg_ids = []
@@ -438,51 +511,16 b' class Client(object):'
438 511 """
439 512 args = args if args is not None else []
440 513 kwargs = kwargs if kwargs is not None else {}
514 if not isinstance(args, (tuple, list)):
515 raise TypeError("args must be tuple or list, not %s"%type(args))
516 if not isinstance(kwargs, dict):
517 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
441 518 if targets is None:
442 519 return self._apply_balanced(f,args,kwargs,bound=bound, block=block)
443 520 else:
444 521 return self._apply_direct(f, args, kwargs,
445 522 bound=bound,block=block, targets=targets)
446 523
447 # def apply_bound(self, f, *args, **kwargs):
448 # """calls f(*args, **kwargs) on a remote engine. This does get
449 # executed in an engine's namespace. The controller selects the
450 # target engine via 0MQ XREQ load balancing.
451 #
452 # if self.block is False:
453 # returns msg_id
454 # else:
455 # returns actual result of f(*args, **kwargs)
456 # """
457 # return self._apply(f, args, kwargs, bound=True)
458 #
459 #
460 # def apply_to(self, targets, f, *args, **kwargs):
461 # """calls f(*args, **kwargs) on a specific engine.
462 #
463 # if self.block is False:
464 # returns msg_id
465 # else:
466 # returns actual result of f(*args, **kwargs)
467 #
468 # The target's namespace is not used here.
469 # Use apply_bound_to() to access target's globals.
470 # """
471 # return self._apply_to(False, targets, f, args, kwargs)
472 #
473 # def apply_bound_to(self, targets, f, *args, **kwargs):
474 # """calls f(*args, **kwargs) on a specific engine.
475 #
476 # if self.block is False:
477 # returns msg_id
478 # else:
479 # returns actual result of f(*args, **kwargs)
480 #
481 # This method has access to the target's globals
482 #
483 # """
484 # return self._apply_to(f, args, kwargs)
485 #
486 524 def push(self, ns, targets=None, block=None):
487 525 """push the contents of `ns` into the namespace on `target`"""
488 526 if not isinstance(ns, dict):
@@ -546,9 +584,11 b' class Client(object):'
546 584 theids.append(msg_id)
547 585
548 586 content = dict(msg_ids=theids, status_only=status_only)
549 msg = self.session.send(self.controller_socket, "result_request", content=content)
550 zmq.select([self.controller_socket], [], [])
551 idents,msg = self.session.recv(self.controller_socket, zmq.NOBLOCK)
587 msg = self.session.send(self.query_socket, "result_request", content=content)
588 zmq.select([self.query_socket], [], [])
589 idents,msg = self.session.recv(self.query_socket, zmq.NOBLOCK)
590 if self.debug:
591 pprint(msg)
552 592
553 593 # while True:
554 594 # try:
@@ -297,6 +297,8 b' class Controller(object):'
297 297 self.save_task_result(idents, msg)
298 298 elif switch == 'tracktask':
299 299 self.save_task_destination(idents, msg)
300 elif switch in ('incontrol', 'outcontrol'):
301 pass
300 302 else:
301 303 logger.error("Invalid message topic: %s"%switch)
302 304
@@ -7,6 +7,7 b' import sys'
7 7 import time
8 8 import traceback
9 9 import uuid
10 from pprint import pprint
10 11
11 12 import zmq
12 13 from zmq.eventloop import ioloop, zmqstream
@@ -20,7 +21,7 b' import heartmonitor'
20 21
21 22
22 23 def printer(*msg):
23 print msg
24 pprint(msg)
24 25
25 26 class Engine(object):
26 27 """IPython engine"""
@@ -29,26 +30,23 b' class Engine(object):'
29 30 context=None
30 31 loop=None
31 32 session=None
32 queue_id=None
33 control_id=None
34 heart_id=None
33 ident=None
35 34 registrar=None
36 35 heart=None
37 36 kernel=None
38 37
39 def __init__(self, context, loop, session, registrar, client, queue_id=None, heart_id=None):
38 def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None):
40 39 self.context = context
41 40 self.loop = loop
42 41 self.session = session
43 42 self.registrar = registrar
44 43 self.client = client
45 self.queue_id = queue_id or str(uuid.uuid4())
46 self.heart_id = heart_id or self.queue_id
44 self.ident = ident if ident else str(uuid.uuid4())
47 45 self.registrar.on_send(printer)
48 46
49 47 def register(self):
50 48
51 content = dict(queue=self.queue_id, heartbeat=self.heart_id)
49 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
52 50 self.registrar.on_recv(self.complete_registration)
53 51 self.session.send(self.registrar, "registration_request",content=content)
54 52
@@ -61,14 +59,14 b' class Engine(object):'
61 59 queue_addr = msg.content.queue
62 60 if queue_addr:
63 61 queue = self.context.socket(zmq.PAIR)
64 queue.setsockopt(zmq.IDENTITY, self.queue_id)
62 queue.setsockopt(zmq.IDENTITY, self.ident)
65 63 queue.connect(str(queue_addr))
66 64 self.queue = zmqstream.ZMQStream(queue, self.loop)
67 65
68 66 control_addr = msg.content.control
69 67 if control_addr:
70 68 control = self.context.socket(zmq.PAIR)
71 control.setsockopt(zmq.IDENTITY, self.queue_id)
69 control.setsockopt(zmq.IDENTITY, self.ident)
72 70 control.connect(str(control_addr))
73 71 self.control = zmqstream.ZMQStream(control, self.loop)
74 72
@@ -81,14 +79,14 b' class Engine(object):'
81 79 self.task_stream = zmqstream.ZMQStream(task, self.loop)
82 80 # TaskThread:
83 81 # mon_addr = msg.content.monitor
84 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.queue_id)
82 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
85 83 # task.connect_in(str(task_addr))
86 84 # task.connect_out(str(mon_addr))
87 85 # self.task_stream = taskthread.QueueStream(*task.queues)
88 86 # task.start()
89 87
90 88 hbs = msg.content.heartbeat
91 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.heart_id)
89 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
92 90 self.heart.start()
93 91 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
94 92 # placeholder for now:
@@ -4,10 +4,12 b' Kernel adapted from kernel.py to use ZMQ Streams'
4 4 """
5 5
6 6 import __builtin__
7 import os
7 8 import sys
8 9 import time
9 10 import traceback
10 11 from signal import SIGTERM, SIGKILL
12 from pprint import pprint
11 13
12 14 from code import CommandCompiler
13 15
@@ -18,6 +20,9 b' from streamsession import StreamSession, Message, extract_header, serialize_obje'
18 20 unpack_apply_message
19 21 from IPython.zmq.completer import KernelCompleter
20 22
23 def printer(*args):
24 pprint(args)
25
21 26 class OutStream(object):
22 27 """A file like object that publishes the stream to a 0MQ PUB socket."""
23 28
@@ -133,6 +138,7 b' class Kernel(object):'
133 138 task_stream=None, client=None):
134 139 self.session = session
135 140 self.control_stream = control_stream
141 self.control_socket = control_stream.socket
136 142 self.reply_stream = reply_stream
137 143 self.task_stream = task_stream
138 144 self.pub_stream = pub_stream
@@ -153,6 +159,10 b' class Kernel(object):'
153 159 self.control_handlers[msg_type] = getattr(self, msg_type)
154 160
155 161 #-------------------- control handlers -----------------------------
162 def abort_queues(self):
163 for stream in (self.task_stream, self.reply_stream):
164 if stream:
165 self.abort_queue(stream)
156 166
157 167 def abort_queue(self, stream):
158 168 while True:
@@ -186,28 +196,30 b' class Kernel(object):'
186 196 time.sleep(0.05)
187 197
188 198 def abort_request(self, stream, ident, parent):
199 """abort a specifig msg by id"""
189 200 msg_ids = parent['content'].get('msg_ids', None)
201 if isinstance(msg_ids, basestring):
202 msg_ids = [msg_ids]
190 203 if not msg_ids:
191 self.abort_queue(self.task_stream)
192 self.abort_queue(self.reply_stream)
204 self.abort_queues()
193 205 for mid in msg_ids:
194 self.aborted.add(mid)
206 self.aborted.add(str(mid))
195 207
196 208 content = dict(status='ok')
197 self.session.send(stream, 'abort_reply', content=content, parent=parent,
209 reply_msg = self.session.send(stream, 'abort_reply', content=content, parent=parent,
198 210 ident=ident)
211 print>>sys.__stdout__, Message(reply_msg)
199 212
200 213 def kill_request(self, stream, idents, parent):
201 self.abort_queue(self.reply_stream)
202 if self.task_stream:
203 self.abort_queue(self.task_stream)
214 """kill ourselves. This should really be handled in an external process"""
215 self.abort_queues()
204 216 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
205 217 content = dict(status='ok'))
206 218 # we can know that a message is done if we *don't* use streams, but
207 219 # use a socket directly with MessageTracker
208 time.sleep(1)
220 time.sleep(.5)
209 221 os.kill(os.getpid(), SIGTERM)
210 time.sleep(.25)
222 time.sleep(1)
211 223 os.kill(os.getpid(), SIGKILL)
212 224
213 225 def dispatch_control(self, msg):
@@ -221,7 +233,7 b' class Kernel(object):'
221 233 if handler is None:
222 234 print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg
223 235 else:
224 handler(stream, idents, msg)
236 handler(self.control_stream, idents, msg)
225 237
226 238 def flush_control(self):
227 239 while any(zmq.select([self.control_socket],[],[],1e-4)):
@@ -258,6 +270,16 b' class Kernel(object):'
258 270
259 271 return True
260 272
273 def check_aborted(self, msg_id):
274 return msg_id in self.aborted
275
276 def unmet_dependencies(self, stream, idents, msg):
277 reply_type = msg['msg_type'].split('_')[0] + '_reply'
278 content = dict(status='resubmitted', reason='unmet dependencies')
279 reply_msg = self.session.send(stream, reply_type,
280 content=content, parent=msg, ident=idents)
281 ### TODO: actually resubmit it ###
282
261 283 #-------------------- queue handlers -----------------------------
262 284
263 285 def execute_request(self, stream, ident, parent):
@@ -297,7 +319,7 b' class Kernel(object):'
297 319 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
298 320 # print>>sys.__stdout__, Message(reply_msg)
299 321 if reply_msg['content']['status'] == u'error':
300 self.abort_queue()
322 self.abort_queues()
301 323
302 324 def complete_request(self, stream, ident, parent):
303 325 matches = {'matches' : self.complete(parent),
@@ -334,7 +356,7 b' class Kernel(object):'
334 356
335 357 else:
336 358 working = dict()
337 suffix = prefix = ""
359 suffix = prefix = "_" # prevent keyword collisions with lambda
338 360 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
339 361 # if f.fun
340 362 fname = prefix+f.func_name.strip('<>')+suffix
@@ -379,7 +401,7 b' class Kernel(object):'
379 401 reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf)
380 402 # print>>sys.__stdout__, Message(reply_msg)
381 403 if reply_msg['content']['status'] == u'error':
382 self.abort_queue()
404 self.abort_queues()
383 405
384 406 def dispatch_queue(self, stream, msg):
385 407 self.flush_control()
@@ -389,12 +411,15 b' class Kernel(object):'
389 411 header = msg['header']
390 412 msg_id = header['msg_id']
391 413 dependencies = header.get('dependencies', [])
392
393 414 if self.check_aborted(msg_id):
394 return self.abort_reply(stream, msg)
415 self.aborted.remove(msg_id)
416 # is it safe to assume a msg_id will not be resubmitted?
417 reply_type = msg['msg_type'].split('_')[0] + '_reply'
418 reply_msg = self.session.send(stream, reply_type,
419 content={'status' : 'aborted'}, parent=msg, ident=idents)
420 return
395 421 if not self.check_dependencies(dependencies):
396 return self.unmet_dependencies(stream, msg)
397
422 return self.unmet_dependencies(stream, idents, msg)
398 423 handler = self.queue_handlers.get(msg['msg_type'], None)
399 424 if handler is None:
400 425 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg
@@ -405,12 +430,15 b' class Kernel(object):'
405 430 #### stream mode:
406 431 if self.control_stream:
407 432 self.control_stream.on_recv(self.dispatch_control, copy=False)
433 self.control_stream.on_err(printer)
408 434 if self.reply_stream:
409 435 self.reply_stream.on_recv(lambda msg:
410 436 self.dispatch_queue(self.reply_stream, msg), copy=False)
437 self.reply_stream.on_err(printer)
411 438 if self.task_stream:
412 439 self.task_stream.on_recv(lambda msg:
413 440 self.dispatch_queue(self.task_stream, msg), copy=False)
441 self.task_stream.on_err(printer)
414 442
415 443 #### while True mode:
416 444 # while True:
@@ -257,7 +257,7 b' def unpack_apply_message(bufs, g=None, copy=True):'
257 257
258 258 class StreamSession(object):
259 259 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
260
260 debug=False
261 261 def __init__(self, username=None, session=None, packer=None, unpacker=None):
262 262 if username is None:
263 263 username = os.environ.get('USER','username')
@@ -335,6 +335,10 b' class StreamSession(object):'
335 335 if buffers:
336 336 stream.send(buffers[-1], copy=False)
337 337 omsg = Message(msg)
338 if self.debug:
339 pprint.pprint(omsg)
340 pprint.pprint(to_send)
341 pprint.pprint(buffers)
338 342 return omsg
339 343
340 344 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
@@ -103,7 +103,7 b' class View(object):'
103 103 This method has access to the targets' globals
104 104
105 105 """
106 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
106 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
107 107
108 108
109 109 class DirectView(View):
@@ -129,12 +129,28 b' class DirectView(View):'
129 129 def __setitem__(self,key,value):
130 130 self.update({key:value})
131 131
132 def clear(self):
133 """clear the remote namespace"""
134 return self.client.clear(targets=self.targets,block=self.block)
132 def clear(self, block=False):
133 """Clear the remote namespaces on my engines."""
134 block = block if block is not None else self.block
135 return self.client.clear(targets=self.targets,block=block)
136
137 def kill(self, block=True):
138 """Kill my engines."""
139 block = block if block is not None else self.block
140 return self.client.kill(targets=self.targets,block=block)
135 141
136 def abort(self):
137 return self.client.abort(targets=self.targets,block=self.block)
142 def abort(self, msg_ids=None, block=None):
143 """Abort jobs on my engines.
144
145 Parameters
146 ----------
147
148 msg_ids : None, str, list of strs, optional
149 if None: abort all jobs.
150 else: abort specific msg_id(s).
151 """
152 block = block if block is not None else self.block
153 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
138 154
139 155 class LoadBalancedView(View):
140 156 _targets=None
@@ -124,7 +124,7 b' def setup():'
124 124
125 125 client_addrs = {
126 126 'control' : "%s:%i"%(iface, ccport),
127 'controller': "%s:%i"%(iface, cport),
127 'query': "%s:%i"%(iface, cport),
128 128 'queue': "%s:%i"%(iface, cqport),
129 129 'task' : "%s:%i"%(iface, ctport),
130 130 'notification': "%s:%i"%(iface, nport)
General Comments 0
You need to be logged in to leave comments. Login now