Show More
The requested changes are too big and content was truncated. Show full diff
@@ -270,8 +270,9 b' class Client(HasTraits):' | |||
|
270 | 270 | _control_socket=Instance('zmq.Socket') |
|
271 | 271 | _iopub_socket=Instance('zmq.Socket') |
|
272 | 272 | _notification_socket=Instance('zmq.Socket') |
|
273 |
_ |
|
|
274 | _task_socket=Instance('zmq.Socket') | |
|
273 | _apply_socket=Instance('zmq.Socket') | |
|
274 | _mux_ident=Str() | |
|
275 | _task_ident=Str() | |
|
275 | 276 | _task_scheme=Str() |
|
276 | 277 | _balanced_views=Dict() |
|
277 | 278 | _direct_views=Dict() |
@@ -401,16 +402,16 b' class Client(HasTraits):' | |||
|
401 | 402 | self._ids.append(eid) |
|
402 | 403 | self._ids = sorted(self._ids) |
|
403 | 404 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ |
|
404 |
self._task_scheme == 'pure' and self._task_ |
|
|
405 | self._task_scheme == 'pure' and self._task_ident: | |
|
405 | 406 | self._stop_scheduling_tasks() |
|
406 | 407 | |
|
407 | 408 | def _stop_scheduling_tasks(self): |
|
408 | 409 | """Stop scheduling tasks because an engine has been unregistered |
|
409 | 410 | from a pure ZMQ scheduler. |
|
410 | 411 | """ |
|
411 | ||
|
412 | self._task_socket.close() | |
|
413 | self._task_socket = None | |
|
412 | self._task_ident = '' | |
|
413 | # self._task_socket.close() | |
|
414 | # self._task_socket = None | |
|
414 | 415 | msg = "An engine has been unregistered, and we are using pure " +\ |
|
415 | 416 | "ZMQ task scheduling. Task farming will be disabled." |
|
416 | 417 | if self.outstanding: |
@@ -457,15 +458,18 b' class Client(HasTraits):' | |||
|
457 | 458 | content = msg.content |
|
458 | 459 | self._config['registration'] = dict(content) |
|
459 | 460 | if content.status == 'ok': |
|
461 | self._apply_socket = self._context.socket(zmq.XREP) | |
|
462 | self._apply_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
460 | 463 | if content.mux: |
|
461 | self._mux_socket = self._context.socket(zmq.XREQ) | |
|
462 | self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
463 |
connect_socket(self._ |
|
|
464 | # self._mux_socket = self._context.socket(zmq.XREQ) | |
|
465 | self._mux_ident = 'mux' | |
|
466 | connect_socket(self._apply_socket, content.mux) | |
|
464 | 467 | if content.task: |
|
465 | 468 | self._task_scheme, task_addr = content.task |
|
466 | self._task_socket = self._context.socket(zmq.XREQ) | |
|
467 | self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
468 |
connect_socket(self._ |
|
|
469 | # self._task_socket = self._context.socket(zmq.XREQ) | |
|
470 | # self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
471 | connect_socket(self._apply_socket, task_addr) | |
|
472 | self._task_ident = 'task' | |
|
469 | 473 | if content.notification: |
|
470 | 474 | self._notification_socket = self._context.socket(zmq.SUB) |
|
471 | 475 | connect_socket(self._notification_socket, content.notification) |
@@ -484,7 +488,8 b' class Client(HasTraits):' | |||
|
484 | 488 | self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
485 | 489 | connect_socket(self._iopub_socket, content.iopub) |
|
486 | 490 | self._update_engines(dict(content.engines)) |
|
487 | ||
|
491 | # give XREP apply_socket some time to connect | |
|
492 | time.sleep(0.25) | |
|
488 | 493 | else: |
|
489 | 494 | self._connected = False |
|
490 | 495 | raise Exception("Failed to connect!") |
@@ -496,7 +501,7 b' class Client(HasTraits):' | |||
|
496 | 501 | def _unwrap_exception(self, content): |
|
497 | 502 | """unwrap exception, and remap engineid to int.""" |
|
498 | 503 | e = error.unwrap_exception(content) |
|
499 | print e.traceback | |
|
504 | # print e.traceback | |
|
500 | 505 | if e.engine_info: |
|
501 | 506 | e_uuid = e.engine_info['engine_uuid'] |
|
502 | 507 | eid = self._engines[e_uuid] |
@@ -540,7 +545,7 b' class Client(HasTraits):' | |||
|
540 | 545 | |
|
541 | 546 | self._handle_stranded_msgs(eid, uuid) |
|
542 | 547 | |
|
543 |
if self._task_ |
|
|
548 | if self._task_ident and self._task_scheme == 'pure': | |
|
544 | 549 | self._stop_scheduling_tasks() |
|
545 | 550 | |
|
546 | 551 | def _handle_stranded_msgs(self, eid, uuid): |
@@ -725,10 +730,8 b' class Client(HasTraits):' | |||
|
725 | 730 | """ |
|
726 | 731 | if self._notification_socket: |
|
727 | 732 | self._flush_notifications() |
|
728 |
if self._ |
|
|
729 |
self._flush_results(self._ |
|
|
730 | if self._task_socket: | |
|
731 | self._flush_results(self._task_socket) | |
|
733 | if self._apply_socket: | |
|
734 | self._flush_results(self._apply_socket) | |
|
732 | 735 | if self._control_socket: |
|
733 | 736 | self._flush_control(self._control_socket) |
|
734 | 737 | if self._iopub_socket: |
@@ -1030,6 +1033,12 b' class Client(HasTraits):' | |||
|
1030 | 1033 | args = args if args is not None else [] |
|
1031 | 1034 | kwargs = kwargs if kwargs is not None else {} |
|
1032 | 1035 | |
|
1036 | if not self._ids: | |
|
1037 | # flush notification socket if no engines yet | |
|
1038 | any_ids = self.ids | |
|
1039 | if not any_ids: | |
|
1040 | raise error.NoEnginesRegistered("Can't execute without any connected engines.") | |
|
1041 | ||
|
1033 | 1042 | if balanced is None: |
|
1034 | 1043 | if targets is None: |
|
1035 | 1044 | # default to balanced if targets unspecified |
@@ -1074,7 +1083,7 b' class Client(HasTraits):' | |||
|
1074 | 1083 | for name in ('bound', 'block', 'track'): |
|
1075 | 1084 | assert loc[name] is not None, "kwarg %r must be specified!"%name |
|
1076 | 1085 | |
|
1077 |
if self._task_ |
|
|
1086 | if not self._task_ident: | |
|
1078 | 1087 | msg = "Task farming is disabled" |
|
1079 | 1088 | if self._task_scheme == 'pure': |
|
1080 | 1089 | msg += " because the pure ZMQ scheduler cannot handle" |
@@ -1106,7 +1115,7 b' class Client(HasTraits):' | |||
|
1106 | 1115 | bufs = util.pack_apply_message(f,args,kwargs) |
|
1107 | 1116 | content = dict(bound=bound) |
|
1108 | 1117 | |
|
1109 |
msg = self.session.send(self._ |
|
|
1118 | msg = self.session.send(self._apply_socket, "apply_request", ident=self._task_ident, | |
|
1110 | 1119 | content=content, buffers=bufs, subheader=subheader, track=track) |
|
1111 | 1120 | msg_id = msg['msg_id'] |
|
1112 | 1121 | self.outstanding.add(msg_id) |
@@ -1130,6 +1139,11 b' class Client(HasTraits):' | |||
|
1130 | 1139 | This is a private method, see `apply` for details. |
|
1131 | 1140 | Not to be called directly! |
|
1132 | 1141 | """ |
|
1142 | ||
|
1143 | if not self._mux_ident: | |
|
1144 | msg = "Multiplexing is disabled" | |
|
1145 | raise RuntimeError(msg) | |
|
1146 | ||
|
1133 | 1147 | loc = locals() |
|
1134 | 1148 | for name in ('bound', 'block', 'targets', 'track'): |
|
1135 | 1149 | assert loc[name] is not None, "kwarg %r must be specified!"%name |
@@ -1143,8 +1157,8 b' class Client(HasTraits):' | |||
|
1143 | 1157 | msg_ids = [] |
|
1144 | 1158 | trackers = [] |
|
1145 | 1159 | for ident in idents: |
|
1146 |
msg = self.session.send(self._ |
|
|
1147 | content=content, buffers=bufs, ident=ident, subheader=subheader, | |
|
1160 | msg = self.session.send(self._apply_socket, "apply_request", | |
|
1161 | content=content, buffers=bufs, ident=[self._mux_ident, ident], subheader=subheader, | |
|
1148 | 1162 | track=track) |
|
1149 | 1163 | if track: |
|
1150 | 1164 | trackers.append(msg['tracker']) |
@@ -80,6 +80,7 b' class ControllerFactory(HubFactory):' | |||
|
80 | 80 | # Multiplexer Queue (in a Process) |
|
81 | 81 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') |
|
82 | 82 | q.bind_in(self.client_info['mux']) |
|
83 | q.setsockopt_in(zmq.IDENTITY, 'mux') | |
|
83 | 84 | q.bind_out(self.engine_info['mux']) |
|
84 | 85 | q.connect_mon(maybe_inproc) |
|
85 | 86 | q.daemon=True |
@@ -88,6 +89,7 b' class ControllerFactory(HubFactory):' | |||
|
88 | 89 | # Control Queue (in a Process) |
|
89 | 90 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') |
|
90 | 91 | q.bind_in(self.client_info['control']) |
|
92 | q.setsockopt_in(zmq.IDENTITY, 'control') | |
|
91 | 93 | q.bind_out(self.engine_info['control']) |
|
92 | 94 | q.connect_mon(maybe_inproc) |
|
93 | 95 | q.daemon=True |
@@ -98,6 +100,7 b' class ControllerFactory(HubFactory):' | |||
|
98 | 100 | q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') |
|
99 | 101 | q.setsockopt_out(zmq.HWM, self.hwm) |
|
100 | 102 | q.bind_in(self.client_info['task'][1]) |
|
103 | q.setsockopt_in(zmq.IDENTITY, 'task') | |
|
101 | 104 | q.bind_out(self.engine_info['task']) |
|
102 | 105 | q.connect_mon(maybe_inproc) |
|
103 | 106 | q.daemon=True |
@@ -544,7 +544,8 b' class TaskScheduler(SessionFactory):' | |||
|
544 | 544 | |
|
545 | 545 | |
|
546 | 546 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', |
|
547 |
log_addr=None, loglevel=logging.DEBUG, scheme='lru' |
|
|
547 | log_addr=None, loglevel=logging.DEBUG, scheme='lru', | |
|
548 | identity=b'task'): | |
|
548 | 549 | from zmq.eventloop import ioloop |
|
549 | 550 | from zmq.eventloop.zmqstream import ZMQStream |
|
550 | 551 | |
@@ -552,8 +553,11 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname=' | |||
|
552 | 553 | loop = ioloop.IOLoop() |
|
553 | 554 | print (in_addr, out_addr, mon_addr, not_addr) |
|
554 | 555 | ins = ZMQStream(ctx.socket(zmq.XREP),loop) |
|
556 | ins.setsockopt(zmq.IDENTITY, identity) | |
|
555 | 557 | ins.bind(in_addr) |
|
558 | ||
|
556 | 559 | outs = ZMQStream(ctx.socket(zmq.XREP),loop) |
|
560 | outs.setsockopt(zmq.IDENTITY, identity) | |
|
557 | 561 | outs.bind(out_addr) |
|
558 | 562 | mons = ZMQStream(ctx.socket(zmq.PUB),loop) |
|
559 | 563 | mons.connect(mon_addr) |
@@ -12,7 +12,7 b' blackhole = tempfile.TemporaryFile()' | |||
|
12 | 12 | # nose setup/teardown |
|
13 | 13 | |
|
14 | 14 | def setup(): |
|
15 |
cp = Popen('ipcontrollerz --profile iptest -r --log-level |
|
|
15 | cp = Popen('ipcontrollerz --profile iptest -r --log-level 10 --log-to-file'.split(), stdout=blackhole, stderr=STDOUT) | |
|
16 | 16 | processes.append(cp) |
|
17 | 17 | time.sleep(.5) |
|
18 | 18 | add_engine() |
@@ -22,7 +22,7 b' def setup():' | |||
|
22 | 22 | c.spin() |
|
23 | 23 | |
|
24 | 24 | def add_engine(profile='iptest'): |
|
25 |
ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', ' |
|
|
25 | ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT) | |
|
26 | 26 | # ep.start() |
|
27 | 27 | processes.append(ep) |
|
28 | 28 | return ep |
@@ -1,3 +1,5 b'' | |||
|
1 | import sys | |
|
2 | import tempfile | |
|
1 | 3 | import time |
|
2 | 4 | from signal import SIGINT |
|
3 | 5 | from multiprocessing import Process |
@@ -62,7 +64,7 b' class ClusterTestCase(BaseZMQTestCase):' | |||
|
62 | 64 | while time.time()-tic < timeout and len(self.client.ids) < n: |
|
63 | 65 | time.sleep(0.1) |
|
64 | 66 | |
|
65 | assert not self.client.ids < n, "waiting for engines timed out" | |
|
67 | assert not len(self.client.ids) < n, "waiting for engines timed out" | |
|
66 | 68 | |
|
67 | 69 | def connect_client(self): |
|
68 | 70 | """connect a client with my Context, and track its sockets for cleanup""" |
@@ -89,12 +91,15 b' class ClusterTestCase(BaseZMQTestCase):' | |||
|
89 | 91 | self.engines=[] |
|
90 | 92 | |
|
91 | 93 | def tearDown(self): |
|
94 | ||
|
95 | # close fds: | |
|
96 | for e in filter(lambda e: e.poll() is not None, processes): | |
|
97 | processes.remove(e) | |
|
98 | ||
|
92 | 99 | self.client.close() |
|
93 | 100 | BaseZMQTestCase.tearDown(self) |
|
94 | # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ] | |
|
95 | # [ e.wait() for e in self.engines ] | |
|
96 | # while len(self.client.ids) > self.base_engine_count: | |
|
97 | # time.sleep(.1) | |
|
98 | # del self.engines | |
|
99 | # BaseZMQTestCase.tearDown(self) | |
|
101 | # this will be superfluous when pyzmq merges PR #88 | |
|
102 | self.context.term() | |
|
103 | print tempfile.TemporaryFile().fileno(), | |
|
104 | sys.stdout.flush() | |
|
100 | 105 | No newline at end of file |
@@ -17,7 +17,6 b' class TestClient(ClusterTestCase):' | |||
|
17 | 17 | n = len(self.client.ids) |
|
18 | 18 | self.add_engines(3) |
|
19 | 19 | self.assertEquals(len(self.client.ids), n+3) |
|
20 | self.assertTrue | |
|
21 | 20 | |
|
22 | 21 | def test_segfault_task(self): |
|
23 | 22 | """test graceful handling of engine death (balanced)""" |
@@ -179,8 +178,9 b' class TestClient(ClusterTestCase):' | |||
|
179 | 178 | def test_get_result(self): |
|
180 | 179 | """test getting results from the Hub.""" |
|
181 | 180 | c = clientmod.Client(profile='iptest') |
|
182 |
|
|
|
181 | self.add_engines(1) | |
|
183 | 182 | ar = c.apply(wait, (1,), block=False, targets=t) |
|
183 | # give the monitor time to notice the message | |
|
184 | 184 | time.sleep(.25) |
|
185 | 185 | ahr = self.client.get_result(ar.msg_ids) |
|
186 | 186 | self.assertTrue(isinstance(ahr, AsyncHubResult)) |
|
1 | NO CONTENT: modified file, binary diff hidden |
|
1 | NO CONTENT: modified file | |
The requested commit or file is too big and content was truncated. Show full diff |
|
1 | NO CONTENT: modified file, binary diff hidden |
|
1 | NO CONTENT: modified file, binary diff hidden |
|
1 | NO CONTENT: modified file, binary diff hidden |
|
1 | NO CONTENT: modified file, binary diff hidden |
|
1 | NO CONTENT: modified file | |
The requested commit or file is too big and content was truncated. Show full diff |
|
1 | NO CONTENT: modified file | |
The requested commit or file is too big and content was truncated. Show full diff |
|
1 | NO CONTENT: file was removed, binary diff hidden |
|
1 | NO CONTENT: file was removed, binary diff hidden |
General Comments 0
You need to be logged in to leave comments.
Login now