##// END OF EJS Templates
update connections and diagrams for reduced sockets
MinRK -
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -270,8 +270,9 b' class Client(HasTraits):'
270 270 _control_socket=Instance('zmq.Socket')
271 271 _iopub_socket=Instance('zmq.Socket')
272 272 _notification_socket=Instance('zmq.Socket')
273 _mux_socket=Instance('zmq.Socket')
274 _task_socket=Instance('zmq.Socket')
273 _apply_socket=Instance('zmq.Socket')
274 _mux_ident=Str()
275 _task_ident=Str()
275 276 _task_scheme=Str()
276 277 _balanced_views=Dict()
277 278 _direct_views=Dict()
@@ -401,16 +402,16 b' class Client(HasTraits):'
401 402 self._ids.append(eid)
402 403 self._ids = sorted(self._ids)
403 404 if sorted(self._engines.keys()) != range(len(self._engines)) and \
404 self._task_scheme == 'pure' and self._task_socket:
405 self._task_scheme == 'pure' and self._task_ident:
405 406 self._stop_scheduling_tasks()
406 407
407 408 def _stop_scheduling_tasks(self):
408 409 """Stop scheduling tasks because an engine has been unregistered
409 410 from a pure ZMQ scheduler.
410 411 """
411
412 self._task_socket.close()
413 self._task_socket = None
412 self._task_ident = ''
413 # self._task_socket.close()
414 # self._task_socket = None
414 415 msg = "An engine has been unregistered, and we are using pure " +\
415 416 "ZMQ task scheduling. Task farming will be disabled."
416 417 if self.outstanding:
@@ -457,15 +458,18 b' class Client(HasTraits):'
457 458 content = msg.content
458 459 self._config['registration'] = dict(content)
459 460 if content.status == 'ok':
461 self._apply_socket = self._context.socket(zmq.XREP)
462 self._apply_socket.setsockopt(zmq.IDENTITY, self.session.session)
460 463 if content.mux:
461 self._mux_socket = self._context.socket(zmq.XREQ)
462 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
463 connect_socket(self._mux_socket, content.mux)
464 # self._mux_socket = self._context.socket(zmq.XREQ)
465 self._mux_ident = 'mux'
466 connect_socket(self._apply_socket, content.mux)
464 467 if content.task:
465 468 self._task_scheme, task_addr = content.task
466 self._task_socket = self._context.socket(zmq.XREQ)
467 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
468 connect_socket(self._task_socket, task_addr)
469 # self._task_socket = self._context.socket(zmq.XREQ)
470 # self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
471 connect_socket(self._apply_socket, task_addr)
472 self._task_ident = 'task'
469 473 if content.notification:
470 474 self._notification_socket = self._context.socket(zmq.SUB)
471 475 connect_socket(self._notification_socket, content.notification)
@@ -484,7 +488,8 b' class Client(HasTraits):'
484 488 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
485 489 connect_socket(self._iopub_socket, content.iopub)
486 490 self._update_engines(dict(content.engines))
487
491 # give XREP apply_socket some time to connect
492 time.sleep(0.25)
488 493 else:
489 494 self._connected = False
490 495 raise Exception("Failed to connect!")
@@ -496,7 +501,7 b' class Client(HasTraits):'
496 501 def _unwrap_exception(self, content):
497 502 """unwrap exception, and remap engineid to int."""
498 503 e = error.unwrap_exception(content)
499 print e.traceback
504 # print e.traceback
500 505 if e.engine_info:
501 506 e_uuid = e.engine_info['engine_uuid']
502 507 eid = self._engines[e_uuid]
@@ -540,7 +545,7 b' class Client(HasTraits):'
540 545
541 546 self._handle_stranded_msgs(eid, uuid)
542 547
543 if self._task_socket and self._task_scheme == 'pure':
548 if self._task_ident and self._task_scheme == 'pure':
544 549 self._stop_scheduling_tasks()
545 550
546 551 def _handle_stranded_msgs(self, eid, uuid):
@@ -725,10 +730,8 b' class Client(HasTraits):'
725 730 """
726 731 if self._notification_socket:
727 732 self._flush_notifications()
728 if self._mux_socket:
729 self._flush_results(self._mux_socket)
730 if self._task_socket:
731 self._flush_results(self._task_socket)
733 if self._apply_socket:
734 self._flush_results(self._apply_socket)
732 735 if self._control_socket:
733 736 self._flush_control(self._control_socket)
734 737 if self._iopub_socket:
@@ -1030,6 +1033,12 b' class Client(HasTraits):'
1030 1033 args = args if args is not None else []
1031 1034 kwargs = kwargs if kwargs is not None else {}
1032 1035
1036 if not self._ids:
1037 # flush notification socket if no engines yet
1038 any_ids = self.ids
1039 if not any_ids:
1040 raise error.NoEnginesRegistered("Can't execute without any connected engines.")
1041
1033 1042 if balanced is None:
1034 1043 if targets is None:
1035 1044 # default to balanced if targets unspecified
@@ -1074,7 +1083,7 b' class Client(HasTraits):'
1074 1083 for name in ('bound', 'block', 'track'):
1075 1084 assert loc[name] is not None, "kwarg %r must be specified!"%name
1076 1085
1077 if self._task_socket is None:
1086 if not self._task_ident:
1078 1087 msg = "Task farming is disabled"
1079 1088 if self._task_scheme == 'pure':
1080 1089 msg += " because the pure ZMQ scheduler cannot handle"
@@ -1106,7 +1115,7 b' class Client(HasTraits):'
1106 1115 bufs = util.pack_apply_message(f,args,kwargs)
1107 1116 content = dict(bound=bound)
1108 1117
1109 msg = self.session.send(self._task_socket, "apply_request",
1118 msg = self.session.send(self._apply_socket, "apply_request", ident=self._task_ident,
1110 1119 content=content, buffers=bufs, subheader=subheader, track=track)
1111 1120 msg_id = msg['msg_id']
1112 1121 self.outstanding.add(msg_id)
@@ -1130,6 +1139,11 b' class Client(HasTraits):'
1130 1139 This is a private method, see `apply` for details.
1131 1140 Not to be called directly!
1132 1141 """
1142
1143 if not self._mux_ident:
1144 msg = "Multiplexing is disabled"
1145 raise RuntimeError(msg)
1146
1133 1147 loc = locals()
1134 1148 for name in ('bound', 'block', 'targets', 'track'):
1135 1149 assert loc[name] is not None, "kwarg %r must be specified!"%name
@@ -1143,8 +1157,8 b' class Client(HasTraits):'
1143 1157 msg_ids = []
1144 1158 trackers = []
1145 1159 for ident in idents:
1146 msg = self.session.send(self._mux_socket, "apply_request",
1147 content=content, buffers=bufs, ident=ident, subheader=subheader,
1160 msg = self.session.send(self._apply_socket, "apply_request",
1161 content=content, buffers=bufs, ident=[self._mux_ident, ident], subheader=subheader,
1148 1162 track=track)
1149 1163 if track:
1150 1164 trackers.append(msg['tracker'])
@@ -80,6 +80,7 b' class ControllerFactory(HubFactory):'
80 80 # Multiplexer Queue (in a Process)
81 81 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
82 82 q.bind_in(self.client_info['mux'])
83 q.setsockopt_in(zmq.IDENTITY, 'mux')
83 84 q.bind_out(self.engine_info['mux'])
84 85 q.connect_mon(maybe_inproc)
85 86 q.daemon=True
@@ -88,6 +89,7 b' class ControllerFactory(HubFactory):'
88 89 # Control Queue (in a Process)
89 90 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
90 91 q.bind_in(self.client_info['control'])
92 q.setsockopt_in(zmq.IDENTITY, 'control')
91 93 q.bind_out(self.engine_info['control'])
92 94 q.connect_mon(maybe_inproc)
93 95 q.daemon=True
@@ -98,6 +100,7 b' class ControllerFactory(HubFactory):'
98 100 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
99 101 q.setsockopt_out(zmq.HWM, self.hwm)
100 102 q.bind_in(self.client_info['task'][1])
103 q.setsockopt_in(zmq.IDENTITY, 'task')
101 104 q.bind_out(self.engine_info['task'])
102 105 q.connect_mon(maybe_inproc)
103 106 q.daemon=True
@@ -544,7 +544,8 b' class TaskScheduler(SessionFactory):'
544 544
545 545
546 546 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
547 log_addr=None, loglevel=logging.DEBUG, scheme='lru'):
547 log_addr=None, loglevel=logging.DEBUG, scheme='lru',
548 identity=b'task'):
548 549 from zmq.eventloop import ioloop
549 550 from zmq.eventloop.zmqstream import ZMQStream
550 551
@@ -552,8 +553,11 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='
552 553 loop = ioloop.IOLoop()
553 554 print (in_addr, out_addr, mon_addr, not_addr)
554 555 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
556 ins.setsockopt(zmq.IDENTITY, identity)
555 557 ins.bind(in_addr)
558
556 559 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
560 outs.setsockopt(zmq.IDENTITY, identity)
557 561 outs.bind(out_addr)
558 562 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
559 563 mons.connect(mon_addr)
@@ -12,7 +12,7 b' blackhole = tempfile.TemporaryFile()'
12 12 # nose setup/teardown
13 13
14 14 def setup():
15 cp = Popen('ipcontrollerz --profile iptest -r --log-level 40'.split(), stdout=blackhole, stderr=STDOUT)
15 cp = Popen('ipcontrollerz --profile iptest -r --log-level 10 --log-to-file'.split(), stdout=blackhole, stderr=STDOUT)
16 16 processes.append(cp)
17 17 time.sleep(.5)
18 18 add_engine()
@@ -22,7 +22,7 b' def setup():'
22 22 c.spin()
23 23
24 24 def add_engine(profile='iptest'):
25 ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '40'], stdout=blackhole, stderr=STDOUT)
25 ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT)
26 26 # ep.start()
27 27 processes.append(ep)
28 28 return ep
@@ -1,3 +1,5 b''
1 import sys
2 import tempfile
1 3 import time
2 4 from signal import SIGINT
3 5 from multiprocessing import Process
@@ -62,7 +64,7 b' class ClusterTestCase(BaseZMQTestCase):'
62 64 while time.time()-tic < timeout and len(self.client.ids) < n:
63 65 time.sleep(0.1)
64 66
65 assert not self.client.ids < n, "waiting for engines timed out"
67 assert not len(self.client.ids) < n, "waiting for engines timed out"
66 68
67 69 def connect_client(self):
68 70 """connect a client with my Context, and track its sockets for cleanup"""
@@ -89,12 +91,15 b' class ClusterTestCase(BaseZMQTestCase):'
89 91 self.engines=[]
90 92
91 93 def tearDown(self):
94
95 # close fds:
96 for e in filter(lambda e: e.poll() is not None, processes):
97 processes.remove(e)
98
92 99 self.client.close()
93 100 BaseZMQTestCase.tearDown(self)
94 # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
95 # [ e.wait() for e in self.engines ]
96 # while len(self.client.ids) > self.base_engine_count:
97 # time.sleep(.1)
98 # del self.engines
99 # BaseZMQTestCase.tearDown(self)
101 # this will be superfluous when pyzmq merges PR #88
102 self.context.term()
103 print tempfile.TemporaryFile().fileno(),
104 sys.stdout.flush()
100 105 No newline at end of file
@@ -17,7 +17,6 b' class TestClient(ClusterTestCase):'
17 17 n = len(self.client.ids)
18 18 self.add_engines(3)
19 19 self.assertEquals(len(self.client.ids), n+3)
20 self.assertTrue
21 20
22 21 def test_segfault_task(self):
23 22 """test graceful handling of engine death (balanced)"""
@@ -179,8 +178,9 b' class TestClient(ClusterTestCase):'
179 178 def test_get_result(self):
180 179 """test getting results from the Hub."""
181 180 c = clientmod.Client(profile='iptest')
182 t = self.client.ids[-1]
181 self.add_engines(1)
183 182 ar = c.apply(wait, (1,), block=False, targets=t)
183 # give the monitor time to notice the message
184 184 time.sleep(.25)
185 185 ahr = self.client.get_result(ar.msg_ids)
186 186 self.assertTrue(isinstance(ahr, AsyncHubResult))
1 NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file, binary diff hidden
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: file was removed, binary diff hidden
1 NO CONTENT: file was removed, binary diff hidden
General Comments 0
You need to be logged in to leave comments. Login now