##// END OF EJS Templates
Controller renamed to Hub (keeping ipcontrollerz)
MinRK -
Show More
This diff has been collapsed as it changes many lines, (837 lines changed) Show them Hide them
@@ -0,0 +1,837 b''
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
5 """
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
8 #
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
12
13 #-----------------------------------------------------------------------------
14 # Imports
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
17
18 import sys
19 from datetime import datetime
20 import time
21
22 import zmq
23 from zmq.eventloop import ioloop
24
25 # internal:
26 from IPython.zmq.log import logger # a Logger object
27
28 from streamsession import Message, wrap_exception, ISO8601
29
30 try:
31 from pymongo.binary import Binary
32 except ImportError:
33 MongoDB=None
34 else:
35 from mongodb import MongoDB
36
37 #-----------------------------------------------------------------------------
38 # Code
39 #-----------------------------------------------------------------------------
40
41 def _passer(*args, **kwargs):
42 return
43
44 def init_record(msg):
45 """return an empty TaskRecord dict, with all keys initialized with None."""
46 header = msg['header']
47 return {
48 'msg_id' : header['msg_id'],
49 'header' : header,
50 'content': msg['content'],
51 'buffers': msg['buffers'],
52 'submitted': datetime.strptime(header['date'], ISO8601),
53 'client_uuid' : None,
54 'engine_uuid' : None,
55 'started': None,
56 'completed': None,
57 'resubmitted': None,
58 'result_header' : None,
59 'result_content' : None,
60 'result_buffers' : None,
61 'queue' : None
62 }
63
64
65 class EngineConnector(object):
66 """A simple object for accessing the various zmq connections of an object.
67 Attributes are:
68 id (int): engine ID
69 uuid (str): uuid (unused?)
70 queue (str): identity of queue's XREQ socket
71 registration (str): identity of registration XREQ socket
72 heartbeat (str): identity of heartbeat XREQ socket
73 """
74 id=0
75 queue=None
76 control=None
77 registration=None
78 heartbeat=None
79 pending=None
80
81 def __init__(self, id, queue, registration, control, heartbeat=None):
82 logger.info("engine::Engine Connected: %i"%id)
83 self.id = id
84 self.queue = queue
85 self.registration = registration
86 self.control = control
87 self.heartbeat = heartbeat
88
89 class Hub(object):
90 """The IPython Controller Hub with 0MQ connections
91
92 Parameters
93 ==========
94 loop: zmq IOLoop instance
95 session: StreamSession object
96 <removed> context: zmq context for creating new connections (?)
97 queue: ZMQStream for monitoring the command queue (SUB)
98 registrar: ZMQStream for engine registration requests (XREP)
99 heartbeat: HeartMonitor object checking the pulse of the engines
100 clientele: ZMQStream for client connections (XREP)
101 not used for jobs, only query/control commands
102 notifier: ZMQStream for broadcasting engine registration changes (PUB)
103 db: connection to db for out of memory logging of commands
104 NotImplemented
105 engine_addrs: dict of zmq connection information for engines to connect
106 to the queues.
107 client_addrs: dict of zmq connection information for engines to connect
108 to the queues.
109 """
110 # internal data structures:
111 ids=None # engine IDs
112 keytable=None
113 engines=None
114 clients=None
115 hearts=None
116 pending=None
117 results=None
118 tasks=None
119 completed=None
120 mia=None
121 incoming_registrations=None
122 registration_timeout=None
123
124 #objects from constructor:
125 loop=None
126 registrar=None
127 clientelle=None
128 queue=None
129 heartbeat=None
130 notifier=None
131 db=None
132 client_addr=None
133 engine_addrs=None
134
135
136 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
137 """
138 # universal:
139 loop: IOLoop for creating future connections
140 session: streamsession for sending serialized data
141 # engine:
142 queue: ZMQStream for monitoring queue messages
143 registrar: ZMQStream for engine registration
144 heartbeat: HeartMonitor object for tracking engines
145 # client:
146 clientele: ZMQStream for client connections
147 # extra:
148 db: ZMQStream for db connection (NotImplemented)
149 engine_addrs: zmq address/protocol dict for engine connections
150 client_addrs: zmq address/protocol dict for client connections
151 """
152 self.ids = set()
153 self.keytable={}
154 self.incoming_registrations={}
155 self.engines = {}
156 self.by_ident = {}
157 self.clients = {}
158 self.hearts = {}
159 # self.mia = set()
160
161 # self.sockets = {}
162 self.loop = loop
163 self.session = session
164 self.registrar = registrar
165 self.clientele = clientele
166 self.queue = queue
167 self.heartbeat = heartbeat
168 self.notifier = notifier
169 self.db = db
170
171 # validate connection dicts:
172 self.client_addrs = client_addrs
173 assert isinstance(client_addrs['queue'], str)
174 assert isinstance(client_addrs['control'], str)
175 # self.hb_addrs = hb_addrs
176 self.engine_addrs = engine_addrs
177 assert isinstance(engine_addrs['queue'], str)
178 assert isinstance(client_addrs['control'], str)
179 assert len(engine_addrs['heartbeat']) == 2
180
181 # register our callbacks
182 self.registrar.on_recv(self.dispatch_register_request)
183 self.clientele.on_recv(self.dispatch_client_msg)
184 self.queue.on_recv(self.dispatch_queue_traffic)
185
186 if heartbeat is not None:
187 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
188 heartbeat.add_new_heart_handler(self.handle_new_heart)
189
190 self.queue_handlers = { 'in' : self.save_queue_request,
191 'out': self.save_queue_result,
192 'intask': self.save_task_request,
193 'outtask': self.save_task_result,
194 'tracktask': self.save_task_destination,
195 'incontrol': _passer,
196 'outcontrol': _passer,
197 }
198
199 self.client_handlers = {'queue_request': self.queue_status,
200 'result_request': self.get_results,
201 'purge_request': self.purge_results,
202 'load_request': self.check_load,
203 'resubmit_request': self.resubmit_task,
204 'shutdown_request': self.shutdown_request,
205 }
206
207 self.registrar_handlers = {'registration_request' : self.register_engine,
208 'unregistration_request' : self.unregister_engine,
209 'connection_request': self.connection_request,
210 }
211 self.registration_timeout = max(5000, 2*self.heartbeat.period)
212 # this is the stuff that will move to DB:
213 # self.results = {} # completed results
214 self.pending = set() # pending messages, keyed by msg_id
215 self.queues = {} # pending msg_ids keyed by engine_id
216 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
217 self.completed = {} # completed msg_ids keyed by engine_id
218 self.all_completed = set()
219
220 logger.info("controller::created controller")
221
222 def _new_id(self):
223 """gemerate a new ID"""
224 newid = 0
225 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
226 # print newid, self.ids, self.incoming_registrations
227 while newid in self.ids or newid in incoming:
228 newid += 1
229 return newid
230
231 #-----------------------------------------------------------------------------
232 # message validation
233 #-----------------------------------------------------------------------------
234
235 def _validate_targets(self, targets):
236 """turn any valid targets argument into a list of integer ids"""
237 if targets is None:
238 # default to all
239 targets = self.ids
240
241 if isinstance(targets, (int,str,unicode)):
242 # only one target specified
243 targets = [targets]
244 _targets = []
245 for t in targets:
246 # map raw identities to ids
247 if isinstance(t, (str,unicode)):
248 t = self.by_ident.get(t, t)
249 _targets.append(t)
250 targets = _targets
251 bad_targets = [ t for t in targets if t not in self.ids ]
252 if bad_targets:
253 raise IndexError("No Such Engine: %r"%bad_targets)
254 if not targets:
255 raise IndexError("No Engines Registered")
256 return targets
257
258 def _validate_client_msg(self, msg):
259 """validates and unpacks headers of a message. Returns False if invalid,
260 (ident, header, parent, content)"""
261 client_id = msg[0]
262 try:
263 msg = self.session.unpack_message(msg[1:], content=True)
264 except:
265 logger.error("client::Invalid Message %s"%msg)
266 return False
267
268 msg_type = msg.get('msg_type', None)
269 if msg_type is None:
270 return False
271 header = msg.get('header')
272 # session doesn't handle split content for now:
273 return client_id, msg
274
275
276 #-----------------------------------------------------------------------------
277 # dispatch methods (1 per stream)
278 #-----------------------------------------------------------------------------
279
280 def dispatch_register_request(self, msg):
281 """"""
282 logger.debug("registration::dispatch_register_request(%s)"%msg)
283 idents,msg = self.session.feed_identities(msg)
284 if not idents:
285 logger.error("Bad Queue Message: %s"%msg, exc_info=True)
286 return
287 try:
288 msg = self.session.unpack_message(msg,content=True)
289 except:
290 logger.error("registration::got bad registration message: %s"%msg, exc_info=True)
291 return
292
293 msg_type = msg['msg_type']
294 content = msg['content']
295
296 handler = self.registrar_handlers.get(msg_type, None)
297 if handler is None:
298 logger.error("registration::got bad registration message: %s"%msg)
299 else:
300 handler(idents, msg)
301
302 def dispatch_queue_traffic(self, msg):
303 """all ME and Task queue messages come through here"""
304 logger.debug("queue traffic: %s"%msg[:2])
305 switch = msg[0]
306 idents, msg = self.session.feed_identities(msg[1:])
307 if not idents:
308 logger.error("Bad Queue Message: %s"%msg)
309 return
310 handler = self.queue_handlers.get(switch, None)
311 if handler is not None:
312 handler(idents, msg)
313 else:
314 logger.error("Invalid message topic: %s"%switch)
315
316
317 def dispatch_client_msg(self, msg):
318 """Route messages from clients"""
319 idents, msg = self.session.feed_identities(msg)
320 if not idents:
321 logger.error("Bad Client Message: %s"%msg)
322 return
323 client_id = idents[0]
324 try:
325 msg = self.session.unpack_message(msg, content=True)
326 except:
327 content = wrap_exception()
328 logger.error("Bad Client Message: %s"%msg, exc_info=True)
329 self.session.send(self.clientele, "controller_error", ident=client_id,
330 content=content)
331 return
332
333 # print client_id, header, parent, content
334 #switch on message type:
335 msg_type = msg['msg_type']
336 logger.info("client:: client %s requested %s"%(client_id, msg_type))
337 handler = self.client_handlers.get(msg_type, None)
338 try:
339 assert handler is not None, "Bad Message Type: %s"%msg_type
340 except:
341 content = wrap_exception()
342 logger.error("Bad Message Type: %s"%msg_type, exc_info=True)
343 self.session.send(self.clientele, "controller_error", ident=client_id,
344 content=content)
345 return
346 else:
347 handler(client_id, msg)
348
349 def dispatch_db(self, msg):
350 """"""
351 raise NotImplementedError
352
353 #---------------------------------------------------------------------------
354 # handler methods (1 per event)
355 #---------------------------------------------------------------------------
356
357 #----------------------- Heartbeat --------------------------------------
358
359 def handle_new_heart(self, heart):
360 """handler to attach to heartbeater.
361 Called when a new heart starts to beat.
362 Triggers completion of registration."""
363 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
364 if heart not in self.incoming_registrations:
365 logger.info("heartbeat::ignoring new heart: %r"%heart)
366 else:
367 self.finish_registration(heart)
368
369
370 def handle_heart_failure(self, heart):
371 """handler to attach to heartbeater.
372 called when a previously registered heart fails to respond to beat request.
373 triggers unregistration"""
374 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
375 eid = self.hearts.get(heart, None)
376 queue = self.engines[eid].queue
377 if eid is None:
378 logger.info("heartbeat::ignoring heart failure %r"%heart)
379 else:
380 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
381
382 #----------------------- MUX Queue Traffic ------------------------------
383
384 def save_queue_request(self, idents, msg):
385 if len(idents) < 2:
386 logger.error("invalid identity prefix: %s"%idents)
387 return
388 queue_id, client_id = idents[:2]
389 try:
390 msg = self.session.unpack_message(msg, content=False)
391 except:
392 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
393 return
394
395 eid = self.by_ident.get(queue_id, None)
396 if eid is None:
397 logger.error("queue::target %r not registered"%queue_id)
398 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
399 return
400
401 header = msg['header']
402 msg_id = header['msg_id']
403 record = init_record(msg)
404 record['engine_uuid'] = queue_id
405 record['client_uuid'] = client_id
406 record['queue'] = 'mux'
407 if MongoDB is not None and isinstance(self.db, MongoDB):
408 record['buffers'] = map(Binary, record['buffers'])
409 self.pending.add(msg_id)
410 self.queues[eid].append(msg_id)
411 self.db.add_record(msg_id, record)
412
413 def save_queue_result(self, idents, msg):
414 if len(idents) < 2:
415 logger.error("invalid identity prefix: %s"%idents)
416 return
417
418 client_id, queue_id = idents[:2]
419 try:
420 msg = self.session.unpack_message(msg, content=False)
421 except:
422 logger.error("queue::engine %r sent invalid message to %r: %s"%(
423 queue_id,client_id, msg), exc_info=True)
424 return
425
426 eid = self.by_ident.get(queue_id, None)
427 if eid is None:
428 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
429 logger.debug("queue:: %s"%msg[2:])
430 return
431
432 parent = msg['parent_header']
433 if not parent:
434 return
435 msg_id = parent['msg_id']
436 if msg_id in self.pending:
437 self.pending.remove(msg_id)
438 self.all_completed.add(msg_id)
439 self.queues[eid].remove(msg_id)
440 self.completed[eid].append(msg_id)
441 rheader = msg['header']
442 completed = datetime.strptime(rheader['date'], ISO8601)
443 started = rheader.get('started', None)
444 if started is not None:
445 started = datetime.strptime(started, ISO8601)
446 result = {
447 'result_header' : rheader,
448 'result_content': msg['content'],
449 'started' : started,
450 'completed' : completed
451 }
452 if MongoDB is not None and isinstance(self.db, MongoDB):
453 result['result_buffers'] = map(Binary, msg['buffers'])
454 else:
455 result['result_buffers'] = msg['buffers']
456 self.db.update_record(msg_id, result)
457 else:
458 logger.debug("queue:: unknown msg finished %s"%msg_id)
459
460 #--------------------- Task Queue Traffic ------------------------------
461
462 def save_task_request(self, idents, msg):
463 """Save the submission of a task."""
464 client_id = idents[0]
465
466 try:
467 msg = self.session.unpack_message(msg, content=False)
468 except:
469 logger.error("task::client %r sent invalid task message: %s"%(
470 client_id, msg), exc_info=True)
471 return
472 record = init_record(msg)
473 if MongoDB is not None and isinstance(self.db, MongoDB):
474 record['buffers'] = map(Binary, record['buffers'])
475 record['client_uuid'] = client_id
476 record['queue'] = 'task'
477 header = msg['header']
478 msg_id = header['msg_id']
479 self.pending.add(msg_id)
480 self.db.add_record(msg_id, record)
481
482 def save_task_result(self, idents, msg):
483 """save the result of a completed task."""
484 client_id = idents[0]
485 try:
486 msg = self.session.unpack_message(msg, content=False)
487 except:
488 logger.error("task::invalid task result message send to %r: %s"%(
489 client_id, msg))
490 raise
491 return
492
493 parent = msg['parent_header']
494 if not parent:
495 # print msg
496 logger.warn("Task %r had no parent!"%msg)
497 return
498 msg_id = parent['msg_id']
499
500 header = msg['header']
501 engine_uuid = header.get('engine', None)
502 eid = self.by_ident.get(engine_uuid, None)
503
504 if msg_id in self.pending:
505 self.pending.remove(msg_id)
506 self.all_completed.add(msg_id)
507 if eid is not None:
508 self.completed[eid].append(msg_id)
509 if msg_id in self.tasks[eid]:
510 self.tasks[eid].remove(msg_id)
511 completed = datetime.strptime(header['date'], ISO8601)
512 started = header.get('started', None)
513 if started is not None:
514 started = datetime.strptime(started, ISO8601)
515 result = {
516 'result_header' : header,
517 'result_content': msg['content'],
518 'started' : started,
519 'completed' : completed,
520 'engine_uuid': engine_uuid
521 }
522 if MongoDB is not None and isinstance(self.db, MongoDB):
523 result['result_buffers'] = map(Binary, msg['buffers'])
524 else:
525 result['result_buffers'] = msg['buffers']
526 self.db.update_record(msg_id, result)
527
528 else:
529 logger.debug("task::unknown task %s finished"%msg_id)
530
531 def save_task_destination(self, idents, msg):
532 try:
533 msg = self.session.unpack_message(msg, content=True)
534 except:
535 logger.error("task::invalid task tracking message")
536 return
537 content = msg['content']
538 print (content)
539 msg_id = content['msg_id']
540 engine_uuid = content['engine_id']
541 eid = self.by_ident[engine_uuid]
542
543 logger.info("task::task %s arrived on %s"%(msg_id, eid))
544 # if msg_id in self.mia:
545 # self.mia.remove(msg_id)
546 # else:
547 # logger.debug("task::task %s not listed as MIA?!"%(msg_id))
548
549 self.tasks[eid].append(msg_id)
550 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
551 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
552
553 def mia_task_request(self, idents, msg):
554 raise NotImplementedError
555 client_id = idents[0]
556 # content = dict(mia=self.mia,status='ok')
557 # self.session.send('mia_reply', content=content, idents=client_id)
558
559
560
561 #-------------------------------------------------------------------------
562 # Registration requests
563 #-------------------------------------------------------------------------
564
565 def connection_request(self, client_id, msg):
566 """Reply with connection addresses for clients."""
567 logger.info("client::client %s connected"%client_id)
568 content = dict(status='ok')
569 content.update(self.client_addrs)
570 jsonable = {}
571 for k,v in self.keytable.iteritems():
572 jsonable[str(k)] = v
573 content['engines'] = jsonable
574 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
575
576 def register_engine(self, reg, msg):
577 """Register a new engine."""
578 content = msg['content']
579 try:
580 queue = content['queue']
581 except KeyError:
582 logger.error("registration::queue not specified")
583 return
584 heart = content.get('heartbeat', None)
585 """register a new engine, and create the socket(s) necessary"""
586 eid = self._new_id()
587 # print (eid, queue, reg, heart)
588
589 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
590
591 content = dict(id=eid,status='ok')
592 content.update(self.engine_addrs)
593 # check if requesting available IDs:
594 if queue in self.by_ident:
595 try:
596 raise KeyError("queue_id %r in use"%queue)
597 except:
598 content = wrap_exception()
599 elif heart in self.hearts: # need to check unique hearts?
600 try:
601 raise KeyError("heart_id %r in use"%heart)
602 except:
603 content = wrap_exception()
604 else:
605 for h, pack in self.incoming_registrations.iteritems():
606 if heart == h:
607 try:
608 raise KeyError("heart_id %r in use"%heart)
609 except:
610 content = wrap_exception()
611 break
612 elif queue == pack[1]:
613 try:
614 raise KeyError("queue_id %r in use"%queue)
615 except:
616 content = wrap_exception()
617 break
618
619 msg = self.session.send(self.registrar, "registration_reply",
620 content=content,
621 ident=reg)
622
623 if content['status'] == 'ok':
624 if heart in self.heartbeat.hearts:
625 # already beating
626 self.incoming_registrations[heart] = (eid,queue,reg,None)
627 self.finish_registration(heart)
628 else:
629 purge = lambda : self._purge_stalled_registration(heart)
630 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
631 dc.start()
632 self.incoming_registrations[heart] = (eid,queue,reg,dc)
633 else:
634 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
635 return eid
636
637 def unregister_engine(self, ident, msg):
638 """Unregister an engine that explicitly requested to leave."""
639 try:
640 eid = msg['content']['id']
641 except:
642 logger.error("registration::bad engine id for unregistration: %s"%ident)
643 return
644 logger.info("registration::unregister_engine(%s)"%eid)
645 content=dict(id=eid, queue=self.engines[eid].queue)
646 self.ids.remove(eid)
647 self.keytable.pop(eid)
648 ec = self.engines.pop(eid)
649 self.hearts.pop(ec.heartbeat)
650 self.by_ident.pop(ec.queue)
651 self.completed.pop(eid)
652 for msg_id in self.queues.pop(eid):
653 msg = self.pending.remove(msg_id)
654 ############## TODO: HANDLE IT ################
655
656 if self.notifier:
657 self.session.send(self.notifier, "unregistration_notification", content=content)
658
659 def finish_registration(self, heart):
660 """Second half of engine registration, called after our HeartMonitor
661 has received a beat from the Engine's Heart."""
662 try:
663 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
664 except KeyError:
665 logger.error("registration::tried to finish nonexistant registration")
666 return
667 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
668 if purge is not None:
669 purge.stop()
670 control = queue
671 self.ids.add(eid)
672 self.keytable[eid] = queue
673 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
674 self.by_ident[queue] = eid
675 self.queues[eid] = list()
676 self.tasks[eid] = list()
677 self.completed[eid] = list()
678 self.hearts[heart] = eid
679 content = dict(id=eid, queue=self.engines[eid].queue)
680 if self.notifier:
681 self.session.send(self.notifier, "registration_notification", content=content)
682
683 def _purge_stalled_registration(self, heart):
684 if heart in self.incoming_registrations:
685 eid = self.incoming_registrations.pop(heart)[0]
686 logger.info("registration::purging stalled registration: %i"%eid)
687 else:
688 pass
689
690 #-------------------------------------------------------------------------
691 # Client Requests
692 #-------------------------------------------------------------------------
693
694 def shutdown_request(self, client_id, msg):
695 """handle shutdown request."""
696 # s = self.context.socket(zmq.XREQ)
697 # s.connect(self.client_connections['mux'])
698 # time.sleep(0.1)
699 # for eid,ec in self.engines.iteritems():
700 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
701 # time.sleep(1)
702 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
703 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
704 dc.start()
705
706 def _shutdown(self):
707 logger.info("controller::controller shutting down.")
708 time.sleep(0.1)
709 sys.exit(0)
710
711
712 def check_load(self, client_id, msg):
713 content = msg['content']
714 try:
715 targets = content['targets']
716 targets = self._validate_targets(targets)
717 except:
718 content = wrap_exception()
719 self.session.send(self.clientele, "controller_error",
720 content=content, ident=client_id)
721 return
722
723 content = dict(status='ok')
724 # loads = {}
725 for t in targets:
726 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
727 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
728
729
730 def queue_status(self, client_id, msg):
731 """Return the Queue status of one or more targets.
732 if verbose: return the msg_ids
733 else: return len of each type.
734 keys: queue (pending MUX jobs)
735 tasks (pending Task jobs)
736 completed (finished jobs from both queues)"""
737 content = msg['content']
738 targets = content['targets']
739 try:
740 targets = self._validate_targets(targets)
741 except:
742 content = wrap_exception()
743 self.session.send(self.clientele, "controller_error",
744 content=content, ident=client_id)
745 return
746 verbose = content.get('verbose', False)
747 content = dict(status='ok')
748 for t in targets:
749 queue = self.queues[t]
750 completed = self.completed[t]
751 tasks = self.tasks[t]
752 if not verbose:
753 queue = len(queue)
754 completed = len(completed)
755 tasks = len(tasks)
756 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
757 # pending
758 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
759
760 def purge_results(self, client_id, msg):
761 """Purge results from memory. This method is more valuable before we move
762 to a DB based message storage mechanism."""
763 content = msg['content']
764 msg_ids = content.get('msg_ids', [])
765 reply = dict(status='ok')
766 if msg_ids == 'all':
767 self.db.drop_matching_records(dict(completed={'$ne':None}))
768 else:
769 for msg_id in msg_ids:
770 if msg_id in self.all_completed:
771 self.db.drop_record(msg_id)
772 else:
773 if msg_id in self.pending:
774 try:
775 raise IndexError("msg pending: %r"%msg_id)
776 except:
777 reply = wrap_exception()
778 else:
779 try:
780 raise IndexError("No such msg: %r"%msg_id)
781 except:
782 reply = wrap_exception()
783 break
784 eids = content.get('engine_ids', [])
785 for eid in eids:
786 if eid not in self.engines:
787 try:
788 raise IndexError("No such engine: %i"%eid)
789 except:
790 reply = wrap_exception()
791 break
792 msg_ids = self.completed.pop(eid)
793 uid = self.engines[eid].queue
794 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
795
796 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
797
798 def resubmit_task(self, client_id, msg, buffers):
799 """Resubmit a task."""
800 raise NotImplementedError
801
802 def get_results(self, client_id, msg):
803 """Get the result of 1 or more messages."""
804 content = msg['content']
805 msg_ids = sorted(set(content['msg_ids']))
806 statusonly = content.get('status_only', False)
807 pending = []
808 completed = []
809 content = dict(status='ok')
810 content['pending'] = pending
811 content['completed'] = completed
812 buffers = []
813 if not statusonly:
814 content['results'] = {}
815 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
816 for msg_id in msg_ids:
817 if msg_id in self.pending:
818 pending.append(msg_id)
819 elif msg_id in self.all_completed:
820 completed.append(msg_id)
821 if not statusonly:
822 rec = records[msg_id]
823 content[msg_id] = { 'result_content': rec['result_content'],
824 'header': rec['header'],
825 'result_header' : rec['result_header'],
826 }
827 buffers.extend(map(str, rec['result_buffers']))
828 else:
829 try:
830 raise KeyError('No such message: '+msg_id)
831 except:
832 content = wrap_exception()
833 break
834 self.session.send(self.clientele, "result_reply", content=content,
835 parent=msg, ident=client_id,
836 buffers=buffers)
837
This diff has been collapsed as it changes many lines, (834 lines changed) Show them Hide them
@@ -1,1035 +1,227 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller with 0MQ
2 """The IPython Controller with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
19 import os
18 import os
20 from datetime import datetime
21 import logging
22 import time
19 import time
23 import uuid
20 from multiprocessing import Process
24
21
25 import zmq
22 import zmq
26 from zmq.eventloop import zmqstream, ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
25 from zmq.devices import ProcessMonitoredQueue
27
26
28 # internal:
27 # internal:
29 from IPython.zmq.log import logger # a Logger object
30 from IPython.zmq.entry_point import bind_port
28 from IPython.zmq.entry_point import bind_port
31
29
32 from streamsession import Message, wrap_exception, ISO8601
30 from hub import Hub
33 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
31 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
34 connect_logger, parse_url, signal_children, generate_exec_key)
32 connect_logger, parse_url, signal_children, generate_exec_key)
33
34
35 import streamsession as session
36 import heartmonitor
37 from scheduler import launch_scheduler
38
35 from dictdb import DictDB
39 from dictdb import DictDB
36 try:
40 try:
37 from pymongo.binary import Binary
41 import pymongo
38 except ImportError:
42 except ImportError:
39 MongoDB=None
43 MongoDB=None
40 else:
44 else:
41 from mongodb import MongoDB
45 from mongodb import MongoDB
42
46
43 #-----------------------------------------------------------------------------
44 # Code
45 #-----------------------------------------------------------------------------
46
47 def _passer(*args, **kwargs):
48 return
49
50 def init_record(msg):
51 """return an empty TaskRecord dict, with all keys initialized with None."""
52 header = msg['header']
53 return {
54 'msg_id' : header['msg_id'],
55 'header' : header,
56 'content': msg['content'],
57 'buffers': msg['buffers'],
58 'submitted': datetime.strptime(header['date'], ISO8601),
59 'client_uuid' : None,
60 'engine_uuid' : None,
61 'started': None,
62 'completed': None,
63 'resubmitted': None,
64 'result_header' : None,
65 'result_content' : None,
66 'result_buffers' : None,
67 'queue' : None
68 }
69
70
71 class EngineConnector(object):
72 """A simple object for accessing the various zmq connections of an object.
73 Attributes are:
74 id (int): engine ID
75 uuid (str): uuid (unused?)
76 queue (str): identity of queue's XREQ socket
77 registration (str): identity of registration XREQ socket
78 heartbeat (str): identity of heartbeat XREQ socket
79 """
80 id=0
81 queue=None
82 control=None
83 registration=None
84 heartbeat=None
85 pending=None
86
87 def __init__(self, id, queue, registration, control, heartbeat=None):
88 logger.info("engine::Engine Connected: %i"%id)
89 self.id = id
90 self.queue = queue
91 self.registration = registration
92 self.control = control
93 self.heartbeat = heartbeat
94
95 class Controller(object):
96 """The IPython Controller with 0MQ connections
97
98 Parameters
99 ==========
100 loop: zmq IOLoop instance
101 session: StreamSession object
102 <removed> context: zmq context for creating new connections (?)
103 queue: ZMQStream for monitoring the command queue (SUB)
104 registrar: ZMQStream for engine registration requests (XREP)
105 heartbeat: HeartMonitor object checking the pulse of the engines
106 clientele: ZMQStream for client connections (XREP)
107 not used for jobs, only query/control commands
108 notifier: ZMQStream for broadcasting engine registration changes (PUB)
109 db: connection to db for out of memory logging of commands
110 NotImplemented
111 engine_addrs: dict of zmq connection information for engines to connect
112 to the queues.
113 client_addrs: dict of zmq connection information for engines to connect
114 to the queues.
115 """
116 # internal data structures:
117 ids=None # engine IDs
118 keytable=None
119 engines=None
120 clients=None
121 hearts=None
122 pending=None
123 results=None
124 tasks=None
125 completed=None
126 mia=None
127 incoming_registrations=None
128 registration_timeout=None
129
130 #objects from constructor:
131 loop=None
132 registrar=None
133 clientelle=None
134 queue=None
135 heartbeat=None
136 notifier=None
137 db=None
138 client_addr=None
139 engine_addrs=None
140
141
142 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
143 """
144 # universal:
145 loop: IOLoop for creating future connections
146 session: streamsession for sending serialized data
147 # engine:
148 queue: ZMQStream for monitoring queue messages
149 registrar: ZMQStream for engine registration
150 heartbeat: HeartMonitor object for tracking engines
151 # client:
152 clientele: ZMQStream for client connections
153 # extra:
154 db: ZMQStream for db connection (NotImplemented)
155 engine_addrs: zmq address/protocol dict for engine connections
156 client_addrs: zmq address/protocol dict for client connections
157 """
158 self.ids = set()
159 self.keytable={}
160 self.incoming_registrations={}
161 self.engines = {}
162 self.by_ident = {}
163 self.clients = {}
164 self.hearts = {}
165 # self.mia = set()
166
167 # self.sockets = {}
168 self.loop = loop
169 self.session = session
170 self.registrar = registrar
171 self.clientele = clientele
172 self.queue = queue
173 self.heartbeat = heartbeat
174 self.notifier = notifier
175 self.db = db
176
177 # validate connection dicts:
178 self.client_addrs = client_addrs
179 assert isinstance(client_addrs['queue'], str)
180 assert isinstance(client_addrs['control'], str)
181 # self.hb_addrs = hb_addrs
182 self.engine_addrs = engine_addrs
183 assert isinstance(engine_addrs['queue'], str)
184 assert isinstance(client_addrs['control'], str)
185 assert len(engine_addrs['heartbeat']) == 2
186
187 # register our callbacks
188 self.registrar.on_recv(self.dispatch_register_request)
189 self.clientele.on_recv(self.dispatch_client_msg)
190 self.queue.on_recv(self.dispatch_queue_traffic)
191
192 if heartbeat is not None:
193 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
194 heartbeat.add_new_heart_handler(self.handle_new_heart)
195
196 self.queue_handlers = { 'in' : self.save_queue_request,
197 'out': self.save_queue_result,
198 'intask': self.save_task_request,
199 'outtask': self.save_task_result,
200 'tracktask': self.save_task_destination,
201 'incontrol': _passer,
202 'outcontrol': _passer,
203 }
204
205 self.client_handlers = {'queue_request': self.queue_status,
206 'result_request': self.get_results,
207 'purge_request': self.purge_results,
208 'load_request': self.check_load,
209 'resubmit_request': self.resubmit_task,
210 'shutdown_request': self.shutdown_request,
211 }
212
213 self.registrar_handlers = {'registration_request' : self.register_engine,
214 'unregistration_request' : self.unregister_engine,
215 'connection_request': self.connection_request,
216 }
217 self.registration_timeout = max(5000, 2*self.heartbeat.period)
218 # this is the stuff that will move to DB:
219 # self.results = {} # completed results
220 self.pending = set() # pending messages, keyed by msg_id
221 self.queues = {} # pending msg_ids keyed by engine_id
222 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
223 self.completed = {} # completed msg_ids keyed by engine_id
224 self.all_completed = set()
225
226 logger.info("controller::created controller")
227
228 def _new_id(self):
229 """gemerate a new ID"""
230 newid = 0
231 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
232 # print newid, self.ids, self.incoming_registrations
233 while newid in self.ids or newid in incoming:
234 newid += 1
235 return newid
236
237 #-----------------------------------------------------------------------------
238 # message validation
239 #-----------------------------------------------------------------------------
240
241 def _validate_targets(self, targets):
242 """turn any valid targets argument into a list of integer ids"""
243 if targets is None:
244 # default to all
245 targets = self.ids
246
247 if isinstance(targets, (int,str,unicode)):
248 # only one target specified
249 targets = [targets]
250 _targets = []
251 for t in targets:
252 # map raw identities to ids
253 if isinstance(t, (str,unicode)):
254 t = self.by_ident.get(t, t)
255 _targets.append(t)
256 targets = _targets
257 bad_targets = [ t for t in targets if t not in self.ids ]
258 if bad_targets:
259 raise IndexError("No Such Engine: %r"%bad_targets)
260 if not targets:
261 raise IndexError("No Engines Registered")
262 return targets
263
264 def _validate_client_msg(self, msg):
265 """validates and unpacks headers of a message. Returns False if invalid,
266 (ident, header, parent, content)"""
267 client_id = msg[0]
268 try:
269 msg = self.session.unpack_message(msg[1:], content=True)
270 except:
271 logger.error("client::Invalid Message %s"%msg)
272 return False
273
274 msg_type = msg.get('msg_type', None)
275 if msg_type is None:
276 return False
277 header = msg.get('header')
278 # session doesn't handle split content for now:
279 return client_id, msg
280
281
282 #-----------------------------------------------------------------------------
283 # dispatch methods (1 per stream)
284 #-----------------------------------------------------------------------------
285
286 def dispatch_register_request(self, msg):
287 """"""
288 logger.debug("registration::dispatch_register_request(%s)"%msg)
289 idents,msg = self.session.feed_identities(msg)
290 if not idents:
291 logger.error("Bad Queue Message: %s"%msg, exc_info=True)
292 return
293 try:
294 msg = self.session.unpack_message(msg,content=True)
295 except:
296 logger.error("registration::got bad registration message: %s"%msg, exc_info=True)
297 return
298
299 msg_type = msg['msg_type']
300 content = msg['content']
301
302 handler = self.registrar_handlers.get(msg_type, None)
303 if handler is None:
304 logger.error("registration::got bad registration message: %s"%msg)
305 else:
306 handler(idents, msg)
307
308 def dispatch_queue_traffic(self, msg):
309 """all ME and Task queue messages come through here"""
310 logger.debug("queue traffic: %s"%msg[:2])
311 switch = msg[0]
312 idents, msg = self.session.feed_identities(msg[1:])
313 if not idents:
314 logger.error("Bad Queue Message: %s"%msg)
315 return
316 handler = self.queue_handlers.get(switch, None)
317 if handler is not None:
318 handler(idents, msg)
319 else:
320 logger.error("Invalid message topic: %s"%switch)
321
322
323 def dispatch_client_msg(self, msg):
324 """Route messages from clients"""
325 idents, msg = self.session.feed_identities(msg)
326 if not idents:
327 logger.error("Bad Client Message: %s"%msg)
328 return
329 client_id = idents[0]
330 try:
331 msg = self.session.unpack_message(msg, content=True)
332 except:
333 content = wrap_exception()
334 logger.error("Bad Client Message: %s"%msg, exc_info=True)
335 self.session.send(self.clientele, "controller_error", ident=client_id,
336 content=content)
337 return
338
339 # print client_id, header, parent, content
340 #switch on message type:
341 msg_type = msg['msg_type']
342 logger.info("client:: client %s requested %s"%(client_id, msg_type))
343 handler = self.client_handlers.get(msg_type, None)
344 try:
345 assert handler is not None, "Bad Message Type: %s"%msg_type
346 except:
347 content = wrap_exception()
348 logger.error("Bad Message Type: %s"%msg_type, exc_info=True)
349 self.session.send(self.clientele, "controller_error", ident=client_id,
350 content=content)
351 return
352 else:
353 handler(client_id, msg)
354
355 def dispatch_db(self, msg):
356 """"""
357 raise NotImplementedError
358
359 #---------------------------------------------------------------------------
360 # handler methods (1 per event)
361 #---------------------------------------------------------------------------
362
363 #----------------------- Heartbeat --------------------------------------
364
365 def handle_new_heart(self, heart):
366 """handler to attach to heartbeater.
367 Called when a new heart starts to beat.
368 Triggers completion of registration."""
369 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
370 if heart not in self.incoming_registrations:
371 logger.info("heartbeat::ignoring new heart: %r"%heart)
372 else:
373 self.finish_registration(heart)
374
375
376 def handle_heart_failure(self, heart):
377 """handler to attach to heartbeater.
378 called when a previously registered heart fails to respond to beat request.
379 triggers unregistration"""
380 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
381 eid = self.hearts.get(heart, None)
382 queue = self.engines[eid].queue
383 if eid is None:
384 logger.info("heartbeat::ignoring heart failure %r"%heart)
385 else:
386 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
387
388 #----------------------- MUX Queue Traffic ------------------------------
389
390 def save_queue_request(self, idents, msg):
391 if len(idents) < 2:
392 logger.error("invalid identity prefix: %s"%idents)
393 return
394 queue_id, client_id = idents[:2]
395 try:
396 msg = self.session.unpack_message(msg, content=False)
397 except:
398 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
399 return
400
401 eid = self.by_ident.get(queue_id, None)
402 if eid is None:
403 logger.error("queue::target %r not registered"%queue_id)
404 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
405 return
406
407 header = msg['header']
408 msg_id = header['msg_id']
409 record = init_record(msg)
410 record['engine_uuid'] = queue_id
411 record['client_uuid'] = client_id
412 record['queue'] = 'mux'
413 if MongoDB is not None and isinstance(self.db, MongoDB):
414 record['buffers'] = map(Binary, record['buffers'])
415 self.pending.add(msg_id)
416 self.queues[eid].append(msg_id)
417 self.db.add_record(msg_id, record)
418
419 def save_queue_result(self, idents, msg):
420 if len(idents) < 2:
421 logger.error("invalid identity prefix: %s"%idents)
422 return
423
424 client_id, queue_id = idents[:2]
425 try:
426 msg = self.session.unpack_message(msg, content=False)
427 except:
428 logger.error("queue::engine %r sent invalid message to %r: %s"%(
429 queue_id,client_id, msg), exc_info=True)
430 return
431
432 eid = self.by_ident.get(queue_id, None)
433 if eid is None:
434 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
435 logger.debug("queue:: %s"%msg[2:])
436 return
437
438 parent = msg['parent_header']
439 if not parent:
440 return
441 msg_id = parent['msg_id']
442 if msg_id in self.pending:
443 self.pending.remove(msg_id)
444 self.all_completed.add(msg_id)
445 self.queues[eid].remove(msg_id)
446 self.completed[eid].append(msg_id)
447 rheader = msg['header']
448 completed = datetime.strptime(rheader['date'], ISO8601)
449 started = rheader.get('started', None)
450 if started is not None:
451 started = datetime.strptime(started, ISO8601)
452 result = {
453 'result_header' : rheader,
454 'result_content': msg['content'],
455 'started' : started,
456 'completed' : completed
457 }
458 if MongoDB is not None and isinstance(self.db, MongoDB):
459 result['result_buffers'] = map(Binary, msg['buffers'])
460 else:
461 result['result_buffers'] = msg['buffers']
462 self.db.update_record(msg_id, result)
463 else:
464 logger.debug("queue:: unknown msg finished %s"%msg_id)
465
466 #--------------------- Task Queue Traffic ------------------------------
467
468 def save_task_request(self, idents, msg):
469 """Save the submission of a task."""
470 client_id = idents[0]
471
472 try:
473 msg = self.session.unpack_message(msg, content=False)
474 except:
475 logger.error("task::client %r sent invalid task message: %s"%(
476 client_id, msg), exc_info=True)
477 return
478 record = init_record(msg)
479 if MongoDB is not None and isinstance(self.db, MongoDB):
480 record['buffers'] = map(Binary, record['buffers'])
481 record['client_uuid'] = client_id
482 record['queue'] = 'task'
483 header = msg['header']
484 msg_id = header['msg_id']
485 self.pending.add(msg_id)
486 self.db.add_record(msg_id, record)
487
488 def save_task_result(self, idents, msg):
489 """save the result of a completed task."""
490 client_id = idents[0]
491 try:
492 msg = self.session.unpack_message(msg, content=False)
493 except:
494 logger.error("task::invalid task result message send to %r: %s"%(
495 client_id, msg))
496 raise
497 return
498
499 parent = msg['parent_header']
500 if not parent:
501 # print msg
502 logger.warn("Task %r had no parent!"%msg)
503 return
504 msg_id = parent['msg_id']
505
506 header = msg['header']
507 engine_uuid = header.get('engine', None)
508 eid = self.by_ident.get(engine_uuid, None)
509
510 if msg_id in self.pending:
511 self.pending.remove(msg_id)
512 self.all_completed.add(msg_id)
513 if eid is not None:
514 self.completed[eid].append(msg_id)
515 if msg_id in self.tasks[eid]:
516 self.tasks[eid].remove(msg_id)
517 completed = datetime.strptime(header['date'], ISO8601)
518 started = header.get('started', None)
519 if started is not None:
520 started = datetime.strptime(started, ISO8601)
521 result = {
522 'result_header' : header,
523 'result_content': msg['content'],
524 'started' : started,
525 'completed' : completed,
526 'engine_uuid': engine_uuid
527 }
528 if MongoDB is not None and isinstance(self.db, MongoDB):
529 result['result_buffers'] = map(Binary, msg['buffers'])
530 else:
531 result['result_buffers'] = msg['buffers']
532 self.db.update_record(msg_id, result)
533
534 else:
535 logger.debug("task::unknown task %s finished"%msg_id)
536
537 def save_task_destination(self, idents, msg):
538 try:
539 msg = self.session.unpack_message(msg, content=True)
540 except:
541 logger.error("task::invalid task tracking message")
542 return
543 content = msg['content']
544 print (content)
545 msg_id = content['msg_id']
546 engine_uuid = content['engine_id']
547 eid = self.by_ident[engine_uuid]
548
549 logger.info("task::task %s arrived on %s"%(msg_id, eid))
550 # if msg_id in self.mia:
551 # self.mia.remove(msg_id)
552 # else:
553 # logger.debug("task::task %s not listed as MIA?!"%(msg_id))
554
555 self.tasks[eid].append(msg_id)
556 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
557 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
558
559 def mia_task_request(self, idents, msg):
560 raise NotImplementedError
561 client_id = idents[0]
562 # content = dict(mia=self.mia,status='ok')
563 # self.session.send('mia_reply', content=content, idents=client_id)
564
565
566
567 #-------------------------------------------------------------------------
568 # Registration requests
569 #-------------------------------------------------------------------------
570
571 def connection_request(self, client_id, msg):
572 """Reply with connection addresses for clients."""
573 logger.info("client::client %s connected"%client_id)
574 content = dict(status='ok')
575 content.update(self.client_addrs)
576 jsonable = {}
577 for k,v in self.keytable.iteritems():
578 jsonable[str(k)] = v
579 content['engines'] = jsonable
580 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
581
582 def register_engine(self, reg, msg):
583 """Register a new engine."""
584 content = msg['content']
585 try:
586 queue = content['queue']
587 except KeyError:
588 logger.error("registration::queue not specified")
589 return
590 heart = content.get('heartbeat', None)
591 """register a new engine, and create the socket(s) necessary"""
592 eid = self._new_id()
593 # print (eid, queue, reg, heart)
594
595 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
596
597 content = dict(id=eid,status='ok')
598 content.update(self.engine_addrs)
599 # check if requesting available IDs:
600 if queue in self.by_ident:
601 try:
602 raise KeyError("queue_id %r in use"%queue)
603 except:
604 content = wrap_exception()
605 elif heart in self.hearts: # need to check unique hearts?
606 try:
607 raise KeyError("heart_id %r in use"%heart)
608 except:
609 content = wrap_exception()
610 else:
611 for h, pack in self.incoming_registrations.iteritems():
612 if heart == h:
613 try:
614 raise KeyError("heart_id %r in use"%heart)
615 except:
616 content = wrap_exception()
617 break
618 elif queue == pack[1]:
619 try:
620 raise KeyError("queue_id %r in use"%queue)
621 except:
622 content = wrap_exception()
623 break
624
625 msg = self.session.send(self.registrar, "registration_reply",
626 content=content,
627 ident=reg)
628
629 if content['status'] == 'ok':
630 if heart in self.heartbeat.hearts:
631 # already beating
632 self.incoming_registrations[heart] = (eid,queue,reg,None)
633 self.finish_registration(heart)
634 else:
635 purge = lambda : self._purge_stalled_registration(heart)
636 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
637 dc.start()
638 self.incoming_registrations[heart] = (eid,queue,reg,dc)
639 else:
640 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
641 return eid
642
643 def unregister_engine(self, ident, msg):
644 """Unregister an engine that explicitly requested to leave."""
645 try:
646 eid = msg['content']['id']
647 except:
648 logger.error("registration::bad engine id for unregistration: %s"%ident)
649 return
650 logger.info("registration::unregister_engine(%s)"%eid)
651 content=dict(id=eid, queue=self.engines[eid].queue)
652 self.ids.remove(eid)
653 self.keytable.pop(eid)
654 ec = self.engines.pop(eid)
655 self.hearts.pop(ec.heartbeat)
656 self.by_ident.pop(ec.queue)
657 self.completed.pop(eid)
658 for msg_id in self.queues.pop(eid):
659 msg = self.pending.remove(msg_id)
660 ############## TODO: HANDLE IT ################
661
662 if self.notifier:
663 self.session.send(self.notifier, "unregistration_notification", content=content)
664
665 def finish_registration(self, heart):
666 """Second half of engine registration, called after our HeartMonitor
667 has received a beat from the Engine's Heart."""
668 try:
669 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
670 except KeyError:
671 logger.error("registration::tried to finish nonexistant registration")
672 return
673 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
674 if purge is not None:
675 purge.stop()
676 control = queue
677 self.ids.add(eid)
678 self.keytable[eid] = queue
679 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
680 self.by_ident[queue] = eid
681 self.queues[eid] = list()
682 self.tasks[eid] = list()
683 self.completed[eid] = list()
684 self.hearts[heart] = eid
685 content = dict(id=eid, queue=self.engines[eid].queue)
686 if self.notifier:
687 self.session.send(self.notifier, "registration_notification", content=content)
688
689 def _purge_stalled_registration(self, heart):
690 if heart in self.incoming_registrations:
691 eid = self.incoming_registrations.pop(heart)[0]
692 logger.info("registration::purging stalled registration: %i"%eid)
693 else:
694 pass
695
696 #-------------------------------------------------------------------------
697 # Client Requests
698 #-------------------------------------------------------------------------
699
700 def shutdown_request(self, client_id, msg):
701 """handle shutdown request."""
702 # s = self.context.socket(zmq.XREQ)
703 # s.connect(self.client_connections['mux'])
704 # time.sleep(0.1)
705 # for eid,ec in self.engines.iteritems():
706 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
707 # time.sleep(1)
708 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
709 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
710 dc.start()
711
712 def _shutdown(self):
713 logger.info("controller::controller shutting down.")
714 time.sleep(0.1)
715 sys.exit(0)
716
717
718 def check_load(self, client_id, msg):
719 content = msg['content']
720 try:
721 targets = content['targets']
722 targets = self._validate_targets(targets)
723 except:
724 content = wrap_exception()
725 self.session.send(self.clientele, "controller_error",
726 content=content, ident=client_id)
727 return
728
729 content = dict(status='ok')
730 # loads = {}
731 for t in targets:
732 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
733 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
734
735
736 def queue_status(self, client_id, msg):
737 """Return the Queue status of one or more targets.
738 if verbose: return the msg_ids
739 else: return len of each type.
740 keys: queue (pending MUX jobs)
741 tasks (pending Task jobs)
742 completed (finished jobs from both queues)"""
743 content = msg['content']
744 targets = content['targets']
745 try:
746 targets = self._validate_targets(targets)
747 except:
748 content = wrap_exception()
749 self.session.send(self.clientele, "controller_error",
750 content=content, ident=client_id)
751 return
752 verbose = content.get('verbose', False)
753 content = dict(status='ok')
754 for t in targets:
755 queue = self.queues[t]
756 completed = self.completed[t]
757 tasks = self.tasks[t]
758 if not verbose:
759 queue = len(queue)
760 completed = len(completed)
761 tasks = len(tasks)
762 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
763 # pending
764 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
765
766 def purge_results(self, client_id, msg):
767 """Purge results from memory. This method is more valuable before we move
768 to a DB based message storage mechanism."""
769 content = msg['content']
770 msg_ids = content.get('msg_ids', [])
771 reply = dict(status='ok')
772 if msg_ids == 'all':
773 self.db.drop_matching_records(dict(completed={'$ne':None}))
774 else:
775 for msg_id in msg_ids:
776 if msg_id in self.all_completed:
777 self.db.drop_record(msg_id)
778 else:
779 if msg_id in self.pending:
780 try:
781 raise IndexError("msg pending: %r"%msg_id)
782 except:
783 reply = wrap_exception()
784 else:
785 try:
786 raise IndexError("No such msg: %r"%msg_id)
787 except:
788 reply = wrap_exception()
789 break
790 eids = content.get('engine_ids', [])
791 for eid in eids:
792 if eid not in self.engines:
793 try:
794 raise IndexError("No such engine: %i"%eid)
795 except:
796 reply = wrap_exception()
797 break
798 msg_ids = self.completed.pop(eid)
799 uid = self.engines[eid].queue
800 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
801
802 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
803
804 def resubmit_task(self, client_id, msg, buffers):
805 """Resubmit a task."""
806 raise NotImplementedError
807
808 def get_results(self, client_id, msg):
809 """Get the result of 1 or more messages."""
810 content = msg['content']
811 msg_ids = sorted(set(content['msg_ids']))
812 statusonly = content.get('status_only', False)
813 pending = []
814 completed = []
815 content = dict(status='ok')
816 content['pending'] = pending
817 content['completed'] = completed
818 buffers = []
819 if not statusonly:
820 content['results'] = {}
821 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
822 for msg_id in msg_ids:
823 if msg_id in self.pending:
824 pending.append(msg_id)
825 elif msg_id in self.all_completed:
826 completed.append(msg_id)
827 if not statusonly:
828 rec = records[msg_id]
829 content[msg_id] = { 'result_content': rec['result_content'],
830 'header': rec['header'],
831 'result_header' : rec['result_header'],
832 }
833 buffers.extend(map(str, rec['result_buffers']))
834 else:
835 try:
836 raise KeyError('No such message: '+msg_id)
837 except:
838 content = wrap_exception()
839 break
840 self.session.send(self.clientele, "result_reply", content=content,
841 parent=msg, ident=client_id,
842 buffers=buffers)
843
844
845 #-------------------------------------------------------------------------
47 #-------------------------------------------------------------------------
846 # Entry Point
48 # Entry Point
847 #-------------------------------------------------------------------------
49 #-------------------------------------------------------------------------
848
50
849 def make_argument_parser():
51 def make_argument_parser():
850 """Make an argument parser"""
52 """Make an argument parser"""
851 parser = make_base_argument_parser()
53 parser = make_base_argument_parser()
852
54
853 parser.add_argument('--client', type=int, metavar='PORT', default=0,
55 parser.add_argument('--client', type=int, metavar='PORT', default=0,
854 help='set the XREP port for clients [default: random]')
56 help='set the XREP port for clients [default: random]')
855 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
57 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
856 help='set the PUB socket for registration notification [default: random]')
58 help='set the PUB socket for registration notification [default: random]')
857 parser.add_argument('--hb', type=str, metavar='PORTS',
59 parser.add_argument('--hb', type=str, metavar='PORTS',
858 help='set the 2 ports for heartbeats [default: random]')
60 help='set the 2 ports for heartbeats [default: random]')
859 parser.add_argument('--ping', type=int, default=3000,
61 parser.add_argument('--ping', type=int, default=3000,
860 help='set the heartbeat period in ms [default: 3000]')
62 help='set the heartbeat period in ms [default: 3000]')
861 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
63 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
862 help='set the SUB port for queue monitoring [default: random]')
64 help='set the SUB port for queue monitoring [default: random]')
863 parser.add_argument('--mux', type=str, metavar='PORTS',
65 parser.add_argument('--mux', type=str, metavar='PORTS',
864 help='set the XREP ports for the MUX queue [default: random]')
66 help='set the XREP ports for the MUX queue [default: random]')
865 parser.add_argument('--task', type=str, metavar='PORTS',
67 parser.add_argument('--task', type=str, metavar='PORTS',
866 help='set the XREP/XREQ ports for the task queue [default: random]')
68 help='set the XREP/XREQ ports for the task queue [default: random]')
867 parser.add_argument('--control', type=str, metavar='PORTS',
69 parser.add_argument('--control', type=str, metavar='PORTS',
868 help='set the XREP ports for the control queue [default: random]')
70 help='set the XREP ports for the control queue [default: random]')
869 parser.add_argument('--scheduler', type=str, default='pure',
71 parser.add_argument('--scheduler', type=str, default='pure',
870 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
72 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
871 help='select the task scheduler [default: pure ZMQ]')
73 help='select the task scheduler [default: pure ZMQ]')
872 parser.add_argument('--mongodb', action='store_true',
74 parser.add_argument('--mongodb', action='store_true',
873 help='Use MongoDB task storage [default: in-memory]')
75 help='Use MongoDB task storage [default: in-memory]')
874
76
875 return parser
77 return parser
876
78
877 def main(argv=None):
79 def main(argv=None):
878 import time
879 from multiprocessing import Process
880
881 from zmq.eventloop.zmqstream import ZMQStream
882 from zmq.devices import ProcessMonitoredQueue
883 from zmq.log import handlers
884
885 import streamsession as session
886 import heartmonitor
887 from scheduler import launch_scheduler
888
80
889 parser = make_argument_parser()
81 parser = make_argument_parser()
890
82
891 args = parser.parse_args(argv)
83 args = parser.parse_args(argv)
892 parse_url(args)
84 parse_url(args)
893
85
894 iface="%s://%s"%(args.transport,args.ip)+':%i'
86 iface="%s://%s"%(args.transport,args.ip)+':%i'
895
87
896 random_ports = 0
88 random_ports = 0
897 if args.hb:
89 if args.hb:
898 hb = split_ports(args.hb, 2)
90 hb = split_ports(args.hb, 2)
899 else:
91 else:
900 hb = select_random_ports(2)
92 hb = select_random_ports(2)
901 if args.mux:
93 if args.mux:
902 mux = split_ports(args.mux, 2)
94 mux = split_ports(args.mux, 2)
903 else:
95 else:
904 mux = None
96 mux = None
905 random_ports += 2
97 random_ports += 2
906 if args.task:
98 if args.task:
907 task = split_ports(args.task, 2)
99 task = split_ports(args.task, 2)
908 else:
100 else:
909 task = None
101 task = None
910 random_ports += 2
102 random_ports += 2
911 if args.control:
103 if args.control:
912 control = split_ports(args.control, 2)
104 control = split_ports(args.control, 2)
913 else:
105 else:
914 control = None
106 control = None
915 random_ports += 2
107 random_ports += 2
916
108
917 ctx = zmq.Context()
109 ctx = zmq.Context()
918 loop = ioloop.IOLoop.instance()
110 loop = ioloop.IOLoop.instance()
919
111
920 # setup logging
112 # setup logging
921 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
113 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
922
114
923 # Registrar socket
115 # Registrar socket
924 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
116 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
925 regport = bind_port(reg, args.ip, args.regport)
117 regport = bind_port(reg, args.ip, args.regport)
926
118
927 ### Engine connections ###
119 ### Engine connections ###
928
120
929 # heartbeat
121 # heartbeat
930 hpub = ctx.socket(zmq.PUB)
122 hpub = ctx.socket(zmq.PUB)
931 bind_port(hpub, args.ip, hb[0])
123 bind_port(hpub, args.ip, hb[0])
932 hrep = ctx.socket(zmq.XREP)
124 hrep = ctx.socket(zmq.XREP)
933 bind_port(hrep, args.ip, hb[1])
125 bind_port(hrep, args.ip, hb[1])
934
126
935 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
127 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
936 hmon.start()
128 hmon.start()
937
129
938 ### Client connections ###
130 ### Client connections ###
939 # Clientele socket
131 # Clientele socket
940 c = ZMQStream(ctx.socket(zmq.XREP), loop)
132 c = ZMQStream(ctx.socket(zmq.XREP), loop)
941 cport = bind_port(c, args.ip, args.client)
133 cport = bind_port(c, args.ip, args.client)
942 # Notifier socket
134 # Notifier socket
943 n = ZMQStream(ctx.socket(zmq.PUB), loop)
135 n = ZMQStream(ctx.socket(zmq.PUB), loop)
944 nport = bind_port(n, args.ip, args.notice)
136 nport = bind_port(n, args.ip, args.notice)
945
137
946 ### Key File ###
138 ### Key File ###
947 if args.execkey and not os.path.isfile(args.execkey):
139 if args.execkey and not os.path.isfile(args.execkey):
948 generate_exec_key(args.execkey)
140 generate_exec_key(args.execkey)
949
141
950 thesession = session.StreamSession(username=args.ident or "controller", keyfile=args.execkey)
142 thesession = session.StreamSession(username=args.ident or "controller", keyfile=args.execkey)
951
143
952 ### build and launch the queues ###
144 ### build and launch the queues ###
953
145
954 # monitor socket
146 # monitor socket
955 sub = ctx.socket(zmq.SUB)
147 sub = ctx.socket(zmq.SUB)
956 sub.setsockopt(zmq.SUBSCRIBE, "")
148 sub.setsockopt(zmq.SUBSCRIBE, "")
957 monport = bind_port(sub, args.ip, args.monitor)
149 monport = bind_port(sub, args.ip, args.monitor)
958 sub = ZMQStream(sub, loop)
150 sub = ZMQStream(sub, loop)
959
151
960 ports = select_random_ports(random_ports)
152 ports = select_random_ports(random_ports)
961 children = []
153 children = []
962 # Multiplexer Queue (in a Process)
154 # Multiplexer Queue (in a Process)
963 if not mux:
155 if not mux:
964 mux = (ports.pop(),ports.pop())
156 mux = (ports.pop(),ports.pop())
965 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
157 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
966 q.bind_in(iface%mux[0])
158 q.bind_in(iface%mux[0])
967 q.bind_out(iface%mux[1])
159 q.bind_out(iface%mux[1])
968 q.connect_mon(iface%monport)
160 q.connect_mon(iface%monport)
969 q.daemon=True
161 q.daemon=True
970 q.start()
162 q.start()
971 children.append(q.launcher)
163 children.append(q.launcher)
972
164
973 # Control Queue (in a Process)
165 # Control Queue (in a Process)
974 if not control:
166 if not control:
975 control = (ports.pop(),ports.pop())
167 control = (ports.pop(),ports.pop())
976 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
168 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
977 q.bind_in(iface%control[0])
169 q.bind_in(iface%control[0])
978 q.bind_out(iface%control[1])
170 q.bind_out(iface%control[1])
979 q.connect_mon(iface%monport)
171 q.connect_mon(iface%monport)
980 q.daemon=True
172 q.daemon=True
981 q.start()
173 q.start()
982 children.append(q.launcher)
174 children.append(q.launcher)
983 # Task Queue (in a Process)
175 # Task Queue (in a Process)
984 if not task:
176 if not task:
985 task = (ports.pop(),ports.pop())
177 task = (ports.pop(),ports.pop())
986 if args.scheduler == 'pure':
178 if args.scheduler == 'pure':
987 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
179 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
988 q.bind_in(iface%task[0])
180 q.bind_in(iface%task[0])
989 q.bind_out(iface%task[1])
181 q.bind_out(iface%task[1])
990 q.connect_mon(iface%monport)
182 q.connect_mon(iface%monport)
991 q.daemon=True
183 q.daemon=True
992 q.start()
184 q.start()
993 children.append(q.launcher)
185 children.append(q.launcher)
994 else:
186 else:
995 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
187 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
996 print (sargs)
188 print (sargs)
997 q = Process(target=launch_scheduler, args=sargs)
189 q = Process(target=launch_scheduler, args=sargs)
998 q.daemon=True
190 q.daemon=True
999 q.start()
191 q.start()
1000 children.append(q)
192 children.append(q)
1001
193
1002 if args.mongodb:
194 if args.mongodb:
1003 from mongodb import MongoDB
195 from mongodb import MongoDB
1004 db = MongoDB(thesession.session)
196 db = MongoDB(thesession.session)
1005 else:
197 else:
1006 db = DictDB()
198 db = DictDB()
1007 time.sleep(.25)
199 time.sleep(.25)
1008
200
1009 # build connection dicts
201 # build connection dicts
1010 engine_addrs = {
202 engine_addrs = {
1011 'control' : iface%control[1],
203 'control' : iface%control[1],
1012 'queue': iface%mux[1],
204 'queue': iface%mux[1],
1013 'heartbeat': (iface%hb[0], iface%hb[1]),
205 'heartbeat': (iface%hb[0], iface%hb[1]),
1014 'task' : iface%task[1],
206 'task' : iface%task[1],
1015 'monitor' : iface%monport,
207 'monitor' : iface%monport,
1016 }
208 }
1017
209
1018 client_addrs = {
210 client_addrs = {
1019 'control' : iface%control[0],
211 'control' : iface%control[0],
1020 'query': iface%cport,
212 'query': iface%cport,
1021 'queue': iface%mux[0],
213 'queue': iface%mux[0],
1022 'task' : iface%task[0],
214 'task' : iface%task[0],
1023 'notification': iface%nport
215 'notification': iface%nport
1024 }
216 }
1025 signal_children(children)
217 signal_children(children)
1026 con = Controller(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
218 hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
1027 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
219 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
1028 dc.start()
220 dc.start()
1029 loop.start()
221 loop.start()
1030
222
1031
223
1032
224
1033
225
1034 if __name__ == '__main__':
226 if __name__ == '__main__':
1035 main()
227 main()
General Comments 0
You need to be logged in to leave comments. Login now