##// END OF EJS Templates
general parallel code cleanup
MinRK -
Show More
@@ -1,6 +1,7 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller with 0MQ
2 """The IPython Controller with 0MQ
3 This is the master object that handles connections from engines, clients, and
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 """
5 """
5 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
@@ -28,12 +29,14 b' from IPython.zmq.entry_point import bind_port'
28 from streamsession import Message, wrap_exception
29 from streamsession import Message, wrap_exception
29 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
30 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
30 connect_logger, parse_url)
31 connect_logger, parse_url)
31 # from messages import json # use the same import switches
32
32
33 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
34 # Code
34 # Code
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36
36
37 def _passer(*args, **kwargs):
38 return
39
37 class ReverseDict(dict):
40 class ReverseDict(dict):
38 """simple double-keyed subset of dict methods."""
41 """simple double-keyed subset of dict methods."""
39
42
@@ -93,16 +96,18 b' class Controller(object):'
93 loop: zmq IOLoop instance
96 loop: zmq IOLoop instance
94 session: StreamSession object
97 session: StreamSession object
95 <removed> context: zmq context for creating new connections (?)
98 <removed> context: zmq context for creating new connections (?)
99 queue: ZMQStream for monitoring the command queue (SUB)
96 registrar: ZMQStream for engine registration requests (XREP)
100 registrar: ZMQStream for engine registration requests (XREP)
101 heartbeat: HeartMonitor object checking the pulse of the engines
97 clientele: ZMQStream for client connections (XREP)
102 clientele: ZMQStream for client connections (XREP)
98 not used for jobs, only query/control commands
103 not used for jobs, only query/control commands
99 queue: ZMQStream for monitoring the command queue (SUB)
104 notifier: ZMQStream for broadcasting engine registration changes (PUB)
100 heartbeat: HeartMonitor object checking the pulse of the engines
105 db: connection to db for out of memory logging of commands
101 db_stream: connection to db for out of memory logging of commands
102 NotImplemented
106 NotImplemented
103 queue_addr: zmq connection address of the XREP socket for the queue
107 engine_addrs: dict of zmq connection information for engines to connect
104 hb_addr: zmq connection address of the PUB socket for heartbeats
108 to the queues.
105 task_addr: zmq connection address of the XREQ socket for task queue
109 client_addrs: dict of zmq connection information for engines to connect
110 to the queues.
106 """
111 """
107 # internal data structures:
112 # internal data structures:
108 ids=None # engine IDs
113 ids=None # engine IDs
@@ -165,14 +170,16 b' class Controller(object):'
165 self.notifier = notifier
170 self.notifier = notifier
166 self.db = db
171 self.db = db
167
172
173 # validate connection dicts:
168 self.client_addrs = client_addrs
174 self.client_addrs = client_addrs
169 assert isinstance(client_addrs['queue'], str)
175 assert isinstance(client_addrs['queue'], str)
176 assert isinstance(client_addrs['control'], str)
170 # self.hb_addrs = hb_addrs
177 # self.hb_addrs = hb_addrs
171 self.engine_addrs = engine_addrs
178 self.engine_addrs = engine_addrs
172 assert isinstance(engine_addrs['queue'], str)
179 assert isinstance(engine_addrs['queue'], str)
180 assert isinstance(client_addrs['control'], str)
173 assert len(engine_addrs['heartbeat']) == 2
181 assert len(engine_addrs['heartbeat']) == 2
174
182
175
176 # register our callbacks
183 # register our callbacks
177 self.registrar.on_recv(self.dispatch_register_request)
184 self.registrar.on_recv(self.dispatch_register_request)
178 self.clientele.on_recv(self.dispatch_client_msg)
185 self.clientele.on_recv(self.dispatch_client_msg)
@@ -182,19 +189,25 b' class Controller(object):'
182 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
189 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
183 heartbeat.add_new_heart_handler(self.handle_new_heart)
190 heartbeat.add_new_heart_handler(self.handle_new_heart)
184
191
185 if self.db is not None:
192 self.queue_handlers = { 'in' : self.save_queue_request,
186 self.db.on_recv(self.dispatch_db)
193 'out': self.save_queue_result,
187
194 'intask': self.save_task_request,
195 'outtask': self.save_task_result,
196 'tracktask': self.save_task_destination,
197 'incontrol': _passer,
198 'outcontrol': _passer,
199 }
200
188 self.client_handlers = {'queue_request': self.queue_status,
201 self.client_handlers = {'queue_request': self.queue_status,
189 'result_request': self.get_results,
202 'result_request': self.get_results,
190 'purge_request': self.purge_results,
203 'purge_request': self.purge_results,
204 'load_request': self.check_load,
191 'resubmit_request': self.resubmit_task,
205 'resubmit_request': self.resubmit_task,
192 }
206 }
193
207
194 self.registrar_handlers = {'registration_request' : self.register_engine,
208 self.registrar_handlers = {'registration_request' : self.register_engine,
195 'unregistration_request' : self.unregister_engine,
209 'unregistration_request' : self.unregister_engine,
196 'connection_request': self.connection_request,
210 'connection_request': self.connection_request,
197
198 }
211 }
199 #
212 #
200 # this is the stuff that will move to DB:
213 # this is the stuff that will move to DB:
@@ -272,7 +285,7 b' class Controller(object):'
272 print (idents,msg, len(msg))
285 print (idents,msg, len(msg))
273 try:
286 try:
274 msg = self.session.unpack_message(msg,content=True)
287 msg = self.session.unpack_message(msg,content=True)
275 except Exception, e:
288 except Exception as e:
276 logger.error("registration::got bad registration message: %s"%msg)
289 logger.error("registration::got bad registration message: %s"%msg)
277 raise e
290 raise e
278 return
291 return
@@ -291,18 +304,9 b' class Controller(object):'
291 logger.debug("queue traffic: %s"%msg[:2])
304 logger.debug("queue traffic: %s"%msg[:2])
292 switch = msg[0]
305 switch = msg[0]
293 idents, msg = self.session.feed_identities(msg[1:])
306 idents, msg = self.session.feed_identities(msg[1:])
294 if switch == 'in':
307 handler = self.queue_handlers.get(switch, None)
295 self.save_queue_request(idents, msg)
308 if handler is not None:
296 elif switch == 'out':
309 handler(idents, msg)
297 self.save_queue_result(idents, msg)
298 elif switch == 'intask':
299 self.save_task_request(idents, msg)
300 elif switch == 'outtask':
301 self.save_task_result(idents, msg)
302 elif switch == 'tracktask':
303 self.save_task_destination(idents, msg)
304 elif switch in ('incontrol', 'outcontrol'):
305 pass
306 else:
310 else:
307 logger.error("Invalid message topic: %s"%switch)
311 logger.error("Invalid message topic: %s"%switch)
308
312
@@ -392,7 +396,7 b' class Controller(object):'
392 received=None,
396 received=None,
393 engine=(eid, queue_id))
397 engine=(eid, queue_id))
394 self.pending[msg_id] = ( msg, info )
398 self.pending[msg_id] = ( msg, info )
395 self.queues[eid][0].append(msg_id)
399 self.queues[eid].append(msg_id)
396
400
397 def save_queue_result(self, idents, msg):
401 def save_queue_result(self, idents, msg):
398 client_id, queue_id = idents[:2]
402 client_id, queue_id = idents[:2]
@@ -417,7 +421,7 b' class Controller(object):'
417 self.results[msg_id] = msg
421 self.results[msg_id] = msg
418 if msg_id in self.pending:
422 if msg_id in self.pending:
419 self.pending.pop(msg_id)
423 self.pending.pop(msg_id)
420 self.queues[eid][0].remove(msg_id)
424 self.queues[eid].remove(msg_id)
421 self.completed[eid].append(msg_id)
425 self.completed[eid].append(msg_id)
422 else:
426 else:
423 logger.debug("queue:: unknown msg finished %s"%msg_id)
427 logger.debug("queue:: unknown msg finished %s"%msg_id)
@@ -425,6 +429,7 b' class Controller(object):'
425 #--------------------- Task Queue Traffic ------------------------------
429 #--------------------- Task Queue Traffic ------------------------------
426
430
427 def save_task_request(self, idents, msg):
431 def save_task_request(self, idents, msg):
432 """Save the submission of a task."""
428 client_id = idents[0]
433 client_id = idents[0]
429
434
430 try:
435 try:
@@ -437,13 +442,17 b' class Controller(object):'
437 header = msg['header']
442 header = msg['header']
438 msg_id = header['msg_id']
443 msg_id = header['msg_id']
439 self.mia.add(msg_id)
444 self.mia.add(msg_id)
440 self.pending[msg_id] = msg
445 info = dict(submit=datetime.now(),
446 received=None,
447 engine=None)
448 self.pending[msg_id] = (msg, info)
441 if not self.tasks.has_key(client_id):
449 if not self.tasks.has_key(client_id):
442 self.tasks[client_id] = []
450 self.tasks[client_id] = []
443 self.tasks[client_id].append(msg_id)
451 self.tasks[client_id].append(msg_id)
444
452
445 def save_task_result(self, idents, msg):
453 def save_task_result(self, idents, msg):
446 client_id = idents[0]
454 """save the result of a completed task."""
455 client_id, engine_uuid = idents[:2]
447 try:
456 try:
448 msg = self.session.unpack_message(msg, content=False)
457 msg = self.session.unpack_message(msg, content=False)
449 except:
458 except:
@@ -452,16 +461,19 b' class Controller(object):'
452 return
461 return
453
462
454 parent = msg['parent_header']
463 parent = msg['parent_header']
464 eid = self.by_ident[engine_uuid]
455 if not parent:
465 if not parent:
456 # print msg
466 # print msg
457 # logger.warn("")
467 # logger.warn("")
458 return
468 return
459 msg_id = parent['msg_id']
469 msg_id = parent['msg_id']
460 self.results[msg_id] = msg
470 self.results[msg_id] = msg
461 if msg_id in self.pending:
471 if msg_id in self.pending and msg_id in self.tasks[eid]:
462 self.pending.pop(msg_id)
472 self.pending.pop(msg_id)
463 if msg_id in self.mia:
473 if msg_id in self.mia:
464 self.mia.remove(msg_id)
474 self.mia.remove(msg_id)
475 self.completed[eid].append(msg_id)
476 self.tasks[eid].remove(msg_id)
465 else:
477 else:
466 logger.debug("task::unknown task %s finished"%msg_id)
478 logger.debug("task::unknown task %s finished"%msg_id)
467
479
@@ -475,16 +487,16 b' class Controller(object):'
475 print (content)
487 print (content)
476 msg_id = content['msg_id']
488 msg_id = content['msg_id']
477 engine_uuid = content['engine_id']
489 engine_uuid = content['engine_id']
478 for eid,queue_id in self.keytable.iteritems():
490 eid = self.by_ident[engine_uuid]
479 if queue_id == engine_uuid:
480 break
481
491
482 logger.info("task::task %s arrived on %s"%(msg_id, eid))
492 logger.info("task::task %s arrived on %s"%(msg_id, eid))
483 if msg_id in self.mia:
493 if msg_id in self.mia:
484 self.mia.remove(msg_id)
494 self.mia.remove(msg_id)
485 else:
495 else:
486 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
496 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
487 self.tasks[engine_uuid].append(msg_id)
497
498 self.tasks[eid].append(msg_id)
499 self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
488
500
489 def mia_task_request(self, idents, msg):
501 def mia_task_request(self, idents, msg):
490 client_id = idents[0]
502 client_id = idents[0]
@@ -493,10 +505,12 b' class Controller(object):'
493
505
494
506
495
507
496 #-------------------- Registration -----------------------------
508 #-------------------------------------------------------------------------
509 # Registration requests
510 #-------------------------------------------------------------------------
497
511
498 def connection_request(self, client_id, msg):
512 def connection_request(self, client_id, msg):
499 """reply with connection addresses for clients"""
513 """Reply with connection addresses for clients."""
500 logger.info("client::client %s connected"%client_id)
514 logger.info("client::client %s connected"%client_id)
501 content = dict(status='ok')
515 content = dict(status='ok')
502 content.update(self.client_addrs)
516 content.update(self.client_addrs)
@@ -507,7 +521,7 b' class Controller(object):'
507 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
521 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
508
522
509 def register_engine(self, reg, msg):
523 def register_engine(self, reg, msg):
510 """register an engine"""
524 """Register a new engine."""
511 content = msg['content']
525 content = msg['content']
512 try:
526 try:
513 queue = content['queue']
527 queue = content['queue']
@@ -556,6 +570,7 b' class Controller(object):'
556 return eid
570 return eid
557
571
558 def unregister_engine(self, ident, msg):
572 def unregister_engine(self, ident, msg):
573 """Unregister an engine that explicitly requested to leave."""
559 try:
574 try:
560 eid = msg['content']['id']
575 eid = msg['content']['id']
561 except:
576 except:
@@ -569,7 +584,7 b' class Controller(object):'
569 self.hearts.pop(ec.heartbeat)
584 self.hearts.pop(ec.heartbeat)
570 self.by_ident.pop(ec.queue)
585 self.by_ident.pop(ec.queue)
571 self.completed.pop(eid)
586 self.completed.pop(eid)
572 for msg_id in self.queues.pop(eid)[0]:
587 for msg_id in self.queues.pop(eid):
573 msg = self.pending.pop(msg_id)
588 msg = self.pending.pop(msg_id)
574 ############## TODO: HANDLE IT ################
589 ############## TODO: HANDLE IT ################
575
590
@@ -577,6 +592,8 b' class Controller(object):'
577 self.session.send(self.notifier, "unregistration_notification", content=content)
592 self.session.send(self.notifier, "unregistration_notification", content=content)
578
593
579 def finish_registration(self, heart):
594 def finish_registration(self, heart):
595 """Second half of engine registration, called after our HeartMonitor
596 has received a beat from the Engine's Heart."""
580 try:
597 try:
581 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
598 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
582 except KeyError:
599 except KeyError:
@@ -590,7 +607,8 b' class Controller(object):'
590 self.keytable[eid] = queue
607 self.keytable[eid] = queue
591 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
608 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
592 self.by_ident[queue] = eid
609 self.by_ident[queue] = eid
593 self.queues[eid] = ([],[])
610 self.queues[eid] = list()
611 self.tasks[eid] = list()
594 self.completed[eid] = list()
612 self.completed[eid] = list()
595 self.hearts[heart] = eid
613 self.hearts[heart] = eid
596 content = dict(id=eid, queue=self.engines[eid].queue)
614 content = dict(id=eid, queue=self.engines[eid].queue)
@@ -604,7 +622,9 b' class Controller(object):'
604 else:
622 else:
605 pass
623 pass
606
624
607 #------------------- Client Requests -------------------------------
625 #-------------------------------------------------------------------------
626 # Client Requests
627 #-------------------------------------------------------------------------
608
628
609 def check_load(self, client_id, msg):
629 def check_load(self, client_id, msg):
610 content = msg['content']
630 content = msg['content']
@@ -620,12 +640,17 b' class Controller(object):'
620 content = dict(status='ok')
640 content = dict(status='ok')
621 # loads = {}
641 # loads = {}
622 for t in targets:
642 for t in targets:
623 content[str(t)] = len(self.queues[t])
643 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
624 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
644 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
625
645
626
646
627 def queue_status(self, client_id, msg):
647 def queue_status(self, client_id, msg):
628 """handle queue_status request"""
648 """Return the Queue status of one or more targets.
649 if verbose: return the msg_ids
650 else: return len of each type.
651 keys: queue (pending MUX jobs)
652 tasks (pending Task jobs)
653 completed (finished jobs from both queues)"""
629 content = msg['content']
654 content = msg['content']
630 targets = content['targets']
655 targets = content['targets']
631 try:
656 try:
@@ -635,19 +660,23 b' class Controller(object):'
635 self.session.send(self.clientele, "controller_error",
660 self.session.send(self.clientele, "controller_error",
636 content=content, ident=client_id)
661 content=content, ident=client_id)
637 return
662 return
638 verbose = msg.get('verbose', False)
663 verbose = content.get('verbose', False)
639 content = dict()
664 content = dict(status='ok')
640 for t in targets:
665 for t in targets:
641 queue = self.queues[t]
666 queue = self.queues[t]
642 completed = self.completed[t]
667 completed = self.completed[t]
668 tasks = self.tasks[t]
643 if not verbose:
669 if not verbose:
644 queue = len(queue)
670 queue = len(queue)
645 completed = len(completed)
671 completed = len(completed)
646 content[str(t)] = {'queue': queue, 'completed': completed }
672 tasks = len(tasks)
673 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
647 # pending
674 # pending
648 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
675 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
649
676
650 def purge_results(self, client_id, msg):
677 def purge_results(self, client_id, msg):
678 """Purge results from memory. This method is more valuable before we move
679 to a DB based message storage mechanism."""
651 content = msg['content']
680 content = msg['content']
652 msg_ids = content.get('msg_ids', [])
681 msg_ids = content.get('msg_ids', [])
653 reply = dict(status='ok')
682 reply = dict(status='ok')
@@ -675,37 +704,11 b' class Controller(object):'
675 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
704 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
676
705
677 def resubmit_task(self, client_id, msg, buffers):
706 def resubmit_task(self, client_id, msg, buffers):
678 content = msg['content']
707 """Resubmit a task."""
679 header = msg['header']
708 raise NotImplementedError
680
681
682 msg_ids = content.get('msg_ids', [])
683 reply = dict(status='ok')
684 if msg_ids == 'all':
685 self.results = {}
686 else:
687 for msg_id in msg_ids:
688 if msg_id in self.results:
689 self.results.pop(msg_id)
690 else:
691 if msg_id in self.pending:
692 reply = dict(status='error', reason="msg pending: %r"%msg_id)
693 else:
694 reply = dict(status='error', reason="No such msg: %r"%msg_id)
695 break
696 eids = content.get('engine_ids', [])
697 for eid in eids:
698 if eid not in self.engines:
699 reply = dict(status='error', reason="No such engine: %i"%eid)
700 break
701 msg_ids = self.completed.pop(eid)
702 for msg_id in msg_ids:
703 self.results.pop(msg_id)
704
705 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
706
709
707 def get_results(self, client_id, msg):
710 def get_results(self, client_id, msg):
708 """get the result of 1 or more messages"""
711 """Get the result of 1 or more messages."""
709 content = msg['content']
712 content = msg['content']
710 msg_ids = set(content['msg_ids'])
713 msg_ids = set(content['msg_ids'])
711 statusonly = content.get('status_only', False)
714 statusonly = content.get('status_only', False)
@@ -727,33 +730,12 b' class Controller(object):'
727 break
730 break
728 self.session.send(self.clientele, "result_reply", content=content,
731 self.session.send(self.clientele, "result_reply", content=content,
729 parent=msg, ident=client_id)
732 parent=msg, ident=client_id)
730
731
733
732
734
733 ############ OLD METHODS for Python Relay Controller ###################
735 #-------------------------------------------------------------------------
734 def _validate_engine_msg(self, msg):
735 """validates and unpacks headers of a message. Returns False if invalid,
736 (ident, message)"""
737 ident = msg[0]
738 try:
739 msg = self.session.unpack_message(msg[1:], content=False)
740 except:
741 logger.error("engine.%s::Invalid Message %s"%(ident, msg))
742 return False
743
744 try:
745 eid = msg.header.username
746 assert self.engines.has_key(eid)
747 except:
748 logger.error("engine::Invalid Engine ID %s"%(ident))
749 return False
750
751 return eid, msg
752
753
754 #--------------------
755 # Entry Point
736 # Entry Point
756 #--------------------
737 #-------------------------------------------------------------------------
738
757 def make_argument_parser():
739 def make_argument_parser():
758 """Make an argument parser"""
740 """Make an argument parser"""
759 parser = make_base_argument_parser()
741 parser = make_base_argument_parser()
@@ -130,7 +130,7 b' class HeartMonitor(object):'
130 for handler in self._failure_handlers:
130 for handler in self._failure_handlers:
131 try:
131 try:
132 handler(heart)
132 handler(heart)
133 except Exception, e:
133 except Exception as e:
134 print (e)
134 print (e)
135 logger.error("heartbeat::Bad Handler! %s"%handler)
135 logger.error("heartbeat::Bad Handler! %s"%handler)
136 pass
136 pass
@@ -1,3 +1,10 b''
1 """The Python scheduler for rich scheduling.
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
6 """
7
1 #----------------------------------------------------------------------
8 #----------------------------------------------------------------------
2 # Imports
9 # Imports
3 #----------------------------------------------------------------------
10 #----------------------------------------------------------------------
@@ -40,7 +47,7 b' def plainrandom(loads):'
40 def lru(loads):
47 def lru(loads):
41 """Always pick the front of the line.
48 """Always pick the front of the line.
42
49
43 The content of loads is ignored.
50 The content of `loads` is ignored.
44
51
45 Assumes LRU ordering of loads, with oldest first.
52 Assumes LRU ordering of loads, with oldest first.
46 """
53 """
@@ -151,10 +158,12 b' class TaskScheduler(object):'
151 self.notifier_stream.on_recv(self.dispatch_notification)
158 self.notifier_stream.on_recv(self.dispatch_notification)
152
159
153 def resume_receiving(self):
160 def resume_receiving(self):
154 """resume accepting jobs"""
161 """Resume accepting jobs."""
155 self.client_stream.on_recv(self.dispatch_submission, copy=False)
162 self.client_stream.on_recv(self.dispatch_submission, copy=False)
156
163
157 def stop_receiving(self):
164 def stop_receiving(self):
165 """Stop accepting jobs while there are no engines.
166 Leave them in the ZMQ queue."""
158 self.client_stream.on_recv(None)
167 self.client_stream.on_recv(None)
159
168
160 #-----------------------------------------------------------------------
169 #-----------------------------------------------------------------------
@@ -176,7 +185,7 b' class TaskScheduler(object):'
176 logger.error("task::Invalid notification msg: %s"%msg)
185 logger.error("task::Invalid notification msg: %s"%msg)
177 @logged
186 @logged
178 def _register_engine(self, uid):
187 def _register_engine(self, uid):
179 """new engine became available"""
188 """New engine with ident `uid` became available."""
180 # head of the line:
189 # head of the line:
181 self.targets.insert(0,uid)
190 self.targets.insert(0,uid)
182 self.loads.insert(0,0)
191 self.loads.insert(0,0)
@@ -187,10 +196,12 b' class TaskScheduler(object):'
187 self.resume_receiving()
196 self.resume_receiving()
188
197
189 def _unregister_engine(self, uid):
198 def _unregister_engine(self, uid):
190 """existing engine became unavailable"""
199 """Existing engine with ident `uid` became unavailable."""
191 # handle any potentially finished tasks:
192 if len(self.targets) == 1:
200 if len(self.targets) == 1:
201 # this was our only engine
193 self.stop_receiving()
202 self.stop_receiving()
203
204 # handle any potentially finished tasks:
194 self.engine_stream.flush()
205 self.engine_stream.flush()
195
206
196 self.completed.pop(uid)
207 self.completed.pop(uid)
@@ -203,7 +214,7 b' class TaskScheduler(object):'
203 self.handle_stranded_tasks(lost)
214 self.handle_stranded_tasks(lost)
204
215
205 def handle_stranded_tasks(self, lost):
216 def handle_stranded_tasks(self, lost):
206 """deal with jobs resident in an engine that died."""
217 """Deal with jobs resident in an engine that died."""
207 # TODO: resubmit the tasks?
218 # TODO: resubmit the tasks?
208 for msg_id in lost:
219 for msg_id in lost:
209 pass
220 pass
@@ -214,26 +225,29 b' class TaskScheduler(object):'
214 #-----------------------------------------------------------------------
225 #-----------------------------------------------------------------------
215 @logged
226 @logged
216 def dispatch_submission(self, raw_msg):
227 def dispatch_submission(self, raw_msg):
217 """dispatch job submission"""
228 """Dispatch job submission to appropriate handlers."""
218 # ensure targets up to date:
229 # ensure targets up to date:
219 self.notifier_stream.flush()
230 self.notifier_stream.flush()
220 try:
231 try:
221 idents, msg = self.session.feed_identities(raw_msg, copy=False)
232 idents, msg = self.session.feed_identities(raw_msg, copy=False)
222 except Exception, e:
233 except Exception as e:
223 logger.error("task::Invaid msg: %s"%msg)
234 logger.error("task::Invaid msg: %s"%msg)
224 return
235 return
225
236
226 msg = self.session.unpack_message(msg, content=False, copy=False)
237 msg = self.session.unpack_message(msg, content=False, copy=False)
227 header = msg['header']
238 header = msg['header']
228 msg_id = header['msg_id']
239 msg_id = header['msg_id']
240
241 # time dependencies
229 after = Dependency(header.get('after', []))
242 after = Dependency(header.get('after', []))
230 if after.mode == 'all':
243 if after.mode == 'all':
231 after.difference_update(self.all_done)
244 after.difference_update(self.all_done)
232 if after.check(self.all_done):
245 if after.check(self.all_done):
233 # recast as empty set, if we are already met,
246 # recast as empty set, if `after` already met,
234 # to prevent
247 # to prevent unnecessary set comparisons
235 after = Dependency([])
248 after = Dependency([])
236
249
250 # location dependencies
237 follow = Dependency(header.get('follow', []))
251 follow = Dependency(header.get('follow', []))
238 if len(after) == 0:
252 if len(after) == 0:
239 # time deps already met, try to run
253 # time deps already met, try to run
@@ -244,6 +258,7 b' class TaskScheduler(object):'
244 self.save_unmet(msg_id, raw_msg, after, follow)
258 self.save_unmet(msg_id, raw_msg, after, follow)
245 # send to monitor
259 # send to monitor
246 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
260 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
261
247 @logged
262 @logged
248 def maybe_run(self, msg_id, raw_msg, follow=None):
263 def maybe_run(self, msg_id, raw_msg, follow=None):
249 """check location dependencies, and run if they are met."""
264 """check location dependencies, and run if they are met."""
@@ -276,7 +291,7 b' class TaskScheduler(object):'
276
291
277 @logged
292 @logged
278 def submit_task(self, msg_id, msg, follow=None, indices=None):
293 def submit_task(self, msg_id, msg, follow=None, indices=None):
279 """submit a task to any of a subset of our targets"""
294 """Submit a task to any of a subset of our targets."""
280 if indices:
295 if indices:
281 loads = [self.loads[i] for i in indices]
296 loads = [self.loads[i] for i in indices]
282 else:
297 else:
@@ -290,6 +305,8 b' class TaskScheduler(object):'
290 self.engine_stream.send_multipart(msg, copy=False)
305 self.engine_stream.send_multipart(msg, copy=False)
291 self.add_job(idx)
306 self.add_job(idx)
292 self.pending[target][msg_id] = (msg, follow)
307 self.pending[target][msg_id] = (msg, follow)
308 content = dict(msg_id=msg_id, engine_id=target)
309 self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask')
293
310
294 #-----------------------------------------------------------------------
311 #-----------------------------------------------------------------------
295 # Result Handling
312 # Result Handling
@@ -298,7 +315,7 b' class TaskScheduler(object):'
298 def dispatch_result(self, raw_msg):
315 def dispatch_result(self, raw_msg):
299 try:
316 try:
300 idents,msg = self.session.feed_identities(raw_msg, copy=False)
317 idents,msg = self.session.feed_identities(raw_msg, copy=False)
301 except Exception, e:
318 except Exception as e:
302 logger.error("task::Invaid result: %s"%msg)
319 logger.error("task::Invaid result: %s"%msg)
303 return
320 return
304 msg = self.session.unpack_message(msg, content=False, copy=False)
321 msg = self.session.unpack_message(msg, content=False, copy=False)
@@ -125,7 +125,7 b' class RawInput(object):'
125 while True:
125 while True:
126 try:
126 try:
127 reply = self.socket.recv_json(zmq.NOBLOCK)
127 reply = self.socket.recv_json(zmq.NOBLOCK)
128 except zmq.ZMQError, e:
128 except zmq.ZMQError as e:
129 if e.errno == zmq.EAGAIN:
129 if e.errno == zmq.EAGAIN:
130 pass
130 pass
131 else:
131 else:
@@ -171,7 +171,7 b' class Kernel(object):'
171 while True:
171 while True:
172 try:
172 try:
173 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
173 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
174 except zmq.ZMQError, e:
174 except zmq.ZMQError as e:
175 if e.errno == zmq.EAGAIN:
175 if e.errno == zmq.EAGAIN:
176 break
176 break
177 else:
177 else:
@@ -238,17 +238,6 b' class Kernel(object):'
238 else:
238 else:
239 handler(self.control_stream, idents, msg)
239 handler(self.control_stream, idents, msg)
240
240
241 # def flush_control(self):
242 # while any(zmq.select([self.control_socket],[],[],1e-4)):
243 # try:
244 # msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False)
245 # except zmq.ZMQError, e:
246 # if e.errno != zmq.EAGAIN:
247 # raise e
248 # return
249 # else:
250 # self.dispatch_control(msg)
251
252
241
253 #-------------------- queue helpers ------------------------------
242 #-------------------- queue helpers ------------------------------
254
243
@@ -8,6 +8,7 b' import sys'
8 import traceback
8 import traceback
9 import pprint
9 import pprint
10 import uuid
10 import uuid
11 from datetime import datetime
11
12
12 import zmq
13 import zmq
13 from zmq.utils import jsonapi
14 from zmq.utils import jsonapi
@@ -111,6 +112,7 b' class Message(object):'
111
112
112
113
113 def msg_header(msg_id, msg_type, username, session):
114 def msg_header(msg_id, msg_type, username, session):
115 date=datetime.now().isoformat()
114 return locals()
116 return locals()
115 # return {
117 # return {
116 # 'msg_id' : msg_id,
118 # 'msg_id' : msg_id,
@@ -140,7 +142,7 b' def extract_header(msg_or_header):'
140 return h
142 return h
141
143
142 def rekey(dikt):
144 def rekey(dikt):
143 """rekey a dict that has been forced to use str keys where there should be
145 """Rekey a dict that has been forced to use str keys where there should be
144 ints by json. This belongs in the jsonutil added by fperez."""
146 ints by json. This belongs in the jsonutil added by fperez."""
145 for k in dikt.iterkeys():
147 for k in dikt.iterkeys():
146 if isinstance(k, str):
148 if isinstance(k, str):
@@ -162,11 +164,22 b' def rekey(dikt):'
162 return dikt
164 return dikt
163
165
164 def serialize_object(obj, threshold=64e-6):
166 def serialize_object(obj, threshold=64e-6):
165 """serialize an object into a list of sendable buffers.
167 """Serialize an object into a list of sendable buffers.
166
168
167 Returns: (pmd, bufs)
169 Parameters
168 where pmd is the pickled metadata wrapper, and bufs
170 ----------
169 is a list of data buffers"""
171
172 obj : object
173 The object to be serialized
174 threshold : float
175 The threshold for not double-pickling the content.
176
177
178 Returns
179 -------
180 ('pmd', [bufs]) :
181 where pmd is the pickled metadata wrapper,
182 bufs is a list of data buffers"""
170 # threshold is 100 B
183 # threshold is 100 B
171 databuffers = []
184 databuffers = []
172 if isinstance(obj, (list, tuple)):
185 if isinstance(obj, (list, tuple)):
@@ -318,6 +331,8 b' class StreamSession(object):'
318 Parameters
331 Parameters
319 ----------
332 ----------
320
333
334 stream : zmq.Socket or ZMQStream
335 the socket-like object used to send the data
321 msg_type : str or Message/dict
336 msg_type : str or Message/dict
322 Normally, msg_type will be
337 Normally, msg_type will be
323
338
@@ -347,10 +362,7 b' class StreamSession(object):'
347 to_send.append(DELIM)
362 to_send.append(DELIM)
348 to_send.append(self.pack(msg['header']))
363 to_send.append(self.pack(msg['header']))
349 to_send.append(self.pack(msg['parent_header']))
364 to_send.append(self.pack(msg['parent_header']))
350 # if parent is None:
365
351 # to_send.append(self.none)
352 # else:
353 # to_send.append(self.pack(dict(parent)))
354 if content is None:
366 if content is None:
355 content = self.none
367 content = self.none
356 elif isinstance(content, dict):
368 elif isinstance(content, dict):
@@ -374,11 +386,10 b' class StreamSession(object):'
374 pprint.pprint(omsg)
386 pprint.pprint(omsg)
375 pprint.pprint(to_send)
387 pprint.pprint(to_send)
376 pprint.pprint(buffers)
388 pprint.pprint(buffers)
377 # return both the msg object and the buffers
378 return omsg
389 return omsg
379
390
380 def send_raw(self, stream, msg, flags=0, copy=True, idents=None):
391 def send_raw(self, stream, msg, flags=0, copy=True, idents=None):
381 """send a raw message via idents.
392 """Send a raw message via idents.
382
393
383 Parameters
394 Parameters
384 ----------
395 ----------
@@ -399,7 +410,7 b' class StreamSession(object):'
399 socket = socket.socket
410 socket = socket.socket
400 try:
411 try:
401 msg = socket.recv_multipart(mode)
412 msg = socket.recv_multipart(mode)
402 except zmq.ZMQError, e:
413 except zmq.ZMQError as e:
403 if e.errno == zmq.EAGAIN:
414 if e.errno == zmq.EAGAIN:
404 # We can convert EAGAIN to None as we know in this case
415 # We can convert EAGAIN to None as we know in this case
405 # recv_json won't return None.
416 # recv_json won't return None.
@@ -412,7 +423,7 b' class StreamSession(object):'
412 idents, msg = self.feed_identities(msg, copy)
423 idents, msg = self.feed_identities(msg, copy)
413 try:
424 try:
414 return idents, self.unpack_message(msg, content=content, copy=copy)
425 return idents, self.unpack_message(msg, content=content, copy=copy)
415 except Exception, e:
426 except Exception as e:
416 print (idents, msg)
427 print (idents, msg)
417 # TODO: handle it
428 # TODO: handle it
418 raise e
429 raise e
@@ -4,18 +4,22 b''
4 Connection Diagrams of The IPython ZMQ Cluster
4 Connection Diagrams of The IPython ZMQ Cluster
5 ==============================================
5 ==============================================
6
6
7 This is a quick summary and illustration of the connections involved in the ZeroMQ based IPython cluster for parallel computing.
7 This is a quick summary and illustration of the connections involved in the ZeroMQ based
8 IPython cluster for parallel computing.
8
9
9 All Connections
10 All Connections
10 ===============
11 ===============
11
12
12 The Parallel Computing code is currently under development in Min RK's IPython fork_ on GitHub.
13 The Parallel Computing code is currently under development in Min RK's IPython fork_ on GitHub.
13
14
14 .. _fork: http://github.com/minrk/ipython
15 .. _fork: http://github.com/minrk/ipython/tree/newparallel
15
16
16 The IPython cluster consists of a Controller and one or more clients and engines. The goal of the Controller is to manage and monitor the connections and communications between the clients and the engines.
17 The IPython cluster consists of a Controller and one or more clients and engines. The goal
18 of the Controller is to manage and monitor the connections and communications between the
19 clients and the engines.
17
20
18 It is important for security/practicality reasons that all connections be inbound to the controller process. The arrows in the figures indicate the direction of the connection.
21 It is important for security/practicality reasons that all connections be inbound to the
22 controller process. The arrows in the figures indicate the direction of the connection.
19
23
20
24
21 .. figure:: figs/allconnections.png
25 .. figure:: figs/allconnections.png
@@ -25,8 +29,9 b' It is important for security/practicality reasons that all connections be inboun'
25
29
26 All the connections involved in connecting one client to one engine.
30 All the connections involved in connecting one client to one engine.
27
31
28 The Controller consists of two ZMQ Devices - both MonitoredQueues, one for Tasks (load balanced, engine agnostic), one for Multiplexing (explicit targets), a Python device for monitoring (the Heartbeat Monitor).
32 The Controller consists of two ZMQ Devices - both MonitoredQueues, one for Tasks (load
29
33 balanced, engine agnostic), one for Multiplexing (explicit targets), a Python device for
34 monitoring (the Heartbeat Monitor).
30
35
31
36
32 Registration
37 Registration
@@ -39,7 +44,10 b' Registration'
39
44
40 Engines and Clients only need to know where the Registrar ``XREP`` is located to start connecting.
45 Engines and Clients only need to know where the Registrar ``XREP`` is located to start connecting.
41
46
42 Once a controller is launched, the only information needed for connecting clients and/or engines to the controller is the IP/port of the ``XREP`` socket called the Registrar. This socket handles connections from both clients and engines, and replies with the remaining information necessary to establish the remaining connections.
47 Once a controller is launched, the only information needed for connecting clients and/or
48 engines to the controller is the IP/port of the ``XREP`` socket called the Registrar. This
49 socket handles connections from both clients and engines, and replies with the remaining
50 information necessary to establish the remaining connections.
43
51
44 Heartbeat
52 Heartbeat
45 ---------
53 ---------
@@ -51,25 +59,43 b' Heartbeat'
51
59
52 The heartbeat sockets.
60 The heartbeat sockets.
53
61
54 The heartbeat process has been described elsewhere. To summarize: the controller publishes a distinct message periodically via a ``PUB`` socket. Each engine has a ``zmq.FORWARDER`` device with a ``SUB`` socket for input, and ``XREQ`` socket for output. The ``SUB`` socket is connected to the ``PUB`` socket labeled *HB(ping)*, and the ``XREQ`` is connected to the ``XREP`` labeled *HB(pong)*. This results in the same message being relayed back to the Heartbeat Monitor with the addition of the ``XREQ`` prefix. The Heartbeat Monitor receives all the replies via an ``XREP`` socket, and identifies which hearts are still beating by the ``zmq.IDENTITY`` prefix of the ``XREQ`` sockets.
62 The heartbeat process has been described elsewhere. To summarize: the controller publishes
63 a distinct message periodically via a ``PUB`` socket. Each engine has a ``zmq.FORWARDER``
64 device with a ``SUB`` socket for input, and ``XREQ`` socket for output. The ``SUB`` socket
65 is connected to the ``PUB`` socket labeled *HB(ping)*, and the ``XREQ`` is connected to
66 the ``XREP`` labeled *HB(pong)*. This results in the same message being relayed back to
67 the Heartbeat Monitor with the addition of the ``XREQ`` prefix. The Heartbeat Monitor
68 receives all the replies via an ``XREP`` socket, and identifies which hearts are still
69 beating by the ``zmq.IDENTITY`` prefix of the ``XREQ`` sockets.
55
70
56 Queues
71 Queues
57 ------
72 ------
58
73
59 .. figure:: figs/queuefade.png
74 .. figure:: figs/queuefade.png
60 :width: 432px
75 :width: 432px
61 :alt: IPython Queue connections
76 :alt: IPython Queue connections
62 :align: center
77 :align: center
63
78
64 Load balanced Task queue on the left, explicitly multiplexed queue on the right.
79 Load balanced Task queue on the left, explicitly multiplexed queue on the right.
65
80
66 The controller has two MonitoredQueue devices. These devices are primarily for relaying messages between clients and engines, but the controller needs to see those messages for its own purposes. Since no Python code may exist between the two sockets in a queue, all messages sent through these queues (both directions) are also sent via a ``PUB`` socket to a monitor, which allows the Controller to monitor queue traffic without interfering with it.
81 The controller has two MonitoredQueue devices. These devices are primarily for relaying
67
82 messages between clients and engines, but the controller needs to see those messages for
68 For tasks, the engine need not be specified. Messages sent to the ``XREP`` socket from the client side are assigned to an engine via ZMQ's ``XREQ`` round-robin load balancing. Engine replies are directed to specific clients via the IDENTITY of the client, which is received as a prefix at the Engine.
83 its own purposes. Since no Python code may exist between the two sockets in a queue, all
69
84 messages sent through these queues (both directions) are also sent via a ``PUB`` socket to
70 For Multiplexing, ``XREP`` is used for both in and output sockets in the device. Clients must specify the destination by the ``zmq.IDENTITY`` of the ``PAIR`` socket connected to the downstream end of the device.
85 a monitor, which allows the Controller to monitor queue traffic without interfering with
71
86 it.
72 At the Kernel level, both of these PAIR sockets are treated in the same way as the ``REP`` socket in the serial version (except using ZMQStreams instead of explicit sockets).
87
88 For tasks, the engine need not be specified. Messages sent to the ``XREP`` socket from the
89 client side are assigned to an engine via ZMQ's ``XREQ`` round-robin load balancing.
90 Engine replies are directed to specific clients via the IDENTITY of the client, which is
91 received as a prefix at the Engine.
92
93 For Multiplexing, ``XREP`` is used for both in and output sockets in the device. Clients
94 must specify the destination by the ``zmq.IDENTITY`` of the ``PAIR`` socket connected to
95 the downstream end of the device.
96
97 At the Kernel level, both of these PAIR sockets are treated in the same way as the ``REP``
98 socket in the serial version (except using ZMQStreams instead of explicit sockets).
73
99
74 Client connections
100 Client connections
75 ------------------
101 ------------------
@@ -81,7 +107,8 b' Client connections'
81
107
82 Clients connect to an ``XREP`` socket to query the controller
108 Clients connect to an ``XREP`` socket to query the controller
83
109
84 The controller listens on an ``XREP`` socket for queries from clients as to queue status, and control instructions. Clients can connect to this via a PAIR socket or ``XREQ``.
110 The controller listens on an ``XREP`` socket for queries from clients as to queue status,
111 and control instructions. Clients can connect to this via a PAIR socket or ``XREQ``.
85
112
86 .. figure:: figs/notiffade.png
113 .. figure:: figs/notiffade.png
87 :width: 432px
114 :width: 432px
@@ -90,5 +117,8 b' The controller listens on an ``XREP`` socket for queries from clients as to queu'
90
117
91 Engine registration events are published via a ``PUB`` socket.
118 Engine registration events are published via a ``PUB`` socket.
92
119
93 The controller publishes all registration/unregistration events via a ``PUB`` socket. This allows clients to stay up to date with what engines are available by subscribing to the feed with a ``SUB`` socket. Other processes could selectively subscribe to just registration or unregistration events.
120 The controller publishes all registration/unregistration events via a ``PUB`` socket. This
121 allows clients to stay up to date with what engines are available by subscribing to the
122 feed with a ``SUB`` socket. Other processes could selectively subscribe to just
123 registration or unregistration events.
94
124
@@ -3,16 +3,20 b''
3 Messaging for Parallel Computing
3 Messaging for Parallel Computing
4 ================================
4 ================================
5
5
6 This is an extension of the :ref:`messaging <messaging>` doc. Diagrams of the connections can be found in the :ref:`parallel connections <parallel_connections>` doc.
6 This is an extension of the :ref:`messaging <messaging>` doc. Diagrams of the connections
7 can be found in the :ref:`parallel connections <parallel_connections>` doc.
7
8
8
9
9
10 ZMQ messaging is also used in the parallel computing IPython system. All messages to/from
10 ZMQ messaging is also used in the parallel computing IPython system. All messages to/from kernels remain the same as the single kernel model, and are forwarded through a ZMQ Queue device. The controller receives all messages and replies in these channels, and saves results for future use.
11 kernels remain the same as the single kernel model, and are forwarded through a ZMQ Queue
12 device. The controller receives all messages and replies in these channels, and saves
13 results for future use.
11
14
12 The Controller
15 The Controller
13 --------------
16 --------------
14
17
15 The controller is the central process of the IPython parallel computing model. It has 3 Devices:
18 The controller is the central process of the IPython parallel computing model. It has 3
19 Devices:
16
20
17 * Heartbeater
21 * Heartbeater
18 * Multiplexed Queue
22 * Multiplexed Queue
@@ -29,9 +33,13 b' and 3 sockets:'
29 Registration (``XREP``)
33 Registration (``XREP``)
30 ***********************
34 ***********************
31
35
32 The first function of the Controller is to facilitate and monitor connections of clients and engines. Both client and engine registration are handled by the same socket, so only one ip/port pair is needed to connect any number of connections and clients.
36 The first function of the Controller is to facilitate and monitor connections of clients
37 and engines. Both client and engine registration are handled by the same socket, so only
38 one ip/port pair is needed to connect any number of connections and clients.
33
39
34 Engines register with the ``zmq.IDENTITY`` of their two ``XREQ`` sockets, one for the queue, which receives execute requests, and one for the heartbeat, which is used to monitor the survival of the Engine process.
40 Engines register with the ``zmq.IDENTITY`` of their two ``XREQ`` sockets, one for the
41 queue, which receives execute requests, and one for the heartbeat, which is used to
42 monitor the survival of the Engine process.
35
43
36 Message type: ``registration_request``::
44 Message type: ``registration_request``::
37
45
@@ -40,7 +48,10 b' Message type: ``registration_request``::'
40 'heartbeat' : '1234-abcd-...' # the heartbeat XREQ id
48 'heartbeat' : '1234-abcd-...' # the heartbeat XREQ id
41 }
49 }
42
50
43 The Controller replies to an Engine's registration request with the engine's integer ID, and all the remaining connection information for connecting the heartbeat process, and kernel socket(s). The message status will be an error if the Engine requests IDs that already in use.
51 The Controller replies to an Engine's registration request with the engine's integer ID,
52 and all the remaining connection information for connecting the heartbeat process, and
53 kernel queue socket(s). The message status will be an error if the Engine requests IDs that
54 already in use.
44
55
45 Message type: ``registration_reply``::
56 Message type: ``registration_reply``::
46
57
@@ -49,39 +60,49 b' Message type: ``registration_reply``::'
49 # if ok:
60 # if ok:
50 'id' : 0, # int, the engine id
61 'id' : 0, # int, the engine id
51 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue
62 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue
63 'control' : 'tcp://...', # addr for control queue
52 'heartbeat' : (a,b), # tuple containing two interfaces needed for heartbeat
64 'heartbeat' : (a,b), # tuple containing two interfaces needed for heartbeat
53 'task' : 'tcp...', # addr for task queue, or None if no task queue running
65 'task' : 'tcp://...', # addr for task queue, or None if no task queue running
54 # if error:
66 # if error:
55 'reason' : 'queue_id already registered'
67 'reason' : 'queue_id already registered'
56 }
68 }
57
69
58 Clients use the same socket to start their connections. Connection requests from clients need no information:
70 Clients use the same socket as engines to start their connections. Connection requests
71 from clients need no information:
59
72
60 Message type: ``connection_request``::
73 Message type: ``connection_request``::
61
74
62 content = {}
75 content = {}
63
76
64 The reply to a Client registration request contains the connection information for the multiplexer and load balanced queues, as well as the address for direct controller queries. If any of these addresses is `None`, that functionality is not available.
77 The reply to a Client registration request contains the connection information for the
78 multiplexer and load balanced queues, as well as the address for direct controller
79 queries. If any of these addresses is `None`, that functionality is not available.
65
80
66 Message type: ``connection_reply``::
81 Message type: ``connection_reply``::
67
82
68 content = {
83 content = {
69 'status' : 'ok', # or 'error'
84 'status' : 'ok', # or 'error'
70 # if ok:
85 # if ok:
71 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the queue
86 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue
72 'task' : 'tcp...', # addr for task queue, or None if no task queue running
87 'task' : 'tcp...', # addr for task queue, or None if no task queue running
73 'controller' : 'tcp...' # addr for controller methods, like queue_request, etc.
88 'query' : 'tcp...' # addr for methods to query the controller, like queue_request, etc.
89 'control' : 'tcp...' # addr for control methods, like abort, etc.
74 }
90 }
75
91
76 Heartbeat
92 Heartbeat
77 *********
93 *********
78
94
79 The controller uses a heartbeat system to monitor engines, and track when they become unresponsive. As described in :ref:`messages <messages>`, and shown in :ref:`connections <parallel_connections>`.
95 The controller uses a heartbeat system to monitor engines, and track when they become
96 unresponsive. As described in :ref:`messages <messages>`, and shown in :ref:`connections
97 <parallel_connections>`.
80
98
81 Notification (``PUB``)
99 Notification (``PUB``)
82 **********************
100 **********************
83
101
84 The controller published all engine registration/unregistration events on a PUB socket. This allows clients to have up-to-date engine ID sets without polling. Registration notifications contain both the integer engine ID and the queue ID, which is necessary for sending messages via the Multiplexer Queue.
102 The controller published all engine registration/unregistration events on a PUB socket.
103 This allows clients to have up-to-date engine ID sets without polling. Registration
104 notifications contain both the integer engine ID and the queue ID, which is necessary for
105 sending messages via the Multiplexer Queue.
85
106
86 Message type: ``registration_notification``::
107 Message type: ``registration_notification``::
87
108
@@ -100,9 +121,14 b' Message type : ``unregistration_notification``::'
100 Client Queries (``XREP``)
121 Client Queries (``XREP``)
101 *************************
122 *************************
102
123
103 The controller monitors and logs all queue traffic, so that clients can retrieve past results or monitor pending tasks. Currently, this information resides in memory on the Controller, but will ultimately be offloaded to a database over an additional ZMQ connection. The interface should remain the same or at least similar.
124 The controller monitors and logs all queue traffic, so that clients can retrieve past
125 results or monitor pending tasks. Currently, this information resides in memory on the
126 Controller, but will ultimately be offloaded to a database over an additional ZMQ
127 connection. The interface should remain the same or at least similar.
104
128
105 :func:`queue_request` requests can specify multiple engines to query via the `targets` element. A verbose flag can be passed, to determine whether the result should be the list of `msg_ids` in the queue or simply the length of each list.
129 :func:`queue_request` requests can specify multiple engines to query via the `targets`
130 element. A verbose flag can be passed, to determine whether the result should be the list
131 of `msg_ids` in the queue or simply the length of each list.
106
132
107 Message type: ``queue_request``::
133 Message type: ``queue_request``::
108
134
@@ -111,7 +137,9 b' Message type: ``queue_request``::'
111 'targets' : [0,3,1] # list of ints
137 'targets' : [0,3,1] # list of ints
112 }
138 }
113
139
114 The content of a reply to a :func:queue_request request is a dict, keyed by the engine IDs. Note that they will be the string representation of the integer keys, since JSON cannot handle number keys.
140 The content of a reply to a :func:queue_request request is a dict, keyed by the engine
141 IDs. Note that they will be the string representation of the integer keys, since JSON
142 cannot handle number keys.
115
143
116 Message type: ``queue_reply``::
144 Message type: ``queue_reply``::
117
145
@@ -120,15 +148,19 b' Message type: ``queue_reply``::'
120 '1' : {'completed' : 10, 'queue' : 1}
148 '1' : {'completed' : 10, 'queue' : 1}
121 }
149 }
122
150
123 Clients can request individual results directly from the controller. This is primarily for use gathering results of executions not submitted by the particular client, as the client will have all its own results already. Requests are made by msg_id, and can contain one or more msg_id.
151 Clients can request individual results directly from the controller. This is primarily for
152 use gathering results of executions not submitted by the particular client, as the client
153 will have all its own results already. Requests are made by msg_id, and can contain one or
154 more msg_id.
124
155
125 Message type: ``result_request``::
156 Message type: ``result_request``::
126
157
127 content = {
158 content = {
128 'msg_ids' : [uuid,'...'] # list of strs
159 'msg_ids' : ['uuid','...'] # list of strs
129 }
160 }
130
161
131 The :func:`result_request` reply contains the content objects of the actual execution reply messages
162 The :func:`result_request` reply contains the content objects of the actual execution
163 reply messages
132
164
133
165
134 Message type: ``result_reply``::
166 Message type: ``result_reply``::
@@ -139,13 +171,18 b' Message type: ``result_reply``::'
139 msg_id : msg, # the content dict is keyed by msg_ids,
171 msg_id : msg, # the content dict is keyed by msg_ids,
140 # values are the result messages
172 # values are the result messages
141 'pending' : ['msg_id','...'], # msg_ids still pending
173 'pending' : ['msg_id','...'], # msg_ids still pending
174 'completed' : ['msg_id','...'], # list of completed msg_ids
142 # if error:
175 # if error:
143 'reason' : "explanation"
176 'reason' : "explanation"
144 }
177 }
145
178
146 Clients can also instruct the controller to forget the results of messages. This can be done by message ID or engine ID. Individual messages are dropped by msg_id, and all messages completed on an engine are dropped by engine ID.
179 For memory management purposes, Clients can also instruct the controller to forget the
180 results of messages. This can be done by message ID or engine ID. Individual messages are
181 dropped by msg_id, and all messages completed on an engine are dropped by engine ID. This will likely no longer
182 be necessary once we move to a DB-based message logging backend.
147
183
148 If the msg_ids element is the string ``'all'`` instead of a list, then all completed results are forgotten.
184 If the msg_ids element is the string ``'all'`` instead of a list, then all completed
185 results are forgotten.
149
186
150 Message type: ``purge_request``::
187 Message type: ``purge_request``::
151
188
@@ -154,7 +191,9 b' Message type: ``purge_request``::'
154 'engine_ids' : [0,2,4] # list of engine IDs
191 'engine_ids' : [0,2,4] # list of engine IDs
155 }
192 }
156
193
157 The reply to a purge request is simply the status 'ok' if the request succeeded, or an explanation of why it failed, such as requesting the purge of a nonexistent or pending message.
194 The reply to a purge request is simply the status 'ok' if the request succeeded, or an
195 explanation of why it failed, such as requesting the purge of a nonexistent or pending
196 message.
158
197
159 Message type: ``purge_reply``::
198 Message type: ``purge_reply``::
160
199
@@ -168,18 +207,29 b' Message type: ``purge_reply``::'
168 :func:`apply` and :func:`apply_bound`
207 :func:`apply` and :func:`apply_bound`
169 *************************************
208 *************************************
170
209
171 The `Namespace <http://gist.github.com/483294>`_ model suggests that execution be able to use the model::
210 The `Namespace <http://gist.github.com/483294>`_ model suggests that execution be able to
211 use the model::
172
212
173 client.apply(f, *args, **kwargs)
213 client.apply(f, *args, **kwargs)
174
214
175 which takes `f`, a function in the user's namespace, and executes ``f(*args, **kwargs)`` on a remote engine, returning the result (or, for non-blocking, information facilitating later retrieval of the result). This model, unlike the execute message which just uses a code string, must be able to send arbitrary (pickleable) Python objects. And ideally, copy as little data as we can. The `buffers` property of a Message was introduced for this purpose.
215 which takes `f`, a function in the user's namespace, and executes ``f(*args, **kwargs)``
216 on a remote engine, returning the result (or, for non-blocking, information facilitating
217 later retrieval of the result). This model, unlike the execute message which just uses a
218 code string, must be able to send arbitrary (pickleable) Python objects. And ideally, copy
219 as little data as we can. The `buffers` property of a Message was introduced for this
220 purpose.
176
221
177 Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a function signature and builds the correct buffer format.
222 Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a
223 function signature and builds the correct buffer format for minimal data copying (exactly
224 zero copies of numpy array data).
178
225
179 Message type: ``apply_request``::
226 Message type: ``apply_request``::
180
227
181 content = {
228 content = {
182 'bound' : True # whether to execute in the engine's namespace or unbound
229 'bound' : True, # whether to execute in the engine's namespace or unbound
230 'after' : [msg_ids,...], # list of msg_ids or output of Dependency.as_dict()
231 'follow' : [msg_ids,...], # list of msg_ids or output of Dependency.as_dict()
232
183 }
233 }
184 buffers = ['...'] # at least 3 in length
234 buffers = ['...'] # at least 3 in length
185 # as built by build_apply_message(f,args,kwargs)
235 # as built by build_apply_message(f,args,kwargs)
@@ -200,10 +250,22 b' Message type: ``apply_reply``::'
200 Implementation
250 Implementation
201 --------------
251 --------------
202
252
203 There are a few differences in implementation between the `StreamSession` object used in the parallel computing fork and the `Session` object, the main one being that messages are sent in parts, rather than as a single serialized object. `StreamSession` objects also take pack/unpack functions, which are to be used when serializing/deserializing objects. These can be any functions that translate to/from formats that ZMQ sockets can send (buffers,bytes, etc.).
253 There are a few differences in implementation between the `StreamSession` object used in
254 the parallel computing fork and the `Session` object, the main one being that messages are
255 sent in parts, rather than as a single serialized object. `StreamSession` objects also
256 take pack/unpack functions, which are to be used when serializing/deserializing objects.
257 These can be any functions that translate to/from formats that ZMQ sockets can send
258 (buffers,bytes, etc.).
204
259
205 Split Sends
260 Split Sends
206 ***********
261 ***********
207
262
208 Previously, messages were bundled as a single json object and one call to :func:`socket.send_json`. Since the controller inspects all messages, and doesn't need to see the content of the messages, which can be large, messages are serialized and sent in pieces. All messages are sent in at least 3 parts: the header, the parent header, and the content. This allows the controller to unpack and inspect the (always small) header, without spending time unpacking the content unless the message is bound for the controller. Buffers are added on to the end of the message, and can be any objects that present the buffer interface.
263 Previously, messages were bundled as a single json object and one call to
264 :func:`socket.send_json`. Since the controller inspects all messages, and doesn't need to
265 see the content of the messages, which can be large, messages are now serialized and sent in
266 pieces. All messages are sent in at least 3 parts: the header, the parent header, and the
267 content. This allows the controller to unpack and inspect the (always small) header,
268 without spending time unpacking the content unless the message is bound for the
269 controller. Buffers are added on to the end of the message, and can be any objects that
270 present the buffer interface.
209
271
@@ -69,11 +69,11 b' New features'
69 :mod:`~IPython.external.argparse` to parse command line options for
69 :mod:`~IPython.external.argparse` to parse command line options for
70 :command:`ipython`.
70 :command:`ipython`.
71
71
72 * New top level :func:`~IPython.core.embed.embed` function that can be called
72 * New top level :func:`~IPython.frontend.terminal.embed.embed` function that can be called
73 to embed IPython at any place in user's code. One the first call it will
73 to embed IPython at any place in user's code. One the first call it will
74 create an :class:`~IPython.core.embed.InteractiveShellEmbed` instance and
74 create an :class:`~IPython.frontend.terminal.embed.InteractiveShellEmbed` instance and
75 call it. In later calls, it just calls the previously created
75 call it. In later calls, it just calls the previously created
76 :class:`~IPython.core.embed.InteractiveShellEmbed`.
76 :class:`~IPython.frontend.terminal.embed.InteractiveShellEmbed`.
77
77
78 * Created a component system (:mod:`IPython.core.component`) that is based on
78 * Created a component system (:mod:`IPython.core.component`) that is based on
79 :mod:`IPython.utils.traitlets`. Components are arranged into a runtime
79 :mod:`IPython.utils.traitlets`. Components are arranged into a runtime
General Comments 0
You need to be logged in to leave comments. Login now