##// END OF EJS Templates
update connections and diagrams for reduced sockets
MinRK -
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -270,8 +270,9 b' class Client(HasTraits):'
270 _control_socket=Instance('zmq.Socket')
270 _control_socket=Instance('zmq.Socket')
271 _iopub_socket=Instance('zmq.Socket')
271 _iopub_socket=Instance('zmq.Socket')
272 _notification_socket=Instance('zmq.Socket')
272 _notification_socket=Instance('zmq.Socket')
273 _mux_socket=Instance('zmq.Socket')
273 _apply_socket=Instance('zmq.Socket')
274 _task_socket=Instance('zmq.Socket')
274 _mux_ident=Str()
275 _task_ident=Str()
275 _task_scheme=Str()
276 _task_scheme=Str()
276 _balanced_views=Dict()
277 _balanced_views=Dict()
277 _direct_views=Dict()
278 _direct_views=Dict()
@@ -401,16 +402,16 b' class Client(HasTraits):'
401 self._ids.append(eid)
402 self._ids.append(eid)
402 self._ids = sorted(self._ids)
403 self._ids = sorted(self._ids)
403 if sorted(self._engines.keys()) != range(len(self._engines)) and \
404 if sorted(self._engines.keys()) != range(len(self._engines)) and \
404 self._task_scheme == 'pure' and self._task_socket:
405 self._task_scheme == 'pure' and self._task_ident:
405 self._stop_scheduling_tasks()
406 self._stop_scheduling_tasks()
406
407
407 def _stop_scheduling_tasks(self):
408 def _stop_scheduling_tasks(self):
408 """Stop scheduling tasks because an engine has been unregistered
409 """Stop scheduling tasks because an engine has been unregistered
409 from a pure ZMQ scheduler.
410 from a pure ZMQ scheduler.
410 """
411 """
411
412 self._task_ident = ''
412 self._task_socket.close()
413 # self._task_socket.close()
413 self._task_socket = None
414 # self._task_socket = None
414 msg = "An engine has been unregistered, and we are using pure " +\
415 msg = "An engine has been unregistered, and we are using pure " +\
415 "ZMQ task scheduling. Task farming will be disabled."
416 "ZMQ task scheduling. Task farming will be disabled."
416 if self.outstanding:
417 if self.outstanding:
@@ -457,15 +458,18 b' class Client(HasTraits):'
457 content = msg.content
458 content = msg.content
458 self._config['registration'] = dict(content)
459 self._config['registration'] = dict(content)
459 if content.status == 'ok':
460 if content.status == 'ok':
461 self._apply_socket = self._context.socket(zmq.XREP)
462 self._apply_socket.setsockopt(zmq.IDENTITY, self.session.session)
460 if content.mux:
463 if content.mux:
461 self._mux_socket = self._context.socket(zmq.XREQ)
464 # self._mux_socket = self._context.socket(zmq.XREQ)
462 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
465 self._mux_ident = 'mux'
463 connect_socket(self._mux_socket, content.mux)
466 connect_socket(self._apply_socket, content.mux)
464 if content.task:
467 if content.task:
465 self._task_scheme, task_addr = content.task
468 self._task_scheme, task_addr = content.task
466 self._task_socket = self._context.socket(zmq.XREQ)
469 # self._task_socket = self._context.socket(zmq.XREQ)
467 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
470 # self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
468 connect_socket(self._task_socket, task_addr)
471 connect_socket(self._apply_socket, task_addr)
472 self._task_ident = 'task'
469 if content.notification:
473 if content.notification:
470 self._notification_socket = self._context.socket(zmq.SUB)
474 self._notification_socket = self._context.socket(zmq.SUB)
471 connect_socket(self._notification_socket, content.notification)
475 connect_socket(self._notification_socket, content.notification)
@@ -484,7 +488,8 b' class Client(HasTraits):'
484 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
488 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
485 connect_socket(self._iopub_socket, content.iopub)
489 connect_socket(self._iopub_socket, content.iopub)
486 self._update_engines(dict(content.engines))
490 self._update_engines(dict(content.engines))
487
491 # give XREP apply_socket some time to connect
492 time.sleep(0.25)
488 else:
493 else:
489 self._connected = False
494 self._connected = False
490 raise Exception("Failed to connect!")
495 raise Exception("Failed to connect!")
@@ -496,7 +501,7 b' class Client(HasTraits):'
496 def _unwrap_exception(self, content):
501 def _unwrap_exception(self, content):
497 """unwrap exception, and remap engineid to int."""
502 """unwrap exception, and remap engineid to int."""
498 e = error.unwrap_exception(content)
503 e = error.unwrap_exception(content)
499 print e.traceback
504 # print e.traceback
500 if e.engine_info:
505 if e.engine_info:
501 e_uuid = e.engine_info['engine_uuid']
506 e_uuid = e.engine_info['engine_uuid']
502 eid = self._engines[e_uuid]
507 eid = self._engines[e_uuid]
@@ -540,7 +545,7 b' class Client(HasTraits):'
540
545
541 self._handle_stranded_msgs(eid, uuid)
546 self._handle_stranded_msgs(eid, uuid)
542
547
543 if self._task_socket and self._task_scheme == 'pure':
548 if self._task_ident and self._task_scheme == 'pure':
544 self._stop_scheduling_tasks()
549 self._stop_scheduling_tasks()
545
550
546 def _handle_stranded_msgs(self, eid, uuid):
551 def _handle_stranded_msgs(self, eid, uuid):
@@ -725,10 +730,8 b' class Client(HasTraits):'
725 """
730 """
726 if self._notification_socket:
731 if self._notification_socket:
727 self._flush_notifications()
732 self._flush_notifications()
728 if self._mux_socket:
733 if self._apply_socket:
729 self._flush_results(self._mux_socket)
734 self._flush_results(self._apply_socket)
730 if self._task_socket:
731 self._flush_results(self._task_socket)
732 if self._control_socket:
735 if self._control_socket:
733 self._flush_control(self._control_socket)
736 self._flush_control(self._control_socket)
734 if self._iopub_socket:
737 if self._iopub_socket:
@@ -1030,6 +1033,12 b' class Client(HasTraits):'
1030 args = args if args is not None else []
1033 args = args if args is not None else []
1031 kwargs = kwargs if kwargs is not None else {}
1034 kwargs = kwargs if kwargs is not None else {}
1032
1035
1036 if not self._ids:
1037 # flush notification socket if no engines yet
1038 any_ids = self.ids
1039 if not any_ids:
1040 raise error.NoEnginesRegistered("Can't execute without any connected engines.")
1041
1033 if balanced is None:
1042 if balanced is None:
1034 if targets is None:
1043 if targets is None:
1035 # default to balanced if targets unspecified
1044 # default to balanced if targets unspecified
@@ -1074,7 +1083,7 b' class Client(HasTraits):'
1074 for name in ('bound', 'block', 'track'):
1083 for name in ('bound', 'block', 'track'):
1075 assert loc[name] is not None, "kwarg %r must be specified!"%name
1084 assert loc[name] is not None, "kwarg %r must be specified!"%name
1076
1085
1077 if self._task_socket is None:
1086 if not self._task_ident:
1078 msg = "Task farming is disabled"
1087 msg = "Task farming is disabled"
1079 if self._task_scheme == 'pure':
1088 if self._task_scheme == 'pure':
1080 msg += " because the pure ZMQ scheduler cannot handle"
1089 msg += " because the pure ZMQ scheduler cannot handle"
@@ -1106,7 +1115,7 b' class Client(HasTraits):'
1106 bufs = util.pack_apply_message(f,args,kwargs)
1115 bufs = util.pack_apply_message(f,args,kwargs)
1107 content = dict(bound=bound)
1116 content = dict(bound=bound)
1108
1117
1109 msg = self.session.send(self._task_socket, "apply_request",
1118 msg = self.session.send(self._apply_socket, "apply_request", ident=self._task_ident,
1110 content=content, buffers=bufs, subheader=subheader, track=track)
1119 content=content, buffers=bufs, subheader=subheader, track=track)
1111 msg_id = msg['msg_id']
1120 msg_id = msg['msg_id']
1112 self.outstanding.add(msg_id)
1121 self.outstanding.add(msg_id)
@@ -1130,6 +1139,11 b' class Client(HasTraits):'
1130 This is a private method, see `apply` for details.
1139 This is a private method, see `apply` for details.
1131 Not to be called directly!
1140 Not to be called directly!
1132 """
1141 """
1142
1143 if not self._mux_ident:
1144 msg = "Multiplexing is disabled"
1145 raise RuntimeError(msg)
1146
1133 loc = locals()
1147 loc = locals()
1134 for name in ('bound', 'block', 'targets', 'track'):
1148 for name in ('bound', 'block', 'targets', 'track'):
1135 assert loc[name] is not None, "kwarg %r must be specified!"%name
1149 assert loc[name] is not None, "kwarg %r must be specified!"%name
@@ -1143,8 +1157,8 b' class Client(HasTraits):'
1143 msg_ids = []
1157 msg_ids = []
1144 trackers = []
1158 trackers = []
1145 for ident in idents:
1159 for ident in idents:
1146 msg = self.session.send(self._mux_socket, "apply_request",
1160 msg = self.session.send(self._apply_socket, "apply_request",
1147 content=content, buffers=bufs, ident=ident, subheader=subheader,
1161 content=content, buffers=bufs, ident=[self._mux_ident, ident], subheader=subheader,
1148 track=track)
1162 track=track)
1149 if track:
1163 if track:
1150 trackers.append(msg['tracker'])
1164 trackers.append(msg['tracker'])
@@ -80,6 +80,7 b' class ControllerFactory(HubFactory):'
80 # Multiplexer Queue (in a Process)
80 # Multiplexer Queue (in a Process)
81 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
81 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
82 q.bind_in(self.client_info['mux'])
82 q.bind_in(self.client_info['mux'])
83 q.setsockopt_in(zmq.IDENTITY, 'mux')
83 q.bind_out(self.engine_info['mux'])
84 q.bind_out(self.engine_info['mux'])
84 q.connect_mon(maybe_inproc)
85 q.connect_mon(maybe_inproc)
85 q.daemon=True
86 q.daemon=True
@@ -88,6 +89,7 b' class ControllerFactory(HubFactory):'
88 # Control Queue (in a Process)
89 # Control Queue (in a Process)
89 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
90 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
90 q.bind_in(self.client_info['control'])
91 q.bind_in(self.client_info['control'])
92 q.setsockopt_in(zmq.IDENTITY, 'control')
91 q.bind_out(self.engine_info['control'])
93 q.bind_out(self.engine_info['control'])
92 q.connect_mon(maybe_inproc)
94 q.connect_mon(maybe_inproc)
93 q.daemon=True
95 q.daemon=True
@@ -98,6 +100,7 b' class ControllerFactory(HubFactory):'
98 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
100 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
99 q.setsockopt_out(zmq.HWM, self.hwm)
101 q.setsockopt_out(zmq.HWM, self.hwm)
100 q.bind_in(self.client_info['task'][1])
102 q.bind_in(self.client_info['task'][1])
103 q.setsockopt_in(zmq.IDENTITY, 'task')
101 q.bind_out(self.engine_info['task'])
104 q.bind_out(self.engine_info['task'])
102 q.connect_mon(maybe_inproc)
105 q.connect_mon(maybe_inproc)
103 q.daemon=True
106 q.daemon=True
@@ -544,7 +544,8 b' class TaskScheduler(SessionFactory):'
544
544
545
545
546 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
546 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
547 log_addr=None, loglevel=logging.DEBUG, scheme='lru'):
547 log_addr=None, loglevel=logging.DEBUG, scheme='lru',
548 identity=b'task'):
548 from zmq.eventloop import ioloop
549 from zmq.eventloop import ioloop
549 from zmq.eventloop.zmqstream import ZMQStream
550 from zmq.eventloop.zmqstream import ZMQStream
550
551
@@ -552,8 +553,11 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='
552 loop = ioloop.IOLoop()
553 loop = ioloop.IOLoop()
553 print (in_addr, out_addr, mon_addr, not_addr)
554 print (in_addr, out_addr, mon_addr, not_addr)
554 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
555 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
556 ins.setsockopt(zmq.IDENTITY, identity)
555 ins.bind(in_addr)
557 ins.bind(in_addr)
558
556 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
559 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
560 outs.setsockopt(zmq.IDENTITY, identity)
557 outs.bind(out_addr)
561 outs.bind(out_addr)
558 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
562 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
559 mons.connect(mon_addr)
563 mons.connect(mon_addr)
@@ -12,7 +12,7 b' blackhole = tempfile.TemporaryFile()'
12 # nose setup/teardown
12 # nose setup/teardown
13
13
14 def setup():
14 def setup():
15 cp = Popen('ipcontrollerz --profile iptest -r --log-level 40'.split(), stdout=blackhole, stderr=STDOUT)
15 cp = Popen('ipcontrollerz --profile iptest -r --log-level 10 --log-to-file'.split(), stdout=blackhole, stderr=STDOUT)
16 processes.append(cp)
16 processes.append(cp)
17 time.sleep(.5)
17 time.sleep(.5)
18 add_engine()
18 add_engine()
@@ -22,7 +22,7 b' def setup():'
22 c.spin()
22 c.spin()
23
23
24 def add_engine(profile='iptest'):
24 def add_engine(profile='iptest'):
25 ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '40'], stdout=blackhole, stderr=STDOUT)
25 ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT)
26 # ep.start()
26 # ep.start()
27 processes.append(ep)
27 processes.append(ep)
28 return ep
28 return ep
@@ -1,3 +1,5 b''
1 import sys
2 import tempfile
1 import time
3 import time
2 from signal import SIGINT
4 from signal import SIGINT
3 from multiprocessing import Process
5 from multiprocessing import Process
@@ -62,7 +64,7 b' class ClusterTestCase(BaseZMQTestCase):'
62 while time.time()-tic < timeout and len(self.client.ids) < n:
64 while time.time()-tic < timeout and len(self.client.ids) < n:
63 time.sleep(0.1)
65 time.sleep(0.1)
64
66
65 assert not self.client.ids < n, "waiting for engines timed out"
67 assert not len(self.client.ids) < n, "waiting for engines timed out"
66
68
67 def connect_client(self):
69 def connect_client(self):
68 """connect a client with my Context, and track its sockets for cleanup"""
70 """connect a client with my Context, and track its sockets for cleanup"""
@@ -89,12 +91,15 b' class ClusterTestCase(BaseZMQTestCase):'
89 self.engines=[]
91 self.engines=[]
90
92
91 def tearDown(self):
93 def tearDown(self):
94
95 # close fds:
96 for e in filter(lambda e: e.poll() is not None, processes):
97 processes.remove(e)
98
92 self.client.close()
99 self.client.close()
93 BaseZMQTestCase.tearDown(self)
100 BaseZMQTestCase.tearDown(self)
94 # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
101 # this will be superfluous when pyzmq merges PR #88
95 # [ e.wait() for e in self.engines ]
102 self.context.term()
96 # while len(self.client.ids) > self.base_engine_count:
103 print tempfile.TemporaryFile().fileno(),
97 # time.sleep(.1)
104 sys.stdout.flush()
98 # del self.engines
99 # BaseZMQTestCase.tearDown(self)
100 No newline at end of file
105
@@ -17,7 +17,6 b' class TestClient(ClusterTestCase):'
17 n = len(self.client.ids)
17 n = len(self.client.ids)
18 self.add_engines(3)
18 self.add_engines(3)
19 self.assertEquals(len(self.client.ids), n+3)
19 self.assertEquals(len(self.client.ids), n+3)
20 self.assertTrue
21
20
22 def test_segfault_task(self):
21 def test_segfault_task(self):
23 """test graceful handling of engine death (balanced)"""
22 """test graceful handling of engine death (balanced)"""
@@ -179,8 +178,9 b' class TestClient(ClusterTestCase):'
179 def test_get_result(self):
178 def test_get_result(self):
180 """test getting results from the Hub."""
179 """test getting results from the Hub."""
181 c = clientmod.Client(profile='iptest')
180 c = clientmod.Client(profile='iptest')
182 t = self.client.ids[-1]
181 self.add_engines(1)
183 ar = c.apply(wait, (1,), block=False, targets=t)
182 ar = c.apply(wait, (1,), block=False, targets=t)
183 # give the monitor time to notice the message
184 time.sleep(.25)
184 time.sleep(.25)
185 ahr = self.client.get_result(ar.msg_ids)
185 ahr = self.client.get_result(ar.msg_ids)
186 self.assertTrue(isinstance(ahr, AsyncHubResult))
186 self.assertTrue(isinstance(ahr, AsyncHubResult))
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: file was removed, binary diff hidden
NO CONTENT: file was removed, binary diff hidden
1 NO CONTENT: file was removed, binary diff hidden
NO CONTENT: file was removed, binary diff hidden
General Comments 0
You need to be logged in to leave comments. Login now