Show More
@@ -1,6 +1,7 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """The IPython Controller with 0MQ |
|
2 | """The IPython Controller with 0MQ | |
3 |
This is the master object that handles connections from engines |
|
3 | This is the master object that handles connections from engines and clients, | |
|
4 | and monitors traffic through the various queues. | |||
4 | """ |
|
5 | """ | |
5 | #----------------------------------------------------------------------------- |
|
6 | #----------------------------------------------------------------------------- | |
6 | # Copyright (C) 2010 The IPython Development Team |
|
7 | # Copyright (C) 2010 The IPython Development Team | |
@@ -28,12 +29,14 b' from IPython.zmq.entry_point import bind_port' | |||||
28 | from streamsession import Message, wrap_exception |
|
29 | from streamsession import Message, wrap_exception | |
29 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, |
|
30 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, | |
30 | connect_logger, parse_url) |
|
31 | connect_logger, parse_url) | |
31 | # from messages import json # use the same import switches |
|
|||
32 |
|
32 | |||
33 | #----------------------------------------------------------------------------- |
|
33 | #----------------------------------------------------------------------------- | |
34 | # Code |
|
34 | # Code | |
35 | #----------------------------------------------------------------------------- |
|
35 | #----------------------------------------------------------------------------- | |
36 |
|
36 | |||
|
37 | def _passer(*args, **kwargs): | |||
|
38 | return | |||
|
39 | ||||
37 | class ReverseDict(dict): |
|
40 | class ReverseDict(dict): | |
38 | """simple double-keyed subset of dict methods.""" |
|
41 | """simple double-keyed subset of dict methods.""" | |
39 |
|
42 | |||
@@ -93,16 +96,18 b' class Controller(object):' | |||||
93 | loop: zmq IOLoop instance |
|
96 | loop: zmq IOLoop instance | |
94 | session: StreamSession object |
|
97 | session: StreamSession object | |
95 | <removed> context: zmq context for creating new connections (?) |
|
98 | <removed> context: zmq context for creating new connections (?) | |
|
99 | queue: ZMQStream for monitoring the command queue (SUB) | |||
96 | registrar: ZMQStream for engine registration requests (XREP) |
|
100 | registrar: ZMQStream for engine registration requests (XREP) | |
|
101 | heartbeat: HeartMonitor object checking the pulse of the engines | |||
97 | clientele: ZMQStream for client connections (XREP) |
|
102 | clientele: ZMQStream for client connections (XREP) | |
98 | not used for jobs, only query/control commands |
|
103 | not used for jobs, only query/control commands | |
99 | queue: ZMQStream for monitoring the command queue (SUB) |
|
104 | notifier: ZMQStream for broadcasting engine registration changes (PUB) | |
100 | heartbeat: HeartMonitor object checking the pulse of the engines |
|
105 | db: connection to db for out of memory logging of commands | |
101 | db_stream: connection to db for out of memory logging of commands |
|
|||
102 | NotImplemented |
|
106 | NotImplemented | |
103 | queue_addr: zmq connection address of the XREP socket for the queue |
|
107 | engine_addrs: dict of zmq connection information for engines to connect | |
104 | hb_addr: zmq connection address of the PUB socket for heartbeats |
|
108 | to the queues. | |
105 | task_addr: zmq connection address of the XREQ socket for task queue |
|
109 | client_addrs: dict of zmq connection information for engines to connect | |
|
110 | to the queues. | |||
106 | """ |
|
111 | """ | |
107 | # internal data structures: |
|
112 | # internal data structures: | |
108 | ids=None # engine IDs |
|
113 | ids=None # engine IDs | |
@@ -165,14 +170,16 b' class Controller(object):' | |||||
165 | self.notifier = notifier |
|
170 | self.notifier = notifier | |
166 | self.db = db |
|
171 | self.db = db | |
167 |
|
172 | |||
|
173 | # validate connection dicts: | |||
168 | self.client_addrs = client_addrs |
|
174 | self.client_addrs = client_addrs | |
169 | assert isinstance(client_addrs['queue'], str) |
|
175 | assert isinstance(client_addrs['queue'], str) | |
|
176 | assert isinstance(client_addrs['control'], str) | |||
170 | # self.hb_addrs = hb_addrs |
|
177 | # self.hb_addrs = hb_addrs | |
171 | self.engine_addrs = engine_addrs |
|
178 | self.engine_addrs = engine_addrs | |
172 | assert isinstance(engine_addrs['queue'], str) |
|
179 | assert isinstance(engine_addrs['queue'], str) | |
|
180 | assert isinstance(client_addrs['control'], str) | |||
173 | assert len(engine_addrs['heartbeat']) == 2 |
|
181 | assert len(engine_addrs['heartbeat']) == 2 | |
174 |
|
182 | |||
175 |
|
||||
176 | # register our callbacks |
|
183 | # register our callbacks | |
177 | self.registrar.on_recv(self.dispatch_register_request) |
|
184 | self.registrar.on_recv(self.dispatch_register_request) | |
178 | self.clientele.on_recv(self.dispatch_client_msg) |
|
185 | self.clientele.on_recv(self.dispatch_client_msg) | |
@@ -182,19 +189,25 b' class Controller(object):' | |||||
182 | heartbeat.add_heart_failure_handler(self.handle_heart_failure) |
|
189 | heartbeat.add_heart_failure_handler(self.handle_heart_failure) | |
183 | heartbeat.add_new_heart_handler(self.handle_new_heart) |
|
190 | heartbeat.add_new_heart_handler(self.handle_new_heart) | |
184 |
|
191 | |||
185 | if self.db is not None: |
|
192 | self.queue_handlers = { 'in' : self.save_queue_request, | |
186 | self.db.on_recv(self.dispatch_db) |
|
193 | 'out': self.save_queue_result, | |
187 |
|
194 | 'intask': self.save_task_request, | ||
|
195 | 'outtask': self.save_task_result, | |||
|
196 | 'tracktask': self.save_task_destination, | |||
|
197 | 'incontrol': _passer, | |||
|
198 | 'outcontrol': _passer, | |||
|
199 | } | |||
|
200 | ||||
188 | self.client_handlers = {'queue_request': self.queue_status, |
|
201 | self.client_handlers = {'queue_request': self.queue_status, | |
189 | 'result_request': self.get_results, |
|
202 | 'result_request': self.get_results, | |
190 | 'purge_request': self.purge_results, |
|
203 | 'purge_request': self.purge_results, | |
|
204 | 'load_request': self.check_load, | |||
191 | 'resubmit_request': self.resubmit_task, |
|
205 | 'resubmit_request': self.resubmit_task, | |
192 | } |
|
206 | } | |
193 |
|
207 | |||
194 | self.registrar_handlers = {'registration_request' : self.register_engine, |
|
208 | self.registrar_handlers = {'registration_request' : self.register_engine, | |
195 | 'unregistration_request' : self.unregister_engine, |
|
209 | 'unregistration_request' : self.unregister_engine, | |
196 | 'connection_request': self.connection_request, |
|
210 | 'connection_request': self.connection_request, | |
197 |
|
||||
198 | } |
|
211 | } | |
199 | # |
|
212 | # | |
200 | # this is the stuff that will move to DB: |
|
213 | # this is the stuff that will move to DB: | |
@@ -272,7 +285,7 b' class Controller(object):' | |||||
272 | print (idents,msg, len(msg)) |
|
285 | print (idents,msg, len(msg)) | |
273 | try: |
|
286 | try: | |
274 | msg = self.session.unpack_message(msg,content=True) |
|
287 | msg = self.session.unpack_message(msg,content=True) | |
275 |
except Exception |
|
288 | except Exception as e: | |
276 | logger.error("registration::got bad registration message: %s"%msg) |
|
289 | logger.error("registration::got bad registration message: %s"%msg) | |
277 | raise e |
|
290 | raise e | |
278 | return |
|
291 | return | |
@@ -291,18 +304,9 b' class Controller(object):' | |||||
291 | logger.debug("queue traffic: %s"%msg[:2]) |
|
304 | logger.debug("queue traffic: %s"%msg[:2]) | |
292 | switch = msg[0] |
|
305 | switch = msg[0] | |
293 | idents, msg = self.session.feed_identities(msg[1:]) |
|
306 | idents, msg = self.session.feed_identities(msg[1:]) | |
294 | if switch == 'in': |
|
307 | handler = self.queue_handlers.get(switch, None) | |
295 | self.save_queue_request(idents, msg) |
|
308 | if handler is not None: | |
296 | elif switch == 'out': |
|
309 | handler(idents, msg) | |
297 | self.save_queue_result(idents, msg) |
|
|||
298 | elif switch == 'intask': |
|
|||
299 | self.save_task_request(idents, msg) |
|
|||
300 | elif switch == 'outtask': |
|
|||
301 | self.save_task_result(idents, msg) |
|
|||
302 | elif switch == 'tracktask': |
|
|||
303 | self.save_task_destination(idents, msg) |
|
|||
304 | elif switch in ('incontrol', 'outcontrol'): |
|
|||
305 | pass |
|
|||
306 | else: |
|
310 | else: | |
307 | logger.error("Invalid message topic: %s"%switch) |
|
311 | logger.error("Invalid message topic: %s"%switch) | |
308 |
|
312 | |||
@@ -392,7 +396,7 b' class Controller(object):' | |||||
392 | received=None, |
|
396 | received=None, | |
393 | engine=(eid, queue_id)) |
|
397 | engine=(eid, queue_id)) | |
394 | self.pending[msg_id] = ( msg, info ) |
|
398 | self.pending[msg_id] = ( msg, info ) | |
395 |
self.queues[eid] |
|
399 | self.queues[eid].append(msg_id) | |
396 |
|
400 | |||
397 | def save_queue_result(self, idents, msg): |
|
401 | def save_queue_result(self, idents, msg): | |
398 | client_id, queue_id = idents[:2] |
|
402 | client_id, queue_id = idents[:2] | |
@@ -417,7 +421,7 b' class Controller(object):' | |||||
417 | self.results[msg_id] = msg |
|
421 | self.results[msg_id] = msg | |
418 | if msg_id in self.pending: |
|
422 | if msg_id in self.pending: | |
419 | self.pending.pop(msg_id) |
|
423 | self.pending.pop(msg_id) | |
420 |
self.queues[eid] |
|
424 | self.queues[eid].remove(msg_id) | |
421 | self.completed[eid].append(msg_id) |
|
425 | self.completed[eid].append(msg_id) | |
422 | else: |
|
426 | else: | |
423 | logger.debug("queue:: unknown msg finished %s"%msg_id) |
|
427 | logger.debug("queue:: unknown msg finished %s"%msg_id) | |
@@ -425,6 +429,7 b' class Controller(object):' | |||||
425 | #--------------------- Task Queue Traffic ------------------------------ |
|
429 | #--------------------- Task Queue Traffic ------------------------------ | |
426 |
|
430 | |||
427 | def save_task_request(self, idents, msg): |
|
431 | def save_task_request(self, idents, msg): | |
|
432 | """Save the submission of a task.""" | |||
428 | client_id = idents[0] |
|
433 | client_id = idents[0] | |
429 |
|
434 | |||
430 | try: |
|
435 | try: | |
@@ -437,13 +442,17 b' class Controller(object):' | |||||
437 | header = msg['header'] |
|
442 | header = msg['header'] | |
438 | msg_id = header['msg_id'] |
|
443 | msg_id = header['msg_id'] | |
439 | self.mia.add(msg_id) |
|
444 | self.mia.add(msg_id) | |
440 | self.pending[msg_id] = msg |
|
445 | info = dict(submit=datetime.now(), | |
|
446 | received=None, | |||
|
447 | engine=None) | |||
|
448 | self.pending[msg_id] = (msg, info) | |||
441 | if not self.tasks.has_key(client_id): |
|
449 | if not self.tasks.has_key(client_id): | |
442 | self.tasks[client_id] = [] |
|
450 | self.tasks[client_id] = [] | |
443 | self.tasks[client_id].append(msg_id) |
|
451 | self.tasks[client_id].append(msg_id) | |
444 |
|
452 | |||
445 | def save_task_result(self, idents, msg): |
|
453 | def save_task_result(self, idents, msg): | |
446 | client_id = idents[0] |
|
454 | """save the result of a completed task.""" | |
|
455 | client_id, engine_uuid = idents[:2] | |||
447 | try: |
|
456 | try: | |
448 | msg = self.session.unpack_message(msg, content=False) |
|
457 | msg = self.session.unpack_message(msg, content=False) | |
449 | except: |
|
458 | except: | |
@@ -452,16 +461,19 b' class Controller(object):' | |||||
452 | return |
|
461 | return | |
453 |
|
462 | |||
454 | parent = msg['parent_header'] |
|
463 | parent = msg['parent_header'] | |
|
464 | eid = self.by_ident[engine_uuid] | |||
455 | if not parent: |
|
465 | if not parent: | |
456 | # print msg |
|
466 | # print msg | |
457 | # logger.warn("") |
|
467 | # logger.warn("") | |
458 | return |
|
468 | return | |
459 | msg_id = parent['msg_id'] |
|
469 | msg_id = parent['msg_id'] | |
460 | self.results[msg_id] = msg |
|
470 | self.results[msg_id] = msg | |
461 | if msg_id in self.pending: |
|
471 | if msg_id in self.pending and msg_id in self.tasks[eid]: | |
462 | self.pending.pop(msg_id) |
|
472 | self.pending.pop(msg_id) | |
463 | if msg_id in self.mia: |
|
473 | if msg_id in self.mia: | |
464 | self.mia.remove(msg_id) |
|
474 | self.mia.remove(msg_id) | |
|
475 | self.completed[eid].append(msg_id) | |||
|
476 | self.tasks[eid].remove(msg_id) | |||
465 | else: |
|
477 | else: | |
466 | logger.debug("task::unknown task %s finished"%msg_id) |
|
478 | logger.debug("task::unknown task %s finished"%msg_id) | |
467 |
|
479 | |||
@@ -475,16 +487,16 b' class Controller(object):' | |||||
475 | print (content) |
|
487 | print (content) | |
476 | msg_id = content['msg_id'] |
|
488 | msg_id = content['msg_id'] | |
477 | engine_uuid = content['engine_id'] |
|
489 | engine_uuid = content['engine_id'] | |
478 | for eid,queue_id in self.keytable.iteritems(): |
|
490 | eid = self.by_ident[engine_uuid] | |
479 | if queue_id == engine_uuid: |
|
|||
480 | break |
|
|||
481 |
|
491 | |||
482 | logger.info("task::task %s arrived on %s"%(msg_id, eid)) |
|
492 | logger.info("task::task %s arrived on %s"%(msg_id, eid)) | |
483 | if msg_id in self.mia: |
|
493 | if msg_id in self.mia: | |
484 | self.mia.remove(msg_id) |
|
494 | self.mia.remove(msg_id) | |
485 | else: |
|
495 | else: | |
486 | logger.debug("task::task %s not listed as MIA?!"%(msg_id)) |
|
496 | logger.debug("task::task %s not listed as MIA?!"%(msg_id)) | |
487 | self.tasks[engine_uuid].append(msg_id) |
|
497 | ||
|
498 | self.tasks[eid].append(msg_id) | |||
|
499 | self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid)) | |||
488 |
|
500 | |||
489 | def mia_task_request(self, idents, msg): |
|
501 | def mia_task_request(self, idents, msg): | |
490 | client_id = idents[0] |
|
502 | client_id = idents[0] | |
@@ -493,10 +505,12 b' class Controller(object):' | |||||
493 |
|
505 | |||
494 |
|
506 | |||
495 |
|
507 | |||
496 | #-------------------- Registration ----------------------------- |
|
508 | #------------------------------------------------------------------------- | |
|
509 | # Registration requests | |||
|
510 | #------------------------------------------------------------------------- | |||
497 |
|
511 | |||
498 | def connection_request(self, client_id, msg): |
|
512 | def connection_request(self, client_id, msg): | |
499 |
""" |
|
513 | """Reply with connection addresses for clients.""" | |
500 | logger.info("client::client %s connected"%client_id) |
|
514 | logger.info("client::client %s connected"%client_id) | |
501 | content = dict(status='ok') |
|
515 | content = dict(status='ok') | |
502 | content.update(self.client_addrs) |
|
516 | content.update(self.client_addrs) | |
@@ -507,7 +521,7 b' class Controller(object):' | |||||
507 | self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id) |
|
521 | self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id) | |
508 |
|
522 | |||
509 | def register_engine(self, reg, msg): |
|
523 | def register_engine(self, reg, msg): | |
510 |
""" |
|
524 | """Register a new engine.""" | |
511 | content = msg['content'] |
|
525 | content = msg['content'] | |
512 | try: |
|
526 | try: | |
513 | queue = content['queue'] |
|
527 | queue = content['queue'] | |
@@ -556,6 +570,7 b' class Controller(object):' | |||||
556 | return eid |
|
570 | return eid | |
557 |
|
571 | |||
558 | def unregister_engine(self, ident, msg): |
|
572 | def unregister_engine(self, ident, msg): | |
|
573 | """Unregister an engine that explicitly requested to leave.""" | |||
559 | try: |
|
574 | try: | |
560 | eid = msg['content']['id'] |
|
575 | eid = msg['content']['id'] | |
561 | except: |
|
576 | except: | |
@@ -569,7 +584,7 b' class Controller(object):' | |||||
569 | self.hearts.pop(ec.heartbeat) |
|
584 | self.hearts.pop(ec.heartbeat) | |
570 | self.by_ident.pop(ec.queue) |
|
585 | self.by_ident.pop(ec.queue) | |
571 | self.completed.pop(eid) |
|
586 | self.completed.pop(eid) | |
572 |
for msg_id in self.queues.pop(eid) |
|
587 | for msg_id in self.queues.pop(eid): | |
573 | msg = self.pending.pop(msg_id) |
|
588 | msg = self.pending.pop(msg_id) | |
574 | ############## TODO: HANDLE IT ################ |
|
589 | ############## TODO: HANDLE IT ################ | |
575 |
|
590 | |||
@@ -577,6 +592,8 b' class Controller(object):' | |||||
577 | self.session.send(self.notifier, "unregistration_notification", content=content) |
|
592 | self.session.send(self.notifier, "unregistration_notification", content=content) | |
578 |
|
593 | |||
579 | def finish_registration(self, heart): |
|
594 | def finish_registration(self, heart): | |
|
595 | """Second half of engine registration, called after our HeartMonitor | |||
|
596 | has received a beat from the Engine's Heart.""" | |||
580 | try: |
|
597 | try: | |
581 | (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) |
|
598 | (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) | |
582 | except KeyError: |
|
599 | except KeyError: | |
@@ -590,7 +607,8 b' class Controller(object):' | |||||
590 | self.keytable[eid] = queue |
|
607 | self.keytable[eid] = queue | |
591 | self.engines[eid] = EngineConnector(eid, queue, reg, control, heart) |
|
608 | self.engines[eid] = EngineConnector(eid, queue, reg, control, heart) | |
592 | self.by_ident[queue] = eid |
|
609 | self.by_ident[queue] = eid | |
593 |
self.queues[eid] = ( |
|
610 | self.queues[eid] = list() | |
|
611 | self.tasks[eid] = list() | |||
594 | self.completed[eid] = list() |
|
612 | self.completed[eid] = list() | |
595 | self.hearts[heart] = eid |
|
613 | self.hearts[heart] = eid | |
596 | content = dict(id=eid, queue=self.engines[eid].queue) |
|
614 | content = dict(id=eid, queue=self.engines[eid].queue) | |
@@ -604,7 +622,9 b' class Controller(object):' | |||||
604 | else: |
|
622 | else: | |
605 | pass |
|
623 | pass | |
606 |
|
624 | |||
607 | #------------------- Client Requests ------------------------------- |
|
625 | #------------------------------------------------------------------------- | |
|
626 | # Client Requests | |||
|
627 | #------------------------------------------------------------------------- | |||
608 |
|
628 | |||
609 | def check_load(self, client_id, msg): |
|
629 | def check_load(self, client_id, msg): | |
610 | content = msg['content'] |
|
630 | content = msg['content'] | |
@@ -620,12 +640,17 b' class Controller(object):' | |||||
620 | content = dict(status='ok') |
|
640 | content = dict(status='ok') | |
621 | # loads = {} |
|
641 | # loads = {} | |
622 | for t in targets: |
|
642 | for t in targets: | |
623 |
content[ |
|
643 | content[bytes(t)] = len(self.queues[t])+len(self.tasks[t]) | |
624 | self.session.send(self.clientele, "load_reply", content=content, ident=client_id) |
|
644 | self.session.send(self.clientele, "load_reply", content=content, ident=client_id) | |
625 |
|
645 | |||
626 |
|
646 | |||
627 | def queue_status(self, client_id, msg): |
|
647 | def queue_status(self, client_id, msg): | |
628 | """handle queue_status request""" |
|
648 | """Return the Queue status of one or more targets. | |
|
649 | if verbose: return the msg_ids | |||
|
650 | else: return len of each type. | |||
|
651 | keys: queue (pending MUX jobs) | |||
|
652 | tasks (pending Task jobs) | |||
|
653 | completed (finished jobs from both queues)""" | |||
629 | content = msg['content'] |
|
654 | content = msg['content'] | |
630 | targets = content['targets'] |
|
655 | targets = content['targets'] | |
631 | try: |
|
656 | try: | |
@@ -635,19 +660,23 b' class Controller(object):' | |||||
635 | self.session.send(self.clientele, "controller_error", |
|
660 | self.session.send(self.clientele, "controller_error", | |
636 | content=content, ident=client_id) |
|
661 | content=content, ident=client_id) | |
637 | return |
|
662 | return | |
638 |
verbose = |
|
663 | verbose = content.get('verbose', False) | |
639 | content = dict() |
|
664 | content = dict(status='ok') | |
640 | for t in targets: |
|
665 | for t in targets: | |
641 | queue = self.queues[t] |
|
666 | queue = self.queues[t] | |
642 | completed = self.completed[t] |
|
667 | completed = self.completed[t] | |
|
668 | tasks = self.tasks[t] | |||
643 | if not verbose: |
|
669 | if not verbose: | |
644 | queue = len(queue) |
|
670 | queue = len(queue) | |
645 | completed = len(completed) |
|
671 | completed = len(completed) | |
646 | content[str(t)] = {'queue': queue, 'completed': completed } |
|
672 | tasks = len(tasks) | |
|
673 | content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} | |||
647 | # pending |
|
674 | # pending | |
648 | self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) |
|
675 | self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) | |
649 |
|
676 | |||
650 | def purge_results(self, client_id, msg): |
|
677 | def purge_results(self, client_id, msg): | |
|
678 | """Purge results from memory. This method is more valuable before we move | |||
|
679 | to a DB based message storage mechanism.""" | |||
651 | content = msg['content'] |
|
680 | content = msg['content'] | |
652 | msg_ids = content.get('msg_ids', []) |
|
681 | msg_ids = content.get('msg_ids', []) | |
653 | reply = dict(status='ok') |
|
682 | reply = dict(status='ok') | |
@@ -675,37 +704,11 b' class Controller(object):' | |||||
675 | self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) |
|
704 | self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) | |
676 |
|
705 | |||
677 | def resubmit_task(self, client_id, msg, buffers): |
|
706 | def resubmit_task(self, client_id, msg, buffers): | |
678 | content = msg['content'] |
|
707 | """Resubmit a task.""" | |
679 | header = msg['header'] |
|
708 | raise NotImplementedError | |
680 |
|
||||
681 |
|
||||
682 | msg_ids = content.get('msg_ids', []) |
|
|||
683 | reply = dict(status='ok') |
|
|||
684 | if msg_ids == 'all': |
|
|||
685 | self.results = {} |
|
|||
686 | else: |
|
|||
687 | for msg_id in msg_ids: |
|
|||
688 | if msg_id in self.results: |
|
|||
689 | self.results.pop(msg_id) |
|
|||
690 | else: |
|
|||
691 | if msg_id in self.pending: |
|
|||
692 | reply = dict(status='error', reason="msg pending: %r"%msg_id) |
|
|||
693 | else: |
|
|||
694 | reply = dict(status='error', reason="No such msg: %r"%msg_id) |
|
|||
695 | break |
|
|||
696 | eids = content.get('engine_ids', []) |
|
|||
697 | for eid in eids: |
|
|||
698 | if eid not in self.engines: |
|
|||
699 | reply = dict(status='error', reason="No such engine: %i"%eid) |
|
|||
700 | break |
|
|||
701 | msg_ids = self.completed.pop(eid) |
|
|||
702 | for msg_id in msg_ids: |
|
|||
703 | self.results.pop(msg_id) |
|
|||
704 |
|
||||
705 | self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) |
|
|||
706 |
|
709 | |||
707 | def get_results(self, client_id, msg): |
|
710 | def get_results(self, client_id, msg): | |
708 |
""" |
|
711 | """Get the result of 1 or more messages.""" | |
709 | content = msg['content'] |
|
712 | content = msg['content'] | |
710 | msg_ids = set(content['msg_ids']) |
|
713 | msg_ids = set(content['msg_ids']) | |
711 | statusonly = content.get('status_only', False) |
|
714 | statusonly = content.get('status_only', False) | |
@@ -727,33 +730,12 b' class Controller(object):' | |||||
727 | break |
|
730 | break | |
728 | self.session.send(self.clientele, "result_reply", content=content, |
|
731 | self.session.send(self.clientele, "result_reply", content=content, | |
729 | parent=msg, ident=client_id) |
|
732 | parent=msg, ident=client_id) | |
730 |
|
||||
731 |
|
733 | |||
732 |
|
734 | |||
733 | ############ OLD METHODS for Python Relay Controller ################### |
|
735 | #------------------------------------------------------------------------- | |
734 | def _validate_engine_msg(self, msg): |
|
|||
735 | """validates and unpacks headers of a message. Returns False if invalid, |
|
|||
736 | (ident, message)""" |
|
|||
737 | ident = msg[0] |
|
|||
738 | try: |
|
|||
739 | msg = self.session.unpack_message(msg[1:], content=False) |
|
|||
740 | except: |
|
|||
741 | logger.error("engine.%s::Invalid Message %s"%(ident, msg)) |
|
|||
742 | return False |
|
|||
743 |
|
||||
744 | try: |
|
|||
745 | eid = msg.header.username |
|
|||
746 | assert self.engines.has_key(eid) |
|
|||
747 | except: |
|
|||
748 | logger.error("engine::Invalid Engine ID %s"%(ident)) |
|
|||
749 | return False |
|
|||
750 |
|
||||
751 | return eid, msg |
|
|||
752 |
|
||||
753 |
|
||||
754 | #-------------------- |
|
|||
755 | # Entry Point |
|
736 | # Entry Point | |
756 | #-------------------- |
|
737 | #------------------------------------------------------------------------- | |
|
738 | ||||
757 | def make_argument_parser(): |
|
739 | def make_argument_parser(): | |
758 | """Make an argument parser""" |
|
740 | """Make an argument parser""" | |
759 | parser = make_base_argument_parser() |
|
741 | parser = make_base_argument_parser() |
@@ -130,7 +130,7 b' class HeartMonitor(object):' | |||||
130 | for handler in self._failure_handlers: |
|
130 | for handler in self._failure_handlers: | |
131 | try: |
|
131 | try: | |
132 | handler(heart) |
|
132 | handler(heart) | |
133 |
except Exception |
|
133 | except Exception as e: | |
134 | print (e) |
|
134 | print (e) | |
135 | logger.error("heartbeat::Bad Handler! %s"%handler) |
|
135 | logger.error("heartbeat::Bad Handler! %s"%handler) | |
136 | pass |
|
136 | pass |
@@ -1,3 +1,10 b'' | |||||
|
1 | """The Python scheduler for rich scheduling. | |||
|
2 | ||||
|
3 | The Pure ZMQ scheduler does not allow routing schemes other than LRU, | |||
|
4 | nor does it check msg_id DAG dependencies. For those, a slightly slower | |||
|
5 | Python Scheduler exists. | |||
|
6 | """ | |||
|
7 | ||||
1 | #---------------------------------------------------------------------- |
|
8 | #---------------------------------------------------------------------- | |
2 | # Imports |
|
9 | # Imports | |
3 | #---------------------------------------------------------------------- |
|
10 | #---------------------------------------------------------------------- | |
@@ -40,7 +47,7 b' def plainrandom(loads):' | |||||
40 | def lru(loads): |
|
47 | def lru(loads): | |
41 | """Always pick the front of the line. |
|
48 | """Always pick the front of the line. | |
42 |
|
49 | |||
43 | The content of loads is ignored. |
|
50 | The content of `loads` is ignored. | |
44 |
|
51 | |||
45 | Assumes LRU ordering of loads, with oldest first. |
|
52 | Assumes LRU ordering of loads, with oldest first. | |
46 | """ |
|
53 | """ | |
@@ -151,10 +158,12 b' class TaskScheduler(object):' | |||||
151 | self.notifier_stream.on_recv(self.dispatch_notification) |
|
158 | self.notifier_stream.on_recv(self.dispatch_notification) | |
152 |
|
159 | |||
153 | def resume_receiving(self): |
|
160 | def resume_receiving(self): | |
154 |
""" |
|
161 | """Resume accepting jobs.""" | |
155 | self.client_stream.on_recv(self.dispatch_submission, copy=False) |
|
162 | self.client_stream.on_recv(self.dispatch_submission, copy=False) | |
156 |
|
163 | |||
157 | def stop_receiving(self): |
|
164 | def stop_receiving(self): | |
|
165 | """Stop accepting jobs while there are no engines. | |||
|
166 | Leave them in the ZMQ queue.""" | |||
158 | self.client_stream.on_recv(None) |
|
167 | self.client_stream.on_recv(None) | |
159 |
|
168 | |||
160 | #----------------------------------------------------------------------- |
|
169 | #----------------------------------------------------------------------- | |
@@ -176,7 +185,7 b' class TaskScheduler(object):' | |||||
176 | logger.error("task::Invalid notification msg: %s"%msg) |
|
185 | logger.error("task::Invalid notification msg: %s"%msg) | |
177 | @logged |
|
186 | @logged | |
178 | def _register_engine(self, uid): |
|
187 | def _register_engine(self, uid): | |
179 |
""" |
|
188 | """New engine with ident `uid` became available.""" | |
180 | # head of the line: |
|
189 | # head of the line: | |
181 | self.targets.insert(0,uid) |
|
190 | self.targets.insert(0,uid) | |
182 | self.loads.insert(0,0) |
|
191 | self.loads.insert(0,0) | |
@@ -187,10 +196,12 b' class TaskScheduler(object):' | |||||
187 | self.resume_receiving() |
|
196 | self.resume_receiving() | |
188 |
|
197 | |||
189 | def _unregister_engine(self, uid): |
|
198 | def _unregister_engine(self, uid): | |
190 |
""" |
|
199 | """Existing engine with ident `uid` became unavailable.""" | |
191 | # handle any potentially finished tasks: |
|
|||
192 | if len(self.targets) == 1: |
|
200 | if len(self.targets) == 1: | |
|
201 | # this was our only engine | |||
193 | self.stop_receiving() |
|
202 | self.stop_receiving() | |
|
203 | ||||
|
204 | # handle any potentially finished tasks: | |||
194 | self.engine_stream.flush() |
|
205 | self.engine_stream.flush() | |
195 |
|
206 | |||
196 | self.completed.pop(uid) |
|
207 | self.completed.pop(uid) | |
@@ -203,7 +214,7 b' class TaskScheduler(object):' | |||||
203 | self.handle_stranded_tasks(lost) |
|
214 | self.handle_stranded_tasks(lost) | |
204 |
|
215 | |||
205 | def handle_stranded_tasks(self, lost): |
|
216 | def handle_stranded_tasks(self, lost): | |
206 |
""" |
|
217 | """Deal with jobs resident in an engine that died.""" | |
207 | # TODO: resubmit the tasks? |
|
218 | # TODO: resubmit the tasks? | |
208 | for msg_id in lost: |
|
219 | for msg_id in lost: | |
209 | pass |
|
220 | pass | |
@@ -214,26 +225,29 b' class TaskScheduler(object):' | |||||
214 | #----------------------------------------------------------------------- |
|
225 | #----------------------------------------------------------------------- | |
215 | @logged |
|
226 | @logged | |
216 | def dispatch_submission(self, raw_msg): |
|
227 | def dispatch_submission(self, raw_msg): | |
217 |
""" |
|
228 | """Dispatch job submission to appropriate handlers.""" | |
218 | # ensure targets up to date: |
|
229 | # ensure targets up to date: | |
219 | self.notifier_stream.flush() |
|
230 | self.notifier_stream.flush() | |
220 | try: |
|
231 | try: | |
221 | idents, msg = self.session.feed_identities(raw_msg, copy=False) |
|
232 | idents, msg = self.session.feed_identities(raw_msg, copy=False) | |
222 |
except Exception |
|
233 | except Exception as e: | |
223 | logger.error("task::Invaid msg: %s"%msg) |
|
234 | logger.error("task::Invaid msg: %s"%msg) | |
224 | return |
|
235 | return | |
225 |
|
236 | |||
226 | msg = self.session.unpack_message(msg, content=False, copy=False) |
|
237 | msg = self.session.unpack_message(msg, content=False, copy=False) | |
227 | header = msg['header'] |
|
238 | header = msg['header'] | |
228 | msg_id = header['msg_id'] |
|
239 | msg_id = header['msg_id'] | |
|
240 | ||||
|
241 | # time dependencies | |||
229 | after = Dependency(header.get('after', [])) |
|
242 | after = Dependency(header.get('after', [])) | |
230 | if after.mode == 'all': |
|
243 | if after.mode == 'all': | |
231 | after.difference_update(self.all_done) |
|
244 | after.difference_update(self.all_done) | |
232 | if after.check(self.all_done): |
|
245 | if after.check(self.all_done): | |
233 |
# recast as empty set, if |
|
246 | # recast as empty set, if `after` already met, | |
234 | # to prevent |
|
247 | # to prevent unnecessary set comparisons | |
235 | after = Dependency([]) |
|
248 | after = Dependency([]) | |
236 |
|
249 | |||
|
250 | # location dependencies | |||
237 | follow = Dependency(header.get('follow', [])) |
|
251 | follow = Dependency(header.get('follow', [])) | |
238 | if len(after) == 0: |
|
252 | if len(after) == 0: | |
239 | # time deps already met, try to run |
|
253 | # time deps already met, try to run | |
@@ -244,6 +258,7 b' class TaskScheduler(object):' | |||||
244 | self.save_unmet(msg_id, raw_msg, after, follow) |
|
258 | self.save_unmet(msg_id, raw_msg, after, follow) | |
245 | # send to monitor |
|
259 | # send to monitor | |
246 | self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) |
|
260 | self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) | |
|
261 | ||||
247 | @logged |
|
262 | @logged | |
248 | def maybe_run(self, msg_id, raw_msg, follow=None): |
|
263 | def maybe_run(self, msg_id, raw_msg, follow=None): | |
249 | """check location dependencies, and run if they are met.""" |
|
264 | """check location dependencies, and run if they are met.""" | |
@@ -276,7 +291,7 b' class TaskScheduler(object):' | |||||
276 |
|
291 | |||
277 | @logged |
|
292 | @logged | |
278 | def submit_task(self, msg_id, msg, follow=None, indices=None): |
|
293 | def submit_task(self, msg_id, msg, follow=None, indices=None): | |
279 |
""" |
|
294 | """Submit a task to any of a subset of our targets.""" | |
280 | if indices: |
|
295 | if indices: | |
281 | loads = [self.loads[i] for i in indices] |
|
296 | loads = [self.loads[i] for i in indices] | |
282 | else: |
|
297 | else: | |
@@ -290,6 +305,8 b' class TaskScheduler(object):' | |||||
290 | self.engine_stream.send_multipart(msg, copy=False) |
|
305 | self.engine_stream.send_multipart(msg, copy=False) | |
291 | self.add_job(idx) |
|
306 | self.add_job(idx) | |
292 | self.pending[target][msg_id] = (msg, follow) |
|
307 | self.pending[target][msg_id] = (msg, follow) | |
|
308 | content = dict(msg_id=msg_id, engine_id=target) | |||
|
309 | self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask') | |||
293 |
|
310 | |||
294 | #----------------------------------------------------------------------- |
|
311 | #----------------------------------------------------------------------- | |
295 | # Result Handling |
|
312 | # Result Handling | |
@@ -298,7 +315,7 b' class TaskScheduler(object):' | |||||
298 | def dispatch_result(self, raw_msg): |
|
315 | def dispatch_result(self, raw_msg): | |
299 | try: |
|
316 | try: | |
300 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
317 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |
301 |
except Exception |
|
318 | except Exception as e: | |
302 | logger.error("task::Invaid result: %s"%msg) |
|
319 | logger.error("task::Invaid result: %s"%msg) | |
303 | return |
|
320 | return | |
304 | msg = self.session.unpack_message(msg, content=False, copy=False) |
|
321 | msg = self.session.unpack_message(msg, content=False, copy=False) |
@@ -125,7 +125,7 b' class RawInput(object):' | |||||
125 | while True: |
|
125 | while True: | |
126 | try: |
|
126 | try: | |
127 | reply = self.socket.recv_json(zmq.NOBLOCK) |
|
127 | reply = self.socket.recv_json(zmq.NOBLOCK) | |
128 |
except zmq.ZMQError |
|
128 | except zmq.ZMQError as e: | |
129 | if e.errno == zmq.EAGAIN: |
|
129 | if e.errno == zmq.EAGAIN: | |
130 | pass |
|
130 | pass | |
131 | else: |
|
131 | else: | |
@@ -171,7 +171,7 b' class Kernel(object):' | |||||
171 | while True: |
|
171 | while True: | |
172 | try: |
|
172 | try: | |
173 | msg = self.session.recv(stream, zmq.NOBLOCK,content=True) |
|
173 | msg = self.session.recv(stream, zmq.NOBLOCK,content=True) | |
174 |
except zmq.ZMQError |
|
174 | except zmq.ZMQError as e: | |
175 | if e.errno == zmq.EAGAIN: |
|
175 | if e.errno == zmq.EAGAIN: | |
176 | break |
|
176 | break | |
177 | else: |
|
177 | else: | |
@@ -238,17 +238,6 b' class Kernel(object):' | |||||
238 | else: |
|
238 | else: | |
239 | handler(self.control_stream, idents, msg) |
|
239 | handler(self.control_stream, idents, msg) | |
240 |
|
240 | |||
241 | # def flush_control(self): |
|
|||
242 | # while any(zmq.select([self.control_socket],[],[],1e-4)): |
|
|||
243 | # try: |
|
|||
244 | # msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False) |
|
|||
245 | # except zmq.ZMQError, e: |
|
|||
246 | # if e.errno != zmq.EAGAIN: |
|
|||
247 | # raise e |
|
|||
248 | # return |
|
|||
249 | # else: |
|
|||
250 | # self.dispatch_control(msg) |
|
|||
251 |
|
||||
252 |
|
241 | |||
253 | #-------------------- queue helpers ------------------------------ |
|
242 | #-------------------- queue helpers ------------------------------ | |
254 |
|
243 |
@@ -8,6 +8,7 b' import sys' | |||||
8 | import traceback |
|
8 | import traceback | |
9 | import pprint |
|
9 | import pprint | |
10 | import uuid |
|
10 | import uuid | |
|
11 | from datetime import datetime | |||
11 |
|
12 | |||
12 | import zmq |
|
13 | import zmq | |
13 | from zmq.utils import jsonapi |
|
14 | from zmq.utils import jsonapi | |
@@ -111,6 +112,7 b' class Message(object):' | |||||
111 |
|
112 | |||
112 |
|
113 | |||
113 | def msg_header(msg_id, msg_type, username, session): |
|
114 | def msg_header(msg_id, msg_type, username, session): | |
|
115 | date=datetime.now().isoformat() | |||
114 | return locals() |
|
116 | return locals() | |
115 | # return { |
|
117 | # return { | |
116 | # 'msg_id' : msg_id, |
|
118 | # 'msg_id' : msg_id, | |
@@ -140,7 +142,7 b' def extract_header(msg_or_header):' | |||||
140 | return h |
|
142 | return h | |
141 |
|
143 | |||
142 | def rekey(dikt): |
|
144 | def rekey(dikt): | |
143 |
""" |
|
145 | """Rekey a dict that has been forced to use str keys where there should be | |
144 | ints by json. This belongs in the jsonutil added by fperez.""" |
|
146 | ints by json. This belongs in the jsonutil added by fperez.""" | |
145 | for k in dikt.iterkeys(): |
|
147 | for k in dikt.iterkeys(): | |
146 | if isinstance(k, str): |
|
148 | if isinstance(k, str): | |
@@ -162,11 +164,22 b' def rekey(dikt):' | |||||
162 | return dikt |
|
164 | return dikt | |
163 |
|
165 | |||
164 | def serialize_object(obj, threshold=64e-6): |
|
166 | def serialize_object(obj, threshold=64e-6): | |
165 |
""" |
|
167 | """Serialize an object into a list of sendable buffers. | |
166 |
|
168 | |||
167 | Returns: (pmd, bufs) |
|
169 | Parameters | |
168 | where pmd is the pickled metadata wrapper, and bufs |
|
170 | ---------- | |
169 | is a list of data buffers""" |
|
171 | ||
|
172 | obj : object | |||
|
173 | The object to be serialized | |||
|
174 | threshold : float | |||
|
175 | The threshold for not double-pickling the content. | |||
|
176 | ||||
|
177 | ||||
|
178 | Returns | |||
|
179 | ------- | |||
|
180 | ('pmd', [bufs]) : | |||
|
181 | where pmd is the pickled metadata wrapper, | |||
|
182 | bufs is a list of data buffers""" | |||
170 | # threshold is 100 B |
|
183 | # threshold is 100 B | |
171 | databuffers = [] |
|
184 | databuffers = [] | |
172 | if isinstance(obj, (list, tuple)): |
|
185 | if isinstance(obj, (list, tuple)): | |
@@ -318,6 +331,8 b' class StreamSession(object):' | |||||
318 | Parameters |
|
331 | Parameters | |
319 | ---------- |
|
332 | ---------- | |
320 |
|
333 | |||
|
334 | stream : zmq.Socket or ZMQStream | |||
|
335 | the socket-like object used to send the data | |||
321 | msg_type : str or Message/dict |
|
336 | msg_type : str or Message/dict | |
322 | Normally, msg_type will be |
|
337 | Normally, msg_type will be | |
323 |
|
338 | |||
@@ -347,10 +362,7 b' class StreamSession(object):' | |||||
347 | to_send.append(DELIM) |
|
362 | to_send.append(DELIM) | |
348 | to_send.append(self.pack(msg['header'])) |
|
363 | to_send.append(self.pack(msg['header'])) | |
349 | to_send.append(self.pack(msg['parent_header'])) |
|
364 | to_send.append(self.pack(msg['parent_header'])) | |
350 | # if parent is None: |
|
365 | ||
351 | # to_send.append(self.none) |
|
|||
352 | # else: |
|
|||
353 | # to_send.append(self.pack(dict(parent))) |
|
|||
354 | if content is None: |
|
366 | if content is None: | |
355 | content = self.none |
|
367 | content = self.none | |
356 | elif isinstance(content, dict): |
|
368 | elif isinstance(content, dict): | |
@@ -374,11 +386,10 b' class StreamSession(object):' | |||||
374 | pprint.pprint(omsg) |
|
386 | pprint.pprint(omsg) | |
375 | pprint.pprint(to_send) |
|
387 | pprint.pprint(to_send) | |
376 | pprint.pprint(buffers) |
|
388 | pprint.pprint(buffers) | |
377 | # return both the msg object and the buffers |
|
|||
378 | return omsg |
|
389 | return omsg | |
379 |
|
390 | |||
380 | def send_raw(self, stream, msg, flags=0, copy=True, idents=None): |
|
391 | def send_raw(self, stream, msg, flags=0, copy=True, idents=None): | |
381 |
""" |
|
392 | """Send a raw message via idents. | |
382 |
|
393 | |||
383 | Parameters |
|
394 | Parameters | |
384 | ---------- |
|
395 | ---------- | |
@@ -399,7 +410,7 b' class StreamSession(object):' | |||||
399 | socket = socket.socket |
|
410 | socket = socket.socket | |
400 | try: |
|
411 | try: | |
401 | msg = socket.recv_multipart(mode) |
|
412 | msg = socket.recv_multipart(mode) | |
402 |
except zmq.ZMQError |
|
413 | except zmq.ZMQError as e: | |
403 | if e.errno == zmq.EAGAIN: |
|
414 | if e.errno == zmq.EAGAIN: | |
404 | # We can convert EAGAIN to None as we know in this case |
|
415 | # We can convert EAGAIN to None as we know in this case | |
405 | # recv_json won't return None. |
|
416 | # recv_json won't return None. | |
@@ -412,7 +423,7 b' class StreamSession(object):' | |||||
412 | idents, msg = self.feed_identities(msg, copy) |
|
423 | idents, msg = self.feed_identities(msg, copy) | |
413 | try: |
|
424 | try: | |
414 | return idents, self.unpack_message(msg, content=content, copy=copy) |
|
425 | return idents, self.unpack_message(msg, content=content, copy=copy) | |
415 |
except Exception |
|
426 | except Exception as e: | |
416 | print (idents, msg) |
|
427 | print (idents, msg) | |
417 | # TODO: handle it |
|
428 | # TODO: handle it | |
418 | raise e |
|
429 | raise e |
@@ -4,18 +4,22 b'' | |||||
4 | Connection Diagrams of The IPython ZMQ Cluster |
|
4 | Connection Diagrams of The IPython ZMQ Cluster | |
5 | ============================================== |
|
5 | ============================================== | |
6 |
|
6 | |||
7 |
This is a quick summary and illustration of the connections involved in the ZeroMQ based |
|
7 | This is a quick summary and illustration of the connections involved in the ZeroMQ based | |
|
8 | IPython cluster for parallel computing. | |||
8 |
|
9 | |||
9 | All Connections |
|
10 | All Connections | |
10 | =============== |
|
11 | =============== | |
11 |
|
12 | |||
12 | The Parallel Computing code is currently under development in Min RK's IPython fork_ on GitHub. |
|
13 | The Parallel Computing code is currently under development in Min RK's IPython fork_ on GitHub. | |
13 |
|
14 | |||
14 | .. _fork: http://github.com/minrk/ipython |
|
15 | .. _fork: http://github.com/minrk/ipython/tree/newparallel | |
15 |
|
16 | |||
16 |
The IPython cluster consists of a Controller and one or more clients and engines. The goal |
|
17 | The IPython cluster consists of a Controller and one or more clients and engines. The goal | |
|
18 | of the Controller is to manage and monitor the connections and communications between the | |||
|
19 | clients and the engines. | |||
17 |
|
20 | |||
18 |
It is important for security/practicality reasons that all connections be inbound to the |
|
21 | It is important for security/practicality reasons that all connections be inbound to the | |
|
22 | controller process. The arrows in the figures indicate the direction of the connection. | |||
19 |
|
23 | |||
20 |
|
24 | |||
21 | .. figure:: figs/allconnections.png |
|
25 | .. figure:: figs/allconnections.png | |
@@ -25,8 +29,9 b' It is important for security/practicality reasons that all connections be inboun' | |||||
25 |
|
29 | |||
26 | All the connections involved in connecting one client to one engine. |
|
30 | All the connections involved in connecting one client to one engine. | |
27 |
|
31 | |||
28 | The Controller consists of two ZMQ Devices - both MonitoredQueues, one for Tasks (load balanced, engine agnostic), one for Multiplexing (explicit targets), a Python device for monitoring (the Heartbeat Monitor). |
|
32 | The Controller consists of two ZMQ Devices - both MonitoredQueues, one for Tasks (load | |
29 |
|
33 | balanced, engine agnostic), one for Multiplexing (explicit targets), a Python device for | ||
|
34 | monitoring (the Heartbeat Monitor). | |||
30 |
|
35 | |||
31 |
|
36 | |||
32 | Registration |
|
37 | Registration | |
@@ -39,7 +44,10 b' Registration' | |||||
39 |
|
44 | |||
40 | Engines and Clients only need to know where the Registrar ``XREP`` is located to start connecting. |
|
45 | Engines and Clients only need to know where the Registrar ``XREP`` is located to start connecting. | |
41 |
|
46 | |||
42 | Once a controller is launched, the only information needed for connecting clients and/or engines to the controller is the IP/port of the ``XREP`` socket called the Registrar. This socket handles connections from both clients and engines, and replies with the remaining information necessary to establish the remaining connections. |
|
47 | Once a controller is launched, the only information needed for connecting clients and/or | |
|
48 | engines to the controller is the IP/port of the ``XREP`` socket called the Registrar. This | |||
|
49 | socket handles connections from both clients and engines, and replies with the remaining | |||
|
50 | information necessary to establish the remaining connections. | |||
43 |
|
51 | |||
44 | Heartbeat |
|
52 | Heartbeat | |
45 | --------- |
|
53 | --------- | |
@@ -51,25 +59,43 b' Heartbeat' | |||||
51 |
|
59 | |||
52 | The heartbeat sockets. |
|
60 | The heartbeat sockets. | |
53 |
|
61 | |||
54 | The heartbeat process has been described elsewhere. To summarize: the controller publishes a distinct message periodically via a ``PUB`` socket. Each engine has a ``zmq.FORWARDER`` device with a ``SUB`` socket for input, and ``XREQ`` socket for output. The ``SUB`` socket is connected to the ``PUB`` socket labeled *HB(ping)*, and the ``XREQ`` is connected to the ``XREP`` labeled *HB(pong)*. This results in the same message being relayed back to the Heartbeat Monitor with the addition of the ``XREQ`` prefix. The Heartbeat Monitor receives all the replies via an ``XREP`` socket, and identifies which hearts are still beating by the ``zmq.IDENTITY`` prefix of the ``XREQ`` sockets. |
|
62 | The heartbeat process has been described elsewhere. To summarize: the controller publishes | |
|
63 | a distinct message periodically via a ``PUB`` socket. Each engine has a ``zmq.FORWARDER`` | |||
|
64 | device with a ``SUB`` socket for input, and ``XREQ`` socket for output. The ``SUB`` socket | |||
|
65 | is connected to the ``PUB`` socket labeled *HB(ping)*, and the ``XREQ`` is connected to | |||
|
66 | the ``XREP`` labeled *HB(pong)*. This results in the same message being relayed back to | |||
|
67 | the Heartbeat Monitor with the addition of the ``XREQ`` prefix. The Heartbeat Monitor | |||
|
68 | receives all the replies via an ``XREP`` socket, and identifies which hearts are still | |||
|
69 | beating by the ``zmq.IDENTITY`` prefix of the ``XREQ`` sockets. | |||
55 |
|
70 | |||
56 | Queues |
|
71 | Queues | |
57 | ------ |
|
72 | ------ | |
58 |
|
73 | |||
59 | .. figure:: figs/queuefade.png |
|
74 | .. figure:: figs/queuefade.png | |
60 | :width: 432px |
|
75 | :width: 432px | |
61 | :alt: IPython Queue connections |
|
76 | :alt: IPython Queue connections | |
62 | :align: center |
|
77 | :align: center | |
63 |
|
78 | |||
64 |
|
|
79 | Load balanced Task queue on the left, explicitly multiplexed queue on the right. | |
65 |
|
80 | |||
66 | The controller has two MonitoredQueue devices. These devices are primarily for relaying messages between clients and engines, but the controller needs to see those messages for its own purposes. Since no Python code may exist between the two sockets in a queue, all messages sent through these queues (both directions) are also sent via a ``PUB`` socket to a monitor, which allows the Controller to monitor queue traffic without interfering with it. |
|
81 | The controller has two MonitoredQueue devices. These devices are primarily for relaying | |
67 |
|
82 | messages between clients and engines, but the controller needs to see those messages for | ||
68 | For tasks, the engine need not be specified. Messages sent to the ``XREP`` socket from the client side are assigned to an engine via ZMQ's ``XREQ`` round-robin load balancing. Engine replies are directed to specific clients via the IDENTITY of the client, which is received as a prefix at the Engine. |
|
83 | its own purposes. Since no Python code may exist between the two sockets in a queue, all | |
69 |
|
84 | messages sent through these queues (both directions) are also sent via a ``PUB`` socket to | ||
70 | For Multiplexing, ``XREP`` is used for both in and output sockets in the device. Clients must specify the destination by the ``zmq.IDENTITY`` of the ``PAIR`` socket connected to the downstream end of the device. |
|
85 | a monitor, which allows the Controller to monitor queue traffic without interfering with | |
71 |
|
86 | it. | ||
72 | At the Kernel level, both of these PAIR sockets are treated in the same way as the ``REP`` socket in the serial version (except using ZMQStreams instead of explicit sockets). |
|
87 | ||
|
88 | For tasks, the engine need not be specified. Messages sent to the ``XREP`` socket from the | |||
|
89 | client side are assigned to an engine via ZMQ's ``XREQ`` round-robin load balancing. | |||
|
90 | Engine replies are directed to specific clients via the IDENTITY of the client, which is | |||
|
91 | received as a prefix at the Engine. | |||
|
92 | ||||
|
93 | For Multiplexing, ``XREP`` is used for both in and output sockets in the device. Clients | |||
|
94 | must specify the destination by the ``zmq.IDENTITY`` of the ``PAIR`` socket connected to | |||
|
95 | the downstream end of the device. | |||
|
96 | ||||
|
97 | At the Kernel level, both of these PAIR sockets are treated in the same way as the ``REP`` | |||
|
98 | socket in the serial version (except using ZMQStreams instead of explicit sockets). | |||
73 |
|
99 | |||
74 | Client connections |
|
100 | Client connections | |
75 | ------------------ |
|
101 | ------------------ | |
@@ -81,7 +107,8 b' Client connections' | |||||
81 |
|
107 | |||
82 | Clients connect to an ``XREP`` socket to query the controller |
|
108 | Clients connect to an ``XREP`` socket to query the controller | |
83 |
|
109 | |||
84 |
The controller listens on an ``XREP`` socket for queries from clients as to queue status, |
|
110 | The controller listens on an ``XREP`` socket for queries from clients as to queue status, | |
|
111 | and control instructions. Clients can connect to this via a PAIR socket or ``XREQ``. | |||
85 |
|
112 | |||
86 | .. figure:: figs/notiffade.png |
|
113 | .. figure:: figs/notiffade.png | |
87 | :width: 432px |
|
114 | :width: 432px | |
@@ -90,5 +117,8 b' The controller listens on an ``XREP`` socket for queries from clients as to queu' | |||||
90 |
|
117 | |||
91 | Engine registration events are published via a ``PUB`` socket. |
|
118 | Engine registration events are published via a ``PUB`` socket. | |
92 |
|
119 | |||
93 | The controller publishes all registration/unregistration events via a ``PUB`` socket. This allows clients to stay up to date with what engines are available by subscribing to the feed with a ``SUB`` socket. Other processes could selectively subscribe to just registration or unregistration events. |
|
120 | The controller publishes all registration/unregistration events via a ``PUB`` socket. This | |
|
121 | allows clients to stay up to date with what engines are available by subscribing to the | |||
|
122 | feed with a ``SUB`` socket. Other processes could selectively subscribe to just | |||
|
123 | registration or unregistration events. | |||
94 |
|
124 |
@@ -3,16 +3,20 b'' | |||||
3 | Messaging for Parallel Computing |
|
3 | Messaging for Parallel Computing | |
4 | ================================ |
|
4 | ================================ | |
5 |
|
5 | |||
6 |
This is an extension of the :ref:`messaging <messaging>` doc. Diagrams of the connections |
|
6 | This is an extension of the :ref:`messaging <messaging>` doc. Diagrams of the connections | |
|
7 | can be found in the :ref:`parallel connections <parallel_connections>` doc. | |||
7 |
|
8 | |||
8 |
|
9 | |||
9 |
|
10 | ZMQ messaging is also used in the parallel computing IPython system. All messages to/from | ||
10 | ZMQ messaging is also used in the parallel computing IPython system. All messages to/from kernels remain the same as the single kernel model, and are forwarded through a ZMQ Queue device. The controller receives all messages and replies in these channels, and saves results for future use. |
|
11 | kernels remain the same as the single kernel model, and are forwarded through a ZMQ Queue | |
|
12 | device. The controller receives all messages and replies in these channels, and saves | |||
|
13 | results for future use. | |||
11 |
|
14 | |||
12 | The Controller |
|
15 | The Controller | |
13 | -------------- |
|
16 | -------------- | |
14 |
|
17 | |||
15 |
The controller is the central process of the IPython parallel computing model. |
|
18 | The controller is the central process of the IPython parallel computing model. It has 3 | |
|
19 | Devices: | |||
16 |
|
20 | |||
17 | * Heartbeater |
|
21 | * Heartbeater | |
18 | * Multiplexed Queue |
|
22 | * Multiplexed Queue | |
@@ -29,9 +33,13 b' and 3 sockets:' | |||||
29 | Registration (``XREP``) |
|
33 | Registration (``XREP``) | |
30 | *********************** |
|
34 | *********************** | |
31 |
|
35 | |||
32 | The first function of the Controller is to facilitate and monitor connections of clients and engines. Both client and engine registration are handled by the same socket, so only one ip/port pair is needed to connect any number of connections and clients. |
|
36 | The first function of the Controller is to facilitate and monitor connections of clients | |
|
37 | and engines. Both client and engine registration are handled by the same socket, so only | |||
|
38 | one ip/port pair is needed to connect any number of connections and clients. | |||
33 |
|
39 | |||
34 | Engines register with the ``zmq.IDENTITY`` of their two ``XREQ`` sockets, one for the queue, which receives execute requests, and one for the heartbeat, which is used to monitor the survival of the Engine process. |
|
40 | Engines register with the ``zmq.IDENTITY`` of their two ``XREQ`` sockets, one for the | |
|
41 | queue, which receives execute requests, and one for the heartbeat, which is used to | |||
|
42 | monitor the survival of the Engine process. | |||
35 |
|
43 | |||
36 | Message type: ``registration_request``:: |
|
44 | Message type: ``registration_request``:: | |
37 |
|
45 | |||
@@ -40,7 +48,10 b' Message type: ``registration_request``::' | |||||
40 | 'heartbeat' : '1234-abcd-...' # the heartbeat XREQ id |
|
48 | 'heartbeat' : '1234-abcd-...' # the heartbeat XREQ id | |
41 | } |
|
49 | } | |
42 |
|
50 | |||
43 | The Controller replies to an Engine's registration request with the engine's integer ID, and all the remaining connection information for connecting the heartbeat process, and kernel socket(s). The message status will be an error if the Engine requests IDs that already in use. |
|
51 | The Controller replies to an Engine's registration request with the engine's integer ID, | |
|
52 | and all the remaining connection information for connecting the heartbeat process, and | |||
|
53 | kernel queue socket(s). The message status will be an error if the Engine requests IDs that | |||
|
54 | already in use. | |||
44 |
|
55 | |||
45 | Message type: ``registration_reply``:: |
|
56 | Message type: ``registration_reply``:: | |
46 |
|
57 | |||
@@ -49,39 +60,49 b' Message type: ``registration_reply``::' | |||||
49 | # if ok: |
|
60 | # if ok: | |
50 | 'id' : 0, # int, the engine id |
|
61 | 'id' : 0, # int, the engine id | |
51 | 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue |
|
62 | 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue | |
|
63 | 'control' : 'tcp://...', # addr for control queue | |||
52 | 'heartbeat' : (a,b), # tuple containing two interfaces needed for heartbeat |
|
64 | 'heartbeat' : (a,b), # tuple containing two interfaces needed for heartbeat | |
53 | 'task' : 'tcp...', # addr for task queue, or None if no task queue running |
|
65 | 'task' : 'tcp://...', # addr for task queue, or None if no task queue running | |
54 | # if error: |
|
66 | # if error: | |
55 | 'reason' : 'queue_id already registered' |
|
67 | 'reason' : 'queue_id already registered' | |
56 | } |
|
68 | } | |
57 |
|
69 | |||
58 |
Clients use the same socket to start their connections. Connection requests |
|
70 | Clients use the same socket as engines to start their connections. Connection requests | |
|
71 | from clients need no information: | |||
59 |
|
72 | |||
60 | Message type: ``connection_request``:: |
|
73 | Message type: ``connection_request``:: | |
61 |
|
74 | |||
62 | content = {} |
|
75 | content = {} | |
63 |
|
76 | |||
64 | The reply to a Client registration request contains the connection information for the multiplexer and load balanced queues, as well as the address for direct controller queries. If any of these addresses is `None`, that functionality is not available. |
|
77 | The reply to a Client registration request contains the connection information for the | |
|
78 | multiplexer and load balanced queues, as well as the address for direct controller | |||
|
79 | queries. If any of these addresses is `None`, that functionality is not available. | |||
65 |
|
80 | |||
66 | Message type: ``connection_reply``:: |
|
81 | Message type: ``connection_reply``:: | |
67 |
|
82 | |||
68 | content = { |
|
83 | content = { | |
69 | 'status' : 'ok', # or 'error' |
|
84 | 'status' : 'ok', # or 'error' | |
70 | # if ok: |
|
85 | # if ok: | |
71 | 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the queue |
|
86 | 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue | |
72 | 'task' : 'tcp...', # addr for task queue, or None if no task queue running |
|
87 | 'task' : 'tcp...', # addr for task queue, or None if no task queue running | |
73 |
' |
|
88 | 'query' : 'tcp...' # addr for methods to query the controller, like queue_request, etc. | |
|
89 | 'control' : 'tcp...' # addr for control methods, like abort, etc. | |||
74 | } |
|
90 | } | |
75 |
|
91 | |||
76 | Heartbeat |
|
92 | Heartbeat | |
77 | ********* |
|
93 | ********* | |
78 |
|
94 | |||
79 | The controller uses a heartbeat system to monitor engines, and track when they become unresponsive. As described in :ref:`messages <messages>`, and shown in :ref:`connections <parallel_connections>`. |
|
95 | The controller uses a heartbeat system to monitor engines, and track when they become | |
|
96 | unresponsive. As described in :ref:`messages <messages>`, and shown in :ref:`connections | |||
|
97 | <parallel_connections>`. | |||
80 |
|
98 | |||
81 | Notification (``PUB``) |
|
99 | Notification (``PUB``) | |
82 | ********************** |
|
100 | ********************** | |
83 |
|
101 | |||
84 | The controller published all engine registration/unregistration events on a PUB socket. This allows clients to have up-to-date engine ID sets without polling. Registration notifications contain both the integer engine ID and the queue ID, which is necessary for sending messages via the Multiplexer Queue. |
|
102 | The controller published all engine registration/unregistration events on a PUB socket. | |
|
103 | This allows clients to have up-to-date engine ID sets without polling. Registration | |||
|
104 | notifications contain both the integer engine ID and the queue ID, which is necessary for | |||
|
105 | sending messages via the Multiplexer Queue. | |||
85 |
|
106 | |||
86 | Message type: ``registration_notification``:: |
|
107 | Message type: ``registration_notification``:: | |
87 |
|
108 | |||
@@ -100,9 +121,14 b' Message type : ``unregistration_notification``::' | |||||
100 | Client Queries (``XREP``) |
|
121 | Client Queries (``XREP``) | |
101 | ************************* |
|
122 | ************************* | |
102 |
|
123 | |||
103 | The controller monitors and logs all queue traffic, so that clients can retrieve past results or monitor pending tasks. Currently, this information resides in memory on the Controller, but will ultimately be offloaded to a database over an additional ZMQ connection. The interface should remain the same or at least similar. |
|
124 | The controller monitors and logs all queue traffic, so that clients can retrieve past | |
|
125 | results or monitor pending tasks. Currently, this information resides in memory on the | |||
|
126 | Controller, but will ultimately be offloaded to a database over an additional ZMQ | |||
|
127 | connection. The interface should remain the same or at least similar. | |||
104 |
|
128 | |||
105 | :func:`queue_request` requests can specify multiple engines to query via the `targets` element. A verbose flag can be passed, to determine whether the result should be the list of `msg_ids` in the queue or simply the length of each list. |
|
129 | :func:`queue_request` requests can specify multiple engines to query via the `targets` | |
|
130 | element. A verbose flag can be passed, to determine whether the result should be the list | |||
|
131 | of `msg_ids` in the queue or simply the length of each list. | |||
106 |
|
132 | |||
107 | Message type: ``queue_request``:: |
|
133 | Message type: ``queue_request``:: | |
108 |
|
134 | |||
@@ -111,7 +137,9 b' Message type: ``queue_request``::' | |||||
111 | 'targets' : [0,3,1] # list of ints |
|
137 | 'targets' : [0,3,1] # list of ints | |
112 | } |
|
138 | } | |
113 |
|
139 | |||
114 |
The content of a reply to a :func:queue_request request is a dict, keyed by the engine |
|
140 | The content of a reply to a :func:queue_request request is a dict, keyed by the engine | |
|
141 | IDs. Note that they will be the string representation of the integer keys, since JSON | |||
|
142 | cannot handle number keys. | |||
115 |
|
143 | |||
116 | Message type: ``queue_reply``:: |
|
144 | Message type: ``queue_reply``:: | |
117 |
|
145 | |||
@@ -120,15 +148,19 b' Message type: ``queue_reply``::' | |||||
120 | '1' : {'completed' : 10, 'queue' : 1} |
|
148 | '1' : {'completed' : 10, 'queue' : 1} | |
121 | } |
|
149 | } | |
122 |
|
150 | |||
123 | Clients can request individual results directly from the controller. This is primarily for use gathering results of executions not submitted by the particular client, as the client will have all its own results already. Requests are made by msg_id, and can contain one or more msg_id. |
|
151 | Clients can request individual results directly from the controller. This is primarily for | |
|
152 | use gathering results of executions not submitted by the particular client, as the client | |||
|
153 | will have all its own results already. Requests are made by msg_id, and can contain one or | |||
|
154 | more msg_id. | |||
124 |
|
155 | |||
125 | Message type: ``result_request``:: |
|
156 | Message type: ``result_request``:: | |
126 |
|
157 | |||
127 | content = { |
|
158 | content = { | |
128 | 'msg_ids' : [uuid,'...'] # list of strs |
|
159 | 'msg_ids' : ['uuid','...'] # list of strs | |
129 | } |
|
160 | } | |
130 |
|
161 | |||
131 |
The :func:`result_request` reply contains the content objects of the actual execution |
|
162 | The :func:`result_request` reply contains the content objects of the actual execution | |
|
163 | reply messages | |||
132 |
|
164 | |||
133 |
|
165 | |||
134 | Message type: ``result_reply``:: |
|
166 | Message type: ``result_reply``:: | |
@@ -139,13 +171,18 b' Message type: ``result_reply``::' | |||||
139 | msg_id : msg, # the content dict is keyed by msg_ids, |
|
171 | msg_id : msg, # the content dict is keyed by msg_ids, | |
140 | # values are the result messages |
|
172 | # values are the result messages | |
141 | 'pending' : ['msg_id','...'], # msg_ids still pending |
|
173 | 'pending' : ['msg_id','...'], # msg_ids still pending | |
|
174 | 'completed' : ['msg_id','...'], # list of completed msg_ids | |||
142 | # if error: |
|
175 | # if error: | |
143 | 'reason' : "explanation" |
|
176 | 'reason' : "explanation" | |
144 | } |
|
177 | } | |
145 |
|
178 | |||
146 | Clients can also instruct the controller to forget the results of messages. This can be done by message ID or engine ID. Individual messages are dropped by msg_id, and all messages completed on an engine are dropped by engine ID. |
|
179 | For memory management purposes, Clients can also instruct the controller to forget the | |
|
180 | results of messages. This can be done by message ID or engine ID. Individual messages are | |||
|
181 | dropped by msg_id, and all messages completed on an engine are dropped by engine ID. This will likely no longer | |||
|
182 | be necessary once we move to a DB-based message logging backend. | |||
147 |
|
183 | |||
148 |
If the msg_ids element is the string ``'all'`` instead of a list, then all completed |
|
184 | If the msg_ids element is the string ``'all'`` instead of a list, then all completed | |
|
185 | results are forgotten. | |||
149 |
|
186 | |||
150 | Message type: ``purge_request``:: |
|
187 | Message type: ``purge_request``:: | |
151 |
|
188 | |||
@@ -154,7 +191,9 b' Message type: ``purge_request``::' | |||||
154 | 'engine_ids' : [0,2,4] # list of engine IDs |
|
191 | 'engine_ids' : [0,2,4] # list of engine IDs | |
155 | } |
|
192 | } | |
156 |
|
193 | |||
157 |
The reply to a purge request is simply the status 'ok' if the request succeeded, or an |
|
194 | The reply to a purge request is simply the status 'ok' if the request succeeded, or an | |
|
195 | explanation of why it failed, such as requesting the purge of a nonexistent or pending | |||
|
196 | message. | |||
158 |
|
197 | |||
159 | Message type: ``purge_reply``:: |
|
198 | Message type: ``purge_reply``:: | |
160 |
|
199 | |||
@@ -168,18 +207,29 b' Message type: ``purge_reply``::' | |||||
168 | :func:`apply` and :func:`apply_bound` |
|
207 | :func:`apply` and :func:`apply_bound` | |
169 | ************************************* |
|
208 | ************************************* | |
170 |
|
209 | |||
171 |
The `Namespace <http://gist.github.com/483294>`_ model suggests that execution be able to |
|
210 | The `Namespace <http://gist.github.com/483294>`_ model suggests that execution be able to | |
|
211 | use the model:: | |||
172 |
|
212 | |||
173 | client.apply(f, *args, **kwargs) |
|
213 | client.apply(f, *args, **kwargs) | |
174 |
|
214 | |||
175 | which takes `f`, a function in the user's namespace, and executes ``f(*args, **kwargs)`` on a remote engine, returning the result (or, for non-blocking, information facilitating later retrieval of the result). This model, unlike the execute message which just uses a code string, must be able to send arbitrary (pickleable) Python objects. And ideally, copy as little data as we can. The `buffers` property of a Message was introduced for this purpose. |
|
215 | which takes `f`, a function in the user's namespace, and executes ``f(*args, **kwargs)`` | |
|
216 | on a remote engine, returning the result (or, for non-blocking, information facilitating | |||
|
217 | later retrieval of the result). This model, unlike the execute message which just uses a | |||
|
218 | code string, must be able to send arbitrary (pickleable) Python objects. And ideally, copy | |||
|
219 | as little data as we can. The `buffers` property of a Message was introduced for this | |||
|
220 | purpose. | |||
176 |
|
221 | |||
177 |
Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a |
|
222 | Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a | |
|
223 | function signature and builds the correct buffer format for minimal data copying (exactly | |||
|
224 | zero copies of numpy array data). | |||
178 |
|
225 | |||
179 | Message type: ``apply_request``:: |
|
226 | Message type: ``apply_request``:: | |
180 |
|
227 | |||
181 | content = { |
|
228 | content = { | |
182 | 'bound' : True # whether to execute in the engine's namespace or unbound |
|
229 | 'bound' : True, # whether to execute in the engine's namespace or unbound | |
|
230 | 'after' : [msg_ids,...], # list of msg_ids or output of Dependency.as_dict() | |||
|
231 | 'follow' : [msg_ids,...], # list of msg_ids or output of Dependency.as_dict() | |||
|
232 | ||||
183 | } |
|
233 | } | |
184 | buffers = ['...'] # at least 3 in length |
|
234 | buffers = ['...'] # at least 3 in length | |
185 | # as built by build_apply_message(f,args,kwargs) |
|
235 | # as built by build_apply_message(f,args,kwargs) | |
@@ -200,10 +250,22 b' Message type: ``apply_reply``::' | |||||
200 | Implementation |
|
250 | Implementation | |
201 | -------------- |
|
251 | -------------- | |
202 |
|
252 | |||
203 | There are a few differences in implementation between the `StreamSession` object used in the parallel computing fork and the `Session` object, the main one being that messages are sent in parts, rather than as a single serialized object. `StreamSession` objects also take pack/unpack functions, which are to be used when serializing/deserializing objects. These can be any functions that translate to/from formats that ZMQ sockets can send (buffers,bytes, etc.). |
|
253 | There are a few differences in implementation between the `StreamSession` object used in | |
|
254 | the parallel computing fork and the `Session` object, the main one being that messages are | |||
|
255 | sent in parts, rather than as a single serialized object. `StreamSession` objects also | |||
|
256 | take pack/unpack functions, which are to be used when serializing/deserializing objects. | |||
|
257 | These can be any functions that translate to/from formats that ZMQ sockets can send | |||
|
258 | (buffers,bytes, etc.). | |||
204 |
|
259 | |||
205 | Split Sends |
|
260 | Split Sends | |
206 | *********** |
|
261 | *********** | |
207 |
|
262 | |||
208 | Previously, messages were bundled as a single json object and one call to :func:`socket.send_json`. Since the controller inspects all messages, and doesn't need to see the content of the messages, which can be large, messages are serialized and sent in pieces. All messages are sent in at least 3 parts: the header, the parent header, and the content. This allows the controller to unpack and inspect the (always small) header, without spending time unpacking the content unless the message is bound for the controller. Buffers are added on to the end of the message, and can be any objects that present the buffer interface. |
|
263 | Previously, messages were bundled as a single json object and one call to | |
|
264 | :func:`socket.send_json`. Since the controller inspects all messages, and doesn't need to | |||
|
265 | see the content of the messages, which can be large, messages are now serialized and sent in | |||
|
266 | pieces. All messages are sent in at least 3 parts: the header, the parent header, and the | |||
|
267 | content. This allows the controller to unpack and inspect the (always small) header, | |||
|
268 | without spending time unpacking the content unless the message is bound for the | |||
|
269 | controller. Buffers are added on to the end of the message, and can be any objects that | |||
|
270 | present the buffer interface. | |||
209 |
|
271 |
@@ -69,11 +69,11 b' New features' | |||||
69 | :mod:`~IPython.external.argparse` to parse command line options for |
|
69 | :mod:`~IPython.external.argparse` to parse command line options for | |
70 | :command:`ipython`. |
|
70 | :command:`ipython`. | |
71 |
|
71 | |||
72 |
* New top level :func:`~IPython. |
|
72 | * New top level :func:`~IPython.frontend.terminal.embed.embed` function that can be called | |
73 | to embed IPython at any place in user's code. One the first call it will |
|
73 | to embed IPython at any place in user's code. One the first call it will | |
74 |
create an :class:`~IPython. |
|
74 | create an :class:`~IPython.frontend.terminal.embed.InteractiveShellEmbed` instance and | |
75 | call it. In later calls, it just calls the previously created |
|
75 | call it. In later calls, it just calls the previously created | |
76 |
:class:`~IPython. |
|
76 | :class:`~IPython.frontend.terminal.embed.InteractiveShellEmbed`. | |
77 |
|
77 | |||
78 | * Created a component system (:mod:`IPython.core.component`) that is based on |
|
78 | * Created a component system (:mod:`IPython.core.component`) that is based on | |
79 | :mod:`IPython.utils.traitlets`. Components are arranged into a runtime |
|
79 | :mod:`IPython.utils.traitlets`. Components are arranged into a runtime |
General Comments 0
You need to be logged in to leave comments.
Login now