##// END OF EJS Templates
use execute_request for parallel execute, instead of apply
MinRK -
Show More
@@ -628,7 +628,30 b' class Client(HasTraits):'
628 print ("got unknown result: %s"%msg_id)
628 print ("got unknown result: %s"%msg_id)
629 else:
629 else:
630 self.outstanding.remove(msg_id)
630 self.outstanding.remove(msg_id)
631 self.results[msg_id] = self._unwrap_exception(msg['content'])
631
632 content = msg['content']
633 header = msg['header']
634
635 # construct metadata:
636 md = self.metadata[msg_id]
637 md.update(self._extract_metadata(header, parent, content))
638 # is this redundant?
639 self.metadata[msg_id] = md
640
641 e_outstanding = self._outstanding_dict[md['engine_uuid']]
642 if msg_id in e_outstanding:
643 e_outstanding.remove(msg_id)
644
645 # construct result:
646 if content['status'] == 'ok':
647 self.results[msg_id] = content
648 elif content['status'] == 'aborted':
649 self.results[msg_id] = error.TaskAborted(msg_id)
650 elif content['status'] == 'resubmitted':
651 # TODO: handle resubmission
652 pass
653 else:
654 self.results[msg_id] = self._unwrap_exception(content)
632
655
633 def _handle_apply_reply(self, msg):
656 def _handle_apply_reply(self, msg):
634 """Save the reply to an apply_request into our results."""
657 """Save the reply to an apply_request into our results."""
@@ -1024,7 +1047,7 b' class Client(HasTraits):'
1024
1047
1025 return result
1048 return result
1026
1049
1027 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1050 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1028 ident=None):
1051 ident=None):
1029 """construct and send an apply message via a socket.
1052 """construct and send an apply message via a socket.
1030
1053
@@ -1066,6 +1089,41 b' class Client(HasTraits):'
1066
1089
1067 return msg
1090 return msg
1068
1091
1092 def send_execute_request(self, socket, code, silent=False, subheader=None, ident=None):
1093 """construct and send an execute request via a socket.
1094
1095 """
1096
1097 assert not self._closed, "cannot use me anymore, I'm closed!"
1098 # defaults:
1099 subheader = subheader if subheader is not None else {}
1100
1101 # validate arguments
1102 if not isinstance(code, basestring):
1103 raise TypeError("code must be text, not %s" % type(code))
1104 if not isinstance(subheader, dict):
1105 raise TypeError("subheader must be dict, not %s" % type(subheader))
1106
1107 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1108
1109
1110 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1111 subheader=subheader)
1112
1113 msg_id = msg['header']['msg_id']
1114 self.outstanding.add(msg_id)
1115 if ident:
1116 # possibly routed to a specific engine
1117 if isinstance(ident, list):
1118 ident = ident[-1]
1119 if ident in self._engines.values():
1120 # save for later, in case of engine death
1121 self._outstanding_dict[ident].add(msg_id)
1122 self.history.append(msg_id)
1123 self.metadata[msg_id]['submitted'] = datetime.now()
1124
1125 return msg
1126
1069 #--------------------------------------------------------------------------
1127 #--------------------------------------------------------------------------
1070 # construct a View object
1128 # construct a View object
1071 #--------------------------------------------------------------------------
1129 #--------------------------------------------------------------------------
@@ -195,7 +195,7 b' class View(HasTraits):'
195 @sync_results
195 @sync_results
196 @save_ids
196 @save_ids
197 def _really_apply(self, f, args, kwargs, block=None, **options):
197 def _really_apply(self, f, args, kwargs, block=None, **options):
198 """wrapper for client.send_apply_message"""
198 """wrapper for client.send_apply_request"""
199 raise NotImplementedError("Implement in subclasses")
199 raise NotImplementedError("Implement in subclasses")
200
200
201 def apply(self, f, *args, **kwargs):
201 def apply(self, f, *args, **kwargs):
@@ -533,7 +533,7 b' class DirectView(View):'
533 msg_ids = []
533 msg_ids = []
534 trackers = []
534 trackers = []
535 for ident in _idents:
535 for ident in _idents:
536 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
536 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
537 ident=ident)
537 ident=ident)
538 if track:
538 if track:
539 trackers.append(msg['tracker'])
539 trackers.append(msg['tracker'])
@@ -547,6 +547,7 b' class DirectView(View):'
547 pass
547 pass
548 return ar
548 return ar
549
549
550
550 @spin_after
551 @spin_after
551 def map(self, f, *sequences, **kwargs):
552 def map(self, f, *sequences, **kwargs):
552 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
553 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
@@ -590,6 +591,8 b' class DirectView(View):'
590 pf = ParallelFunction(self, f, block=block, **kwargs)
591 pf = ParallelFunction(self, f, block=block, **kwargs)
591 return pf.map(*sequences)
592 return pf.map(*sequences)
592
593
594 @sync_results
595 @save_ids
593 def execute(self, code, targets=None, block=None):
596 def execute(self, code, targets=None, block=None):
594 """Executes `code` on `targets` in blocking or nonblocking manner.
597 """Executes `code` on `targets` in blocking or nonblocking manner.
595
598
@@ -604,6 +607,23 b' class DirectView(View):'
604 whether or not to wait until done to return
607 whether or not to wait until done to return
605 default: self.block
608 default: self.block
606 """
609 """
610 block = self.block if block is None else block
611 targets = self.targets if targets is None else targets
612
613 _idents = self.client._build_targets(targets)[0]
614 msg_ids = []
615 trackers = []
616 for ident in _idents:
617 msg = self.client.send_execute_request(self._socket, code, ident=ident)
618 msg_ids.append(msg['header']['msg_id'])
619 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets)
620 if block:
621 try:
622 ar.get()
623 except KeyboardInterrupt:
624 pass
625 return ar
626
607 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
627 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
608
628
609 def run(self, filename, targets=None, block=None):
629 def run(self, filename, targets=None, block=None):
@@ -996,7 +1016,7 b' class LoadBalancedView(View):'
996 follow = self._render_dependency(follow)
1016 follow = self._render_dependency(follow)
997 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1017 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
998
1018
999 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
1019 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1000 subheader=subheader)
1020 subheader=subheader)
1001 tracker = None if track is False else msg['tracker']
1021 tracker = None if track is False else msg['tracker']
1002
1022
General Comments 0
You need to be logged in to leave comments. Login now