Show More
@@ -265,7 +265,7 b' class Client(HasTraits):' | |||
|
265 | 265 | _context = Instance('zmq.Context') |
|
266 | 266 | _config = Dict() |
|
267 | 267 | _engines=Instance(ReverseDict, (), {}) |
|
268 |
|
|
|
268 | # _hub_socket=Instance('zmq.Socket') | |
|
269 | 269 | _query_socket=Instance('zmq.Socket') |
|
270 | 270 | _control_socket=Instance('zmq.Socket') |
|
271 | 271 | _iopub_socket=Instance('zmq.Socket') |
@@ -339,12 +339,12 b' class Client(HasTraits):' | |||
|
339 | 339 | self.session = ss.StreamSession(**key_arg) |
|
340 | 340 | else: |
|
341 | 341 | self.session = ss.StreamSession(username, **key_arg) |
|
342 |
self._ |
|
|
343 |
self._ |
|
|
342 | self._query_socket = self._context.socket(zmq.XREQ) | |
|
343 | self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
344 | 344 | if self._ssh: |
|
345 |
tunnel.tunnel_connection(self._ |
|
|
345 | tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) | |
|
346 | 346 | else: |
|
347 |
self._ |
|
|
347 | self._query_socket.connect(url) | |
|
348 | 348 | |
|
349 | 349 | self.session.debug = self.debug |
|
350 | 350 | |
@@ -449,8 +449,8 b' class Client(HasTraits):' | |||
|
449 | 449 | else: |
|
450 | 450 | return s.connect(url) |
|
451 | 451 | |
|
452 |
self.session.send(self._ |
|
|
453 |
idents,msg = self.session.recv(self._ |
|
|
452 | self.session.send(self._query_socket, 'connection_request') | |
|
453 | idents,msg = self.session.recv(self._query_socket,mode=0) | |
|
454 | 454 | if self.debug: |
|
455 | 455 | pprint(msg) |
|
456 | 456 | msg = ss.Message(msg) |
@@ -458,29 +458,29 b' class Client(HasTraits):' | |||
|
458 | 458 | self._config['registration'] = dict(content) |
|
459 | 459 | if content.status == 'ok': |
|
460 | 460 | if content.mux: |
|
461 |
self._mux_socket = self._context.socket(zmq. |
|
|
461 | self._mux_socket = self._context.socket(zmq.XREQ) | |
|
462 | 462 | self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
463 | 463 | connect_socket(self._mux_socket, content.mux) |
|
464 | 464 | if content.task: |
|
465 | 465 | self._task_scheme, task_addr = content.task |
|
466 |
self._task_socket = self._context.socket(zmq. |
|
|
466 | self._task_socket = self._context.socket(zmq.XREQ) | |
|
467 | 467 | self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
468 | 468 | connect_socket(self._task_socket, task_addr) |
|
469 | 469 | if content.notification: |
|
470 | 470 | self._notification_socket = self._context.socket(zmq.SUB) |
|
471 | 471 | connect_socket(self._notification_socket, content.notification) |
|
472 |
self._notification_socket.setsockopt(zmq.SUBSCRIBE, |
|
|
473 | if content.query: | |
|
474 |
self._query_socket = self._context.socket(zmq. |
|
|
475 | self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
476 | connect_socket(self._query_socket, content.query) | |
|
472 | self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') | |
|
473 | # if content.query: | |
|
474 | # self._query_socket = self._context.socket(zmq.XREQ) | |
|
475 | # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
476 | # connect_socket(self._query_socket, content.query) | |
|
477 | 477 | if content.control: |
|
478 |
self._control_socket = self._context.socket(zmq. |
|
|
478 | self._control_socket = self._context.socket(zmq.XREQ) | |
|
479 | 479 | self._control_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
480 | 480 | connect_socket(self._control_socket, content.control) |
|
481 | 481 | if content.iopub: |
|
482 | 482 | self._iopub_socket = self._context.socket(zmq.SUB) |
|
483 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '') | |
|
483 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') | |
|
484 | 484 | self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
485 | 485 | connect_socket(self._iopub_socket, content.iopub) |
|
486 | 486 | self._update_engines(dict(content.engines)) |
@@ -496,6 +496,7 b' class Client(HasTraits):' | |||
|
496 | 496 | def _unwrap_exception(self, content): |
|
497 | 497 | """unwrap exception, and remap engineid to int.""" |
|
498 | 498 | e = error.unwrap_exception(content) |
|
499 | print e.traceback | |
|
499 | 500 | if e.engine_info: |
|
500 | 501 | e_uuid = e.engine_info['engine_uuid'] |
|
501 | 502 | eid = self._engines[e_uuid] |
@@ -41,7 +41,7 b' class EngineFactory(RegistrationFactory):' | |||
|
41 | 41 | super(EngineFactory, self).__init__(**kwargs) |
|
42 | 42 | ctx = self.context |
|
43 | 43 | |
|
44 |
reg = ctx.socket(zmq. |
|
|
44 | reg = ctx.socket(zmq.XREQ) | |
|
45 | 45 | reg.setsockopt(zmq.IDENTITY, self.ident) |
|
46 | 46 | reg.connect(self.url) |
|
47 | 47 | self.registrar = zmqstream.ZMQStream(reg, self.loop) |
@@ -74,16 +74,26 b' class EngineFactory(RegistrationFactory):' | |||
|
74 | 74 | task_addr = msg.content.task |
|
75 | 75 | if task_addr: |
|
76 | 76 | shell_addrs.append(str(task_addr)) |
|
77 | shell_streams = [] | |
|
77 | ||
|
78 | # Uncomment this to go back to two-socket model | |
|
79 | # shell_streams = [] | |
|
80 | # for addr in shell_addrs: | |
|
81 | # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop) | |
|
82 | # stream.setsockopt(zmq.IDENTITY, identity) | |
|
83 | # stream.connect(disambiguate_url(addr, self.location)) | |
|
84 | # shell_streams.append(stream) | |
|
85 | ||
|
86 | # Now use only one shell stream for mux and tasks | |
|
87 | stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop) | |
|
88 | stream.setsockopt(zmq.IDENTITY, identity) | |
|
89 | shell_streams = [stream] | |
|
78 | 90 | for addr in shell_addrs: |
|
79 | stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop) | |
|
80 | stream.setsockopt(zmq.IDENTITY, identity) | |
|
81 | 91 | stream.connect(disambiguate_url(addr, self.location)) |
|
82 |
|
|
|
92 | # end single stream-socket | |
|
83 | 93 | |
|
84 | 94 | # control stream: |
|
85 | 95 | control_addr = str(msg.content.control) |
|
86 |
control_stream = zmqstream.ZMQStream(ctx.socket(zmq. |
|
|
96 | control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop) | |
|
87 | 97 | control_stream.setsockopt(zmq.IDENTITY, identity) |
|
88 | 98 | control_stream.connect(disambiguate_url(control_addr, self.location)) |
|
89 | 99 |
@@ -119,10 +119,6 b' class HubFactory(RegistrationFactory):' | |||
|
119 | 119 | def _mon_port_default(self): |
|
120 | 120 | return select_random_ports(1)[0] |
|
121 | 121 | |
|
122 | query_port = Instance(int, config=True) | |
|
123 | def _query_port_default(self): | |
|
124 | return select_random_ports(1)[0] | |
|
125 | ||
|
126 | 122 | notifier_port = Instance(int, config=True) |
|
127 | 123 | def _notifier_port_default(self): |
|
128 | 124 | return select_random_ports(1)[0] |
@@ -194,11 +190,11 b' class HubFactory(RegistrationFactory):' | |||
|
194 | 190 | loop = self.loop |
|
195 | 191 | |
|
196 | 192 | # Registrar socket |
|
197 |
|
|
|
198 |
|
|
|
193 | q = ZMQStream(ctx.socket(zmq.XREP), loop) | |
|
194 | q.bind(client_iface % self.regport) | |
|
199 | 195 | self.log.info("Hub listening on %s for registration."%(client_iface%self.regport)) |
|
200 | 196 | if self.client_ip != self.engine_ip: |
|
201 |
|
|
|
197 | q.bind(engine_iface % self.regport) | |
|
202 | 198 | self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport)) |
|
203 | 199 | |
|
204 | 200 | ### Engine connections ### |
@@ -212,9 +208,6 b' class HubFactory(RegistrationFactory):' | |||
|
212 | 208 | period=self.ping, logname=self.log.name) |
|
213 | 209 | |
|
214 | 210 | ### Client connections ### |
|
215 | # Clientele socket | |
|
216 | c = ZMQStream(ctx.socket(zmq.XREP), loop) | |
|
217 | c.bind(client_iface%self.query_port) | |
|
218 | 211 | # Notifier socket |
|
219 | 212 | n = ZMQStream(ctx.socket(zmq.PUB), loop) |
|
220 | 213 | n.bind(client_iface%self.notifier_port) |
@@ -230,7 +223,7 b' class HubFactory(RegistrationFactory):' | |||
|
230 | 223 | |
|
231 | 224 | # connect the db |
|
232 | 225 | self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1])) |
|
233 | cdir = self.config.Global.cluster_dir | |
|
226 | # cdir = self.config.Global.cluster_dir | |
|
234 | 227 | self.db = import_item(self.db_class)(session=self.session.session, config=self.config) |
|
235 | 228 | time.sleep(.25) |
|
236 | 229 | |
@@ -246,7 +239,6 b' class HubFactory(RegistrationFactory):' | |||
|
246 | 239 | |
|
247 | 240 | self.client_info = { |
|
248 | 241 | 'control' : client_iface%self.control[0], |
|
249 | 'query': client_iface%self.query_port, | |
|
250 | 242 | 'mux': client_iface%self.mux[0], |
|
251 | 243 | 'task' : (self.scheme, client_iface%self.task[0]), |
|
252 | 244 | 'iopub' : client_iface%self.iopub[0], |
@@ -255,7 +247,7 b' class HubFactory(RegistrationFactory):' | |||
|
255 | 247 | self.log.debug("Hub engine addrs: %s"%self.engine_info) |
|
256 | 248 | self.log.debug("Hub client addrs: %s"%self.client_info) |
|
257 | 249 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
|
258 |
|
|
|
250 | query=q, notifier=n, db=self.db, | |
|
259 | 251 | engine_info=self.engine_info, client_info=self.client_info, |
|
260 | 252 | logname=self.log.name) |
|
261 | 253 | |
@@ -269,10 +261,8 b' class Hub(LoggingFactory):' | |||
|
269 | 261 | session: StreamSession object |
|
270 | 262 | <removed> context: zmq context for creating new connections (?) |
|
271 | 263 | queue: ZMQStream for monitoring the command queue (SUB) |
|
272 |
|
|
|
264 | query: ZMQStream for engine registration and client queries requests (XREP) | |
|
273 | 265 | heartbeat: HeartMonitor object checking the pulse of the engines |
|
274 | clientele: ZMQStream for client connections (XREP) | |
|
275 | not used for jobs, only query/control commands | |
|
276 | 266 | notifier: ZMQStream for broadcasting engine registration changes (PUB) |
|
277 | 267 | db: connection to db for out of memory logging of commands |
|
278 | 268 | NotImplemented |
@@ -300,8 +290,7 b' class Hub(LoggingFactory):' | |||
|
300 | 290 | |
|
301 | 291 | # objects from constructor: |
|
302 | 292 | loop=Instance(ioloop.IOLoop) |
|
303 |
|
|
|
304 | clientele=Instance(ZMQStream) | |
|
293 | query=Instance(ZMQStream) | |
|
305 | 294 | monitor=Instance(ZMQStream) |
|
306 | 295 | heartmonitor=Instance(HeartMonitor) |
|
307 | 296 | notifier=Instance(ZMQStream) |
@@ -317,10 +306,8 b' class Hub(LoggingFactory):' | |||
|
317 | 306 | session: streamsession for sending serialized data |
|
318 | 307 | # engine: |
|
319 | 308 | queue: ZMQStream for monitoring queue messages |
|
320 |
|
|
|
309 | query: ZMQStream for engine+client registration and client requests | |
|
321 | 310 | heartbeat: HeartMonitor object for tracking engines |
|
322 | # client: | |
|
323 | clientele: ZMQStream for client connections | |
|
324 | 311 | # extra: |
|
325 | 312 | db: ZMQStream for db connection (NotImplemented) |
|
326 | 313 | engine_info: zmq address/protocol dict for engine connections |
@@ -340,8 +327,7 b' class Hub(LoggingFactory):' | |||
|
340 | 327 | validate_url_container(self.engine_info) |
|
341 | 328 | |
|
342 | 329 | # register our callbacks |
|
343 |
self. |
|
|
344 | self.clientele.on_recv(self.dispatch_client_msg) | |
|
330 | self.query.on_recv(self.dispatch_query) | |
|
345 | 331 | self.monitor.on_recv(self.dispatch_monitor_traffic) |
|
346 | 332 | |
|
347 | 333 | self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure) |
@@ -357,15 +343,13 b' class Hub(LoggingFactory):' | |||
|
357 | 343 | 'iopub': self.save_iopub_message, |
|
358 | 344 | } |
|
359 | 345 | |
|
360 |
self. |
|
|
346 | self.query_handlers = {'queue_request': self.queue_status, | |
|
361 | 347 | 'result_request': self.get_results, |
|
362 | 348 | 'purge_request': self.purge_results, |
|
363 | 349 | 'load_request': self.check_load, |
|
364 | 350 | 'resubmit_request': self.resubmit_task, |
|
365 | 351 | 'shutdown_request': self.shutdown_request, |
|
366 | } | |
|
367 | ||
|
368 | self.registrar_handlers = {'registration_request' : self.register_engine, | |
|
352 | 'registration_request' : self.register_engine, | |
|
369 | 353 | 'unregistration_request' : self.unregister_engine, |
|
370 | 354 | 'connection_request': self.connection_request, |
|
371 | 355 | } |
@@ -418,27 +402,27 b' class Hub(LoggingFactory):' | |||
|
418 | 402 | # dispatch methods (1 per stream) |
|
419 | 403 | #----------------------------------------------------------------------------- |
|
420 | 404 | |
|
421 |
def dispatch_regist |
|
|
422 | """""" | |
|
423 | self.log.debug("registration::dispatch_register_request(%s)"%msg) | |
|
424 | idents,msg = self.session.feed_identities(msg) | |
|
425 | if not idents: | |
|
426 |
self.log.error("Bad Que |
|
|
427 | return | |
|
428 | try: | |
|
429 | msg = self.session.unpack_message(msg,content=True) | |
|
430 | except: | |
|
431 | self.log.error("registration::got bad registration message: %s"%msg, exc_info=True) | |
|
432 | return | |
|
433 | ||
|
434 | msg_type = msg['msg_type'] | |
|
435 | content = msg['content'] | |
|
436 | ||
|
437 |
handler = self. |
|
|
438 | if handler is None: | |
|
439 | self.log.error("registration::got bad registration message: %s"%msg) | |
|
440 | else: | |
|
441 | handler(idents, msg) | |
|
405 | # def dispatch_registration_request(self, msg): | |
|
406 | # """""" | |
|
407 | # self.log.debug("registration::dispatch_register_request(%s)"%msg) | |
|
408 | # idents,msg = self.session.feed_identities(msg) | |
|
409 | # if not idents: | |
|
410 | # self.log.error("Bad Query Message: %s"%msg, exc_info=True) | |
|
411 | # return | |
|
412 | # try: | |
|
413 | # msg = self.session.unpack_message(msg,content=True) | |
|
414 | # except: | |
|
415 | # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True) | |
|
416 | # return | |
|
417 | # | |
|
418 | # msg_type = msg['msg_type'] | |
|
419 | # content = msg['content'] | |
|
420 | # | |
|
421 | # handler = self.query_handlers.get(msg_type, None) | |
|
422 | # if handler is None: | |
|
423 | # self.log.error("registration::got bad registration message: %s"%msg) | |
|
424 | # else: | |
|
425 | # handler(idents, msg) | |
|
442 | 426 | |
|
443 | 427 | def dispatch_monitor_traffic(self, msg): |
|
444 | 428 | """all ME and Task queue messages come through here, as well as |
@@ -456,37 +440,37 b' class Hub(LoggingFactory):' | |||
|
456 | 440 | self.log.error("Invalid monitor topic: %s"%switch) |
|
457 | 441 | |
|
458 | 442 | |
|
459 |
def dispatch_ |
|
|
460 |
"""Route |
|
|
443 | def dispatch_query(self, msg): | |
|
444 | """Route registration requests and queries from clients.""" | |
|
461 | 445 | idents, msg = self.session.feed_identities(msg) |
|
462 | 446 | if not idents: |
|
463 |
self.log.error("Bad |
|
|
447 | self.log.error("Bad Query Message: %s"%msg) | |
|
464 | 448 | return |
|
465 | 449 | client_id = idents[0] |
|
466 | 450 | try: |
|
467 | 451 | msg = self.session.unpack_message(msg, content=True) |
|
468 | 452 | except: |
|
469 | 453 | content = error.wrap_exception() |
|
470 |
self.log.error("Bad |
|
|
471 |
self.session.send(self. |
|
|
454 | self.log.error("Bad Query Message: %s"%msg, exc_info=True) | |
|
455 | self.session.send(self.query, "hub_error", ident=client_id, | |
|
472 | 456 | content=content) |
|
473 | 457 | return |
|
474 | 458 | |
|
475 | 459 | # print client_id, header, parent, content |
|
476 | 460 | #switch on message type: |
|
477 | 461 | msg_type = msg['msg_type'] |
|
478 |
self.log.info("client:: |
|
|
479 |
handler = self. |
|
|
462 | self.log.info("client::client %s requested %s"%(client_id, msg_type)) | |
|
463 | handler = self.query_handlers.get(msg_type, None) | |
|
480 | 464 | try: |
|
481 | 465 | assert handler is not None, "Bad Message Type: %s"%msg_type |
|
482 | 466 | except: |
|
483 | 467 | content = error.wrap_exception() |
|
484 | 468 | self.log.error("Bad Message Type: %s"%msg_type, exc_info=True) |
|
485 |
self.session.send(self. |
|
|
469 | self.session.send(self.query, "hub_error", ident=client_id, | |
|
486 | 470 | content=content) |
|
487 | 471 | return |
|
488 | 472 | else: |
|
489 |
handler( |
|
|
473 | handler(idents, msg) | |
|
490 | 474 | |
|
491 | 475 | def dispatch_db(self, msg): |
|
492 | 476 | """""" |
@@ -752,7 +736,7 b' class Hub(LoggingFactory):' | |||
|
752 | 736 | for k,v in self.keytable.iteritems(): |
|
753 | 737 | jsonable[str(k)] = v |
|
754 | 738 | content['engines'] = jsonable |
|
755 |
self.session.send(self. |
|
|
739 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) | |
|
756 | 740 | |
|
757 | 741 | def register_engine(self, reg, msg): |
|
758 | 742 | """Register a new engine.""" |
@@ -801,7 +785,7 b' class Hub(LoggingFactory):' | |||
|
801 | 785 | content = error.wrap_exception() |
|
802 | 786 | break |
|
803 | 787 | |
|
804 |
msg = self.session.send(self. |
|
|
788 | msg = self.session.send(self.query, "registration_reply", | |
|
805 | 789 | content=content, |
|
806 | 790 | ident=reg) |
|
807 | 791 | |
@@ -912,7 +896,7 b' class Hub(LoggingFactory):' | |||
|
912 | 896 | # for eid,ec in self.engines.iteritems(): |
|
913 | 897 | # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue) |
|
914 | 898 | # time.sleep(1) |
|
915 |
self.session.send(self. |
|
|
899 | self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id) | |
|
916 | 900 | dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop) |
|
917 | 901 | dc.start() |
|
918 | 902 | |
@@ -929,7 +913,7 b' class Hub(LoggingFactory):' | |||
|
929 | 913 | targets = self._validate_targets(targets) |
|
930 | 914 | except: |
|
931 | 915 | content = error.wrap_exception() |
|
932 |
self.session.send(self. |
|
|
916 | self.session.send(self.query, "hub_error", | |
|
933 | 917 | content=content, ident=client_id) |
|
934 | 918 | return |
|
935 | 919 | |
@@ -937,7 +921,7 b' class Hub(LoggingFactory):' | |||
|
937 | 921 | # loads = {} |
|
938 | 922 | for t in targets: |
|
939 | 923 | content[bytes(t)] = len(self.queues[t])+len(self.tasks[t]) |
|
940 |
self.session.send(self. |
|
|
924 | self.session.send(self.query, "load_reply", content=content, ident=client_id) | |
|
941 | 925 | |
|
942 | 926 | |
|
943 | 927 | def queue_status(self, client_id, msg): |
@@ -953,7 +937,7 b' class Hub(LoggingFactory):' | |||
|
953 | 937 | targets = self._validate_targets(targets) |
|
954 | 938 | except: |
|
955 | 939 | content = error.wrap_exception() |
|
956 |
self.session.send(self. |
|
|
940 | self.session.send(self.query, "hub_error", | |
|
957 | 941 | content=content, ident=client_id) |
|
958 | 942 | return |
|
959 | 943 | verbose = content.get('verbose', False) |
@@ -968,7 +952,7 b' class Hub(LoggingFactory):' | |||
|
968 | 952 | tasks = len(tasks) |
|
969 | 953 | content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} |
|
970 | 954 | # pending |
|
971 |
self.session.send(self. |
|
|
955 | self.session.send(self.query, "queue_reply", content=content, ident=client_id) | |
|
972 | 956 | |
|
973 | 957 | def purge_results(self, client_id, msg): |
|
974 | 958 | """Purge results from memory. This method is more valuable before we move |
@@ -1006,7 +990,7 b' class Hub(LoggingFactory):' | |||
|
1006 | 990 | uid = self.engines[eid].queue |
|
1007 | 991 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) |
|
1008 | 992 | |
|
1009 |
self.session.send(self. |
|
|
993 | self.session.send(self.query, 'purge_reply', content=reply, ident=client_id) | |
|
1010 | 994 | |
|
1011 | 995 | def resubmit_task(self, client_id, msg, buffers): |
|
1012 | 996 | """Resubmit a task.""" |
@@ -1049,7 +1033,7 b' class Hub(LoggingFactory):' | |||
|
1049 | 1033 | except: |
|
1050 | 1034 | content = error.wrap_exception() |
|
1051 | 1035 | break |
|
1052 |
self.session.send(self. |
|
|
1036 | self.session.send(self.query, "result_reply", content=content, | |
|
1053 | 1037 | parent=msg, ident=client_id, |
|
1054 | 1038 | buffers=buffers) |
|
1055 | 1039 |
General Comments 0
You need to be logged in to leave comments.
Login now