##// END OF EJS Templates
more graceful handling of dying engines
MinRK -
Show More
@@ -252,13 +252,14 class Client(HasTraits):
252 252
253 253
254 254 block = Bool(False)
255 outstanding=Set()
256 results = Dict()
257 metadata = Dict()
255 outstanding = Set()
256 results = Instance('collections.defaultdict', (dict,))
257 metadata = Instance('collections.defaultdict', (Metadata,))
258 258 history = List()
259 259 debug = Bool(False)
260 260 profile=CUnicode('default')
261 261
262 _outstanding_dict = Instance('collections.defaultdict', (set,))
262 263 _ids = List()
263 264 _connected=Bool(False)
264 265 _ssh=Bool(False)
@@ -498,23 +499,6 class Client(HasTraits):
498 499 e.engine_info['engine_id'] = eid
499 500 return e
500 501
501 def _register_engine(self, msg):
502 """Register a new engine, and update our connection info."""
503 content = msg['content']
504 eid = content['id']
505 d = {eid : content['queue']}
506 self._update_engines(d)
507
508 def _unregister_engine(self, msg):
509 """Unregister an engine that has died."""
510 content = msg['content']
511 eid = int(content['id'])
512 if eid in self._ids:
513 self._ids.remove(eid)
514 self._engines.pop(eid)
515 if self._task_socket and self._task_scheme == 'pure':
516 self._stop_scheduling_tasks()
517
518 502 def _extract_metadata(self, header, parent, content):
519 503 md = {'msg_id' : parent['msg_id'],
520 504 'received' : datetime.now(),
@@ -535,6 +519,54 class Client(HasTraits):
535 519 md['completed'] = datetime.strptime(header['date'], util.ISO8601)
536 520 return md
537 521
522 def _register_engine(self, msg):
523 """Register a new engine, and update our connection info."""
524 content = msg['content']
525 eid = content['id']
526 d = {eid : content['queue']}
527 self._update_engines(d)
528
529 def _unregister_engine(self, msg):
530 """Unregister an engine that has died."""
531 content = msg['content']
532 eid = int(content['id'])
533 if eid in self._ids:
534 self._ids.remove(eid)
535 uuid = self._engines.pop(eid)
536
537 self._handle_stranded_msgs(eid, uuid)
538
539 if self._task_socket and self._task_scheme == 'pure':
540 self._stop_scheduling_tasks()
541
542 def _handle_stranded_msgs(self, eid, uuid):
543 """Handle messages known to be on an engine when the engine unregisters.
544
545 It is possible that this will fire prematurely - that is, an engine will
546 go down after completing a result, and the client will be notified
547 of the unregistration and later receive the successful result.
548 """
549
550 outstanding = self._outstanding_dict[uuid]
551
552 for msg_id in list(outstanding):
553 print msg_id
554 if msg_id in self.results:
555 # we already
556 continue
557 try:
558 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
559 except:
560 content = error.wrap_exception()
561 # build a fake message:
562 parent = {}
563 header = {}
564 parent['msg_id'] = msg_id
565 header['engine'] = uuid
566 header['date'] = datetime.now().strftime(util.ISO8601)
567 msg = dict(parent_header=parent, header=header, content=content)
568 self._handle_apply_reply(msg)
569
538 570 def _handle_execute_reply(self, msg):
539 571 """Save the reply to an execute_request into our results.
540 572
@@ -569,10 +601,15 class Client(HasTraits):
569 601 header = msg['header']
570 602
571 603 # construct metadata:
572 md = self.metadata.setdefault(msg_id, Metadata())
604 md = self.metadata[msg_id]
573 605 md.update(self._extract_metadata(header, parent, content))
606 # is this redundant?
574 607 self.metadata[msg_id] = md
575 608
609 e_outstanding = self._outstanding_dict[md['engine_uuid']]
610 if msg_id in e_outstanding:
611 e_outstanding.remove(msg_id)
612
576 613 # construct result:
577 614 if content['status'] == 'ok':
578 615 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
@@ -642,7 +679,7 class Client(HasTraits):
642 679 msg_type = msg['msg_type']
643 680
644 681 # init metadata:
645 md = self.metadata.setdefault(msg_id, Metadata())
682 md = self.metadata[msg_id]
646 683
647 684 if msg_type == 'stream':
648 685 name = content['name']
@@ -653,6 +690,7 class Client(HasTraits):
653 690 else:
654 691 md.update({msg_type : content['data']})
655 692
693 # reduntant?
656 694 self.metadata[msg_id] = md
657 695
658 696 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
@@ -1067,6 +1105,8 class Client(HasTraits):
1067 1105 msg_id = msg['msg_id']
1068 1106 self.outstanding.add(msg_id)
1069 1107 self.history.append(msg_id)
1108 self.metadata[msg_id]['submitted'] = datetime.now()
1109
1070 1110 ar = AsyncResult(self, [msg_id], fname=f.__name__)
1071 1111 if block:
1072 1112 try:
@@ -1099,6 +1139,7 class Client(HasTraits):
1099 1139 content=content, buffers=bufs, ident=ident, subheader=subheader)
1100 1140 msg_id = msg['msg_id']
1101 1141 self.outstanding.add(msg_id)
1142 self._outstanding_dict[ident].add(msg_id)
1102 1143 self.history.append(msg_id)
1103 1144 msg_ids.append(msg_id)
1104 1145 ar = AsyncResult(self, msg_ids, fname=f.__name__)
@@ -1345,7 +1386,7 class Client(HasTraits):
1345 1386 is False, then completed results will be keyed by their `msg_id`.
1346 1387 """
1347 1388 if not isinstance(msg_ids, (list,tuple)):
1348 indices_or_msg_ids = [msg_ids]
1389 msg_ids = [msg_ids]
1349 1390
1350 1391 theids = []
1351 1392 for msg_id in msg_ids:
@@ -1398,7 +1439,7 class Client(HasTraits):
1398 1439 if isinstance(rcontent, str):
1399 1440 rcontent = self.session.unpack(rcontent)
1400 1441
1401 md = self.metadata.setdefault(msg_id, Metadata())
1442 md = self.metadata[msg_id]
1402 1443 md.update(self._extract_metadata(header, parent, rcontent))
1403 1444 md.update(iodict)
1404 1445
@@ -231,7 +231,6 class HubFactory(RegistrationFactory):
231 231 # connect the db
232 232 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
233 233 cdir = self.config.Global.cluster_dir
234 print (cdir)
235 234 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
236 235 time.sleep(.25)
237 236
@@ -415,24 +414,6 class Hub(LoggingFactory):
415 414 raise IndexError("No Engines Registered")
416 415 return targets
417 416
418 def _validate_client_msg(self, msg):
419 """validates and unpacks headers of a message. Returns False if invalid,
420 (ident, header, parent, content)"""
421 client_id = msg[0]
422 try:
423 msg = self.session.unpack_message(msg[1:], content=True)
424 except:
425 self.log.error("client::Invalid Message %s"%msg, exc_info=True)
426 return False
427
428 msg_type = msg.get('msg_type', None)
429 if msg_type is None:
430 return False
431 header = msg.get('header')
432 # session doesn't handle split content for now:
433 return client_id, msg
434
435
436 417 #-----------------------------------------------------------------------------
437 418 # dispatch methods (1 per stream)
438 419 #-----------------------------------------------------------------------------
@@ -598,22 +579,27 class Hub(LoggingFactory):
598 579 self.all_completed.add(msg_id)
599 580 self.queues[eid].remove(msg_id)
600 581 self.completed[eid].append(msg_id)
601 rheader = msg['header']
602 completed = datetime.strptime(rheader['date'], ISO8601)
603 started = rheader.get('started', None)
604 if started is not None:
605 started = datetime.strptime(started, ISO8601)
606 result = {
607 'result_header' : rheader,
608 'result_content': msg['content'],
609 'started' : started,
610 'completed' : completed
611 }
582 elif msg_id not in self.all_completed:
583 # it could be a result from a dead engine that died before delivering the
584 # result
585 self.log.warn("queue:: unknown msg finished %s"%msg_id)
586 return
587 # update record anyway, because the unregistration could have been premature
588 rheader = msg['header']
589 completed = datetime.strptime(rheader['date'], ISO8601)
590 started = rheader.get('started', None)
591 if started is not None:
592 started = datetime.strptime(started, ISO8601)
593 result = {
594 'result_header' : rheader,
595 'result_content': msg['content'],
596 'started' : started,
597 'completed' : completed
598 }
612 599
613 result['result_buffers'] = msg['buffers']
614 self.db.update_record(msg_id, result)
615 else:
616 self.log.debug("queue:: unknown msg finished %s"%msg_id)
600 result['result_buffers'] = msg['buffers']
601 self.db.update_record(msg_id, result)
602
617 603
618 604 #--------------------- Task Queue Traffic ------------------------------
619 605
@@ -841,20 +827,46 class Hub(LoggingFactory):
841 827 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
842 828 return
843 829 self.log.info("registration::unregister_engine(%s)"%eid)
830 # print (eid)
844 831 content=dict(id=eid, queue=self.engines[eid].queue)
845 832 self.ids.remove(eid)
846 self.keytable.pop(eid)
833 uuid = self.keytable.pop(eid)
847 834 ec = self.engines.pop(eid)
848 835 self.hearts.pop(ec.heartbeat)
849 836 self.by_ident.pop(ec.queue)
850 837 self.completed.pop(eid)
851 for msg_id in self.queues.pop(eid):
852 msg = self.pending.remove(msg_id)
838 self._handle_stranded_msgs(eid, uuid)
853 839 ############## TODO: HANDLE IT ################
854 840
855 841 if self.notifier:
856 842 self.session.send(self.notifier, "unregistration_notification", content=content)
857 843
844 def _handle_stranded_msgs(self, eid, uuid):
845 """Handle messages known to be on an engine when the engine unregisters.
846
847 It is possible that this will fire prematurely - that is, an engine will
848 go down after completing a result, and the client will be notified
849 that the result failed and later receive the actual result.
850 """
851
852 outstanding = self.queues.pop(eid)
853
854 for msg_id in outstanding:
855 self.pending.remove(msg_id)
856 self.all_completed.add(msg_id)
857 try:
858 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
859 except:
860 content = error.wrap_exception()
861 # build a fake header:
862 header = {}
863 header['engine'] = uuid
864 header['date'] = datetime.now().strftime(ISO8601)
865 rec = dict(result_content=content, result_header=header, result_buffers=[])
866 rec['completed'] = header['date']
867 rec['engine_uuid'] = uuid
868 self.db.update_record(msg_id, rec)
869
858 870 def finish_registration(self, heart):
859 871 """Second half of engine registration, called after our HeartMonitor
860 872 has received a beat from the Engine's Heart."""
@@ -1029,7 +1041,8 class Hub(LoggingFactory):
1029 1041 'result_header' : rec['result_header'],
1030 1042 'io' : io_dict,
1031 1043 }
1032 buffers.extend(map(str, rec['result_buffers']))
1044 if rec['result_buffers']:
1045 buffers.extend(map(str, rec['result_buffers']))
1033 1046 else:
1034 1047 try:
1035 1048 raise KeyError('No such message: '+msg_id)
General Comments 0
You need to be logged in to leave comments. Login now