Show More
@@ -4,6 +4,8 b'' | |||||
4 | import time |
|
4 | import time | |
5 | import threading |
|
5 | import threading | |
6 |
|
6 | |||
|
7 | from pprint import pprint | |||
|
8 | ||||
7 | from functools import wraps |
|
9 | from functools import wraps | |
8 |
|
10 | |||
9 | from IPython.external.decorator import decorator |
|
11 | from IPython.external.decorator import decorator | |
@@ -46,7 +48,9 b' def defaultblock(f, self, *args, **kwargs):' | |||||
46 | self.block = saveblock |
|
48 | self.block = saveblock | |
47 | return ret |
|
49 | return ret | |
48 |
|
50 | |||
49 |
|
51 | class AbortedTask(object): | ||
|
52 | def __init__(self, msg_id): | |||
|
53 | self.msg_id = msg_id | |||
50 | # @decorator |
|
54 | # @decorator | |
51 | # def checktargets(f): |
|
55 | # def checktargets(f): | |
52 | # @wraps(f) |
|
56 | # @wraps(f) | |
@@ -101,7 +105,11 b' class Client(object):' | |||||
101 | execution methods: apply/apply_bound/apply_to |
|
105 | execution methods: apply/apply_bound/apply_to | |
102 | legacy: execute, run |
|
106 | legacy: execute, run | |
103 |
|
107 | |||
104 |
|
|
108 | query methods: queue_status, get_result | |
|
109 | ||||
|
110 | control methods: abort, kill | |||
|
111 | ||||
|
112 | ||||
105 |
|
113 | |||
106 | """ |
|
114 | """ | |
107 |
|
115 | |||
@@ -109,7 +117,8 b' class Client(object):' | |||||
109 | _connected=False |
|
117 | _connected=False | |
110 | _engines=None |
|
118 | _engines=None | |
111 | registration_socket=None |
|
119 | registration_socket=None | |
112 |
|
|
120 | query_socket=None | |
|
121 | control_socket=None | |||
113 | notification_socket=None |
|
122 | notification_socket=None | |
114 | queue_socket=None |
|
123 | queue_socket=None | |
115 | task_socket=None |
|
124 | task_socket=None | |
@@ -117,8 +126,9 b' class Client(object):' | |||||
117 | outstanding=None |
|
126 | outstanding=None | |
118 | results = None |
|
127 | results = None | |
119 | history = None |
|
128 | history = None | |
|
129 | debug = False | |||
120 |
|
130 | |||
121 | def __init__(self, addr, context=None, username=None): |
|
131 | def __init__(self, addr, context=None, username=None, debug=False): | |
122 | if context is None: |
|
132 | if context is None: | |
123 | context = zmq.Context() |
|
133 | context = zmq.Context() | |
124 | self.context = context |
|
134 | self.context = context | |
@@ -135,6 +145,8 b' class Client(object):' | |||||
135 | self.outstanding=set() |
|
145 | self.outstanding=set() | |
136 | self.results = {} |
|
146 | self.results = {} | |
137 | self.history = [] |
|
147 | self.history = [] | |
|
148 | self.debug = debug | |||
|
149 | self.session.debug = debug | |||
138 | self._connect() |
|
150 | self._connect() | |
139 |
|
151 | |||
140 | self._notification_handlers = {'registration_notification' : self._register_engine, |
|
152 | self._notification_handlers = {'registration_notification' : self._register_engine, | |
@@ -152,7 +164,7 b' class Client(object):' | |||||
152 | def _update_engines(self, engines): |
|
164 | def _update_engines(self, engines): | |
153 | for k,v in engines.iteritems(): |
|
165 | for k,v in engines.iteritems(): | |
154 | eid = int(k) |
|
166 | eid = int(k) | |
155 | self._engines[eid] = v |
|
167 | self._engines[eid] = bytes(v) # force not unicode | |
156 | self._ids.add(eid) |
|
168 | self._ids.add(eid) | |
157 |
|
169 | |||
158 | def _build_targets(self, targets): |
|
170 | def _build_targets(self, targets): | |
@@ -173,7 +185,9 b' class Client(object):' | |||||
173 | return |
|
185 | return | |
174 | self._connected=True |
|
186 | self._connected=True | |
175 | self.session.send(self.registration_socket, 'connection_request') |
|
187 | self.session.send(self.registration_socket, 'connection_request') | |
176 |
msg = self.session.recv(self.registration_socket,mode=0) |
|
188 | idents,msg = self.session.recv(self.registration_socket,mode=0) | |
|
189 | if self.debug: | |||
|
190 | pprint(msg) | |||
177 | msg = ss.Message(msg) |
|
191 | msg = ss.Message(msg) | |
178 | content = msg.content |
|
192 | content = msg.content | |
179 | if content.status == 'ok': |
|
193 | if content.status == 'ok': | |
@@ -189,10 +203,14 b' class Client(object):' | |||||
189 | self.notification_socket = self.context.socket(zmq.SUB) |
|
203 | self.notification_socket = self.context.socket(zmq.SUB) | |
190 | self.notification_socket.connect(content.notification) |
|
204 | self.notification_socket.connect(content.notification) | |
191 | self.notification_socket.setsockopt(zmq.SUBSCRIBE, "") |
|
205 | self.notification_socket.setsockopt(zmq.SUBSCRIBE, "") | |
192 |
if content. |
|
206 | if content.query: | |
193 |
self. |
|
207 | self.query_socket = self.context.socket(zmq.PAIR) | |
194 |
self. |
|
208 | self.query_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
195 |
self. |
|
209 | self.query_socket.connect(content.query) | |
|
210 | if content.control: | |||
|
211 | self.control_socket = self.context.socket(zmq.PAIR) | |||
|
212 | self.control_socket.setsockopt(zmq.IDENTITY, self.session.session) | |||
|
213 | self.control_socket.connect(content.control) | |||
196 | self._update_engines(dict(content.engines)) |
|
214 | self._update_engines(dict(content.engines)) | |
197 |
|
215 | |||
198 | else: |
|
216 | else: | |
@@ -226,7 +244,7 b' class Client(object):' | |||||
226 | self.results[msg_id] = ss.unwrap_exception(msg['content']) |
|
244 | self.results[msg_id] = ss.unwrap_exception(msg['content']) | |
227 |
|
245 | |||
228 | def _handle_apply_reply(self, msg): |
|
246 | def _handle_apply_reply(self, msg): | |
229 |
# p |
|
247 | # pprint(msg) | |
230 | # msg_id = msg['msg_id'] |
|
248 | # msg_id = msg['msg_id'] | |
231 | parent = msg['parent_header'] |
|
249 | parent = msg['parent_header'] | |
232 | msg_id = parent['msg_id'] |
|
250 | msg_id = parent['msg_id'] | |
@@ -237,14 +255,19 b' class Client(object):' | |||||
237 | content = msg['content'] |
|
255 | content = msg['content'] | |
238 | if content['status'] == 'ok': |
|
256 | if content['status'] == 'ok': | |
239 | self.results[msg_id] = ss.unserialize_object(msg['buffers']) |
|
257 | self.results[msg_id] = ss.unserialize_object(msg['buffers']) | |
|
258 | elif content['status'] == 'aborted': | |||
|
259 | self.results[msg_id] = AbortedTask(msg_id) | |||
|
260 | elif content['status'] == 'resubmitted': | |||
|
261 | pass # handle resubmission | |||
240 | else: |
|
262 | else: | |
241 |
|
||||
242 | self.results[msg_id] = ss.unwrap_exception(content) |
|
263 | self.results[msg_id] = ss.unwrap_exception(content) | |
243 |
|
264 | |||
244 | def _flush_notifications(self): |
|
265 | def _flush_notifications(self): | |
245 | "flush incoming notifications of engine registrations" |
|
266 | "flush incoming notifications of engine registrations" | |
246 | msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK) |
|
267 | msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK) | |
247 | while msg is not None: |
|
268 | while msg is not None: | |
|
269 | if self.debug: | |||
|
270 | pprint(msg) | |||
248 | msg = msg[-1] |
|
271 | msg = msg[-1] | |
249 | msg_type = msg['msg_type'] |
|
272 | msg_type = msg['msg_type'] | |
250 | handler = self._notification_handlers.get(msg_type, None) |
|
273 | handler = self._notification_handlers.get(msg_type, None) | |
@@ -258,6 +281,8 b' class Client(object):' | |||||
258 | "flush incoming task or queue results" |
|
281 | "flush incoming task or queue results" | |
259 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) |
|
282 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
260 | while msg is not None: |
|
283 | while msg is not None: | |
|
284 | if self.debug: | |||
|
285 | pprint(msg) | |||
261 | msg = msg[-1] |
|
286 | msg = msg[-1] | |
262 | msg_type = msg['msg_type'] |
|
287 | msg_type = msg['msg_type'] | |
263 | handler = self._queue_handlers.get(msg_type, None) |
|
288 | handler = self._queue_handlers.get(msg_type, None) | |
@@ -267,6 +292,14 b' class Client(object):' | |||||
267 | handler(msg) |
|
292 | handler(msg) | |
268 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) |
|
293 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
269 |
|
294 | |||
|
295 | def _flush_control(self, sock): | |||
|
296 | "flush incoming control replies" | |||
|
297 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |||
|
298 | while msg is not None: | |||
|
299 | if self.debug: | |||
|
300 | pprint(msg) | |||
|
301 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |||
|
302 | ||||
270 | ###### get/setitem ######## |
|
303 | ###### get/setitem ######## | |
271 |
|
304 | |||
272 | def __getitem__(self, key): |
|
305 | def __getitem__(self, key): | |
@@ -297,6 +330,8 b' class Client(object):' | |||||
297 | self._flush_results(self.queue_socket) |
|
330 | self._flush_results(self.queue_socket) | |
298 | if self.task_socket: |
|
331 | if self.task_socket: | |
299 | self._flush_results(self.task_socket) |
|
332 | self._flush_results(self.task_socket) | |
|
333 | if self.control_socket: | |||
|
334 | self._flush_control(self.control_socket) | |||
300 |
|
335 | |||
301 | @spinfirst |
|
336 | @spinfirst | |
302 | def queue_status(self, targets=None, verbose=False): |
|
337 | def queue_status(self, targets=None, verbose=False): | |
@@ -308,24 +343,78 b' class Client(object):' | |||||
308 | the engines on which to execute |
|
343 | the engines on which to execute | |
309 | default : all |
|
344 | default : all | |
310 | verbose : bool |
|
345 | verbose : bool | |
311 | whether to return |
|
346 | whether to return lengths only, or lists of ids for each element | |
312 |
|
347 | |||
313 | """ |
|
348 | """ | |
314 | targets = self._build_targets(targets)[1] |
|
349 | targets = self._build_targets(targets)[1] | |
315 | content = dict(targets=targets) |
|
350 | content = dict(targets=targets) | |
316 |
self.session.send(self. |
|
351 | self.session.send(self.query_socket, "queue_request", content=content) | |
317 |
idents,msg = self.session.recv(self. |
|
352 | idents,msg = self.session.recv(self.query_socket, 0) | |
|
353 | if self.debug: | |||
|
354 | pprint(msg) | |||
318 | return msg['content'] |
|
355 | return msg['content'] | |
319 |
|
356 | |||
320 | @spinfirst |
|
357 | @spinfirst | |
321 | def clear(self, targets=None): |
|
358 | @defaultblock | |
|
359 | def clear(self, targets=None, block=None): | |||
322 | """clear the namespace in target(s)""" |
|
360 | """clear the namespace in target(s)""" | |
323 | pass |
|
361 | targets = self._build_targets(targets)[0] | |
|
362 | print targets | |||
|
363 | for t in targets: | |||
|
364 | self.session.send(self.control_socket, 'clear_request', content={},ident=t) | |||
|
365 | error = False | |||
|
366 | if self.block: | |||
|
367 | for i in range(len(targets)): | |||
|
368 | idents,msg = self.session.recv(self.control_socket,0) | |||
|
369 | if self.debug: | |||
|
370 | pprint(msg) | |||
|
371 | if msg['content']['status'] != 'ok': | |||
|
372 | error = msg['content'] | |||
|
373 | if error: | |||
|
374 | return error | |||
|
375 | ||||
324 |
|
376 | |||
325 | @spinfirst |
|
377 | @spinfirst | |
326 | def abort(self, targets=None): |
|
378 | @defaultblock | |
|
379 | def abort(self, msg_ids = None, targets=None, block=None): | |||
327 | """abort the Queues of target(s)""" |
|
380 | """abort the Queues of target(s)""" | |
328 | pass |
|
381 | targets = self._build_targets(targets)[0] | |
|
382 | print targets | |||
|
383 | if isinstance(msg_ids, basestring): | |||
|
384 | msg_ids = [msg_ids] | |||
|
385 | content = dict(msg_ids=msg_ids) | |||
|
386 | for t in targets: | |||
|
387 | self.session.send(self.control_socket, 'abort_request', | |||
|
388 | content=content, ident=t) | |||
|
389 | error = False | |||
|
390 | if self.block: | |||
|
391 | for i in range(len(targets)): | |||
|
392 | idents,msg = self.session.recv(self.control_socket,0) | |||
|
393 | if self.debug: | |||
|
394 | pprint(msg) | |||
|
395 | if msg['content']['status'] != 'ok': | |||
|
396 | error = msg['content'] | |||
|
397 | if error: | |||
|
398 | return error | |||
|
399 | ||||
|
400 | @spinfirst | |||
|
401 | @defaultblock | |||
|
402 | def kill(self, targets=None, block=None): | |||
|
403 | """Terminates one or more engine processes.""" | |||
|
404 | targets = self._build_targets(targets)[0] | |||
|
405 | print targets | |||
|
406 | for t in targets: | |||
|
407 | self.session.send(self.control_socket, 'kill_request', content={},ident=t) | |||
|
408 | error = False | |||
|
409 | if self.block: | |||
|
410 | for i in range(len(targets)): | |||
|
411 | idents,msg = self.session.recv(self.control_socket,0) | |||
|
412 | if self.debug: | |||
|
413 | pprint(msg) | |||
|
414 | if msg['content']['status'] != 'ok': | |||
|
415 | error = msg['content'] | |||
|
416 | if error: | |||
|
417 | return error | |||
329 |
|
418 | |||
330 | @defaultblock |
|
419 | @defaultblock | |
331 | def execute(self, code, targets='all', block=None): |
|
420 | def execute(self, code, targets='all', block=None): | |
@@ -364,22 +453,6 b' class Client(object):' | |||||
364 | result = self.apply(execute, (code,), targets=None, block=block, bound=False) |
|
453 | result = self.apply(execute, (code,), targets=None, block=block, bound=False) | |
365 | return result |
|
454 | return result | |
366 |
|
|
455 | ||
367 | # a = time.time() |
|
|||
368 | # content = dict(code=code) |
|
|||
369 | # b = time.time() |
|
|||
370 | # msg = self.session.send(self.task_socket, 'execute_request', |
|
|||
371 | # content=content) |
|
|||
372 | # c = time.time() |
|
|||
373 | # msg_id = msg['msg_id'] |
|
|||
374 | # self.outstanding.add(msg_id) |
|
|||
375 | # self.history.append(msg_id) |
|
|||
376 | # d = time.time() |
|
|||
377 | # if block: |
|
|||
378 | # self.barrier(msg_id) |
|
|||
379 | # return self.results[msg_id] |
|
|||
380 | # else: |
|
|||
381 | # return msg_id |
|
|||
382 |
|
||||
383 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None): |
|
456 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None): | |
384 | """the underlying method for applying functions in a load balanced |
|
457 | """the underlying method for applying functions in a load balanced | |
385 | manner.""" |
|
458 | manner.""" | |
@@ -402,7 +475,7 b' class Client(object):' | |||||
402 | """Then underlying method for applying functions to specific engines.""" |
|
475 | """Then underlying method for applying functions to specific engines.""" | |
403 | block = block if block is not None else self.block |
|
476 | block = block if block is not None else self.block | |
404 | queues,targets = self._build_targets(targets) |
|
477 | queues,targets = self._build_targets(targets) | |
405 |
|
478 | print queues | ||
406 | bufs = ss.pack_apply_message(f,args,kwargs) |
|
479 | bufs = ss.pack_apply_message(f,args,kwargs) | |
407 | content = dict(bound=bound) |
|
480 | content = dict(bound=bound) | |
408 | msg_ids = [] |
|
481 | msg_ids = [] | |
@@ -438,51 +511,16 b' class Client(object):' | |||||
438 | """ |
|
511 | """ | |
439 | args = args if args is not None else [] |
|
512 | args = args if args is not None else [] | |
440 | kwargs = kwargs if kwargs is not None else {} |
|
513 | kwargs = kwargs if kwargs is not None else {} | |
|
514 | if not isinstance(args, (tuple, list)): | |||
|
515 | raise TypeError("args must be tuple or list, not %s"%type(args)) | |||
|
516 | if not isinstance(kwargs, dict): | |||
|
517 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | |||
441 | if targets is None: |
|
518 | if targets is None: | |
442 | return self._apply_balanced(f,args,kwargs,bound=bound, block=block) |
|
519 | return self._apply_balanced(f,args,kwargs,bound=bound, block=block) | |
443 | else: |
|
520 | else: | |
444 | return self._apply_direct(f, args, kwargs, |
|
521 | return self._apply_direct(f, args, kwargs, | |
445 | bound=bound,block=block, targets=targets) |
|
522 | bound=bound,block=block, targets=targets) | |
446 |
|
523 | |||
447 | # def apply_bound(self, f, *args, **kwargs): |
|
|||
448 | # """calls f(*args, **kwargs) on a remote engine. This does get |
|
|||
449 | # executed in an engine's namespace. The controller selects the |
|
|||
450 | # target engine via 0MQ XREQ load balancing. |
|
|||
451 | # |
|
|||
452 | # if self.block is False: |
|
|||
453 | # returns msg_id |
|
|||
454 | # else: |
|
|||
455 | # returns actual result of f(*args, **kwargs) |
|
|||
456 | # """ |
|
|||
457 | # return self._apply(f, args, kwargs, bound=True) |
|
|||
458 | # |
|
|||
459 | # |
|
|||
460 | # def apply_to(self, targets, f, *args, **kwargs): |
|
|||
461 | # """calls f(*args, **kwargs) on a specific engine. |
|
|||
462 | # |
|
|||
463 | # if self.block is False: |
|
|||
464 | # returns msg_id |
|
|||
465 | # else: |
|
|||
466 | # returns actual result of f(*args, **kwargs) |
|
|||
467 | # |
|
|||
468 | # The target's namespace is not used here. |
|
|||
469 | # Use apply_bound_to() to access target's globals. |
|
|||
470 | # """ |
|
|||
471 | # return self._apply_to(False, targets, f, args, kwargs) |
|
|||
472 | # |
|
|||
473 | # def apply_bound_to(self, targets, f, *args, **kwargs): |
|
|||
474 | # """calls f(*args, **kwargs) on a specific engine. |
|
|||
475 | # |
|
|||
476 | # if self.block is False: |
|
|||
477 | # returns msg_id |
|
|||
478 | # else: |
|
|||
479 | # returns actual result of f(*args, **kwargs) |
|
|||
480 | # |
|
|||
481 | # This method has access to the target's globals |
|
|||
482 | # |
|
|||
483 | # """ |
|
|||
484 | # return self._apply_to(f, args, kwargs) |
|
|||
485 | # |
|
|||
486 | def push(self, ns, targets=None, block=None): |
|
524 | def push(self, ns, targets=None, block=None): | |
487 | """push the contents of `ns` into the namespace on `target`""" |
|
525 | """push the contents of `ns` into the namespace on `target`""" | |
488 | if not isinstance(ns, dict): |
|
526 | if not isinstance(ns, dict): | |
@@ -546,9 +584,11 b' class Client(object):' | |||||
546 | theids.append(msg_id) |
|
584 | theids.append(msg_id) | |
547 |
|
585 | |||
548 | content = dict(msg_ids=theids, status_only=status_only) |
|
586 | content = dict(msg_ids=theids, status_only=status_only) | |
549 |
msg = self.session.send(self. |
|
587 | msg = self.session.send(self.query_socket, "result_request", content=content) | |
550 |
zmq.select([self. |
|
588 | zmq.select([self.query_socket], [], []) | |
551 |
idents,msg = self.session.recv(self. |
|
589 | idents,msg = self.session.recv(self.query_socket, zmq.NOBLOCK) | |
|
590 | if self.debug: | |||
|
591 | pprint(msg) | |||
552 |
|
592 | |||
553 | # while True: |
|
593 | # while True: | |
554 | # try: |
|
594 | # try: |
@@ -297,6 +297,8 b' class Controller(object):' | |||||
297 | self.save_task_result(idents, msg) |
|
297 | self.save_task_result(idents, msg) | |
298 | elif switch == 'tracktask': |
|
298 | elif switch == 'tracktask': | |
299 | self.save_task_destination(idents, msg) |
|
299 | self.save_task_destination(idents, msg) | |
|
300 | elif switch in ('incontrol', 'outcontrol'): | |||
|
301 | pass | |||
300 | else: |
|
302 | else: | |
301 | logger.error("Invalid message topic: %s"%switch) |
|
303 | logger.error("Invalid message topic: %s"%switch) | |
302 |
|
304 |
@@ -7,6 +7,7 b' import sys' | |||||
7 | import time |
|
7 | import time | |
8 | import traceback |
|
8 | import traceback | |
9 | import uuid |
|
9 | import uuid | |
|
10 | from pprint import pprint | |||
10 |
|
11 | |||
11 | import zmq |
|
12 | import zmq | |
12 | from zmq.eventloop import ioloop, zmqstream |
|
13 | from zmq.eventloop import ioloop, zmqstream | |
@@ -20,7 +21,7 b' import heartmonitor' | |||||
20 |
|
21 | |||
21 |
|
22 | |||
22 | def printer(*msg): |
|
23 | def printer(*msg): | |
23 |
p |
|
24 | pprint(msg) | |
24 |
|
25 | |||
25 | class Engine(object): |
|
26 | class Engine(object): | |
26 | """IPython engine""" |
|
27 | """IPython engine""" | |
@@ -29,26 +30,23 b' class Engine(object):' | |||||
29 | context=None |
|
30 | context=None | |
30 | loop=None |
|
31 | loop=None | |
31 | session=None |
|
32 | session=None | |
32 |
|
|
33 | ident=None | |
33 | control_id=None |
|
|||
34 | heart_id=None |
|
|||
35 | registrar=None |
|
34 | registrar=None | |
36 | heart=None |
|
35 | heart=None | |
37 | kernel=None |
|
36 | kernel=None | |
38 |
|
37 | |||
39 |
def __init__(self, context, loop, session, registrar, client, |
|
38 | def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None): | |
40 | self.context = context |
|
39 | self.context = context | |
41 | self.loop = loop |
|
40 | self.loop = loop | |
42 | self.session = session |
|
41 | self.session = session | |
43 | self.registrar = registrar |
|
42 | self.registrar = registrar | |
44 | self.client = client |
|
43 | self.client = client | |
45 |
self. |
|
44 | self.ident = ident if ident else str(uuid.uuid4()) | |
46 | self.heart_id = heart_id or self.queue_id |
|
|||
47 | self.registrar.on_send(printer) |
|
45 | self.registrar.on_send(printer) | |
48 |
|
46 | |||
49 | def register(self): |
|
47 | def register(self): | |
50 |
|
48 | |||
51 |
content = dict(queue=self. |
|
49 | content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) | |
52 | self.registrar.on_recv(self.complete_registration) |
|
50 | self.registrar.on_recv(self.complete_registration) | |
53 | self.session.send(self.registrar, "registration_request",content=content) |
|
51 | self.session.send(self.registrar, "registration_request",content=content) | |
54 |
|
52 | |||
@@ -61,14 +59,14 b' class Engine(object):' | |||||
61 | queue_addr = msg.content.queue |
|
59 | queue_addr = msg.content.queue | |
62 | if queue_addr: |
|
60 | if queue_addr: | |
63 | queue = self.context.socket(zmq.PAIR) |
|
61 | queue = self.context.socket(zmq.PAIR) | |
64 |
queue.setsockopt(zmq.IDENTITY, self. |
|
62 | queue.setsockopt(zmq.IDENTITY, self.ident) | |
65 | queue.connect(str(queue_addr)) |
|
63 | queue.connect(str(queue_addr)) | |
66 | self.queue = zmqstream.ZMQStream(queue, self.loop) |
|
64 | self.queue = zmqstream.ZMQStream(queue, self.loop) | |
67 |
|
65 | |||
68 | control_addr = msg.content.control |
|
66 | control_addr = msg.content.control | |
69 | if control_addr: |
|
67 | if control_addr: | |
70 | control = self.context.socket(zmq.PAIR) |
|
68 | control = self.context.socket(zmq.PAIR) | |
71 |
control.setsockopt(zmq.IDENTITY, self. |
|
69 | control.setsockopt(zmq.IDENTITY, self.ident) | |
72 | control.connect(str(control_addr)) |
|
70 | control.connect(str(control_addr)) | |
73 | self.control = zmqstream.ZMQStream(control, self.loop) |
|
71 | self.control = zmqstream.ZMQStream(control, self.loop) | |
74 |
|
72 | |||
@@ -81,14 +79,14 b' class Engine(object):' | |||||
81 | self.task_stream = zmqstream.ZMQStream(task, self.loop) |
|
79 | self.task_stream = zmqstream.ZMQStream(task, self.loop) | |
82 | # TaskThread: |
|
80 | # TaskThread: | |
83 | # mon_addr = msg.content.monitor |
|
81 | # mon_addr = msg.content.monitor | |
84 |
# task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self. |
|
82 | # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident) | |
85 | # task.connect_in(str(task_addr)) |
|
83 | # task.connect_in(str(task_addr)) | |
86 | # task.connect_out(str(mon_addr)) |
|
84 | # task.connect_out(str(mon_addr)) | |
87 | # self.task_stream = taskthread.QueueStream(*task.queues) |
|
85 | # self.task_stream = taskthread.QueueStream(*task.queues) | |
88 | # task.start() |
|
86 | # task.start() | |
89 |
|
87 | |||
90 | hbs = msg.content.heartbeat |
|
88 | hbs = msg.content.heartbeat | |
91 |
self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self. |
|
89 | self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident) | |
92 | self.heart.start() |
|
90 | self.heart.start() | |
93 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() |
|
91 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() | |
94 | # placeholder for now: |
|
92 | # placeholder for now: |
@@ -4,10 +4,12 b' Kernel adapted from kernel.py to use ZMQ Streams' | |||||
4 | """ |
|
4 | """ | |
5 |
|
5 | |||
6 | import __builtin__ |
|
6 | import __builtin__ | |
|
7 | import os | |||
7 | import sys |
|
8 | import sys | |
8 | import time |
|
9 | import time | |
9 | import traceback |
|
10 | import traceback | |
10 | from signal import SIGTERM, SIGKILL |
|
11 | from signal import SIGTERM, SIGKILL | |
|
12 | from pprint import pprint | |||
11 |
|
13 | |||
12 | from code import CommandCompiler |
|
14 | from code import CommandCompiler | |
13 |
|
15 | |||
@@ -18,6 +20,9 b' from streamsession import StreamSession, Message, extract_header, serialize_obje' | |||||
18 | unpack_apply_message |
|
20 | unpack_apply_message | |
19 | from IPython.zmq.completer import KernelCompleter |
|
21 | from IPython.zmq.completer import KernelCompleter | |
20 |
|
22 | |||
|
23 | def printer(*args): | |||
|
24 | pprint(args) | |||
|
25 | ||||
21 | class OutStream(object): |
|
26 | class OutStream(object): | |
22 | """A file like object that publishes the stream to a 0MQ PUB socket.""" |
|
27 | """A file like object that publishes the stream to a 0MQ PUB socket.""" | |
23 |
|
28 | |||
@@ -133,6 +138,7 b' class Kernel(object):' | |||||
133 | task_stream=None, client=None): |
|
138 | task_stream=None, client=None): | |
134 | self.session = session |
|
139 | self.session = session | |
135 | self.control_stream = control_stream |
|
140 | self.control_stream = control_stream | |
|
141 | self.control_socket = control_stream.socket | |||
136 | self.reply_stream = reply_stream |
|
142 | self.reply_stream = reply_stream | |
137 | self.task_stream = task_stream |
|
143 | self.task_stream = task_stream | |
138 | self.pub_stream = pub_stream |
|
144 | self.pub_stream = pub_stream | |
@@ -153,6 +159,10 b' class Kernel(object):' | |||||
153 | self.control_handlers[msg_type] = getattr(self, msg_type) |
|
159 | self.control_handlers[msg_type] = getattr(self, msg_type) | |
154 |
|
160 | |||
155 | #-------------------- control handlers ----------------------------- |
|
161 | #-------------------- control handlers ----------------------------- | |
|
162 | def abort_queues(self): | |||
|
163 | for stream in (self.task_stream, self.reply_stream): | |||
|
164 | if stream: | |||
|
165 | self.abort_queue(stream) | |||
156 |
|
166 | |||
157 | def abort_queue(self, stream): |
|
167 | def abort_queue(self, stream): | |
158 | while True: |
|
168 | while True: | |
@@ -186,28 +196,30 b' class Kernel(object):' | |||||
186 | time.sleep(0.05) |
|
196 | time.sleep(0.05) | |
187 |
|
197 | |||
188 | def abort_request(self, stream, ident, parent): |
|
198 | def abort_request(self, stream, ident, parent): | |
|
199 | """abort a specifig msg by id""" | |||
189 | msg_ids = parent['content'].get('msg_ids', None) |
|
200 | msg_ids = parent['content'].get('msg_ids', None) | |
|
201 | if isinstance(msg_ids, basestring): | |||
|
202 | msg_ids = [msg_ids] | |||
190 | if not msg_ids: |
|
203 | if not msg_ids: | |
191 |
self.abort_queue( |
|
204 | self.abort_queues() | |
192 | self.abort_queue(self.reply_stream) |
|
|||
193 | for mid in msg_ids: |
|
205 | for mid in msg_ids: | |
194 | self.aborted.add(mid) |
|
206 | self.aborted.add(str(mid)) | |
195 |
|
207 | |||
196 | content = dict(status='ok') |
|
208 | content = dict(status='ok') | |
197 | self.session.send(stream, 'abort_reply', content=content, parent=parent, |
|
209 | reply_msg = self.session.send(stream, 'abort_reply', content=content, parent=parent, | |
198 | ident=ident) |
|
210 | ident=ident) | |
|
211 | print>>sys.__stdout__, Message(reply_msg) | |||
199 |
|
212 | |||
200 | def kill_request(self, stream, idents, parent): |
|
213 | def kill_request(self, stream, idents, parent): | |
201 | self.abort_queue(self.reply_stream) |
|
214 | """kill ourselves. This should really be handled in an external process""" | |
202 | if self.task_stream: |
|
215 | self.abort_queues() | |
203 | self.abort_queue(self.task_stream) |
|
|||
204 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, |
|
216 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, | |
205 | content = dict(status='ok')) |
|
217 | content = dict(status='ok')) | |
206 | # we can know that a message is done if we *don't* use streams, but |
|
218 | # we can know that a message is done if we *don't* use streams, but | |
207 | # use a socket directly with MessageTracker |
|
219 | # use a socket directly with MessageTracker | |
208 |
time.sleep( |
|
220 | time.sleep(.5) | |
209 | os.kill(os.getpid(), SIGTERM) |
|
221 | os.kill(os.getpid(), SIGTERM) | |
210 |
time.sleep( |
|
222 | time.sleep(1) | |
211 | os.kill(os.getpid(), SIGKILL) |
|
223 | os.kill(os.getpid(), SIGKILL) | |
212 |
|
224 | |||
213 | def dispatch_control(self, msg): |
|
225 | def dispatch_control(self, msg): | |
@@ -221,7 +233,7 b' class Kernel(object):' | |||||
221 | if handler is None: |
|
233 | if handler is None: | |
222 | print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg |
|
234 | print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg | |
223 | else: |
|
235 | else: | |
224 | handler(stream, idents, msg) |
|
236 | handler(self.control_stream, idents, msg) | |
225 |
|
237 | |||
226 | def flush_control(self): |
|
238 | def flush_control(self): | |
227 | while any(zmq.select([self.control_socket],[],[],1e-4)): |
|
239 | while any(zmq.select([self.control_socket],[],[],1e-4)): | |
@@ -258,6 +270,16 b' class Kernel(object):' | |||||
258 |
|
270 | |||
259 | return True |
|
271 | return True | |
260 |
|
272 | |||
|
273 | def check_aborted(self, msg_id): | |||
|
274 | return msg_id in self.aborted | |||
|
275 | ||||
|
276 | def unmet_dependencies(self, stream, idents, msg): | |||
|
277 | reply_type = msg['msg_type'].split('_')[0] + '_reply' | |||
|
278 | content = dict(status='resubmitted', reason='unmet dependencies') | |||
|
279 | reply_msg = self.session.send(stream, reply_type, | |||
|
280 | content=content, parent=msg, ident=idents) | |||
|
281 | ### TODO: actually resubmit it ### | |||
|
282 | ||||
261 | #-------------------- queue handlers ----------------------------- |
|
283 | #-------------------- queue handlers ----------------------------- | |
262 |
|
284 | |||
263 | def execute_request(self, stream, ident, parent): |
|
285 | def execute_request(self, stream, ident, parent): | |
@@ -297,7 +319,7 b' class Kernel(object):' | |||||
297 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident) |
|
319 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident) | |
298 | # print>>sys.__stdout__, Message(reply_msg) |
|
320 | # print>>sys.__stdout__, Message(reply_msg) | |
299 | if reply_msg['content']['status'] == u'error': |
|
321 | if reply_msg['content']['status'] == u'error': | |
300 | self.abort_queue() |
|
322 | self.abort_queues() | |
301 |
|
323 | |||
302 | def complete_request(self, stream, ident, parent): |
|
324 | def complete_request(self, stream, ident, parent): | |
303 | matches = {'matches' : self.complete(parent), |
|
325 | matches = {'matches' : self.complete(parent), | |
@@ -334,7 +356,7 b' class Kernel(object):' | |||||
334 |
|
356 | |||
335 | else: |
|
357 | else: | |
336 | working = dict() |
|
358 | working = dict() | |
337 | suffix = prefix = "" |
|
359 | suffix = prefix = "_" # prevent keyword collisions with lambda | |
338 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) |
|
360 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | |
339 | # if f.fun |
|
361 | # if f.fun | |
340 | fname = prefix+f.func_name.strip('<>')+suffix |
|
362 | fname = prefix+f.func_name.strip('<>')+suffix | |
@@ -379,7 +401,7 b' class Kernel(object):' | |||||
379 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf) |
|
401 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf) | |
380 | # print>>sys.__stdout__, Message(reply_msg) |
|
402 | # print>>sys.__stdout__, Message(reply_msg) | |
381 | if reply_msg['content']['status'] == u'error': |
|
403 | if reply_msg['content']['status'] == u'error': | |
382 | self.abort_queue() |
|
404 | self.abort_queues() | |
383 |
|
405 | |||
384 | def dispatch_queue(self, stream, msg): |
|
406 | def dispatch_queue(self, stream, msg): | |
385 | self.flush_control() |
|
407 | self.flush_control() | |
@@ -389,12 +411,15 b' class Kernel(object):' | |||||
389 | header = msg['header'] |
|
411 | header = msg['header'] | |
390 | msg_id = header['msg_id'] |
|
412 | msg_id = header['msg_id'] | |
391 | dependencies = header.get('dependencies', []) |
|
413 | dependencies = header.get('dependencies', []) | |
392 |
|
||||
393 | if self.check_aborted(msg_id): |
|
414 | if self.check_aborted(msg_id): | |
394 |
|
|
415 | self.aborted.remove(msg_id) | |
|
416 | # is it safe to assume a msg_id will not be resubmitted? | |||
|
417 | reply_type = msg['msg_type'].split('_')[0] + '_reply' | |||
|
418 | reply_msg = self.session.send(stream, reply_type, | |||
|
419 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |||
|
420 | return | |||
395 | if not self.check_dependencies(dependencies): |
|
421 | if not self.check_dependencies(dependencies): | |
396 | return self.unmet_dependencies(stream, msg) |
|
422 | return self.unmet_dependencies(stream, idents, msg) | |
397 |
|
||||
398 | handler = self.queue_handlers.get(msg['msg_type'], None) |
|
423 | handler = self.queue_handlers.get(msg['msg_type'], None) | |
399 | if handler is None: |
|
424 | if handler is None: | |
400 | print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg |
|
425 | print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg | |
@@ -405,12 +430,15 b' class Kernel(object):' | |||||
405 | #### stream mode: |
|
430 | #### stream mode: | |
406 | if self.control_stream: |
|
431 | if self.control_stream: | |
407 | self.control_stream.on_recv(self.dispatch_control, copy=False) |
|
432 | self.control_stream.on_recv(self.dispatch_control, copy=False) | |
|
433 | self.control_stream.on_err(printer) | |||
408 | if self.reply_stream: |
|
434 | if self.reply_stream: | |
409 | self.reply_stream.on_recv(lambda msg: |
|
435 | self.reply_stream.on_recv(lambda msg: | |
410 | self.dispatch_queue(self.reply_stream, msg), copy=False) |
|
436 | self.dispatch_queue(self.reply_stream, msg), copy=False) | |
|
437 | self.reply_stream.on_err(printer) | |||
411 | if self.task_stream: |
|
438 | if self.task_stream: | |
412 | self.task_stream.on_recv(lambda msg: |
|
439 | self.task_stream.on_recv(lambda msg: | |
413 | self.dispatch_queue(self.task_stream, msg), copy=False) |
|
440 | self.dispatch_queue(self.task_stream, msg), copy=False) | |
|
441 | self.task_stream.on_err(printer) | |||
414 |
|
442 | |||
415 | #### while True mode: |
|
443 | #### while True mode: | |
416 | # while True: |
|
444 | # while True: |
@@ -257,7 +257,7 b' def unpack_apply_message(bufs, g=None, copy=True):' | |||||
257 |
|
257 | |||
258 | class StreamSession(object): |
|
258 | class StreamSession(object): | |
259 | """tweaked version of IPython.zmq.session.Session, for development in Parallel""" |
|
259 | """tweaked version of IPython.zmq.session.Session, for development in Parallel""" | |
260 |
|
260 | debug=False | ||
261 | def __init__(self, username=None, session=None, packer=None, unpacker=None): |
|
261 | def __init__(self, username=None, session=None, packer=None, unpacker=None): | |
262 | if username is None: |
|
262 | if username is None: | |
263 | username = os.environ.get('USER','username') |
|
263 | username = os.environ.get('USER','username') | |
@@ -335,6 +335,10 b' class StreamSession(object):' | |||||
335 | if buffers: |
|
335 | if buffers: | |
336 | stream.send(buffers[-1], copy=False) |
|
336 | stream.send(buffers[-1], copy=False) | |
337 | omsg = Message(msg) |
|
337 | omsg = Message(msg) | |
|
338 | if self.debug: | |||
|
339 | pprint.pprint(omsg) | |||
|
340 | pprint.pprint(to_send) | |||
|
341 | pprint.pprint(buffers) | |||
338 | return omsg |
|
342 | return omsg | |
339 |
|
343 | |||
340 | def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True): |
|
344 | def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True): |
@@ -103,7 +103,7 b' class View(object):' | |||||
103 | This method has access to the targets' globals |
|
103 | This method has access to the targets' globals | |
104 |
|
104 | |||
105 | """ |
|
105 | """ | |
106 |
return self.client.apply(f, args, kwargs, block= |
|
106 | return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True) | |
107 |
|
107 | |||
108 |
|
108 | |||
109 | class DirectView(View): |
|
109 | class DirectView(View): | |
@@ -129,12 +129,28 b' class DirectView(View):' | |||||
129 | def __setitem__(self,key,value): |
|
129 | def __setitem__(self,key,value): | |
130 | self.update({key:value}) |
|
130 | self.update({key:value}) | |
131 |
|
131 | |||
132 | def clear(self): |
|
132 | def clear(self, block=False): | |
133 |
""" |
|
133 | """Clear the remote namespaces on my engines.""" | |
134 | return self.client.clear(targets=self.targets,block=self.block) |
|
134 | block = block if block is not None else self.block | |
|
135 | return self.client.clear(targets=self.targets,block=block) | |||
|
136 | ||||
|
137 | def kill(self, block=True): | |||
|
138 | """Kill my engines.""" | |||
|
139 | block = block if block is not None else self.block | |||
|
140 | return self.client.kill(targets=self.targets,block=block) | |||
|
141 | ||||
|
142 | def abort(self, msg_ids=None, block=None): | |||
|
143 | """Abort jobs on my engines. | |||
135 |
|
144 | |||
136 | def abort(self): |
|
145 | Parameters | |
137 | return self.client.abort(targets=self.targets,block=self.block) |
|
146 | ---------- | |
|
147 | ||||
|
148 | msg_ids : None, str, list of strs, optional | |||
|
149 | if None: abort all jobs. | |||
|
150 | else: abort specific msg_id(s). | |||
|
151 | """ | |||
|
152 | block = block if block is not None else self.block | |||
|
153 | return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block) | |||
138 |
|
154 | |||
139 | class LoadBalancedView(View): |
|
155 | class LoadBalancedView(View): | |
140 | _targets=None |
|
156 | _targets=None |
@@ -124,7 +124,7 b' def setup():' | |||||
124 |
|
124 | |||
125 | client_addrs = { |
|
125 | client_addrs = { | |
126 | 'control' : "%s:%i"%(iface, ccport), |
|
126 | 'control' : "%s:%i"%(iface, ccport), | |
127 |
' |
|
127 | 'query': "%s:%i"%(iface, cport), | |
128 | 'queue': "%s:%i"%(iface, cqport), |
|
128 | 'queue': "%s:%i"%(iface, cqport), | |
129 | 'task' : "%s:%i"%(iface, ctport), |
|
129 | 'task' : "%s:%i"%(iface, ctport), | |
130 | 'notification': "%s:%i"%(iface, nport) |
|
130 | 'notification': "%s:%i"%(iface, nport) |
General Comments 0
You need to be logged in to leave comments.
Login now