##// END OF EJS Templates
remove all PAIR sockets, Merge registration+query
MinRK -
Show More
@@ -265,7 +265,7 b' class Client(HasTraits):'
265 265 _context = Instance('zmq.Context')
266 266 _config = Dict()
267 267 _engines=Instance(ReverseDict, (), {})
268 _registration_socket=Instance('zmq.Socket')
268 # _hub_socket=Instance('zmq.Socket')
269 269 _query_socket=Instance('zmq.Socket')
270 270 _control_socket=Instance('zmq.Socket')
271 271 _iopub_socket=Instance('zmq.Socket')
@@ -339,12 +339,12 b' class Client(HasTraits):'
339 339 self.session = ss.StreamSession(**key_arg)
340 340 else:
341 341 self.session = ss.StreamSession(username, **key_arg)
342 self._registration_socket = self._context.socket(zmq.XREQ)
343 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
342 self._query_socket = self._context.socket(zmq.XREQ)
343 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
344 344 if self._ssh:
345 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
345 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
346 346 else:
347 self._registration_socket.connect(url)
347 self._query_socket.connect(url)
348 348
349 349 self.session.debug = self.debug
350 350
@@ -449,8 +449,8 b' class Client(HasTraits):'
449 449 else:
450 450 return s.connect(url)
451 451
452 self.session.send(self._registration_socket, 'connection_request')
453 idents,msg = self.session.recv(self._registration_socket,mode=0)
452 self.session.send(self._query_socket, 'connection_request')
453 idents,msg = self.session.recv(self._query_socket,mode=0)
454 454 if self.debug:
455 455 pprint(msg)
456 456 msg = ss.Message(msg)
@@ -458,29 +458,29 b' class Client(HasTraits):'
458 458 self._config['registration'] = dict(content)
459 459 if content.status == 'ok':
460 460 if content.mux:
461 self._mux_socket = self._context.socket(zmq.PAIR)
461 self._mux_socket = self._context.socket(zmq.XREQ)
462 462 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
463 463 connect_socket(self._mux_socket, content.mux)
464 464 if content.task:
465 465 self._task_scheme, task_addr = content.task
466 self._task_socket = self._context.socket(zmq.PAIR)
466 self._task_socket = self._context.socket(zmq.XREQ)
467 467 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
468 468 connect_socket(self._task_socket, task_addr)
469 469 if content.notification:
470 470 self._notification_socket = self._context.socket(zmq.SUB)
471 471 connect_socket(self._notification_socket, content.notification)
472 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
473 if content.query:
474 self._query_socket = self._context.socket(zmq.PAIR)
475 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
476 connect_socket(self._query_socket, content.query)
472 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
473 # if content.query:
474 # self._query_socket = self._context.socket(zmq.XREQ)
475 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
476 # connect_socket(self._query_socket, content.query)
477 477 if content.control:
478 self._control_socket = self._context.socket(zmq.PAIR)
478 self._control_socket = self._context.socket(zmq.XREQ)
479 479 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
480 480 connect_socket(self._control_socket, content.control)
481 481 if content.iopub:
482 482 self._iopub_socket = self._context.socket(zmq.SUB)
483 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
483 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
484 484 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
485 485 connect_socket(self._iopub_socket, content.iopub)
486 486 self._update_engines(dict(content.engines))
@@ -496,6 +496,7 b' class Client(HasTraits):'
496 496 def _unwrap_exception(self, content):
497 497 """unwrap exception, and remap engineid to int."""
498 498 e = error.unwrap_exception(content)
499 print e.traceback
499 500 if e.engine_info:
500 501 e_uuid = e.engine_info['engine_uuid']
501 502 eid = self._engines[e_uuid]
@@ -41,7 +41,7 b' class EngineFactory(RegistrationFactory):'
41 41 super(EngineFactory, self).__init__(**kwargs)
42 42 ctx = self.context
43 43
44 reg = ctx.socket(zmq.PAIR)
44 reg = ctx.socket(zmq.XREQ)
45 45 reg.setsockopt(zmq.IDENTITY, self.ident)
46 46 reg.connect(self.url)
47 47 self.registrar = zmqstream.ZMQStream(reg, self.loop)
@@ -74,16 +74,26 b' class EngineFactory(RegistrationFactory):'
74 74 task_addr = msg.content.task
75 75 if task_addr:
76 76 shell_addrs.append(str(task_addr))
77 shell_streams = []
77
78 # Uncomment this to go back to two-socket model
79 # shell_streams = []
80 # for addr in shell_addrs:
81 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
82 # stream.setsockopt(zmq.IDENTITY, identity)
83 # stream.connect(disambiguate_url(addr, self.location))
84 # shell_streams.append(stream)
85
86 # Now use only one shell stream for mux and tasks
87 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
88 stream.setsockopt(zmq.IDENTITY, identity)
89 shell_streams = [stream]
78 90 for addr in shell_addrs:
79 stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
80 stream.setsockopt(zmq.IDENTITY, identity)
81 91 stream.connect(disambiguate_url(addr, self.location))
82 shell_streams.append(stream)
92 # end single stream-socket
83 93
84 94 # control stream:
85 95 control_addr = str(msg.content.control)
86 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
96 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
87 97 control_stream.setsockopt(zmq.IDENTITY, identity)
88 98 control_stream.connect(disambiguate_url(control_addr, self.location))
89 99
@@ -119,10 +119,6 b' class HubFactory(RegistrationFactory):'
119 119 def _mon_port_default(self):
120 120 return select_random_ports(1)[0]
121 121
122 query_port = Instance(int, config=True)
123 def _query_port_default(self):
124 return select_random_ports(1)[0]
125
126 122 notifier_port = Instance(int, config=True)
127 123 def _notifier_port_default(self):
128 124 return select_random_ports(1)[0]
@@ -194,11 +190,11 b' class HubFactory(RegistrationFactory):'
194 190 loop = self.loop
195 191
196 192 # Registrar socket
197 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
198 reg.bind(client_iface % self.regport)
193 q = ZMQStream(ctx.socket(zmq.XREP), loop)
194 q.bind(client_iface % self.regport)
199 195 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
200 196 if self.client_ip != self.engine_ip:
201 reg.bind(engine_iface % self.regport)
197 q.bind(engine_iface % self.regport)
202 198 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
203 199
204 200 ### Engine connections ###
@@ -212,9 +208,6 b' class HubFactory(RegistrationFactory):'
212 208 period=self.ping, logname=self.log.name)
213 209
214 210 ### Client connections ###
215 # Clientele socket
216 c = ZMQStream(ctx.socket(zmq.XREP), loop)
217 c.bind(client_iface%self.query_port)
218 211 # Notifier socket
219 212 n = ZMQStream(ctx.socket(zmq.PUB), loop)
220 213 n.bind(client_iface%self.notifier_port)
@@ -230,7 +223,7 b' class HubFactory(RegistrationFactory):'
230 223
231 224 # connect the db
232 225 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
233 cdir = self.config.Global.cluster_dir
226 # cdir = self.config.Global.cluster_dir
234 227 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
235 228 time.sleep(.25)
236 229
@@ -246,7 +239,6 b' class HubFactory(RegistrationFactory):'
246 239
247 240 self.client_info = {
248 241 'control' : client_iface%self.control[0],
249 'query': client_iface%self.query_port,
250 242 'mux': client_iface%self.mux[0],
251 243 'task' : (self.scheme, client_iface%self.task[0]),
252 244 'iopub' : client_iface%self.iopub[0],
@@ -255,7 +247,7 b' class HubFactory(RegistrationFactory):'
255 247 self.log.debug("Hub engine addrs: %s"%self.engine_info)
256 248 self.log.debug("Hub client addrs: %s"%self.client_info)
257 249 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
258 registrar=reg, clientele=c, notifier=n, db=self.db,
250 query=q, notifier=n, db=self.db,
259 251 engine_info=self.engine_info, client_info=self.client_info,
260 252 logname=self.log.name)
261 253
@@ -269,10 +261,8 b' class Hub(LoggingFactory):'
269 261 session: StreamSession object
270 262 <removed> context: zmq context for creating new connections (?)
271 263 queue: ZMQStream for monitoring the command queue (SUB)
272 registrar: ZMQStream for engine registration requests (XREP)
264 query: ZMQStream for engine registration and client queries requests (XREP)
273 265 heartbeat: HeartMonitor object checking the pulse of the engines
274 clientele: ZMQStream for client connections (XREP)
275 not used for jobs, only query/control commands
276 266 notifier: ZMQStream for broadcasting engine registration changes (PUB)
277 267 db: connection to db for out of memory logging of commands
278 268 NotImplemented
@@ -300,8 +290,7 b' class Hub(LoggingFactory):'
300 290
301 291 # objects from constructor:
302 292 loop=Instance(ioloop.IOLoop)
303 registrar=Instance(ZMQStream)
304 clientele=Instance(ZMQStream)
293 query=Instance(ZMQStream)
305 294 monitor=Instance(ZMQStream)
306 295 heartmonitor=Instance(HeartMonitor)
307 296 notifier=Instance(ZMQStream)
@@ -317,10 +306,8 b' class Hub(LoggingFactory):'
317 306 session: streamsession for sending serialized data
318 307 # engine:
319 308 queue: ZMQStream for monitoring queue messages
320 registrar: ZMQStream for engine registration
309 query: ZMQStream for engine+client registration and client requests
321 310 heartbeat: HeartMonitor object for tracking engines
322 # client:
323 clientele: ZMQStream for client connections
324 311 # extra:
325 312 db: ZMQStream for db connection (NotImplemented)
326 313 engine_info: zmq address/protocol dict for engine connections
@@ -340,8 +327,7 b' class Hub(LoggingFactory):'
340 327 validate_url_container(self.engine_info)
341 328
342 329 # register our callbacks
343 self.registrar.on_recv(self.dispatch_register_request)
344 self.clientele.on_recv(self.dispatch_client_msg)
330 self.query.on_recv(self.dispatch_query)
345 331 self.monitor.on_recv(self.dispatch_monitor_traffic)
346 332
347 333 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
@@ -357,15 +343,13 b' class Hub(LoggingFactory):'
357 343 'iopub': self.save_iopub_message,
358 344 }
359 345
360 self.client_handlers = {'queue_request': self.queue_status,
346 self.query_handlers = {'queue_request': self.queue_status,
361 347 'result_request': self.get_results,
362 348 'purge_request': self.purge_results,
363 349 'load_request': self.check_load,
364 350 'resubmit_request': self.resubmit_task,
365 351 'shutdown_request': self.shutdown_request,
366 }
367
368 self.registrar_handlers = {'registration_request' : self.register_engine,
352 'registration_request' : self.register_engine,
369 353 'unregistration_request' : self.unregister_engine,
370 354 'connection_request': self.connection_request,
371 355 }
@@ -418,27 +402,27 b' class Hub(LoggingFactory):'
418 402 # dispatch methods (1 per stream)
419 403 #-----------------------------------------------------------------------------
420 404
421 def dispatch_register_request(self, msg):
422 """"""
423 self.log.debug("registration::dispatch_register_request(%s)"%msg)
424 idents,msg = self.session.feed_identities(msg)
425 if not idents:
426 self.log.error("Bad Queue Message: %s"%msg, exc_info=True)
427 return
428 try:
429 msg = self.session.unpack_message(msg,content=True)
430 except:
431 self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
432 return
433
434 msg_type = msg['msg_type']
435 content = msg['content']
436
437 handler = self.registrar_handlers.get(msg_type, None)
438 if handler is None:
439 self.log.error("registration::got bad registration message: %s"%msg)
440 else:
441 handler(idents, msg)
405 # def dispatch_registration_request(self, msg):
406 # """"""
407 # self.log.debug("registration::dispatch_register_request(%s)"%msg)
408 # idents,msg = self.session.feed_identities(msg)
409 # if not idents:
410 # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
411 # return
412 # try:
413 # msg = self.session.unpack_message(msg,content=True)
414 # except:
415 # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
416 # return
417 #
418 # msg_type = msg['msg_type']
419 # content = msg['content']
420 #
421 # handler = self.query_handlers.get(msg_type, None)
422 # if handler is None:
423 # self.log.error("registration::got bad registration message: %s"%msg)
424 # else:
425 # handler(idents, msg)
442 426
443 427 def dispatch_monitor_traffic(self, msg):
444 428 """all ME and Task queue messages come through here, as well as
@@ -456,37 +440,37 b' class Hub(LoggingFactory):'
456 440 self.log.error("Invalid monitor topic: %s"%switch)
457 441
458 442
459 def dispatch_client_msg(self, msg):
460 """Route messages from clients"""
443 def dispatch_query(self, msg):
444 """Route registration requests and queries from clients."""
461 445 idents, msg = self.session.feed_identities(msg)
462 446 if not idents:
463 self.log.error("Bad Client Message: %s"%msg)
447 self.log.error("Bad Query Message: %s"%msg)
464 448 return
465 449 client_id = idents[0]
466 450 try:
467 451 msg = self.session.unpack_message(msg, content=True)
468 452 except:
469 453 content = error.wrap_exception()
470 self.log.error("Bad Client Message: %s"%msg, exc_info=True)
471 self.session.send(self.clientele, "hub_error", ident=client_id,
454 self.log.error("Bad Query Message: %s"%msg, exc_info=True)
455 self.session.send(self.query, "hub_error", ident=client_id,
472 456 content=content)
473 457 return
474 458
475 459 # print client_id, header, parent, content
476 460 #switch on message type:
477 461 msg_type = msg['msg_type']
478 self.log.info("client:: client %s requested %s"%(client_id, msg_type))
479 handler = self.client_handlers.get(msg_type, None)
462 self.log.info("client::client %s requested %s"%(client_id, msg_type))
463 handler = self.query_handlers.get(msg_type, None)
480 464 try:
481 465 assert handler is not None, "Bad Message Type: %s"%msg_type
482 466 except:
483 467 content = error.wrap_exception()
484 468 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
485 self.session.send(self.clientele, "hub_error", ident=client_id,
469 self.session.send(self.query, "hub_error", ident=client_id,
486 470 content=content)
487 471 return
488 472 else:
489 handler(client_id, msg)
473 handler(idents, msg)
490 474
491 475 def dispatch_db(self, msg):
492 476 """"""
@@ -752,7 +736,7 b' class Hub(LoggingFactory):'
752 736 for k,v in self.keytable.iteritems():
753 737 jsonable[str(k)] = v
754 738 content['engines'] = jsonable
755 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
739 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
756 740
757 741 def register_engine(self, reg, msg):
758 742 """Register a new engine."""
@@ -801,7 +785,7 b' class Hub(LoggingFactory):'
801 785 content = error.wrap_exception()
802 786 break
803 787
804 msg = self.session.send(self.registrar, "registration_reply",
788 msg = self.session.send(self.query, "registration_reply",
805 789 content=content,
806 790 ident=reg)
807 791
@@ -912,7 +896,7 b' class Hub(LoggingFactory):'
912 896 # for eid,ec in self.engines.iteritems():
913 897 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
914 898 # time.sleep(1)
915 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
899 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
916 900 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
917 901 dc.start()
918 902
@@ -929,7 +913,7 b' class Hub(LoggingFactory):'
929 913 targets = self._validate_targets(targets)
930 914 except:
931 915 content = error.wrap_exception()
932 self.session.send(self.clientele, "hub_error",
916 self.session.send(self.query, "hub_error",
933 917 content=content, ident=client_id)
934 918 return
935 919
@@ -937,7 +921,7 b' class Hub(LoggingFactory):'
937 921 # loads = {}
938 922 for t in targets:
939 923 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
940 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
924 self.session.send(self.query, "load_reply", content=content, ident=client_id)
941 925
942 926
943 927 def queue_status(self, client_id, msg):
@@ -953,7 +937,7 b' class Hub(LoggingFactory):'
953 937 targets = self._validate_targets(targets)
954 938 except:
955 939 content = error.wrap_exception()
956 self.session.send(self.clientele, "hub_error",
940 self.session.send(self.query, "hub_error",
957 941 content=content, ident=client_id)
958 942 return
959 943 verbose = content.get('verbose', False)
@@ -968,7 +952,7 b' class Hub(LoggingFactory):'
968 952 tasks = len(tasks)
969 953 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
970 954 # pending
971 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
955 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
972 956
973 957 def purge_results(self, client_id, msg):
974 958 """Purge results from memory. This method is more valuable before we move
@@ -1006,7 +990,7 b' class Hub(LoggingFactory):'
1006 990 uid = self.engines[eid].queue
1007 991 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1008 992
1009 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
993 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1010 994
1011 995 def resubmit_task(self, client_id, msg, buffers):
1012 996 """Resubmit a task."""
@@ -1049,7 +1033,7 b' class Hub(LoggingFactory):'
1049 1033 except:
1050 1034 content = error.wrap_exception()
1051 1035 break
1052 self.session.send(self.clientele, "result_reply", content=content,
1036 self.session.send(self.query, "result_reply", content=content,
1053 1037 parent=msg, ident=client_id,
1054 1038 buffers=buffers)
1055 1039
General Comments 0
You need to be logged in to leave comments. Login now