Show More
@@ -628,7 +628,30 b' class Client(HasTraits):' | |||||
628 | print ("got unknown result: %s"%msg_id) |
|
628 | print ("got unknown result: %s"%msg_id) | |
629 | else: |
|
629 | else: | |
630 | self.outstanding.remove(msg_id) |
|
630 | self.outstanding.remove(msg_id) | |
631 | self.results[msg_id] = self._unwrap_exception(msg['content']) |
|
631 | ||
|
632 | content = msg['content'] | |||
|
633 | header = msg['header'] | |||
|
634 | ||||
|
635 | # construct metadata: | |||
|
636 | md = self.metadata[msg_id] | |||
|
637 | md.update(self._extract_metadata(header, parent, content)) | |||
|
638 | # is this redundant? | |||
|
639 | self.metadata[msg_id] = md | |||
|
640 | ||||
|
641 | e_outstanding = self._outstanding_dict[md['engine_uuid']] | |||
|
642 | if msg_id in e_outstanding: | |||
|
643 | e_outstanding.remove(msg_id) | |||
|
644 | ||||
|
645 | # construct result: | |||
|
646 | if content['status'] == 'ok': | |||
|
647 | self.results[msg_id] = content | |||
|
648 | elif content['status'] == 'aborted': | |||
|
649 | self.results[msg_id] = error.TaskAborted(msg_id) | |||
|
650 | elif content['status'] == 'resubmitted': | |||
|
651 | # TODO: handle resubmission | |||
|
652 | pass | |||
|
653 | else: | |||
|
654 | self.results[msg_id] = self._unwrap_exception(content) | |||
632 |
|
655 | |||
633 | def _handle_apply_reply(self, msg): |
|
656 | def _handle_apply_reply(self, msg): | |
634 | """Save the reply to an apply_request into our results.""" |
|
657 | """Save the reply to an apply_request into our results.""" | |
@@ -1024,7 +1047,7 b' class Client(HasTraits):' | |||||
1024 |
|
1047 | |||
1025 | return result |
|
1048 | return result | |
1026 |
|
1049 | |||
1027 |
def send_apply_ |
|
1050 | def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False, | |
1028 | ident=None): |
|
1051 | ident=None): | |
1029 | """construct and send an apply message via a socket. |
|
1052 | """construct and send an apply message via a socket. | |
1030 |
|
1053 | |||
@@ -1066,6 +1089,41 b' class Client(HasTraits):' | |||||
1066 |
|
1089 | |||
1067 | return msg |
|
1090 | return msg | |
1068 |
|
1091 | |||
|
1092 | def send_execute_request(self, socket, code, silent=False, subheader=None, ident=None): | |||
|
1093 | """construct and send an execute request via a socket. | |||
|
1094 | ||||
|
1095 | """ | |||
|
1096 | ||||
|
1097 | assert not self._closed, "cannot use me anymore, I'm closed!" | |||
|
1098 | # defaults: | |||
|
1099 | subheader = subheader if subheader is not None else {} | |||
|
1100 | ||||
|
1101 | # validate arguments | |||
|
1102 | if not isinstance(code, basestring): | |||
|
1103 | raise TypeError("code must be text, not %s" % type(code)) | |||
|
1104 | if not isinstance(subheader, dict): | |||
|
1105 | raise TypeError("subheader must be dict, not %s" % type(subheader)) | |||
|
1106 | ||||
|
1107 | content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={}) | |||
|
1108 | ||||
|
1109 | ||||
|
1110 | msg = self.session.send(socket, "execute_request", content=content, ident=ident, | |||
|
1111 | subheader=subheader) | |||
|
1112 | ||||
|
1113 | msg_id = msg['header']['msg_id'] | |||
|
1114 | self.outstanding.add(msg_id) | |||
|
1115 | if ident: | |||
|
1116 | # possibly routed to a specific engine | |||
|
1117 | if isinstance(ident, list): | |||
|
1118 | ident = ident[-1] | |||
|
1119 | if ident in self._engines.values(): | |||
|
1120 | # save for later, in case of engine death | |||
|
1121 | self._outstanding_dict[ident].add(msg_id) | |||
|
1122 | self.history.append(msg_id) | |||
|
1123 | self.metadata[msg_id]['submitted'] = datetime.now() | |||
|
1124 | ||||
|
1125 | return msg | |||
|
1126 | ||||
1069 | #-------------------------------------------------------------------------- |
|
1127 | #-------------------------------------------------------------------------- | |
1070 | # construct a View object |
|
1128 | # construct a View object | |
1071 | #-------------------------------------------------------------------------- |
|
1129 | #-------------------------------------------------------------------------- |
@@ -195,7 +195,7 b' class View(HasTraits):' | |||||
195 | @sync_results |
|
195 | @sync_results | |
196 | @save_ids |
|
196 | @save_ids | |
197 | def _really_apply(self, f, args, kwargs, block=None, **options): |
|
197 | def _really_apply(self, f, args, kwargs, block=None, **options): | |
198 |
"""wrapper for client.send_apply_ |
|
198 | """wrapper for client.send_apply_request""" | |
199 | raise NotImplementedError("Implement in subclasses") |
|
199 | raise NotImplementedError("Implement in subclasses") | |
200 |
|
200 | |||
201 | def apply(self, f, *args, **kwargs): |
|
201 | def apply(self, f, *args, **kwargs): | |
@@ -533,7 +533,7 b' class DirectView(View):' | |||||
533 | msg_ids = [] |
|
533 | msg_ids = [] | |
534 | trackers = [] |
|
534 | trackers = [] | |
535 | for ident in _idents: |
|
535 | for ident in _idents: | |
536 |
msg = self.client.send_apply_ |
|
536 | msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, | |
537 | ident=ident) |
|
537 | ident=ident) | |
538 | if track: |
|
538 | if track: | |
539 | trackers.append(msg['tracker']) |
|
539 | trackers.append(msg['tracker']) | |
@@ -547,6 +547,7 b' class DirectView(View):' | |||||
547 | pass |
|
547 | pass | |
548 | return ar |
|
548 | return ar | |
549 |
|
549 | |||
|
550 | ||||
550 | @spin_after |
|
551 | @spin_after | |
551 | def map(self, f, *sequences, **kwargs): |
|
552 | def map(self, f, *sequences, **kwargs): | |
552 | """view.map(f, *sequences, block=self.block) => list|AsyncMapResult |
|
553 | """view.map(f, *sequences, block=self.block) => list|AsyncMapResult | |
@@ -590,6 +591,8 b' class DirectView(View):' | |||||
590 | pf = ParallelFunction(self, f, block=block, **kwargs) |
|
591 | pf = ParallelFunction(self, f, block=block, **kwargs) | |
591 | return pf.map(*sequences) |
|
592 | return pf.map(*sequences) | |
592 |
|
593 | |||
|
594 | @sync_results | |||
|
595 | @save_ids | |||
593 | def execute(self, code, targets=None, block=None): |
|
596 | def execute(self, code, targets=None, block=None): | |
594 | """Executes `code` on `targets` in blocking or nonblocking manner. |
|
597 | """Executes `code` on `targets` in blocking or nonblocking manner. | |
595 |
|
598 | |||
@@ -604,6 +607,23 b' class DirectView(View):' | |||||
604 | whether or not to wait until done to return |
|
607 | whether or not to wait until done to return | |
605 | default: self.block |
|
608 | default: self.block | |
606 | """ |
|
609 | """ | |
|
610 | block = self.block if block is None else block | |||
|
611 | targets = self.targets if targets is None else targets | |||
|
612 | ||||
|
613 | _idents = self.client._build_targets(targets)[0] | |||
|
614 | msg_ids = [] | |||
|
615 | trackers = [] | |||
|
616 | for ident in _idents: | |||
|
617 | msg = self.client.send_execute_request(self._socket, code, ident=ident) | |||
|
618 | msg_ids.append(msg['header']['msg_id']) | |||
|
619 | ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets) | |||
|
620 | if block: | |||
|
621 | try: | |||
|
622 | ar.get() | |||
|
623 | except KeyboardInterrupt: | |||
|
624 | pass | |||
|
625 | return ar | |||
|
626 | ||||
607 | return self._really_apply(util._execute, args=(code,), block=block, targets=targets) |
|
627 | return self._really_apply(util._execute, args=(code,), block=block, targets=targets) | |
608 |
|
628 | |||
609 | def run(self, filename, targets=None, block=None): |
|
629 | def run(self, filename, targets=None, block=None): | |
@@ -996,7 +1016,7 b' class LoadBalancedView(View):' | |||||
996 | follow = self._render_dependency(follow) |
|
1016 | follow = self._render_dependency(follow) | |
997 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) |
|
1017 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) | |
998 |
|
1018 | |||
999 |
msg = self.client.send_apply_ |
|
1019 | msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, | |
1000 | subheader=subheader) |
|
1020 | subheader=subheader) | |
1001 | tracker = None if track is False else msg['tracker'] |
|
1021 | tracker = None if track is False else msg['tracker'] | |
1002 |
|
1022 |
General Comments 0
You need to be logged in to leave comments.
Login now