Show More
@@ -4,6 +4,8 b'' | |||
|
4 | 4 | import time |
|
5 | 5 | import threading |
|
6 | 6 | |
|
7 | from pprint import pprint | |
|
8 | ||
|
7 | 9 | from functools import wraps |
|
8 | 10 | |
|
9 | 11 | from IPython.external.decorator import decorator |
@@ -46,7 +48,9 b' def defaultblock(f, self, *args, **kwargs):' | |||
|
46 | 48 | self.block = saveblock |
|
47 | 49 | return ret |
|
48 | 50 | |
|
49 | ||
|
51 | class AbortedTask(object): | |
|
52 | def __init__(self, msg_id): | |
|
53 | self.msg_id = msg_id | |
|
50 | 54 | # @decorator |
|
51 | 55 | # def checktargets(f): |
|
52 | 56 | # @wraps(f) |
@@ -101,7 +105,11 b' class Client(object):' | |||
|
101 | 105 | execution methods: apply/apply_bound/apply_to |
|
102 | 106 | legacy: execute, run |
|
103 | 107 | |
|
104 |
|
|
|
108 | query methods: queue_status, get_result | |
|
109 | ||
|
110 | control methods: abort, kill | |
|
111 | ||
|
112 | ||
|
105 | 113 | |
|
106 | 114 | """ |
|
107 | 115 | |
@@ -109,7 +117,8 b' class Client(object):' | |||
|
109 | 117 | _connected=False |
|
110 | 118 | _engines=None |
|
111 | 119 | registration_socket=None |
|
112 |
|
|
|
120 | query_socket=None | |
|
121 | control_socket=None | |
|
113 | 122 | notification_socket=None |
|
114 | 123 | queue_socket=None |
|
115 | 124 | task_socket=None |
@@ -117,8 +126,9 b' class Client(object):' | |||
|
117 | 126 | outstanding=None |
|
118 | 127 | results = None |
|
119 | 128 | history = None |
|
129 | debug = False | |
|
120 | 130 | |
|
121 | def __init__(self, addr, context=None, username=None): | |
|
131 | def __init__(self, addr, context=None, username=None, debug=False): | |
|
122 | 132 | if context is None: |
|
123 | 133 | context = zmq.Context() |
|
124 | 134 | self.context = context |
@@ -135,6 +145,8 b' class Client(object):' | |||
|
135 | 145 | self.outstanding=set() |
|
136 | 146 | self.results = {} |
|
137 | 147 | self.history = [] |
|
148 | self.debug = debug | |
|
149 | self.session.debug = debug | |
|
138 | 150 | self._connect() |
|
139 | 151 | |
|
140 | 152 | self._notification_handlers = {'registration_notification' : self._register_engine, |
@@ -152,7 +164,7 b' class Client(object):' | |||
|
152 | 164 | def _update_engines(self, engines): |
|
153 | 165 | for k,v in engines.iteritems(): |
|
154 | 166 | eid = int(k) |
|
155 | self._engines[eid] = v | |
|
167 | self._engines[eid] = bytes(v) # force not unicode | |
|
156 | 168 | self._ids.add(eid) |
|
157 | 169 | |
|
158 | 170 | def _build_targets(self, targets): |
@@ -173,7 +185,9 b' class Client(object):' | |||
|
173 | 185 | return |
|
174 | 186 | self._connected=True |
|
175 | 187 | self.session.send(self.registration_socket, 'connection_request') |
|
176 |
msg = self.session.recv(self.registration_socket,mode=0) |
|
|
188 | idents,msg = self.session.recv(self.registration_socket,mode=0) | |
|
189 | if self.debug: | |
|
190 | pprint(msg) | |
|
177 | 191 | msg = ss.Message(msg) |
|
178 | 192 | content = msg.content |
|
179 | 193 | if content.status == 'ok': |
@@ -189,10 +203,14 b' class Client(object):' | |||
|
189 | 203 | self.notification_socket = self.context.socket(zmq.SUB) |
|
190 | 204 | self.notification_socket.connect(content.notification) |
|
191 | 205 | self.notification_socket.setsockopt(zmq.SUBSCRIBE, "") |
|
192 |
if content. |
|
|
193 |
self. |
|
|
194 |
self. |
|
|
195 |
self. |
|
|
206 | if content.query: | |
|
207 | self.query_socket = self.context.socket(zmq.PAIR) | |
|
208 | self.query_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
209 | self.query_socket.connect(content.query) | |
|
210 | if content.control: | |
|
211 | self.control_socket = self.context.socket(zmq.PAIR) | |
|
212 | self.control_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
213 | self.control_socket.connect(content.control) | |
|
196 | 214 | self._update_engines(dict(content.engines)) |
|
197 | 215 | |
|
198 | 216 | else: |
@@ -226,7 +244,7 b' class Client(object):' | |||
|
226 | 244 | self.results[msg_id] = ss.unwrap_exception(msg['content']) |
|
227 | 245 | |
|
228 | 246 | def _handle_apply_reply(self, msg): |
|
229 |
# p |
|
|
247 | # pprint(msg) | |
|
230 | 248 | # msg_id = msg['msg_id'] |
|
231 | 249 | parent = msg['parent_header'] |
|
232 | 250 | msg_id = parent['msg_id'] |
@@ -237,14 +255,19 b' class Client(object):' | |||
|
237 | 255 | content = msg['content'] |
|
238 | 256 | if content['status'] == 'ok': |
|
239 | 257 | self.results[msg_id] = ss.unserialize_object(msg['buffers']) |
|
258 | elif content['status'] == 'aborted': | |
|
259 | self.results[msg_id] = AbortedTask(msg_id) | |
|
260 | elif content['status'] == 'resubmitted': | |
|
261 | pass # handle resubmission | |
|
240 | 262 | else: |
|
241 | ||
|
242 | 263 | self.results[msg_id] = ss.unwrap_exception(content) |
|
243 | 264 | |
|
244 | 265 | def _flush_notifications(self): |
|
245 | 266 | "flush incoming notifications of engine registrations" |
|
246 | 267 | msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK) |
|
247 | 268 | while msg is not None: |
|
269 | if self.debug: | |
|
270 | pprint(msg) | |
|
248 | 271 | msg = msg[-1] |
|
249 | 272 | msg_type = msg['msg_type'] |
|
250 | 273 | handler = self._notification_handlers.get(msg_type, None) |
@@ -258,6 +281,8 b' class Client(object):' | |||
|
258 | 281 | "flush incoming task or queue results" |
|
259 | 282 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) |
|
260 | 283 | while msg is not None: |
|
284 | if self.debug: | |
|
285 | pprint(msg) | |
|
261 | 286 | msg = msg[-1] |
|
262 | 287 | msg_type = msg['msg_type'] |
|
263 | 288 | handler = self._queue_handlers.get(msg_type, None) |
@@ -267,6 +292,14 b' class Client(object):' | |||
|
267 | 292 | handler(msg) |
|
268 | 293 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) |
|
269 | 294 | |
|
295 | def _flush_control(self, sock): | |
|
296 | "flush incoming control replies" | |
|
297 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
|
298 | while msg is not None: | |
|
299 | if self.debug: | |
|
300 | pprint(msg) | |
|
301 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
|
302 | ||
|
270 | 303 | ###### get/setitem ######## |
|
271 | 304 | |
|
272 | 305 | def __getitem__(self, key): |
@@ -297,6 +330,8 b' class Client(object):' | |||
|
297 | 330 | self._flush_results(self.queue_socket) |
|
298 | 331 | if self.task_socket: |
|
299 | 332 | self._flush_results(self.task_socket) |
|
333 | if self.control_socket: | |
|
334 | self._flush_control(self.control_socket) | |
|
300 | 335 | |
|
301 | 336 | @spinfirst |
|
302 | 337 | def queue_status(self, targets=None, verbose=False): |
@@ -308,24 +343,78 b' class Client(object):' | |||
|
308 | 343 | the engines on which to execute |
|
309 | 344 | default : all |
|
310 | 345 | verbose : bool |
|
311 | whether to return | |
|
346 | whether to return lengths only, or lists of ids for each element | |
|
312 | 347 | |
|
313 | 348 | """ |
|
314 | 349 | targets = self._build_targets(targets)[1] |
|
315 | 350 | content = dict(targets=targets) |
|
316 |
self.session.send(self. |
|
|
317 |
idents,msg = self.session.recv(self. |
|
|
351 | self.session.send(self.query_socket, "queue_request", content=content) | |
|
352 | idents,msg = self.session.recv(self.query_socket, 0) | |
|
353 | if self.debug: | |
|
354 | pprint(msg) | |
|
318 | 355 | return msg['content'] |
|
319 | 356 | |
|
320 | 357 | @spinfirst |
|
321 | def clear(self, targets=None): | |
|
358 | @defaultblock | |
|
359 | def clear(self, targets=None, block=None): | |
|
322 | 360 | """clear the namespace in target(s)""" |
|
323 | pass | |
|
361 | targets = self._build_targets(targets)[0] | |
|
362 | print targets | |
|
363 | for t in targets: | |
|
364 | self.session.send(self.control_socket, 'clear_request', content={},ident=t) | |
|
365 | error = False | |
|
366 | if self.block: | |
|
367 | for i in range(len(targets)): | |
|
368 | idents,msg = self.session.recv(self.control_socket,0) | |
|
369 | if self.debug: | |
|
370 | pprint(msg) | |
|
371 | if msg['content']['status'] != 'ok': | |
|
372 | error = msg['content'] | |
|
373 | if error: | |
|
374 | return error | |
|
375 | ||
|
324 | 376 | |
|
325 | 377 | @spinfirst |
|
326 | def abort(self, targets=None): | |
|
378 | @defaultblock | |
|
379 | def abort(self, msg_ids = None, targets=None, block=None): | |
|
327 | 380 | """abort the Queues of target(s)""" |
|
328 | pass | |
|
381 | targets = self._build_targets(targets)[0] | |
|
382 | print targets | |
|
383 | if isinstance(msg_ids, basestring): | |
|
384 | msg_ids = [msg_ids] | |
|
385 | content = dict(msg_ids=msg_ids) | |
|
386 | for t in targets: | |
|
387 | self.session.send(self.control_socket, 'abort_request', | |
|
388 | content=content, ident=t) | |
|
389 | error = False | |
|
390 | if self.block: | |
|
391 | for i in range(len(targets)): | |
|
392 | idents,msg = self.session.recv(self.control_socket,0) | |
|
393 | if self.debug: | |
|
394 | pprint(msg) | |
|
395 | if msg['content']['status'] != 'ok': | |
|
396 | error = msg['content'] | |
|
397 | if error: | |
|
398 | return error | |
|
399 | ||
|
400 | @spinfirst | |
|
401 | @defaultblock | |
|
402 | def kill(self, targets=None, block=None): | |
|
403 | """Terminates one or more engine processes.""" | |
|
404 | targets = self._build_targets(targets)[0] | |
|
405 | print targets | |
|
406 | for t in targets: | |
|
407 | self.session.send(self.control_socket, 'kill_request', content={},ident=t) | |
|
408 | error = False | |
|
409 | if self.block: | |
|
410 | for i in range(len(targets)): | |
|
411 | idents,msg = self.session.recv(self.control_socket,0) | |
|
412 | if self.debug: | |
|
413 | pprint(msg) | |
|
414 | if msg['content']['status'] != 'ok': | |
|
415 | error = msg['content'] | |
|
416 | if error: | |
|
417 | return error | |
|
329 | 418 | |
|
330 | 419 | @defaultblock |
|
331 | 420 | def execute(self, code, targets='all', block=None): |
@@ -364,22 +453,6 b' class Client(object):' | |||
|
364 | 453 | result = self.apply(execute, (code,), targets=None, block=block, bound=False) |
|
365 | 454 | return result |
|
366 | 455 |
|
|
367 | # a = time.time() | |
|
368 | # content = dict(code=code) | |
|
369 | # b = time.time() | |
|
370 | # msg = self.session.send(self.task_socket, 'execute_request', | |
|
371 | # content=content) | |
|
372 | # c = time.time() | |
|
373 | # msg_id = msg['msg_id'] | |
|
374 | # self.outstanding.add(msg_id) | |
|
375 | # self.history.append(msg_id) | |
|
376 | # d = time.time() | |
|
377 | # if block: | |
|
378 | # self.barrier(msg_id) | |
|
379 | # return self.results[msg_id] | |
|
380 | # else: | |
|
381 | # return msg_id | |
|
382 | ||
|
383 | 456 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None): |
|
384 | 457 | """the underlying method for applying functions in a load balanced |
|
385 | 458 | manner.""" |
@@ -402,7 +475,7 b' class Client(object):' | |||
|
402 | 475 | """Then underlying method for applying functions to specific engines.""" |
|
403 | 476 | block = block if block is not None else self.block |
|
404 | 477 | queues,targets = self._build_targets(targets) |
|
405 | ||
|
478 | print queues | |
|
406 | 479 | bufs = ss.pack_apply_message(f,args,kwargs) |
|
407 | 480 | content = dict(bound=bound) |
|
408 | 481 | msg_ids = [] |
@@ -438,51 +511,16 b' class Client(object):' | |||
|
438 | 511 | """ |
|
439 | 512 | args = args if args is not None else [] |
|
440 | 513 | kwargs = kwargs if kwargs is not None else {} |
|
514 | if not isinstance(args, (tuple, list)): | |
|
515 | raise TypeError("args must be tuple or list, not %s"%type(args)) | |
|
516 | if not isinstance(kwargs, dict): | |
|
517 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | |
|
441 | 518 | if targets is None: |
|
442 | 519 | return self._apply_balanced(f,args,kwargs,bound=bound, block=block) |
|
443 | 520 | else: |
|
444 | 521 | return self._apply_direct(f, args, kwargs, |
|
445 | 522 | bound=bound,block=block, targets=targets) |
|
446 | 523 | |
|
447 | # def apply_bound(self, f, *args, **kwargs): | |
|
448 | # """calls f(*args, **kwargs) on a remote engine. This does get | |
|
449 | # executed in an engine's namespace. The controller selects the | |
|
450 | # target engine via 0MQ XREQ load balancing. | |
|
451 | # | |
|
452 | # if self.block is False: | |
|
453 | # returns msg_id | |
|
454 | # else: | |
|
455 | # returns actual result of f(*args, **kwargs) | |
|
456 | # """ | |
|
457 | # return self._apply(f, args, kwargs, bound=True) | |
|
458 | # | |
|
459 | # | |
|
460 | # def apply_to(self, targets, f, *args, **kwargs): | |
|
461 | # """calls f(*args, **kwargs) on a specific engine. | |
|
462 | # | |
|
463 | # if self.block is False: | |
|
464 | # returns msg_id | |
|
465 | # else: | |
|
466 | # returns actual result of f(*args, **kwargs) | |
|
467 | # | |
|
468 | # The target's namespace is not used here. | |
|
469 | # Use apply_bound_to() to access target's globals. | |
|
470 | # """ | |
|
471 | # return self._apply_to(False, targets, f, args, kwargs) | |
|
472 | # | |
|
473 | # def apply_bound_to(self, targets, f, *args, **kwargs): | |
|
474 | # """calls f(*args, **kwargs) on a specific engine. | |
|
475 | # | |
|
476 | # if self.block is False: | |
|
477 | # returns msg_id | |
|
478 | # else: | |
|
479 | # returns actual result of f(*args, **kwargs) | |
|
480 | # | |
|
481 | # This method has access to the target's globals | |
|
482 | # | |
|
483 | # """ | |
|
484 | # return self._apply_to(f, args, kwargs) | |
|
485 | # | |
|
486 | 524 | def push(self, ns, targets=None, block=None): |
|
487 | 525 | """push the contents of `ns` into the namespace on `target`""" |
|
488 | 526 | if not isinstance(ns, dict): |
@@ -546,9 +584,11 b' class Client(object):' | |||
|
546 | 584 | theids.append(msg_id) |
|
547 | 585 | |
|
548 | 586 | content = dict(msg_ids=theids, status_only=status_only) |
|
549 |
msg = self.session.send(self. |
|
|
550 |
zmq.select([self. |
|
|
551 |
idents,msg = self.session.recv(self. |
|
|
587 | msg = self.session.send(self.query_socket, "result_request", content=content) | |
|
588 | zmq.select([self.query_socket], [], []) | |
|
589 | idents,msg = self.session.recv(self.query_socket, zmq.NOBLOCK) | |
|
590 | if self.debug: | |
|
591 | pprint(msg) | |
|
552 | 592 | |
|
553 | 593 | # while True: |
|
554 | 594 | # try: |
@@ -297,6 +297,8 b' class Controller(object):' | |||
|
297 | 297 | self.save_task_result(idents, msg) |
|
298 | 298 | elif switch == 'tracktask': |
|
299 | 299 | self.save_task_destination(idents, msg) |
|
300 | elif switch in ('incontrol', 'outcontrol'): | |
|
301 | pass | |
|
300 | 302 | else: |
|
301 | 303 | logger.error("Invalid message topic: %s"%switch) |
|
302 | 304 |
@@ -7,6 +7,7 b' import sys' | |||
|
7 | 7 | import time |
|
8 | 8 | import traceback |
|
9 | 9 | import uuid |
|
10 | from pprint import pprint | |
|
10 | 11 | |
|
11 | 12 | import zmq |
|
12 | 13 | from zmq.eventloop import ioloop, zmqstream |
@@ -20,7 +21,7 b' import heartmonitor' | |||
|
20 | 21 | |
|
21 | 22 | |
|
22 | 23 | def printer(*msg): |
|
23 |
p |
|
|
24 | pprint(msg) | |
|
24 | 25 | |
|
25 | 26 | class Engine(object): |
|
26 | 27 | """IPython engine""" |
@@ -29,26 +30,23 b' class Engine(object):' | |||
|
29 | 30 | context=None |
|
30 | 31 | loop=None |
|
31 | 32 | session=None |
|
32 |
|
|
|
33 | control_id=None | |
|
34 | heart_id=None | |
|
33 | ident=None | |
|
35 | 34 | registrar=None |
|
36 | 35 | heart=None |
|
37 | 36 | kernel=None |
|
38 | 37 | |
|
39 |
def __init__(self, context, loop, session, registrar, client, |
|
|
38 | def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None): | |
|
40 | 39 | self.context = context |
|
41 | 40 | self.loop = loop |
|
42 | 41 | self.session = session |
|
43 | 42 | self.registrar = registrar |
|
44 | 43 | self.client = client |
|
45 |
self. |
|
|
46 | self.heart_id = heart_id or self.queue_id | |
|
44 | self.ident = ident if ident else str(uuid.uuid4()) | |
|
47 | 45 | self.registrar.on_send(printer) |
|
48 | 46 | |
|
49 | 47 | def register(self): |
|
50 | 48 | |
|
51 |
content = dict(queue=self. |
|
|
49 | content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) | |
|
52 | 50 | self.registrar.on_recv(self.complete_registration) |
|
53 | 51 | self.session.send(self.registrar, "registration_request",content=content) |
|
54 | 52 | |
@@ -61,14 +59,14 b' class Engine(object):' | |||
|
61 | 59 | queue_addr = msg.content.queue |
|
62 | 60 | if queue_addr: |
|
63 | 61 | queue = self.context.socket(zmq.PAIR) |
|
64 |
queue.setsockopt(zmq.IDENTITY, self. |
|
|
62 | queue.setsockopt(zmq.IDENTITY, self.ident) | |
|
65 | 63 | queue.connect(str(queue_addr)) |
|
66 | 64 | self.queue = zmqstream.ZMQStream(queue, self.loop) |
|
67 | 65 | |
|
68 | 66 | control_addr = msg.content.control |
|
69 | 67 | if control_addr: |
|
70 | 68 | control = self.context.socket(zmq.PAIR) |
|
71 |
control.setsockopt(zmq.IDENTITY, self. |
|
|
69 | control.setsockopt(zmq.IDENTITY, self.ident) | |
|
72 | 70 | control.connect(str(control_addr)) |
|
73 | 71 | self.control = zmqstream.ZMQStream(control, self.loop) |
|
74 | 72 | |
@@ -81,14 +79,14 b' class Engine(object):' | |||
|
81 | 79 | self.task_stream = zmqstream.ZMQStream(task, self.loop) |
|
82 | 80 | # TaskThread: |
|
83 | 81 | # mon_addr = msg.content.monitor |
|
84 |
# task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self. |
|
|
82 | # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident) | |
|
85 | 83 | # task.connect_in(str(task_addr)) |
|
86 | 84 | # task.connect_out(str(mon_addr)) |
|
87 | 85 | # self.task_stream = taskthread.QueueStream(*task.queues) |
|
88 | 86 | # task.start() |
|
89 | 87 | |
|
90 | 88 | hbs = msg.content.heartbeat |
|
91 |
self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self. |
|
|
89 | self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident) | |
|
92 | 90 | self.heart.start() |
|
93 | 91 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() |
|
94 | 92 | # placeholder for now: |
@@ -4,10 +4,12 b' Kernel adapted from kernel.py to use ZMQ Streams' | |||
|
4 | 4 | """ |
|
5 | 5 | |
|
6 | 6 | import __builtin__ |
|
7 | import os | |
|
7 | 8 | import sys |
|
8 | 9 | import time |
|
9 | 10 | import traceback |
|
10 | 11 | from signal import SIGTERM, SIGKILL |
|
12 | from pprint import pprint | |
|
11 | 13 | |
|
12 | 14 | from code import CommandCompiler |
|
13 | 15 | |
@@ -18,6 +20,9 b' from streamsession import StreamSession, Message, extract_header, serialize_obje' | |||
|
18 | 20 | unpack_apply_message |
|
19 | 21 | from IPython.zmq.completer import KernelCompleter |
|
20 | 22 | |
|
23 | def printer(*args): | |
|
24 | pprint(args) | |
|
25 | ||
|
21 | 26 | class OutStream(object): |
|
22 | 27 | """A file like object that publishes the stream to a 0MQ PUB socket.""" |
|
23 | 28 | |
@@ -133,6 +138,7 b' class Kernel(object):' | |||
|
133 | 138 | task_stream=None, client=None): |
|
134 | 139 | self.session = session |
|
135 | 140 | self.control_stream = control_stream |
|
141 | self.control_socket = control_stream.socket | |
|
136 | 142 | self.reply_stream = reply_stream |
|
137 | 143 | self.task_stream = task_stream |
|
138 | 144 | self.pub_stream = pub_stream |
@@ -153,6 +159,10 b' class Kernel(object):' | |||
|
153 | 159 | self.control_handlers[msg_type] = getattr(self, msg_type) |
|
154 | 160 | |
|
155 | 161 | #-------------------- control handlers ----------------------------- |
|
162 | def abort_queues(self): | |
|
163 | for stream in (self.task_stream, self.reply_stream): | |
|
164 | if stream: | |
|
165 | self.abort_queue(stream) | |
|
156 | 166 | |
|
157 | 167 | def abort_queue(self, stream): |
|
158 | 168 | while True: |
@@ -186,28 +196,30 b' class Kernel(object):' | |||
|
186 | 196 | time.sleep(0.05) |
|
187 | 197 | |
|
188 | 198 | def abort_request(self, stream, ident, parent): |
|
199 | """abort a specifig msg by id""" | |
|
189 | 200 | msg_ids = parent['content'].get('msg_ids', None) |
|
201 | if isinstance(msg_ids, basestring): | |
|
202 | msg_ids = [msg_ids] | |
|
190 | 203 | if not msg_ids: |
|
191 |
self.abort_queue( |
|
|
192 | self.abort_queue(self.reply_stream) | |
|
204 | self.abort_queues() | |
|
193 | 205 | for mid in msg_ids: |
|
194 | self.aborted.add(mid) | |
|
206 | self.aborted.add(str(mid)) | |
|
195 | 207 | |
|
196 | 208 | content = dict(status='ok') |
|
197 | self.session.send(stream, 'abort_reply', content=content, parent=parent, | |
|
209 | reply_msg = self.session.send(stream, 'abort_reply', content=content, parent=parent, | |
|
198 | 210 | ident=ident) |
|
211 | print>>sys.__stdout__, Message(reply_msg) | |
|
199 | 212 | |
|
200 | 213 | def kill_request(self, stream, idents, parent): |
|
201 | self.abort_queue(self.reply_stream) | |
|
202 | if self.task_stream: | |
|
203 | self.abort_queue(self.task_stream) | |
|
214 | """kill ourselves. This should really be handled in an external process""" | |
|
215 | self.abort_queues() | |
|
204 | 216 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, |
|
205 | 217 | content = dict(status='ok')) |
|
206 | 218 | # we can know that a message is done if we *don't* use streams, but |
|
207 | 219 | # use a socket directly with MessageTracker |
|
208 |
time.sleep( |
|
|
220 | time.sleep(.5) | |
|
209 | 221 | os.kill(os.getpid(), SIGTERM) |
|
210 |
time.sleep( |
|
|
222 | time.sleep(1) | |
|
211 | 223 | os.kill(os.getpid(), SIGKILL) |
|
212 | 224 | |
|
213 | 225 | def dispatch_control(self, msg): |
@@ -221,7 +233,7 b' class Kernel(object):' | |||
|
221 | 233 | if handler is None: |
|
222 | 234 | print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg |
|
223 | 235 | else: |
|
224 | handler(stream, idents, msg) | |
|
236 | handler(self.control_stream, idents, msg) | |
|
225 | 237 | |
|
226 | 238 | def flush_control(self): |
|
227 | 239 | while any(zmq.select([self.control_socket],[],[],1e-4)): |
@@ -258,6 +270,16 b' class Kernel(object):' | |||
|
258 | 270 | |
|
259 | 271 | return True |
|
260 | 272 | |
|
273 | def check_aborted(self, msg_id): | |
|
274 | return msg_id in self.aborted | |
|
275 | ||
|
276 | def unmet_dependencies(self, stream, idents, msg): | |
|
277 | reply_type = msg['msg_type'].split('_')[0] + '_reply' | |
|
278 | content = dict(status='resubmitted', reason='unmet dependencies') | |
|
279 | reply_msg = self.session.send(stream, reply_type, | |
|
280 | content=content, parent=msg, ident=idents) | |
|
281 | ### TODO: actually resubmit it ### | |
|
282 | ||
|
261 | 283 | #-------------------- queue handlers ----------------------------- |
|
262 | 284 | |
|
263 | 285 | def execute_request(self, stream, ident, parent): |
@@ -297,7 +319,7 b' class Kernel(object):' | |||
|
297 | 319 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident) |
|
298 | 320 | # print>>sys.__stdout__, Message(reply_msg) |
|
299 | 321 | if reply_msg['content']['status'] == u'error': |
|
300 | self.abort_queue() | |
|
322 | self.abort_queues() | |
|
301 | 323 | |
|
302 | 324 | def complete_request(self, stream, ident, parent): |
|
303 | 325 | matches = {'matches' : self.complete(parent), |
@@ -334,7 +356,7 b' class Kernel(object):' | |||
|
334 | 356 | |
|
335 | 357 | else: |
|
336 | 358 | working = dict() |
|
337 | suffix = prefix = "" | |
|
359 | suffix = prefix = "_" # prevent keyword collisions with lambda | |
|
338 | 360 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) |
|
339 | 361 | # if f.fun |
|
340 | 362 | fname = prefix+f.func_name.strip('<>')+suffix |
@@ -379,7 +401,7 b' class Kernel(object):' | |||
|
379 | 401 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf) |
|
380 | 402 | # print>>sys.__stdout__, Message(reply_msg) |
|
381 | 403 | if reply_msg['content']['status'] == u'error': |
|
382 | self.abort_queue() | |
|
404 | self.abort_queues() | |
|
383 | 405 | |
|
384 | 406 | def dispatch_queue(self, stream, msg): |
|
385 | 407 | self.flush_control() |
@@ -389,12 +411,15 b' class Kernel(object):' | |||
|
389 | 411 | header = msg['header'] |
|
390 | 412 | msg_id = header['msg_id'] |
|
391 | 413 | dependencies = header.get('dependencies', []) |
|
392 | ||
|
393 | 414 | if self.check_aborted(msg_id): |
|
394 |
|
|
|
415 | self.aborted.remove(msg_id) | |
|
416 | # is it safe to assume a msg_id will not be resubmitted? | |
|
417 | reply_type = msg['msg_type'].split('_')[0] + '_reply' | |
|
418 | reply_msg = self.session.send(stream, reply_type, | |
|
419 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |
|
420 | return | |
|
395 | 421 | if not self.check_dependencies(dependencies): |
|
396 | return self.unmet_dependencies(stream, msg) | |
|
397 | ||
|
422 | return self.unmet_dependencies(stream, idents, msg) | |
|
398 | 423 | handler = self.queue_handlers.get(msg['msg_type'], None) |
|
399 | 424 | if handler is None: |
|
400 | 425 | print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg |
@@ -405,12 +430,15 b' class Kernel(object):' | |||
|
405 | 430 | #### stream mode: |
|
406 | 431 | if self.control_stream: |
|
407 | 432 | self.control_stream.on_recv(self.dispatch_control, copy=False) |
|
433 | self.control_stream.on_err(printer) | |
|
408 | 434 | if self.reply_stream: |
|
409 | 435 | self.reply_stream.on_recv(lambda msg: |
|
410 | 436 | self.dispatch_queue(self.reply_stream, msg), copy=False) |
|
437 | self.reply_stream.on_err(printer) | |
|
411 | 438 | if self.task_stream: |
|
412 | 439 | self.task_stream.on_recv(lambda msg: |
|
413 | 440 | self.dispatch_queue(self.task_stream, msg), copy=False) |
|
441 | self.task_stream.on_err(printer) | |
|
414 | 442 | |
|
415 | 443 | #### while True mode: |
|
416 | 444 | # while True: |
@@ -257,7 +257,7 b' def unpack_apply_message(bufs, g=None, copy=True):' | |||
|
257 | 257 | |
|
258 | 258 | class StreamSession(object): |
|
259 | 259 | """tweaked version of IPython.zmq.session.Session, for development in Parallel""" |
|
260 | ||
|
260 | debug=False | |
|
261 | 261 | def __init__(self, username=None, session=None, packer=None, unpacker=None): |
|
262 | 262 | if username is None: |
|
263 | 263 | username = os.environ.get('USER','username') |
@@ -335,6 +335,10 b' class StreamSession(object):' | |||
|
335 | 335 | if buffers: |
|
336 | 336 | stream.send(buffers[-1], copy=False) |
|
337 | 337 | omsg = Message(msg) |
|
338 | if self.debug: | |
|
339 | pprint.pprint(omsg) | |
|
340 | pprint.pprint(to_send) | |
|
341 | pprint.pprint(buffers) | |
|
338 | 342 | return omsg |
|
339 | 343 | |
|
340 | 344 | def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True): |
@@ -103,7 +103,7 b' class View(object):' | |||
|
103 | 103 | This method has access to the targets' globals |
|
104 | 104 | |
|
105 | 105 | """ |
|
106 |
return self.client.apply(f, args, kwargs, block= |
|
|
106 | return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True) | |
|
107 | 107 | |
|
108 | 108 | |
|
109 | 109 | class DirectView(View): |
@@ -129,12 +129,28 b' class DirectView(View):' | |||
|
129 | 129 | def __setitem__(self,key,value): |
|
130 | 130 | self.update({key:value}) |
|
131 | 131 | |
|
132 | def clear(self): | |
|
133 |
""" |
|
|
134 | return self.client.clear(targets=self.targets,block=self.block) | |
|
132 | def clear(self, block=False): | |
|
133 | """Clear the remote namespaces on my engines.""" | |
|
134 | block = block if block is not None else self.block | |
|
135 | return self.client.clear(targets=self.targets,block=block) | |
|
136 | ||
|
137 | def kill(self, block=True): | |
|
138 | """Kill my engines.""" | |
|
139 | block = block if block is not None else self.block | |
|
140 | return self.client.kill(targets=self.targets,block=block) | |
|
141 | ||
|
142 | def abort(self, msg_ids=None, block=None): | |
|
143 | """Abort jobs on my engines. | |
|
135 | 144 | |
|
136 | def abort(self): | |
|
137 | return self.client.abort(targets=self.targets,block=self.block) | |
|
145 | Parameters | |
|
146 | ---------- | |
|
147 | ||
|
148 | msg_ids : None, str, list of strs, optional | |
|
149 | if None: abort all jobs. | |
|
150 | else: abort specific msg_id(s). | |
|
151 | """ | |
|
152 | block = block if block is not None else self.block | |
|
153 | return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block) | |
|
138 | 154 | |
|
139 | 155 | class LoadBalancedView(View): |
|
140 | 156 | _targets=None |
@@ -124,7 +124,7 b' def setup():' | |||
|
124 | 124 | |
|
125 | 125 | client_addrs = { |
|
126 | 126 | 'control' : "%s:%i"%(iface, ccport), |
|
127 |
' |
|
|
127 | 'query': "%s:%i"%(iface, cport), | |
|
128 | 128 | 'queue': "%s:%i"%(iface, cqport), |
|
129 | 129 | 'task' : "%s:%i"%(iface, ctport), |
|
130 | 130 | 'notification': "%s:%i"%(iface, nport) |
General Comments 0
You need to be logged in to leave comments.
Login now