##// END OF EJS Templates
general parallel code cleanup
MinRK -
Show More
@@ -1,6 +1,7 b''
1 1 #!/usr/bin/env python
2 2 """The IPython Controller with 0MQ
3 This is the master object that handles connections from engines, clients, and
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 5 """
5 6 #-----------------------------------------------------------------------------
6 7 # Copyright (C) 2010 The IPython Development Team
@@ -28,12 +29,14 b' from IPython.zmq.entry_point import bind_port'
28 29 from streamsession import Message, wrap_exception
29 30 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
30 31 connect_logger, parse_url)
31 # from messages import json # use the same import switches
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Code
35 35 #-----------------------------------------------------------------------------
36 36
37 def _passer(*args, **kwargs):
38 return
39
37 40 class ReverseDict(dict):
38 41 """simple double-keyed subset of dict methods."""
39 42
@@ -93,16 +96,18 b' class Controller(object):'
93 96 loop: zmq IOLoop instance
94 97 session: StreamSession object
95 98 <removed> context: zmq context for creating new connections (?)
99 queue: ZMQStream for monitoring the command queue (SUB)
96 100 registrar: ZMQStream for engine registration requests (XREP)
101 heartbeat: HeartMonitor object checking the pulse of the engines
97 102 clientele: ZMQStream for client connections (XREP)
98 103 not used for jobs, only query/control commands
99 queue: ZMQStream for monitoring the command queue (SUB)
100 heartbeat: HeartMonitor object checking the pulse of the engines
101 db_stream: connection to db for out of memory logging of commands
104 notifier: ZMQStream for broadcasting engine registration changes (PUB)
105 db: connection to db for out of memory logging of commands
102 106 NotImplemented
103 queue_addr: zmq connection address of the XREP socket for the queue
104 hb_addr: zmq connection address of the PUB socket for heartbeats
105 task_addr: zmq connection address of the XREQ socket for task queue
107 engine_addrs: dict of zmq connection information for engines to connect
108 to the queues.
109 client_addrs: dict of zmq connection information for engines to connect
110 to the queues.
106 111 """
107 112 # internal data structures:
108 113 ids=None # engine IDs
@@ -165,14 +170,16 b' class Controller(object):'
165 170 self.notifier = notifier
166 171 self.db = db
167 172
173 # validate connection dicts:
168 174 self.client_addrs = client_addrs
169 175 assert isinstance(client_addrs['queue'], str)
176 assert isinstance(client_addrs['control'], str)
170 177 # self.hb_addrs = hb_addrs
171 178 self.engine_addrs = engine_addrs
172 179 assert isinstance(engine_addrs['queue'], str)
180 assert isinstance(client_addrs['control'], str)
173 181 assert len(engine_addrs['heartbeat']) == 2
174 182
175
176 183 # register our callbacks
177 184 self.registrar.on_recv(self.dispatch_register_request)
178 185 self.clientele.on_recv(self.dispatch_client_msg)
@@ -182,19 +189,25 b' class Controller(object):'
182 189 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
183 190 heartbeat.add_new_heart_handler(self.handle_new_heart)
184 191
185 if self.db is not None:
186 self.db.on_recv(self.dispatch_db)
187
192 self.queue_handlers = { 'in' : self.save_queue_request,
193 'out': self.save_queue_result,
194 'intask': self.save_task_request,
195 'outtask': self.save_task_result,
196 'tracktask': self.save_task_destination,
197 'incontrol': _passer,
198 'outcontrol': _passer,
199 }
200
188 201 self.client_handlers = {'queue_request': self.queue_status,
189 202 'result_request': self.get_results,
190 203 'purge_request': self.purge_results,
204 'load_request': self.check_load,
191 205 'resubmit_request': self.resubmit_task,
192 206 }
193 207
194 208 self.registrar_handlers = {'registration_request' : self.register_engine,
195 209 'unregistration_request' : self.unregister_engine,
196 210 'connection_request': self.connection_request,
197
198 211 }
199 212 #
200 213 # this is the stuff that will move to DB:
@@ -272,7 +285,7 b' class Controller(object):'
272 285 print (idents,msg, len(msg))
273 286 try:
274 287 msg = self.session.unpack_message(msg,content=True)
275 except Exception, e:
288 except Exception as e:
276 289 logger.error("registration::got bad registration message: %s"%msg)
277 290 raise e
278 291 return
@@ -291,18 +304,9 b' class Controller(object):'
291 304 logger.debug("queue traffic: %s"%msg[:2])
292 305 switch = msg[0]
293 306 idents, msg = self.session.feed_identities(msg[1:])
294 if switch == 'in':
295 self.save_queue_request(idents, msg)
296 elif switch == 'out':
297 self.save_queue_result(idents, msg)
298 elif switch == 'intask':
299 self.save_task_request(idents, msg)
300 elif switch == 'outtask':
301 self.save_task_result(idents, msg)
302 elif switch == 'tracktask':
303 self.save_task_destination(idents, msg)
304 elif switch in ('incontrol', 'outcontrol'):
305 pass
307 handler = self.queue_handlers.get(switch, None)
308 if handler is not None:
309 handler(idents, msg)
306 310 else:
307 311 logger.error("Invalid message topic: %s"%switch)
308 312
@@ -392,7 +396,7 b' class Controller(object):'
392 396 received=None,
393 397 engine=(eid, queue_id))
394 398 self.pending[msg_id] = ( msg, info )
395 self.queues[eid][0].append(msg_id)
399 self.queues[eid].append(msg_id)
396 400
397 401 def save_queue_result(self, idents, msg):
398 402 client_id, queue_id = idents[:2]
@@ -417,7 +421,7 b' class Controller(object):'
417 421 self.results[msg_id] = msg
418 422 if msg_id in self.pending:
419 423 self.pending.pop(msg_id)
420 self.queues[eid][0].remove(msg_id)
424 self.queues[eid].remove(msg_id)
421 425 self.completed[eid].append(msg_id)
422 426 else:
423 427 logger.debug("queue:: unknown msg finished %s"%msg_id)
@@ -425,6 +429,7 b' class Controller(object):'
425 429 #--------------------- Task Queue Traffic ------------------------------
426 430
427 431 def save_task_request(self, idents, msg):
432 """Save the submission of a task."""
428 433 client_id = idents[0]
429 434
430 435 try:
@@ -437,13 +442,17 b' class Controller(object):'
437 442 header = msg['header']
438 443 msg_id = header['msg_id']
439 444 self.mia.add(msg_id)
440 self.pending[msg_id] = msg
445 info = dict(submit=datetime.now(),
446 received=None,
447 engine=None)
448 self.pending[msg_id] = (msg, info)
441 449 if not self.tasks.has_key(client_id):
442 450 self.tasks[client_id] = []
443 451 self.tasks[client_id].append(msg_id)
444 452
445 453 def save_task_result(self, idents, msg):
446 client_id = idents[0]
454 """save the result of a completed task."""
455 client_id, engine_uuid = idents[:2]
447 456 try:
448 457 msg = self.session.unpack_message(msg, content=False)
449 458 except:
@@ -452,16 +461,19 b' class Controller(object):'
452 461 return
453 462
454 463 parent = msg['parent_header']
464 eid = self.by_ident[engine_uuid]
455 465 if not parent:
456 466 # print msg
457 467 # logger.warn("")
458 468 return
459 469 msg_id = parent['msg_id']
460 470 self.results[msg_id] = msg
461 if msg_id in self.pending:
471 if msg_id in self.pending and msg_id in self.tasks[eid]:
462 472 self.pending.pop(msg_id)
463 473 if msg_id in self.mia:
464 474 self.mia.remove(msg_id)
475 self.completed[eid].append(msg_id)
476 self.tasks[eid].remove(msg_id)
465 477 else:
466 478 logger.debug("task::unknown task %s finished"%msg_id)
467 479
@@ -475,16 +487,16 b' class Controller(object):'
475 487 print (content)
476 488 msg_id = content['msg_id']
477 489 engine_uuid = content['engine_id']
478 for eid,queue_id in self.keytable.iteritems():
479 if queue_id == engine_uuid:
480 break
490 eid = self.by_ident[engine_uuid]
481 491
482 492 logger.info("task::task %s arrived on %s"%(msg_id, eid))
483 493 if msg_id in self.mia:
484 494 self.mia.remove(msg_id)
485 495 else:
486 496 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
487 self.tasks[engine_uuid].append(msg_id)
497
498 self.tasks[eid].append(msg_id)
499 self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
488 500
489 501 def mia_task_request(self, idents, msg):
490 502 client_id = idents[0]
@@ -493,10 +505,12 b' class Controller(object):'
493 505
494 506
495 507
496 #-------------------- Registration -----------------------------
508 #-------------------------------------------------------------------------
509 # Registration requests
510 #-------------------------------------------------------------------------
497 511
498 512 def connection_request(self, client_id, msg):
499 """reply with connection addresses for clients"""
513 """Reply with connection addresses for clients."""
500 514 logger.info("client::client %s connected"%client_id)
501 515 content = dict(status='ok')
502 516 content.update(self.client_addrs)
@@ -507,7 +521,7 b' class Controller(object):'
507 521 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
508 522
509 523 def register_engine(self, reg, msg):
510 """register an engine"""
524 """Register a new engine."""
511 525 content = msg['content']
512 526 try:
513 527 queue = content['queue']
@@ -556,6 +570,7 b' class Controller(object):'
556 570 return eid
557 571
558 572 def unregister_engine(self, ident, msg):
573 """Unregister an engine that explicitly requested to leave."""
559 574 try:
560 575 eid = msg['content']['id']
561 576 except:
@@ -569,7 +584,7 b' class Controller(object):'
569 584 self.hearts.pop(ec.heartbeat)
570 585 self.by_ident.pop(ec.queue)
571 586 self.completed.pop(eid)
572 for msg_id in self.queues.pop(eid)[0]:
587 for msg_id in self.queues.pop(eid):
573 588 msg = self.pending.pop(msg_id)
574 589 ############## TODO: HANDLE IT ################
575 590
@@ -577,6 +592,8 b' class Controller(object):'
577 592 self.session.send(self.notifier, "unregistration_notification", content=content)
578 593
579 594 def finish_registration(self, heart):
595 """Second half of engine registration, called after our HeartMonitor
596 has received a beat from the Engine's Heart."""
580 597 try:
581 598 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
582 599 except KeyError:
@@ -590,7 +607,8 b' class Controller(object):'
590 607 self.keytable[eid] = queue
591 608 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
592 609 self.by_ident[queue] = eid
593 self.queues[eid] = ([],[])
610 self.queues[eid] = list()
611 self.tasks[eid] = list()
594 612 self.completed[eid] = list()
595 613 self.hearts[heart] = eid
596 614 content = dict(id=eid, queue=self.engines[eid].queue)
@@ -604,7 +622,9 b' class Controller(object):'
604 622 else:
605 623 pass
606 624
607 #------------------- Client Requests -------------------------------
625 #-------------------------------------------------------------------------
626 # Client Requests
627 #-------------------------------------------------------------------------
608 628
609 629 def check_load(self, client_id, msg):
610 630 content = msg['content']
@@ -620,12 +640,17 b' class Controller(object):'
620 640 content = dict(status='ok')
621 641 # loads = {}
622 642 for t in targets:
623 content[str(t)] = len(self.queues[t])
643 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
624 644 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
625 645
626 646
627 647 def queue_status(self, client_id, msg):
628 """handle queue_status request"""
648 """Return the Queue status of one or more targets.
649 if verbose: return the msg_ids
650 else: return len of each type.
651 keys: queue (pending MUX jobs)
652 tasks (pending Task jobs)
653 completed (finished jobs from both queues)"""
629 654 content = msg['content']
630 655 targets = content['targets']
631 656 try:
@@ -635,19 +660,23 b' class Controller(object):'
635 660 self.session.send(self.clientele, "controller_error",
636 661 content=content, ident=client_id)
637 662 return
638 verbose = msg.get('verbose', False)
639 content = dict()
663 verbose = content.get('verbose', False)
664 content = dict(status='ok')
640 665 for t in targets:
641 666 queue = self.queues[t]
642 667 completed = self.completed[t]
668 tasks = self.tasks[t]
643 669 if not verbose:
644 670 queue = len(queue)
645 671 completed = len(completed)
646 content[str(t)] = {'queue': queue, 'completed': completed }
672 tasks = len(tasks)
673 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
647 674 # pending
648 675 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
649 676
650 677 def purge_results(self, client_id, msg):
678 """Purge results from memory. This method is more valuable before we move
679 to a DB based message storage mechanism."""
651 680 content = msg['content']
652 681 msg_ids = content.get('msg_ids', [])
653 682 reply = dict(status='ok')
@@ -675,37 +704,11 b' class Controller(object):'
675 704 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
676 705
677 706 def resubmit_task(self, client_id, msg, buffers):
678 content = msg['content']
679 header = msg['header']
680
681
682 msg_ids = content.get('msg_ids', [])
683 reply = dict(status='ok')
684 if msg_ids == 'all':
685 self.results = {}
686 else:
687 for msg_id in msg_ids:
688 if msg_id in self.results:
689 self.results.pop(msg_id)
690 else:
691 if msg_id in self.pending:
692 reply = dict(status='error', reason="msg pending: %r"%msg_id)
693 else:
694 reply = dict(status='error', reason="No such msg: %r"%msg_id)
695 break
696 eids = content.get('engine_ids', [])
697 for eid in eids:
698 if eid not in self.engines:
699 reply = dict(status='error', reason="No such engine: %i"%eid)
700 break
701 msg_ids = self.completed.pop(eid)
702 for msg_id in msg_ids:
703 self.results.pop(msg_id)
704
705 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
707 """Resubmit a task."""
708 raise NotImplementedError
706 709
707 710 def get_results(self, client_id, msg):
708 """get the result of 1 or more messages"""
711 """Get the result of 1 or more messages."""
709 712 content = msg['content']
710 713 msg_ids = set(content['msg_ids'])
711 714 statusonly = content.get('status_only', False)
@@ -727,33 +730,12 b' class Controller(object):'
727 730 break
728 731 self.session.send(self.clientele, "result_reply", content=content,
729 732 parent=msg, ident=client_id)
730
731 733
732 734
733 ############ OLD METHODS for Python Relay Controller ###################
734 def _validate_engine_msg(self, msg):
735 """validates and unpacks headers of a message. Returns False if invalid,
736 (ident, message)"""
737 ident = msg[0]
738 try:
739 msg = self.session.unpack_message(msg[1:], content=False)
740 except:
741 logger.error("engine.%s::Invalid Message %s"%(ident, msg))
742 return False
743
744 try:
745 eid = msg.header.username
746 assert self.engines.has_key(eid)
747 except:
748 logger.error("engine::Invalid Engine ID %s"%(ident))
749 return False
750
751 return eid, msg
752
753
754 #--------------------
735 #-------------------------------------------------------------------------
755 736 # Entry Point
756 #--------------------
737 #-------------------------------------------------------------------------
738
757 739 def make_argument_parser():
758 740 """Make an argument parser"""
759 741 parser = make_base_argument_parser()
@@ -130,7 +130,7 b' class HeartMonitor(object):'
130 130 for handler in self._failure_handlers:
131 131 try:
132 132 handler(heart)
133 except Exception, e:
133 except Exception as e:
134 134 print (e)
135 135 logger.error("heartbeat::Bad Handler! %s"%handler)
136 136 pass
@@ -1,3 +1,10 b''
1 """The Python scheduler for rich scheduling.
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
6 """
7
1 8 #----------------------------------------------------------------------
2 9 # Imports
3 10 #----------------------------------------------------------------------
@@ -40,7 +47,7 b' def plainrandom(loads):'
40 47 def lru(loads):
41 48 """Always pick the front of the line.
42 49
43 The content of loads is ignored.
50 The content of `loads` is ignored.
44 51
45 52 Assumes LRU ordering of loads, with oldest first.
46 53 """
@@ -151,10 +158,12 b' class TaskScheduler(object):'
151 158 self.notifier_stream.on_recv(self.dispatch_notification)
152 159
153 160 def resume_receiving(self):
154 """resume accepting jobs"""
161 """Resume accepting jobs."""
155 162 self.client_stream.on_recv(self.dispatch_submission, copy=False)
156 163
157 164 def stop_receiving(self):
165 """Stop accepting jobs while there are no engines.
166 Leave them in the ZMQ queue."""
158 167 self.client_stream.on_recv(None)
159 168
160 169 #-----------------------------------------------------------------------
@@ -176,7 +185,7 b' class TaskScheduler(object):'
176 185 logger.error("task::Invalid notification msg: %s"%msg)
177 186 @logged
178 187 def _register_engine(self, uid):
179 """new engine became available"""
188 """New engine with ident `uid` became available."""
180 189 # head of the line:
181 190 self.targets.insert(0,uid)
182 191 self.loads.insert(0,0)
@@ -187,10 +196,12 b' class TaskScheduler(object):'
187 196 self.resume_receiving()
188 197
189 198 def _unregister_engine(self, uid):
190 """existing engine became unavailable"""
191 # handle any potentially finished tasks:
199 """Existing engine with ident `uid` became unavailable."""
192 200 if len(self.targets) == 1:
201 # this was our only engine
193 202 self.stop_receiving()
203
204 # handle any potentially finished tasks:
194 205 self.engine_stream.flush()
195 206
196 207 self.completed.pop(uid)
@@ -203,7 +214,7 b' class TaskScheduler(object):'
203 214 self.handle_stranded_tasks(lost)
204 215
205 216 def handle_stranded_tasks(self, lost):
206 """deal with jobs resident in an engine that died."""
217 """Deal with jobs resident in an engine that died."""
207 218 # TODO: resubmit the tasks?
208 219 for msg_id in lost:
209 220 pass
@@ -214,26 +225,29 b' class TaskScheduler(object):'
214 225 #-----------------------------------------------------------------------
215 226 @logged
216 227 def dispatch_submission(self, raw_msg):
217 """dispatch job submission"""
228 """Dispatch job submission to appropriate handlers."""
218 229 # ensure targets up to date:
219 230 self.notifier_stream.flush()
220 231 try:
221 232 idents, msg = self.session.feed_identities(raw_msg, copy=False)
222 except Exception, e:
233 except Exception as e:
223 234 logger.error("task::Invaid msg: %s"%msg)
224 235 return
225 236
226 237 msg = self.session.unpack_message(msg, content=False, copy=False)
227 238 header = msg['header']
228 239 msg_id = header['msg_id']
240
241 # time dependencies
229 242 after = Dependency(header.get('after', []))
230 243 if after.mode == 'all':
231 244 after.difference_update(self.all_done)
232 245 if after.check(self.all_done):
233 # recast as empty set, if we are already met,
234 # to prevent
246 # recast as empty set, if `after` already met,
247 # to prevent unnecessary set comparisons
235 248 after = Dependency([])
236 249
250 # location dependencies
237 251 follow = Dependency(header.get('follow', []))
238 252 if len(after) == 0:
239 253 # time deps already met, try to run
@@ -244,6 +258,7 b' class TaskScheduler(object):'
244 258 self.save_unmet(msg_id, raw_msg, after, follow)
245 259 # send to monitor
246 260 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
261
247 262 @logged
248 263 def maybe_run(self, msg_id, raw_msg, follow=None):
249 264 """check location dependencies, and run if they are met."""
@@ -276,7 +291,7 b' class TaskScheduler(object):'
276 291
277 292 @logged
278 293 def submit_task(self, msg_id, msg, follow=None, indices=None):
279 """submit a task to any of a subset of our targets"""
294 """Submit a task to any of a subset of our targets."""
280 295 if indices:
281 296 loads = [self.loads[i] for i in indices]
282 297 else:
@@ -290,6 +305,8 b' class TaskScheduler(object):'
290 305 self.engine_stream.send_multipart(msg, copy=False)
291 306 self.add_job(idx)
292 307 self.pending[target][msg_id] = (msg, follow)
308 content = dict(msg_id=msg_id, engine_id=target)
309 self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask')
293 310
294 311 #-----------------------------------------------------------------------
295 312 # Result Handling
@@ -298,7 +315,7 b' class TaskScheduler(object):'
298 315 def dispatch_result(self, raw_msg):
299 316 try:
300 317 idents,msg = self.session.feed_identities(raw_msg, copy=False)
301 except Exception, e:
318 except Exception as e:
302 319 logger.error("task::Invaid result: %s"%msg)
303 320 return
304 321 msg = self.session.unpack_message(msg, content=False, copy=False)
@@ -125,7 +125,7 b' class RawInput(object):'
125 125 while True:
126 126 try:
127 127 reply = self.socket.recv_json(zmq.NOBLOCK)
128 except zmq.ZMQError, e:
128 except zmq.ZMQError as e:
129 129 if e.errno == zmq.EAGAIN:
130 130 pass
131 131 else:
@@ -171,7 +171,7 b' class Kernel(object):'
171 171 while True:
172 172 try:
173 173 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
174 except zmq.ZMQError, e:
174 except zmq.ZMQError as e:
175 175 if e.errno == zmq.EAGAIN:
176 176 break
177 177 else:
@@ -238,17 +238,6 b' class Kernel(object):'
238 238 else:
239 239 handler(self.control_stream, idents, msg)
240 240
241 # def flush_control(self):
242 # while any(zmq.select([self.control_socket],[],[],1e-4)):
243 # try:
244 # msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False)
245 # except zmq.ZMQError, e:
246 # if e.errno != zmq.EAGAIN:
247 # raise e
248 # return
249 # else:
250 # self.dispatch_control(msg)
251
252 241
253 242 #-------------------- queue helpers ------------------------------
254 243
@@ -8,6 +8,7 b' import sys'
8 8 import traceback
9 9 import pprint
10 10 import uuid
11 from datetime import datetime
11 12
12 13 import zmq
13 14 from zmq.utils import jsonapi
@@ -111,6 +112,7 b' class Message(object):'
111 112
112 113
113 114 def msg_header(msg_id, msg_type, username, session):
115 date=datetime.now().isoformat()
114 116 return locals()
115 117 # return {
116 118 # 'msg_id' : msg_id,
@@ -140,7 +142,7 b' def extract_header(msg_or_header):'
140 142 return h
141 143
142 144 def rekey(dikt):
143 """rekey a dict that has been forced to use str keys where there should be
145 """Rekey a dict that has been forced to use str keys where there should be
144 146 ints by json. This belongs in the jsonutil added by fperez."""
145 147 for k in dikt.iterkeys():
146 148 if isinstance(k, str):
@@ -162,11 +164,22 b' def rekey(dikt):'
162 164 return dikt
163 165
164 166 def serialize_object(obj, threshold=64e-6):
165 """serialize an object into a list of sendable buffers.
167 """Serialize an object into a list of sendable buffers.
166 168
167 Returns: (pmd, bufs)
168 where pmd is the pickled metadata wrapper, and bufs
169 is a list of data buffers"""
169 Parameters
170 ----------
171
172 obj : object
173 The object to be serialized
174 threshold : float
175 The threshold for not double-pickling the content.
176
177
178 Returns
179 -------
180 ('pmd', [bufs]) :
181 where pmd is the pickled metadata wrapper,
182 bufs is a list of data buffers"""
170 183 # threshold is 100 B
171 184 databuffers = []
172 185 if isinstance(obj, (list, tuple)):
@@ -318,6 +331,8 b' class StreamSession(object):'
318 331 Parameters
319 332 ----------
320 333
334 stream : zmq.Socket or ZMQStream
335 the socket-like object used to send the data
321 336 msg_type : str or Message/dict
322 337 Normally, msg_type will be
323 338
@@ -347,10 +362,7 b' class StreamSession(object):'
347 362 to_send.append(DELIM)
348 363 to_send.append(self.pack(msg['header']))
349 364 to_send.append(self.pack(msg['parent_header']))
350 # if parent is None:
351 # to_send.append(self.none)
352 # else:
353 # to_send.append(self.pack(dict(parent)))
365
354 366 if content is None:
355 367 content = self.none
356 368 elif isinstance(content, dict):
@@ -374,11 +386,10 b' class StreamSession(object):'
374 386 pprint.pprint(omsg)
375 387 pprint.pprint(to_send)
376 388 pprint.pprint(buffers)
377 # return both the msg object and the buffers
378 389 return omsg
379 390
380 391 def send_raw(self, stream, msg, flags=0, copy=True, idents=None):
381 """send a raw message via idents.
392 """Send a raw message via idents.
382 393
383 394 Parameters
384 395 ----------
@@ -399,7 +410,7 b' class StreamSession(object):'
399 410 socket = socket.socket
400 411 try:
401 412 msg = socket.recv_multipart(mode)
402 except zmq.ZMQError, e:
413 except zmq.ZMQError as e:
403 414 if e.errno == zmq.EAGAIN:
404 415 # We can convert EAGAIN to None as we know in this case
405 416 # recv_json won't return None.
@@ -412,7 +423,7 b' class StreamSession(object):'
412 423 idents, msg = self.feed_identities(msg, copy)
413 424 try:
414 425 return idents, self.unpack_message(msg, content=content, copy=copy)
415 except Exception, e:
426 except Exception as e:
416 427 print (idents, msg)
417 428 # TODO: handle it
418 429 raise e
@@ -4,18 +4,22 b''
4 4 Connection Diagrams of The IPython ZMQ Cluster
5 5 ==============================================
6 6
7 This is a quick summary and illustration of the connections involved in the ZeroMQ based IPython cluster for parallel computing.
7 This is a quick summary and illustration of the connections involved in the ZeroMQ based
8 IPython cluster for parallel computing.
8 9
9 10 All Connections
10 11 ===============
11 12
12 13 The Parallel Computing code is currently under development in Min RK's IPython fork_ on GitHub.
13 14
14 .. _fork: http://github.com/minrk/ipython
15 .. _fork: http://github.com/minrk/ipython/tree/newparallel
15 16
16 The IPython cluster consists of a Controller and one or more clients and engines. The goal of the Controller is to manage and monitor the connections and communications between the clients and the engines.
17 The IPython cluster consists of a Controller and one or more clients and engines. The goal
18 of the Controller is to manage and monitor the connections and communications between the
19 clients and the engines.
17 20
18 It is important for security/practicality reasons that all connections be inbound to the controller process. The arrows in the figures indicate the direction of the connection.
21 It is important for security/practicality reasons that all connections be inbound to the
22 controller process. The arrows in the figures indicate the direction of the connection.
19 23
20 24
21 25 .. figure:: figs/allconnections.png
@@ -25,8 +29,9 b' It is important for security/practicality reasons that all connections be inboun'
25 29
26 30 All the connections involved in connecting one client to one engine.
27 31
28 The Controller consists of two ZMQ Devices - both MonitoredQueues, one for Tasks (load balanced, engine agnostic), one for Multiplexing (explicit targets), a Python device for monitoring (the Heartbeat Monitor).
29
32 The Controller consists of two ZMQ Devices - both MonitoredQueues, one for Tasks (load
33 balanced, engine agnostic), one for Multiplexing (explicit targets), a Python device for
34 monitoring (the Heartbeat Monitor).
30 35
31 36
32 37 Registration
@@ -39,7 +44,10 b' Registration'
39 44
40 45 Engines and Clients only need to know where the Registrar ``XREP`` is located to start connecting.
41 46
42 Once a controller is launched, the only information needed for connecting clients and/or engines to the controller is the IP/port of the ``XREP`` socket called the Registrar. This socket handles connections from both clients and engines, and replies with the remaining information necessary to establish the remaining connections.
47 Once a controller is launched, the only information needed for connecting clients and/or
48 engines to the controller is the IP/port of the ``XREP`` socket called the Registrar. This
49 socket handles connections from both clients and engines, and replies with the remaining
50 information necessary to establish the remaining connections.
43 51
44 52 Heartbeat
45 53 ---------
@@ -51,25 +59,43 b' Heartbeat'
51 59
52 60 The heartbeat sockets.
53 61
54 The heartbeat process has been described elsewhere. To summarize: the controller publishes a distinct message periodically via a ``PUB`` socket. Each engine has a ``zmq.FORWARDER`` device with a ``SUB`` socket for input, and ``XREQ`` socket for output. The ``SUB`` socket is connected to the ``PUB`` socket labeled *HB(ping)*, and the ``XREQ`` is connected to the ``XREP`` labeled *HB(pong)*. This results in the same message being relayed back to the Heartbeat Monitor with the addition of the ``XREQ`` prefix. The Heartbeat Monitor receives all the replies via an ``XREP`` socket, and identifies which hearts are still beating by the ``zmq.IDENTITY`` prefix of the ``XREQ`` sockets.
62 The heartbeat process has been described elsewhere. To summarize: the controller publishes
63 a distinct message periodically via a ``PUB`` socket. Each engine has a ``zmq.FORWARDER``
64 device with a ``SUB`` socket for input, and ``XREQ`` socket for output. The ``SUB`` socket
65 is connected to the ``PUB`` socket labeled *HB(ping)*, and the ``XREQ`` is connected to
66 the ``XREP`` labeled *HB(pong)*. This results in the same message being relayed back to
67 the Heartbeat Monitor with the addition of the ``XREQ`` prefix. The Heartbeat Monitor
68 receives all the replies via an ``XREP`` socket, and identifies which hearts are still
69 beating by the ``zmq.IDENTITY`` prefix of the ``XREQ`` sockets.
55 70
56 71 Queues
57 72 ------
58 73
59 74 .. figure:: figs/queuefade.png
60 :width: 432px
61 :alt: IPython Queue connections
62 :align: center
63
64 Load balanced Task queue on the left, explicitly multiplexed queue on the right.
65
66 The controller has two MonitoredQueue devices. These devices are primarily for relaying messages between clients and engines, but the controller needs to see those messages for its own purposes. Since no Python code may exist between the two sockets in a queue, all messages sent through these queues (both directions) are also sent via a ``PUB`` socket to a monitor, which allows the Controller to monitor queue traffic without interfering with it.
67
68 For tasks, the engine need not be specified. Messages sent to the ``XREP`` socket from the client side are assigned to an engine via ZMQ's ``XREQ`` round-robin load balancing. Engine replies are directed to specific clients via the IDENTITY of the client, which is received as a prefix at the Engine.
69
70 For Multiplexing, ``XREP`` is used for both in and output sockets in the device. Clients must specify the destination by the ``zmq.IDENTITY`` of the ``PAIR`` socket connected to the downstream end of the device.
71
72 At the Kernel level, both of these PAIR sockets are treated in the same way as the ``REP`` socket in the serial version (except using ZMQStreams instead of explicit sockets).
75 :width: 432px
76 :alt: IPython Queue connections
77 :align: center
78
79 Load balanced Task queue on the left, explicitly multiplexed queue on the right.
80
81 The controller has two MonitoredQueue devices. These devices are primarily for relaying
82 messages between clients and engines, but the controller needs to see those messages for
83 its own purposes. Since no Python code may exist between the two sockets in a queue, all
84 messages sent through these queues (both directions) are also sent via a ``PUB`` socket to
85 a monitor, which allows the Controller to monitor queue traffic without interfering with
86 it.
87
88 For tasks, the engine need not be specified. Messages sent to the ``XREP`` socket from the
89 client side are assigned to an engine via ZMQ's ``XREQ`` round-robin load balancing.
90 Engine replies are directed to specific clients via the IDENTITY of the client, which is
91 received as a prefix at the Engine.
92
93 For Multiplexing, ``XREP`` is used for both in and output sockets in the device. Clients
94 must specify the destination by the ``zmq.IDENTITY`` of the ``PAIR`` socket connected to
95 the downstream end of the device.
96
97 At the Kernel level, both of these PAIR sockets are treated in the same way as the ``REP``
98 socket in the serial version (except using ZMQStreams instead of explicit sockets).
73 99
74 100 Client connections
75 101 ------------------
@@ -81,7 +107,8 b' Client connections'
81 107
82 108 Clients connect to an ``XREP`` socket to query the controller
83 109
84 The controller listens on an ``XREP`` socket for queries from clients as to queue status, and control instructions. Clients can connect to this via a PAIR socket or ``XREQ``.
110 The controller listens on an ``XREP`` socket for queries from clients as to queue status,
111 and control instructions. Clients can connect to this via a PAIR socket or ``XREQ``.
85 112
86 113 .. figure:: figs/notiffade.png
87 114 :width: 432px
@@ -90,5 +117,8 b' The controller listens on an ``XREP`` socket for queries from clients as to queu'
90 117
91 118 Engine registration events are published via a ``PUB`` socket.
92 119
93 The controller publishes all registration/unregistration events via a ``PUB`` socket. This allows clients to stay up to date with what engines are available by subscribing to the feed with a ``SUB`` socket. Other processes could selectively subscribe to just registration or unregistration events.
120 The controller publishes all registration/unregistration events via a ``PUB`` socket. This
121 allows clients to stay up to date with what engines are available by subscribing to the
122 feed with a ``SUB`` socket. Other processes could selectively subscribe to just
123 registration or unregistration events.
94 124
@@ -3,16 +3,20 b''
3 3 Messaging for Parallel Computing
4 4 ================================
5 5
6 This is an extension of the :ref:`messaging <messaging>` doc. Diagrams of the connections can be found in the :ref:`parallel connections <parallel_connections>` doc.
6 This is an extension of the :ref:`messaging <messaging>` doc. Diagrams of the connections
7 can be found in the :ref:`parallel connections <parallel_connections>` doc.
7 8
8 9
9
10 ZMQ messaging is also used in the parallel computing IPython system. All messages to/from kernels remain the same as the single kernel model, and are forwarded through a ZMQ Queue device. The controller receives all messages and replies in these channels, and saves results for future use.
10 ZMQ messaging is also used in the parallel computing IPython system. All messages to/from
11 kernels remain the same as the single kernel model, and are forwarded through a ZMQ Queue
12 device. The controller receives all messages and replies in these channels, and saves
13 results for future use.
11 14
12 15 The Controller
13 16 --------------
14 17
15 The controller is the central process of the IPython parallel computing model. It has 3 Devices:
18 The controller is the central process of the IPython parallel computing model. It has 3
19 Devices:
16 20
17 21 * Heartbeater
18 22 * Multiplexed Queue
@@ -29,9 +33,13 b' and 3 sockets:'
29 33 Registration (``XREP``)
30 34 ***********************
31 35
32 The first function of the Controller is to facilitate and monitor connections of clients and engines. Both client and engine registration are handled by the same socket, so only one ip/port pair is needed to connect any number of connections and clients.
36 The first function of the Controller is to facilitate and monitor connections of clients
37 and engines. Both client and engine registration are handled by the same socket, so only
38 one ip/port pair is needed to connect any number of connections and clients.
33 39
34 Engines register with the ``zmq.IDENTITY`` of their two ``XREQ`` sockets, one for the queue, which receives execute requests, and one for the heartbeat, which is used to monitor the survival of the Engine process.
40 Engines register with the ``zmq.IDENTITY`` of their two ``XREQ`` sockets, one for the
41 queue, which receives execute requests, and one for the heartbeat, which is used to
42 monitor the survival of the Engine process.
35 43
36 44 Message type: ``registration_request``::
37 45
@@ -40,7 +48,10 b' Message type: ``registration_request``::'
40 48 'heartbeat' : '1234-abcd-...' # the heartbeat XREQ id
41 49 }
42 50
43 The Controller replies to an Engine's registration request with the engine's integer ID, and all the remaining connection information for connecting the heartbeat process, and kernel socket(s). The message status will be an error if the Engine requests IDs that already in use.
51 The Controller replies to an Engine's registration request with the engine's integer ID,
52 and all the remaining connection information for connecting the heartbeat process, and
53 kernel queue socket(s). The message status will be an error if the Engine requests IDs that
54 already in use.
44 55
45 56 Message type: ``registration_reply``::
46 57
@@ -49,39 +60,49 b' Message type: ``registration_reply``::'
49 60 # if ok:
50 61 'id' : 0, # int, the engine id
51 62 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue
63 'control' : 'tcp://...', # addr for control queue
52 64 'heartbeat' : (a,b), # tuple containing two interfaces needed for heartbeat
53 'task' : 'tcp...', # addr for task queue, or None if no task queue running
65 'task' : 'tcp://...', # addr for task queue, or None if no task queue running
54 66 # if error:
55 67 'reason' : 'queue_id already registered'
56 68 }
57 69
58 Clients use the same socket to start their connections. Connection requests from clients need no information:
70 Clients use the same socket as engines to start their connections. Connection requests
71 from clients need no information:
59 72
60 73 Message type: ``connection_request``::
61 74
62 75 content = {}
63 76
64 The reply to a Client registration request contains the connection information for the multiplexer and load balanced queues, as well as the address for direct controller queries. If any of these addresses is `None`, that functionality is not available.
77 The reply to a Client registration request contains the connection information for the
78 multiplexer and load balanced queues, as well as the address for direct controller
79 queries. If any of these addresses is `None`, that functionality is not available.
65 80
66 81 Message type: ``connection_reply``::
67 82
68 83 content = {
69 84 'status' : 'ok', # or 'error'
70 85 # if ok:
71 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the queue
86 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue
72 87 'task' : 'tcp...', # addr for task queue, or None if no task queue running
73 'controller' : 'tcp...' # addr for controller methods, like queue_request, etc.
88 'query' : 'tcp...' # addr for methods to query the controller, like queue_request, etc.
89 'control' : 'tcp...' # addr for control methods, like abort, etc.
74 90 }
75 91
76 92 Heartbeat
77 93 *********
78 94
79 The controller uses a heartbeat system to monitor engines, and track when they become unresponsive. As described in :ref:`messages <messages>`, and shown in :ref:`connections <parallel_connections>`.
95 The controller uses a heartbeat system to monitor engines, and track when they become
96 unresponsive. As described in :ref:`messages <messages>`, and shown in :ref:`connections
97 <parallel_connections>`.
80 98
81 99 Notification (``PUB``)
82 100 **********************
83 101
84 The controller published all engine registration/unregistration events on a PUB socket. This allows clients to have up-to-date engine ID sets without polling. Registration notifications contain both the integer engine ID and the queue ID, which is necessary for sending messages via the Multiplexer Queue.
102 The controller published all engine registration/unregistration events on a PUB socket.
103 This allows clients to have up-to-date engine ID sets without polling. Registration
104 notifications contain both the integer engine ID and the queue ID, which is necessary for
105 sending messages via the Multiplexer Queue.
85 106
86 107 Message type: ``registration_notification``::
87 108
@@ -100,9 +121,14 b' Message type : ``unregistration_notification``::'
100 121 Client Queries (``XREP``)
101 122 *************************
102 123
103 The controller monitors and logs all queue traffic, so that clients can retrieve past results or monitor pending tasks. Currently, this information resides in memory on the Controller, but will ultimately be offloaded to a database over an additional ZMQ connection. The interface should remain the same or at least similar.
124 The controller monitors and logs all queue traffic, so that clients can retrieve past
125 results or monitor pending tasks. Currently, this information resides in memory on the
126 Controller, but will ultimately be offloaded to a database over an additional ZMQ
127 connection. The interface should remain the same or at least similar.
104 128
105 :func:`queue_request` requests can specify multiple engines to query via the `targets` element. A verbose flag can be passed, to determine whether the result should be the list of `msg_ids` in the queue or simply the length of each list.
129 :func:`queue_request` requests can specify multiple engines to query via the `targets`
130 element. A verbose flag can be passed, to determine whether the result should be the list
131 of `msg_ids` in the queue or simply the length of each list.
106 132
107 133 Message type: ``queue_request``::
108 134
@@ -111,7 +137,9 b' Message type: ``queue_request``::'
111 137 'targets' : [0,3,1] # list of ints
112 138 }
113 139
114 The content of a reply to a :func:queue_request request is a dict, keyed by the engine IDs. Note that they will be the string representation of the integer keys, since JSON cannot handle number keys.
140 The content of a reply to a :func:queue_request request is a dict, keyed by the engine
141 IDs. Note that they will be the string representation of the integer keys, since JSON
142 cannot handle number keys.
115 143
116 144 Message type: ``queue_reply``::
117 145
@@ -120,15 +148,19 b' Message type: ``queue_reply``::'
120 148 '1' : {'completed' : 10, 'queue' : 1}
121 149 }
122 150
123 Clients can request individual results directly from the controller. This is primarily for use gathering results of executions not submitted by the particular client, as the client will have all its own results already. Requests are made by msg_id, and can contain one or more msg_id.
151 Clients can request individual results directly from the controller. This is primarily for
152 use gathering results of executions not submitted by the particular client, as the client
153 will have all its own results already. Requests are made by msg_id, and can contain one or
154 more msg_id.
124 155
125 156 Message type: ``result_request``::
126 157
127 158 content = {
128 'msg_ids' : [uuid,'...'] # list of strs
159 'msg_ids' : ['uuid','...'] # list of strs
129 160 }
130 161
131 The :func:`result_request` reply contains the content objects of the actual execution reply messages
162 The :func:`result_request` reply contains the content objects of the actual execution
163 reply messages
132 164
133 165
134 166 Message type: ``result_reply``::
@@ -139,13 +171,18 b' Message type: ``result_reply``::'
139 171 msg_id : msg, # the content dict is keyed by msg_ids,
140 172 # values are the result messages
141 173 'pending' : ['msg_id','...'], # msg_ids still pending
174 'completed' : ['msg_id','...'], # list of completed msg_ids
142 175 # if error:
143 176 'reason' : "explanation"
144 177 }
145 178
146 Clients can also instruct the controller to forget the results of messages. This can be done by message ID or engine ID. Individual messages are dropped by msg_id, and all messages completed on an engine are dropped by engine ID.
179 For memory management purposes, Clients can also instruct the controller to forget the
180 results of messages. This can be done by message ID or engine ID. Individual messages are
181 dropped by msg_id, and all messages completed on an engine are dropped by engine ID. This will likely no longer
182 be necessary once we move to a DB-based message logging backend.
147 183
148 If the msg_ids element is the string ``'all'`` instead of a list, then all completed results are forgotten.
184 If the msg_ids element is the string ``'all'`` instead of a list, then all completed
185 results are forgotten.
149 186
150 187 Message type: ``purge_request``::
151 188
@@ -154,7 +191,9 b' Message type: ``purge_request``::'
154 191 'engine_ids' : [0,2,4] # list of engine IDs
155 192 }
156 193
157 The reply to a purge request is simply the status 'ok' if the request succeeded, or an explanation of why it failed, such as requesting the purge of a nonexistent or pending message.
194 The reply to a purge request is simply the status 'ok' if the request succeeded, or an
195 explanation of why it failed, such as requesting the purge of a nonexistent or pending
196 message.
158 197
159 198 Message type: ``purge_reply``::
160 199
@@ -168,18 +207,29 b' Message type: ``purge_reply``::'
168 207 :func:`apply` and :func:`apply_bound`
169 208 *************************************
170 209
171 The `Namespace <http://gist.github.com/483294>`_ model suggests that execution be able to use the model::
210 The `Namespace <http://gist.github.com/483294>`_ model suggests that execution be able to
211 use the model::
172 212
173 213 client.apply(f, *args, **kwargs)
174 214
175 which takes `f`, a function in the user's namespace, and executes ``f(*args, **kwargs)`` on a remote engine, returning the result (or, for non-blocking, information facilitating later retrieval of the result). This model, unlike the execute message which just uses a code string, must be able to send arbitrary (pickleable) Python objects. And ideally, copy as little data as we can. The `buffers` property of a Message was introduced for this purpose.
215 which takes `f`, a function in the user's namespace, and executes ``f(*args, **kwargs)``
216 on a remote engine, returning the result (or, for non-blocking, information facilitating
217 later retrieval of the result). This model, unlike the execute message which just uses a
218 code string, must be able to send arbitrary (pickleable) Python objects. And ideally, copy
219 as little data as we can. The `buffers` property of a Message was introduced for this
220 purpose.
176 221
177 Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a function signature and builds the correct buffer format.
222 Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a
223 function signature and builds the correct buffer format for minimal data copying (exactly
224 zero copies of numpy array data).
178 225
179 226 Message type: ``apply_request``::
180 227
181 228 content = {
182 'bound' : True # whether to execute in the engine's namespace or unbound
229 'bound' : True, # whether to execute in the engine's namespace or unbound
230 'after' : [msg_ids,...], # list of msg_ids or output of Dependency.as_dict()
231 'follow' : [msg_ids,...], # list of msg_ids or output of Dependency.as_dict()
232
183 233 }
184 234 buffers = ['...'] # at least 3 in length
185 235 # as built by build_apply_message(f,args,kwargs)
@@ -200,10 +250,22 b' Message type: ``apply_reply``::'
200 250 Implementation
201 251 --------------
202 252
203 There are a few differences in implementation between the `StreamSession` object used in the parallel computing fork and the `Session` object, the main one being that messages are sent in parts, rather than as a single serialized object. `StreamSession` objects also take pack/unpack functions, which are to be used when serializing/deserializing objects. These can be any functions that translate to/from formats that ZMQ sockets can send (buffers,bytes, etc.).
253 There are a few differences in implementation between the `StreamSession` object used in
254 the parallel computing fork and the `Session` object, the main one being that messages are
255 sent in parts, rather than as a single serialized object. `StreamSession` objects also
256 take pack/unpack functions, which are to be used when serializing/deserializing objects.
257 These can be any functions that translate to/from formats that ZMQ sockets can send
258 (buffers,bytes, etc.).
204 259
205 260 Split Sends
206 261 ***********
207 262
208 Previously, messages were bundled as a single json object and one call to :func:`socket.send_json`. Since the controller inspects all messages, and doesn't need to see the content of the messages, which can be large, messages are serialized and sent in pieces. All messages are sent in at least 3 parts: the header, the parent header, and the content. This allows the controller to unpack and inspect the (always small) header, without spending time unpacking the content unless the message is bound for the controller. Buffers are added on to the end of the message, and can be any objects that present the buffer interface.
263 Previously, messages were bundled as a single json object and one call to
264 :func:`socket.send_json`. Since the controller inspects all messages, and doesn't need to
265 see the content of the messages, which can be large, messages are now serialized and sent in
266 pieces. All messages are sent in at least 3 parts: the header, the parent header, and the
267 content. This allows the controller to unpack and inspect the (always small) header,
268 without spending time unpacking the content unless the message is bound for the
269 controller. Buffers are added on to the end of the message, and can be any objects that
270 present the buffer interface.
209 271
@@ -69,11 +69,11 b' New features'
69 69 :mod:`~IPython.external.argparse` to parse command line options for
70 70 :command:`ipython`.
71 71
72 * New top level :func:`~IPython.core.embed.embed` function that can be called
72 * New top level :func:`~IPython.frontend.terminal.embed.embed` function that can be called
73 73 to embed IPython at any place in user's code. One the first call it will
74 create an :class:`~IPython.core.embed.InteractiveShellEmbed` instance and
74 create an :class:`~IPython.frontend.terminal.embed.InteractiveShellEmbed` instance and
75 75 call it. In later calls, it just calls the previously created
76 :class:`~IPython.core.embed.InteractiveShellEmbed`.
76 :class:`~IPython.frontend.terminal.embed.InteractiveShellEmbed`.
77 77
78 78 * Created a component system (:mod:`IPython.core.component`) that is based on
79 79 :mod:`IPython.utils.traitlets`. Components are arranged into a runtime
General Comments 0
You need to be logged in to leave comments. Login now