Show More
The requested changes are too big and content was truncated. Show full diff
@@ -270,8 +270,9 b' class Client(HasTraits):' | |||||
270 | _control_socket=Instance('zmq.Socket') |
|
270 | _control_socket=Instance('zmq.Socket') | |
271 | _iopub_socket=Instance('zmq.Socket') |
|
271 | _iopub_socket=Instance('zmq.Socket') | |
272 | _notification_socket=Instance('zmq.Socket') |
|
272 | _notification_socket=Instance('zmq.Socket') | |
273 |
_ |
|
273 | _apply_socket=Instance('zmq.Socket') | |
274 | _task_socket=Instance('zmq.Socket') |
|
274 | _mux_ident=Str() | |
|
275 | _task_ident=Str() | |||
275 | _task_scheme=Str() |
|
276 | _task_scheme=Str() | |
276 | _balanced_views=Dict() |
|
277 | _balanced_views=Dict() | |
277 | _direct_views=Dict() |
|
278 | _direct_views=Dict() | |
@@ -401,16 +402,16 b' class Client(HasTraits):' | |||||
401 | self._ids.append(eid) |
|
402 | self._ids.append(eid) | |
402 | self._ids = sorted(self._ids) |
|
403 | self._ids = sorted(self._ids) | |
403 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ |
|
404 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ | |
404 |
self._task_scheme == 'pure' and self._task_ |
|
405 | self._task_scheme == 'pure' and self._task_ident: | |
405 | self._stop_scheduling_tasks() |
|
406 | self._stop_scheduling_tasks() | |
406 |
|
407 | |||
407 | def _stop_scheduling_tasks(self): |
|
408 | def _stop_scheduling_tasks(self): | |
408 | """Stop scheduling tasks because an engine has been unregistered |
|
409 | """Stop scheduling tasks because an engine has been unregistered | |
409 | from a pure ZMQ scheduler. |
|
410 | from a pure ZMQ scheduler. | |
410 | """ |
|
411 | """ | |
411 |
|
412 | self._task_ident = '' | ||
412 | self._task_socket.close() |
|
413 | # self._task_socket.close() | |
413 | self._task_socket = None |
|
414 | # self._task_socket = None | |
414 | msg = "An engine has been unregistered, and we are using pure " +\ |
|
415 | msg = "An engine has been unregistered, and we are using pure " +\ | |
415 | "ZMQ task scheduling. Task farming will be disabled." |
|
416 | "ZMQ task scheduling. Task farming will be disabled." | |
416 | if self.outstanding: |
|
417 | if self.outstanding: | |
@@ -457,15 +458,18 b' class Client(HasTraits):' | |||||
457 | content = msg.content |
|
458 | content = msg.content | |
458 | self._config['registration'] = dict(content) |
|
459 | self._config['registration'] = dict(content) | |
459 | if content.status == 'ok': |
|
460 | if content.status == 'ok': | |
|
461 | self._apply_socket = self._context.socket(zmq.XREP) | |||
|
462 | self._apply_socket.setsockopt(zmq.IDENTITY, self.session.session) | |||
460 | if content.mux: |
|
463 | if content.mux: | |
461 | self._mux_socket = self._context.socket(zmq.XREQ) |
|
464 | # self._mux_socket = self._context.socket(zmq.XREQ) | |
462 | self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
465 | self._mux_ident = 'mux' | |
463 |
connect_socket(self._ |
|
466 | connect_socket(self._apply_socket, content.mux) | |
464 | if content.task: |
|
467 | if content.task: | |
465 | self._task_scheme, task_addr = content.task |
|
468 | self._task_scheme, task_addr = content.task | |
466 | self._task_socket = self._context.socket(zmq.XREQ) |
|
469 | # self._task_socket = self._context.socket(zmq.XREQ) | |
467 | self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
470 | # self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
468 |
connect_socket(self._ |
|
471 | connect_socket(self._apply_socket, task_addr) | |
|
472 | self._task_ident = 'task' | |||
469 | if content.notification: |
|
473 | if content.notification: | |
470 | self._notification_socket = self._context.socket(zmq.SUB) |
|
474 | self._notification_socket = self._context.socket(zmq.SUB) | |
471 | connect_socket(self._notification_socket, content.notification) |
|
475 | connect_socket(self._notification_socket, content.notification) | |
@@ -484,7 +488,8 b' class Client(HasTraits):' | |||||
484 | self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
488 | self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
485 | connect_socket(self._iopub_socket, content.iopub) |
|
489 | connect_socket(self._iopub_socket, content.iopub) | |
486 | self._update_engines(dict(content.engines)) |
|
490 | self._update_engines(dict(content.engines)) | |
487 |
|
491 | # give XREP apply_socket some time to connect | ||
|
492 | time.sleep(0.25) | |||
488 | else: |
|
493 | else: | |
489 | self._connected = False |
|
494 | self._connected = False | |
490 | raise Exception("Failed to connect!") |
|
495 | raise Exception("Failed to connect!") | |
@@ -496,7 +501,7 b' class Client(HasTraits):' | |||||
496 | def _unwrap_exception(self, content): |
|
501 | def _unwrap_exception(self, content): | |
497 | """unwrap exception, and remap engineid to int.""" |
|
502 | """unwrap exception, and remap engineid to int.""" | |
498 | e = error.unwrap_exception(content) |
|
503 | e = error.unwrap_exception(content) | |
499 | print e.traceback |
|
504 | # print e.traceback | |
500 | if e.engine_info: |
|
505 | if e.engine_info: | |
501 | e_uuid = e.engine_info['engine_uuid'] |
|
506 | e_uuid = e.engine_info['engine_uuid'] | |
502 | eid = self._engines[e_uuid] |
|
507 | eid = self._engines[e_uuid] | |
@@ -540,7 +545,7 b' class Client(HasTraits):' | |||||
540 |
|
545 | |||
541 | self._handle_stranded_msgs(eid, uuid) |
|
546 | self._handle_stranded_msgs(eid, uuid) | |
542 |
|
547 | |||
543 |
if self._task_ |
|
548 | if self._task_ident and self._task_scheme == 'pure': | |
544 | self._stop_scheduling_tasks() |
|
549 | self._stop_scheduling_tasks() | |
545 |
|
550 | |||
546 | def _handle_stranded_msgs(self, eid, uuid): |
|
551 | def _handle_stranded_msgs(self, eid, uuid): | |
@@ -725,10 +730,8 b' class Client(HasTraits):' | |||||
725 | """ |
|
730 | """ | |
726 | if self._notification_socket: |
|
731 | if self._notification_socket: | |
727 | self._flush_notifications() |
|
732 | self._flush_notifications() | |
728 |
if self._ |
|
733 | if self._apply_socket: | |
729 |
self._flush_results(self._ |
|
734 | self._flush_results(self._apply_socket) | |
730 | if self._task_socket: |
|
|||
731 | self._flush_results(self._task_socket) |
|
|||
732 | if self._control_socket: |
|
735 | if self._control_socket: | |
733 | self._flush_control(self._control_socket) |
|
736 | self._flush_control(self._control_socket) | |
734 | if self._iopub_socket: |
|
737 | if self._iopub_socket: | |
@@ -1030,6 +1033,12 b' class Client(HasTraits):' | |||||
1030 | args = args if args is not None else [] |
|
1033 | args = args if args is not None else [] | |
1031 | kwargs = kwargs if kwargs is not None else {} |
|
1034 | kwargs = kwargs if kwargs is not None else {} | |
1032 |
|
1035 | |||
|
1036 | if not self._ids: | |||
|
1037 | # flush notification socket if no engines yet | |||
|
1038 | any_ids = self.ids | |||
|
1039 | if not any_ids: | |||
|
1040 | raise error.NoEnginesRegistered("Can't execute without any connected engines.") | |||
|
1041 | ||||
1033 | if balanced is None: |
|
1042 | if balanced is None: | |
1034 | if targets is None: |
|
1043 | if targets is None: | |
1035 | # default to balanced if targets unspecified |
|
1044 | # default to balanced if targets unspecified | |
@@ -1074,7 +1083,7 b' class Client(HasTraits):' | |||||
1074 | for name in ('bound', 'block', 'track'): |
|
1083 | for name in ('bound', 'block', 'track'): | |
1075 | assert loc[name] is not None, "kwarg %r must be specified!"%name |
|
1084 | assert loc[name] is not None, "kwarg %r must be specified!"%name | |
1076 |
|
1085 | |||
1077 |
if self._task_ |
|
1086 | if not self._task_ident: | |
1078 | msg = "Task farming is disabled" |
|
1087 | msg = "Task farming is disabled" | |
1079 | if self._task_scheme == 'pure': |
|
1088 | if self._task_scheme == 'pure': | |
1080 | msg += " because the pure ZMQ scheduler cannot handle" |
|
1089 | msg += " because the pure ZMQ scheduler cannot handle" | |
@@ -1106,7 +1115,7 b' class Client(HasTraits):' | |||||
1106 | bufs = util.pack_apply_message(f,args,kwargs) |
|
1115 | bufs = util.pack_apply_message(f,args,kwargs) | |
1107 | content = dict(bound=bound) |
|
1116 | content = dict(bound=bound) | |
1108 |
|
1117 | |||
1109 |
msg = self.session.send(self._ |
|
1118 | msg = self.session.send(self._apply_socket, "apply_request", ident=self._task_ident, | |
1110 | content=content, buffers=bufs, subheader=subheader, track=track) |
|
1119 | content=content, buffers=bufs, subheader=subheader, track=track) | |
1111 | msg_id = msg['msg_id'] |
|
1120 | msg_id = msg['msg_id'] | |
1112 | self.outstanding.add(msg_id) |
|
1121 | self.outstanding.add(msg_id) | |
@@ -1130,6 +1139,11 b' class Client(HasTraits):' | |||||
1130 | This is a private method, see `apply` for details. |
|
1139 | This is a private method, see `apply` for details. | |
1131 | Not to be called directly! |
|
1140 | Not to be called directly! | |
1132 | """ |
|
1141 | """ | |
|
1142 | ||||
|
1143 | if not self._mux_ident: | |||
|
1144 | msg = "Multiplexing is disabled" | |||
|
1145 | raise RuntimeError(msg) | |||
|
1146 | ||||
1133 | loc = locals() |
|
1147 | loc = locals() | |
1134 | for name in ('bound', 'block', 'targets', 'track'): |
|
1148 | for name in ('bound', 'block', 'targets', 'track'): | |
1135 | assert loc[name] is not None, "kwarg %r must be specified!"%name |
|
1149 | assert loc[name] is not None, "kwarg %r must be specified!"%name | |
@@ -1143,8 +1157,8 b' class Client(HasTraits):' | |||||
1143 | msg_ids = [] |
|
1157 | msg_ids = [] | |
1144 | trackers = [] |
|
1158 | trackers = [] | |
1145 | for ident in idents: |
|
1159 | for ident in idents: | |
1146 |
msg = self.session.send(self._ |
|
1160 | msg = self.session.send(self._apply_socket, "apply_request", | |
1147 | content=content, buffers=bufs, ident=ident, subheader=subheader, |
|
1161 | content=content, buffers=bufs, ident=[self._mux_ident, ident], subheader=subheader, | |
1148 | track=track) |
|
1162 | track=track) | |
1149 | if track: |
|
1163 | if track: | |
1150 | trackers.append(msg['tracker']) |
|
1164 | trackers.append(msg['tracker']) |
@@ -80,6 +80,7 b' class ControllerFactory(HubFactory):' | |||||
80 | # Multiplexer Queue (in a Process) |
|
80 | # Multiplexer Queue (in a Process) | |
81 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') |
|
81 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') | |
82 | q.bind_in(self.client_info['mux']) |
|
82 | q.bind_in(self.client_info['mux']) | |
|
83 | q.setsockopt_in(zmq.IDENTITY, 'mux') | |||
83 | q.bind_out(self.engine_info['mux']) |
|
84 | q.bind_out(self.engine_info['mux']) | |
84 | q.connect_mon(maybe_inproc) |
|
85 | q.connect_mon(maybe_inproc) | |
85 | q.daemon=True |
|
86 | q.daemon=True | |
@@ -88,6 +89,7 b' class ControllerFactory(HubFactory):' | |||||
88 | # Control Queue (in a Process) |
|
89 | # Control Queue (in a Process) | |
89 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') |
|
90 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') | |
90 | q.bind_in(self.client_info['control']) |
|
91 | q.bind_in(self.client_info['control']) | |
|
92 | q.setsockopt_in(zmq.IDENTITY, 'control') | |||
91 | q.bind_out(self.engine_info['control']) |
|
93 | q.bind_out(self.engine_info['control']) | |
92 | q.connect_mon(maybe_inproc) |
|
94 | q.connect_mon(maybe_inproc) | |
93 | q.daemon=True |
|
95 | q.daemon=True | |
@@ -98,6 +100,7 b' class ControllerFactory(HubFactory):' | |||||
98 | q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') |
|
100 | q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') | |
99 | q.setsockopt_out(zmq.HWM, self.hwm) |
|
101 | q.setsockopt_out(zmq.HWM, self.hwm) | |
100 | q.bind_in(self.client_info['task'][1]) |
|
102 | q.bind_in(self.client_info['task'][1]) | |
|
103 | q.setsockopt_in(zmq.IDENTITY, 'task') | |||
101 | q.bind_out(self.engine_info['task']) |
|
104 | q.bind_out(self.engine_info['task']) | |
102 | q.connect_mon(maybe_inproc) |
|
105 | q.connect_mon(maybe_inproc) | |
103 | q.daemon=True |
|
106 | q.daemon=True |
@@ -544,7 +544,8 b' class TaskScheduler(SessionFactory):' | |||||
544 |
|
544 | |||
545 |
|
545 | |||
546 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', |
|
546 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', | |
547 |
log_addr=None, loglevel=logging.DEBUG, scheme='lru' |
|
547 | log_addr=None, loglevel=logging.DEBUG, scheme='lru', | |
|
548 | identity=b'task'): | |||
548 | from zmq.eventloop import ioloop |
|
549 | from zmq.eventloop import ioloop | |
549 | from zmq.eventloop.zmqstream import ZMQStream |
|
550 | from zmq.eventloop.zmqstream import ZMQStream | |
550 |
|
551 | |||
@@ -552,8 +553,11 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname=' | |||||
552 | loop = ioloop.IOLoop() |
|
553 | loop = ioloop.IOLoop() | |
553 | print (in_addr, out_addr, mon_addr, not_addr) |
|
554 | print (in_addr, out_addr, mon_addr, not_addr) | |
554 | ins = ZMQStream(ctx.socket(zmq.XREP),loop) |
|
555 | ins = ZMQStream(ctx.socket(zmq.XREP),loop) | |
|
556 | ins.setsockopt(zmq.IDENTITY, identity) | |||
555 | ins.bind(in_addr) |
|
557 | ins.bind(in_addr) | |
|
558 | ||||
556 | outs = ZMQStream(ctx.socket(zmq.XREP),loop) |
|
559 | outs = ZMQStream(ctx.socket(zmq.XREP),loop) | |
|
560 | outs.setsockopt(zmq.IDENTITY, identity) | |||
557 | outs.bind(out_addr) |
|
561 | outs.bind(out_addr) | |
558 | mons = ZMQStream(ctx.socket(zmq.PUB),loop) |
|
562 | mons = ZMQStream(ctx.socket(zmq.PUB),loop) | |
559 | mons.connect(mon_addr) |
|
563 | mons.connect(mon_addr) |
@@ -12,7 +12,7 b' blackhole = tempfile.TemporaryFile()' | |||||
12 | # nose setup/teardown |
|
12 | # nose setup/teardown | |
13 |
|
13 | |||
14 | def setup(): |
|
14 | def setup(): | |
15 |
cp = Popen('ipcontrollerz --profile iptest -r --log-level |
|
15 | cp = Popen('ipcontrollerz --profile iptest -r --log-level 10 --log-to-file'.split(), stdout=blackhole, stderr=STDOUT) | |
16 | processes.append(cp) |
|
16 | processes.append(cp) | |
17 | time.sleep(.5) |
|
17 | time.sleep(.5) | |
18 | add_engine() |
|
18 | add_engine() | |
@@ -22,7 +22,7 b' def setup():' | |||||
22 | c.spin() |
|
22 | c.spin() | |
23 |
|
23 | |||
24 | def add_engine(profile='iptest'): |
|
24 | def add_engine(profile='iptest'): | |
25 |
ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', ' |
|
25 | ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT) | |
26 | # ep.start() |
|
26 | # ep.start() | |
27 | processes.append(ep) |
|
27 | processes.append(ep) | |
28 | return ep |
|
28 | return ep |
@@ -1,3 +1,5 b'' | |||||
|
1 | import sys | |||
|
2 | import tempfile | |||
1 | import time |
|
3 | import time | |
2 | from signal import SIGINT |
|
4 | from signal import SIGINT | |
3 | from multiprocessing import Process |
|
5 | from multiprocessing import Process | |
@@ -62,7 +64,7 b' class ClusterTestCase(BaseZMQTestCase):' | |||||
62 | while time.time()-tic < timeout and len(self.client.ids) < n: |
|
64 | while time.time()-tic < timeout and len(self.client.ids) < n: | |
63 | time.sleep(0.1) |
|
65 | time.sleep(0.1) | |
64 |
|
66 | |||
65 | assert not self.client.ids < n, "waiting for engines timed out" |
|
67 | assert not len(self.client.ids) < n, "waiting for engines timed out" | |
66 |
|
68 | |||
67 | def connect_client(self): |
|
69 | def connect_client(self): | |
68 | """connect a client with my Context, and track its sockets for cleanup""" |
|
70 | """connect a client with my Context, and track its sockets for cleanup""" | |
@@ -89,12 +91,15 b' class ClusterTestCase(BaseZMQTestCase):' | |||||
89 | self.engines=[] |
|
91 | self.engines=[] | |
90 |
|
92 | |||
91 | def tearDown(self): |
|
93 | def tearDown(self): | |
|
94 | ||||
|
95 | # close fds: | |||
|
96 | for e in filter(lambda e: e.poll() is not None, processes): | |||
|
97 | processes.remove(e) | |||
|
98 | ||||
92 | self.client.close() |
|
99 | self.client.close() | |
93 | BaseZMQTestCase.tearDown(self) |
|
100 | BaseZMQTestCase.tearDown(self) | |
94 | # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ] |
|
101 | # this will be superfluous when pyzmq merges PR #88 | |
95 | # [ e.wait() for e in self.engines ] |
|
102 | self.context.term() | |
96 | # while len(self.client.ids) > self.base_engine_count: |
|
103 | print tempfile.TemporaryFile().fileno(), | |
97 | # time.sleep(.1) |
|
104 | sys.stdout.flush() | |
98 | # del self.engines |
|
|||
99 | # BaseZMQTestCase.tearDown(self) |
|
|||
100 | No newline at end of file |
|
105 |
@@ -17,7 +17,6 b' class TestClient(ClusterTestCase):' | |||||
17 | n = len(self.client.ids) |
|
17 | n = len(self.client.ids) | |
18 | self.add_engines(3) |
|
18 | self.add_engines(3) | |
19 | self.assertEquals(len(self.client.ids), n+3) |
|
19 | self.assertEquals(len(self.client.ids), n+3) | |
20 | self.assertTrue |
|
|||
21 |
|
20 | |||
22 | def test_segfault_task(self): |
|
21 | def test_segfault_task(self): | |
23 | """test graceful handling of engine death (balanced)""" |
|
22 | """test graceful handling of engine death (balanced)""" | |
@@ -179,8 +178,9 b' class TestClient(ClusterTestCase):' | |||||
179 | def test_get_result(self): |
|
178 | def test_get_result(self): | |
180 | """test getting results from the Hub.""" |
|
179 | """test getting results from the Hub.""" | |
181 | c = clientmod.Client(profile='iptest') |
|
180 | c = clientmod.Client(profile='iptest') | |
182 |
|
|
181 | self.add_engines(1) | |
183 | ar = c.apply(wait, (1,), block=False, targets=t) |
|
182 | ar = c.apply(wait, (1,), block=False, targets=t) | |
|
183 | # give the monitor time to notice the message | |||
184 | time.sleep(.25) |
|
184 | time.sleep(.25) | |
185 | ahr = self.client.get_result(ar.msg_ids) |
|
185 | ahr = self.client.get_result(ar.msg_ids) | |
186 | self.assertTrue(isinstance(ahr, AsyncHubResult)) |
|
186 | self.assertTrue(isinstance(ahr, AsyncHubResult)) |
1 | NO CONTENT: modified file, binary diff hidden |
|
NO CONTENT: modified file, binary diff hidden |
1 | NO CONTENT: modified file |
|
NO CONTENT: modified file | ||
The requested commit or file is too big and content was truncated. Show full diff |
1 | NO CONTENT: modified file, binary diff hidden |
|
NO CONTENT: modified file, binary diff hidden |
1 | NO CONTENT: modified file, binary diff hidden |
|
NO CONTENT: modified file, binary diff hidden |
1 | NO CONTENT: modified file, binary diff hidden |
|
NO CONTENT: modified file, binary diff hidden |
1 | NO CONTENT: modified file, binary diff hidden |
|
NO CONTENT: modified file, binary diff hidden |
1 | NO CONTENT: modified file |
|
NO CONTENT: modified file | ||
The requested commit or file is too big and content was truncated. Show full diff |
1 | NO CONTENT: modified file |
|
NO CONTENT: modified file | ||
The requested commit or file is too big and content was truncated. Show full diff |
1 | NO CONTENT: file was removed, binary diff hidden |
|
NO CONTENT: file was removed, binary diff hidden |
1 | NO CONTENT: file was removed, binary diff hidden |
|
NO CONTENT: file was removed, binary diff hidden |
General Comments 0
You need to be logged in to leave comments.
Login now