Show More
@@ -252,13 +252,14 class Client(HasTraits): | |||
|
252 | 252 | |
|
253 | 253 | |
|
254 | 254 | block = Bool(False) |
|
255 | outstanding=Set() | |
|
256 | results = Dict() | |
|
257 | metadata = Dict() | |
|
255 | outstanding = Set() | |
|
256 | results = Instance('collections.defaultdict', (dict,)) | |
|
257 | metadata = Instance('collections.defaultdict', (Metadata,)) | |
|
258 | 258 | history = List() |
|
259 | 259 | debug = Bool(False) |
|
260 | 260 | profile=CUnicode('default') |
|
261 | 261 | |
|
262 | _outstanding_dict = Instance('collections.defaultdict', (set,)) | |
|
262 | 263 | _ids = List() |
|
263 | 264 | _connected=Bool(False) |
|
264 | 265 | _ssh=Bool(False) |
@@ -498,23 +499,6 class Client(HasTraits): | |||
|
498 | 499 | e.engine_info['engine_id'] = eid |
|
499 | 500 | return e |
|
500 | 501 | |
|
501 | def _register_engine(self, msg): | |
|
502 | """Register a new engine, and update our connection info.""" | |
|
503 | content = msg['content'] | |
|
504 | eid = content['id'] | |
|
505 | d = {eid : content['queue']} | |
|
506 | self._update_engines(d) | |
|
507 | ||
|
508 | def _unregister_engine(self, msg): | |
|
509 | """Unregister an engine that has died.""" | |
|
510 | content = msg['content'] | |
|
511 | eid = int(content['id']) | |
|
512 | if eid in self._ids: | |
|
513 | self._ids.remove(eid) | |
|
514 | self._engines.pop(eid) | |
|
515 | if self._task_socket and self._task_scheme == 'pure': | |
|
516 | self._stop_scheduling_tasks() | |
|
517 | ||
|
518 | 502 | def _extract_metadata(self, header, parent, content): |
|
519 | 503 | md = {'msg_id' : parent['msg_id'], |
|
520 | 504 | 'received' : datetime.now(), |
@@ -535,6 +519,54 class Client(HasTraits): | |||
|
535 | 519 | md['completed'] = datetime.strptime(header['date'], util.ISO8601) |
|
536 | 520 | return md |
|
537 | 521 | |
|
522 | def _register_engine(self, msg): | |
|
523 | """Register a new engine, and update our connection info.""" | |
|
524 | content = msg['content'] | |
|
525 | eid = content['id'] | |
|
526 | d = {eid : content['queue']} | |
|
527 | self._update_engines(d) | |
|
528 | ||
|
529 | def _unregister_engine(self, msg): | |
|
530 | """Unregister an engine that has died.""" | |
|
531 | content = msg['content'] | |
|
532 | eid = int(content['id']) | |
|
533 | if eid in self._ids: | |
|
534 | self._ids.remove(eid) | |
|
535 | uuid = self._engines.pop(eid) | |
|
536 | ||
|
537 | self._handle_stranded_msgs(eid, uuid) | |
|
538 | ||
|
539 | if self._task_socket and self._task_scheme == 'pure': | |
|
540 | self._stop_scheduling_tasks() | |
|
541 | ||
|
542 | def _handle_stranded_msgs(self, eid, uuid): | |
|
543 | """Handle messages known to be on an engine when the engine unregisters. | |
|
544 | ||
|
545 | It is possible that this will fire prematurely - that is, an engine will | |
|
546 | go down after completing a result, and the client will be notified | |
|
547 | of the unregistration and later receive the successful result. | |
|
548 | """ | |
|
549 | ||
|
550 | outstanding = self._outstanding_dict[uuid] | |
|
551 | ||
|
552 | for msg_id in list(outstanding): | |
|
553 | print msg_id | |
|
554 | if msg_id in self.results: | |
|
555 | # we already | |
|
556 | continue | |
|
557 | try: | |
|
558 | raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id)) | |
|
559 | except: | |
|
560 | content = error.wrap_exception() | |
|
561 | # build a fake message: | |
|
562 | parent = {} | |
|
563 | header = {} | |
|
564 | parent['msg_id'] = msg_id | |
|
565 | header['engine'] = uuid | |
|
566 | header['date'] = datetime.now().strftime(util.ISO8601) | |
|
567 | msg = dict(parent_header=parent, header=header, content=content) | |
|
568 | self._handle_apply_reply(msg) | |
|
569 | ||
|
538 | 570 | def _handle_execute_reply(self, msg): |
|
539 | 571 | """Save the reply to an execute_request into our results. |
|
540 | 572 | |
@@ -569,10 +601,15 class Client(HasTraits): | |||
|
569 | 601 | header = msg['header'] |
|
570 | 602 | |
|
571 | 603 | # construct metadata: |
|
572 |
md = self.metadata |
|
|
604 | md = self.metadata[msg_id] | |
|
573 | 605 | md.update(self._extract_metadata(header, parent, content)) |
|
606 | # is this redundant? | |
|
574 | 607 | self.metadata[msg_id] = md |
|
575 | 608 | |
|
609 | e_outstanding = self._outstanding_dict[md['engine_uuid']] | |
|
610 | if msg_id in e_outstanding: | |
|
611 | e_outstanding.remove(msg_id) | |
|
612 | ||
|
576 | 613 | # construct result: |
|
577 | 614 | if content['status'] == 'ok': |
|
578 | 615 | self.results[msg_id] = util.unserialize_object(msg['buffers'])[0] |
@@ -642,7 +679,7 class Client(HasTraits): | |||
|
642 | 679 | msg_type = msg['msg_type'] |
|
643 | 680 | |
|
644 | 681 | # init metadata: |
|
645 |
md = self.metadata |
|
|
682 | md = self.metadata[msg_id] | |
|
646 | 683 | |
|
647 | 684 | if msg_type == 'stream': |
|
648 | 685 | name = content['name'] |
@@ -653,6 +690,7 class Client(HasTraits): | |||
|
653 | 690 | else: |
|
654 | 691 | md.update({msg_type : content['data']}) |
|
655 | 692 | |
|
693 | # reduntant? | |
|
656 | 694 | self.metadata[msg_id] = md |
|
657 | 695 | |
|
658 | 696 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) |
@@ -1067,6 +1105,8 class Client(HasTraits): | |||
|
1067 | 1105 | msg_id = msg['msg_id'] |
|
1068 | 1106 | self.outstanding.add(msg_id) |
|
1069 | 1107 | self.history.append(msg_id) |
|
1108 | self.metadata[msg_id]['submitted'] = datetime.now() | |
|
1109 | ||
|
1070 | 1110 | ar = AsyncResult(self, [msg_id], fname=f.__name__) |
|
1071 | 1111 | if block: |
|
1072 | 1112 | try: |
@@ -1099,6 +1139,7 class Client(HasTraits): | |||
|
1099 | 1139 | content=content, buffers=bufs, ident=ident, subheader=subheader) |
|
1100 | 1140 | msg_id = msg['msg_id'] |
|
1101 | 1141 | self.outstanding.add(msg_id) |
|
1142 | self._outstanding_dict[ident].add(msg_id) | |
|
1102 | 1143 | self.history.append(msg_id) |
|
1103 | 1144 | msg_ids.append(msg_id) |
|
1104 | 1145 | ar = AsyncResult(self, msg_ids, fname=f.__name__) |
@@ -1345,7 +1386,7 class Client(HasTraits): | |||
|
1345 | 1386 | is False, then completed results will be keyed by their `msg_id`. |
|
1346 | 1387 | """ |
|
1347 | 1388 | if not isinstance(msg_ids, (list,tuple)): |
|
1348 |
|
|
|
1389 | msg_ids = [msg_ids] | |
|
1349 | 1390 | |
|
1350 | 1391 | theids = [] |
|
1351 | 1392 | for msg_id in msg_ids: |
@@ -1398,7 +1439,7 class Client(HasTraits): | |||
|
1398 | 1439 | if isinstance(rcontent, str): |
|
1399 | 1440 | rcontent = self.session.unpack(rcontent) |
|
1400 | 1441 | |
|
1401 |
md = self.metadata |
|
|
1442 | md = self.metadata[msg_id] | |
|
1402 | 1443 | md.update(self._extract_metadata(header, parent, rcontent)) |
|
1403 | 1444 | md.update(iodict) |
|
1404 | 1445 |
@@ -231,7 +231,6 class HubFactory(RegistrationFactory): | |||
|
231 | 231 | # connect the db |
|
232 | 232 | self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1])) |
|
233 | 233 | cdir = self.config.Global.cluster_dir |
|
234 | print (cdir) | |
|
235 | 234 | self.db = import_item(self.db_class)(session=self.session.session, config=self.config) |
|
236 | 235 | time.sleep(.25) |
|
237 | 236 | |
@@ -415,24 +414,6 class Hub(LoggingFactory): | |||
|
415 | 414 | raise IndexError("No Engines Registered") |
|
416 | 415 | return targets |
|
417 | 416 | |
|
418 | def _validate_client_msg(self, msg): | |
|
419 | """validates and unpacks headers of a message. Returns False if invalid, | |
|
420 | (ident, header, parent, content)""" | |
|
421 | client_id = msg[0] | |
|
422 | try: | |
|
423 | msg = self.session.unpack_message(msg[1:], content=True) | |
|
424 | except: | |
|
425 | self.log.error("client::Invalid Message %s"%msg, exc_info=True) | |
|
426 | return False | |
|
427 | ||
|
428 | msg_type = msg.get('msg_type', None) | |
|
429 | if msg_type is None: | |
|
430 | return False | |
|
431 | header = msg.get('header') | |
|
432 | # session doesn't handle split content for now: | |
|
433 | return client_id, msg | |
|
434 | ||
|
435 | ||
|
436 | 417 | #----------------------------------------------------------------------------- |
|
437 | 418 | # dispatch methods (1 per stream) |
|
438 | 419 | #----------------------------------------------------------------------------- |
@@ -598,22 +579,27 class Hub(LoggingFactory): | |||
|
598 | 579 | self.all_completed.add(msg_id) |
|
599 | 580 | self.queues[eid].remove(msg_id) |
|
600 | 581 | self.completed[eid].append(msg_id) |
|
601 | rheader = msg['header'] | |
|
602 | completed = datetime.strptime(rheader['date'], ISO8601) | |
|
603 | started = rheader.get('started', None) | |
|
604 | if started is not None: | |
|
605 | started = datetime.strptime(started, ISO8601) | |
|
606 | result = { | |
|
607 | 'result_header' : rheader, | |
|
608 | 'result_content': msg['content'], | |
|
609 | 'started' : started, | |
|
610 | 'completed' : completed | |
|
611 | } | |
|
582 | elif msg_id not in self.all_completed: | |
|
583 | # it could be a result from a dead engine that died before delivering the | |
|
584 | # result | |
|
585 | self.log.warn("queue:: unknown msg finished %s"%msg_id) | |
|
586 | return | |
|
587 | # update record anyway, because the unregistration could have been premature | |
|
588 | rheader = msg['header'] | |
|
589 | completed = datetime.strptime(rheader['date'], ISO8601) | |
|
590 | started = rheader.get('started', None) | |
|
591 | if started is not None: | |
|
592 | started = datetime.strptime(started, ISO8601) | |
|
593 | result = { | |
|
594 | 'result_header' : rheader, | |
|
595 | 'result_content': msg['content'], | |
|
596 | 'started' : started, | |
|
597 | 'completed' : completed | |
|
598 | } | |
|
612 | 599 | |
|
613 |
|
|
|
614 |
|
|
|
615 |
|
|
|
616 | self.log.debug("queue:: unknown msg finished %s"%msg_id) | |
|
600 | result['result_buffers'] = msg['buffers'] | |
|
601 | self.db.update_record(msg_id, result) | |
|
602 | ||
|
617 | 603 | |
|
618 | 604 | #--------------------- Task Queue Traffic ------------------------------ |
|
619 | 605 | |
@@ -841,20 +827,46 class Hub(LoggingFactory): | |||
|
841 | 827 | self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True) |
|
842 | 828 | return |
|
843 | 829 | self.log.info("registration::unregister_engine(%s)"%eid) |
|
830 | # print (eid) | |
|
844 | 831 | content=dict(id=eid, queue=self.engines[eid].queue) |
|
845 | 832 | self.ids.remove(eid) |
|
846 | self.keytable.pop(eid) | |
|
833 | uuid = self.keytable.pop(eid) | |
|
847 | 834 | ec = self.engines.pop(eid) |
|
848 | 835 | self.hearts.pop(ec.heartbeat) |
|
849 | 836 | self.by_ident.pop(ec.queue) |
|
850 | 837 | self.completed.pop(eid) |
|
851 | for msg_id in self.queues.pop(eid): | |
|
852 | msg = self.pending.remove(msg_id) | |
|
838 | self._handle_stranded_msgs(eid, uuid) | |
|
853 | 839 | ############## TODO: HANDLE IT ################ |
|
854 | 840 | |
|
855 | 841 | if self.notifier: |
|
856 | 842 | self.session.send(self.notifier, "unregistration_notification", content=content) |
|
857 | 843 | |
|
844 | def _handle_stranded_msgs(self, eid, uuid): | |
|
845 | """Handle messages known to be on an engine when the engine unregisters. | |
|
846 | ||
|
847 | It is possible that this will fire prematurely - that is, an engine will | |
|
848 | go down after completing a result, and the client will be notified | |
|
849 | that the result failed and later receive the actual result. | |
|
850 | """ | |
|
851 | ||
|
852 | outstanding = self.queues.pop(eid) | |
|
853 | ||
|
854 | for msg_id in outstanding: | |
|
855 | self.pending.remove(msg_id) | |
|
856 | self.all_completed.add(msg_id) | |
|
857 | try: | |
|
858 | raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id)) | |
|
859 | except: | |
|
860 | content = error.wrap_exception() | |
|
861 | # build a fake header: | |
|
862 | header = {} | |
|
863 | header['engine'] = uuid | |
|
864 | header['date'] = datetime.now().strftime(ISO8601) | |
|
865 | rec = dict(result_content=content, result_header=header, result_buffers=[]) | |
|
866 | rec['completed'] = header['date'] | |
|
867 | rec['engine_uuid'] = uuid | |
|
868 | self.db.update_record(msg_id, rec) | |
|
869 | ||
|
858 | 870 | def finish_registration(self, heart): |
|
859 | 871 | """Second half of engine registration, called after our HeartMonitor |
|
860 | 872 | has received a beat from the Engine's Heart.""" |
@@ -1029,7 +1041,8 class Hub(LoggingFactory): | |||
|
1029 | 1041 | 'result_header' : rec['result_header'], |
|
1030 | 1042 | 'io' : io_dict, |
|
1031 | 1043 | } |
|
1032 |
|
|
|
1044 | if rec['result_buffers']: | |
|
1045 | buffers.extend(map(str, rec['result_buffers'])) | |
|
1033 | 1046 | else: |
|
1034 | 1047 | try: |
|
1035 | 1048 | raise KeyError('No such message: '+msg_id) |
General Comments 0
You need to be logged in to leave comments.
Login now