Show More
@@ -265,7 +265,7 b' class Client(HasTraits):' | |||||
265 | _context = Instance('zmq.Context') |
|
265 | _context = Instance('zmq.Context') | |
266 | _config = Dict() |
|
266 | _config = Dict() | |
267 | _engines=Instance(ReverseDict, (), {}) |
|
267 | _engines=Instance(ReverseDict, (), {}) | |
268 |
|
|
268 | # _hub_socket=Instance('zmq.Socket') | |
269 | _query_socket=Instance('zmq.Socket') |
|
269 | _query_socket=Instance('zmq.Socket') | |
270 | _control_socket=Instance('zmq.Socket') |
|
270 | _control_socket=Instance('zmq.Socket') | |
271 | _iopub_socket=Instance('zmq.Socket') |
|
271 | _iopub_socket=Instance('zmq.Socket') | |
@@ -339,12 +339,12 b' class Client(HasTraits):' | |||||
339 | self.session = ss.StreamSession(**key_arg) |
|
339 | self.session = ss.StreamSession(**key_arg) | |
340 | else: |
|
340 | else: | |
341 | self.session = ss.StreamSession(username, **key_arg) |
|
341 | self.session = ss.StreamSession(username, **key_arg) | |
342 |
self._ |
|
342 | self._query_socket = self._context.socket(zmq.XREQ) | |
343 |
self._ |
|
343 | self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
344 | if self._ssh: |
|
344 | if self._ssh: | |
345 |
tunnel.tunnel_connection(self._ |
|
345 | tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) | |
346 | else: |
|
346 | else: | |
347 |
self._ |
|
347 | self._query_socket.connect(url) | |
348 |
|
348 | |||
349 | self.session.debug = self.debug |
|
349 | self.session.debug = self.debug | |
350 |
|
350 | |||
@@ -449,8 +449,8 b' class Client(HasTraits):' | |||||
449 | else: |
|
449 | else: | |
450 | return s.connect(url) |
|
450 | return s.connect(url) | |
451 |
|
451 | |||
452 |
self.session.send(self._ |
|
452 | self.session.send(self._query_socket, 'connection_request') | |
453 |
idents,msg = self.session.recv(self._ |
|
453 | idents,msg = self.session.recv(self._query_socket,mode=0) | |
454 | if self.debug: |
|
454 | if self.debug: | |
455 | pprint(msg) |
|
455 | pprint(msg) | |
456 | msg = ss.Message(msg) |
|
456 | msg = ss.Message(msg) | |
@@ -458,29 +458,29 b' class Client(HasTraits):' | |||||
458 | self._config['registration'] = dict(content) |
|
458 | self._config['registration'] = dict(content) | |
459 | if content.status == 'ok': |
|
459 | if content.status == 'ok': | |
460 | if content.mux: |
|
460 | if content.mux: | |
461 |
self._mux_socket = self._context.socket(zmq. |
|
461 | self._mux_socket = self._context.socket(zmq.XREQ) | |
462 | self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
462 | self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
463 | connect_socket(self._mux_socket, content.mux) |
|
463 | connect_socket(self._mux_socket, content.mux) | |
464 | if content.task: |
|
464 | if content.task: | |
465 | self._task_scheme, task_addr = content.task |
|
465 | self._task_scheme, task_addr = content.task | |
466 |
self._task_socket = self._context.socket(zmq. |
|
466 | self._task_socket = self._context.socket(zmq.XREQ) | |
467 | self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
467 | self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
468 | connect_socket(self._task_socket, task_addr) |
|
468 | connect_socket(self._task_socket, task_addr) | |
469 | if content.notification: |
|
469 | if content.notification: | |
470 | self._notification_socket = self._context.socket(zmq.SUB) |
|
470 | self._notification_socket = self._context.socket(zmq.SUB) | |
471 | connect_socket(self._notification_socket, content.notification) |
|
471 | connect_socket(self._notification_socket, content.notification) | |
472 |
self._notification_socket.setsockopt(zmq.SUBSCRIBE, |
|
472 | self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') | |
473 | if content.query: |
|
473 | # if content.query: | |
474 |
self._query_socket = self._context.socket(zmq. |
|
474 | # self._query_socket = self._context.socket(zmq.XREQ) | |
475 | self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
475 | # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
476 | connect_socket(self._query_socket, content.query) |
|
476 | # connect_socket(self._query_socket, content.query) | |
477 | if content.control: |
|
477 | if content.control: | |
478 |
self._control_socket = self._context.socket(zmq. |
|
478 | self._control_socket = self._context.socket(zmq.XREQ) | |
479 | self._control_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
479 | self._control_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
480 | connect_socket(self._control_socket, content.control) |
|
480 | connect_socket(self._control_socket, content.control) | |
481 | if content.iopub: |
|
481 | if content.iopub: | |
482 | self._iopub_socket = self._context.socket(zmq.SUB) |
|
482 | self._iopub_socket = self._context.socket(zmq.SUB) | |
483 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '') |
|
483 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') | |
484 | self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
484 | self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
485 | connect_socket(self._iopub_socket, content.iopub) |
|
485 | connect_socket(self._iopub_socket, content.iopub) | |
486 | self._update_engines(dict(content.engines)) |
|
486 | self._update_engines(dict(content.engines)) | |
@@ -496,6 +496,7 b' class Client(HasTraits):' | |||||
496 | def _unwrap_exception(self, content): |
|
496 | def _unwrap_exception(self, content): | |
497 | """unwrap exception, and remap engineid to int.""" |
|
497 | """unwrap exception, and remap engineid to int.""" | |
498 | e = error.unwrap_exception(content) |
|
498 | e = error.unwrap_exception(content) | |
|
499 | print e.traceback | |||
499 | if e.engine_info: |
|
500 | if e.engine_info: | |
500 | e_uuid = e.engine_info['engine_uuid'] |
|
501 | e_uuid = e.engine_info['engine_uuid'] | |
501 | eid = self._engines[e_uuid] |
|
502 | eid = self._engines[e_uuid] |
@@ -41,7 +41,7 b' class EngineFactory(RegistrationFactory):' | |||||
41 | super(EngineFactory, self).__init__(**kwargs) |
|
41 | super(EngineFactory, self).__init__(**kwargs) | |
42 | ctx = self.context |
|
42 | ctx = self.context | |
43 |
|
43 | |||
44 |
reg = ctx.socket(zmq. |
|
44 | reg = ctx.socket(zmq.XREQ) | |
45 | reg.setsockopt(zmq.IDENTITY, self.ident) |
|
45 | reg.setsockopt(zmq.IDENTITY, self.ident) | |
46 | reg.connect(self.url) |
|
46 | reg.connect(self.url) | |
47 | self.registrar = zmqstream.ZMQStream(reg, self.loop) |
|
47 | self.registrar = zmqstream.ZMQStream(reg, self.loop) | |
@@ -74,16 +74,26 b' class EngineFactory(RegistrationFactory):' | |||||
74 | task_addr = msg.content.task |
|
74 | task_addr = msg.content.task | |
75 | if task_addr: |
|
75 | if task_addr: | |
76 | shell_addrs.append(str(task_addr)) |
|
76 | shell_addrs.append(str(task_addr)) | |
77 | shell_streams = [] |
|
77 | ||
|
78 | # Uncomment this to go back to two-socket model | |||
|
79 | # shell_streams = [] | |||
|
80 | # for addr in shell_addrs: | |||
|
81 | # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop) | |||
|
82 | # stream.setsockopt(zmq.IDENTITY, identity) | |||
|
83 | # stream.connect(disambiguate_url(addr, self.location)) | |||
|
84 | # shell_streams.append(stream) | |||
|
85 | ||||
|
86 | # Now use only one shell stream for mux and tasks | |||
|
87 | stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop) | |||
|
88 | stream.setsockopt(zmq.IDENTITY, identity) | |||
|
89 | shell_streams = [stream] | |||
78 | for addr in shell_addrs: |
|
90 | for addr in shell_addrs: | |
79 | stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop) |
|
|||
80 | stream.setsockopt(zmq.IDENTITY, identity) |
|
|||
81 | stream.connect(disambiguate_url(addr, self.location)) |
|
91 | stream.connect(disambiguate_url(addr, self.location)) | |
82 |
|
|
92 | # end single stream-socket | |
83 |
|
93 | |||
84 | # control stream: |
|
94 | # control stream: | |
85 | control_addr = str(msg.content.control) |
|
95 | control_addr = str(msg.content.control) | |
86 |
control_stream = zmqstream.ZMQStream(ctx.socket(zmq. |
|
96 | control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop) | |
87 | control_stream.setsockopt(zmq.IDENTITY, identity) |
|
97 | control_stream.setsockopt(zmq.IDENTITY, identity) | |
88 | control_stream.connect(disambiguate_url(control_addr, self.location)) |
|
98 | control_stream.connect(disambiguate_url(control_addr, self.location)) | |
89 |
|
99 |
@@ -119,10 +119,6 b' class HubFactory(RegistrationFactory):' | |||||
119 | def _mon_port_default(self): |
|
119 | def _mon_port_default(self): | |
120 | return select_random_ports(1)[0] |
|
120 | return select_random_ports(1)[0] | |
121 |
|
121 | |||
122 | query_port = Instance(int, config=True) |
|
|||
123 | def _query_port_default(self): |
|
|||
124 | return select_random_ports(1)[0] |
|
|||
125 |
|
||||
126 | notifier_port = Instance(int, config=True) |
|
122 | notifier_port = Instance(int, config=True) | |
127 | def _notifier_port_default(self): |
|
123 | def _notifier_port_default(self): | |
128 | return select_random_ports(1)[0] |
|
124 | return select_random_ports(1)[0] | |
@@ -194,11 +190,11 b' class HubFactory(RegistrationFactory):' | |||||
194 | loop = self.loop |
|
190 | loop = self.loop | |
195 |
|
191 | |||
196 | # Registrar socket |
|
192 | # Registrar socket | |
197 |
|
|
193 | q = ZMQStream(ctx.socket(zmq.XREP), loop) | |
198 |
|
|
194 | q.bind(client_iface % self.regport) | |
199 | self.log.info("Hub listening on %s for registration."%(client_iface%self.regport)) |
|
195 | self.log.info("Hub listening on %s for registration."%(client_iface%self.regport)) | |
200 | if self.client_ip != self.engine_ip: |
|
196 | if self.client_ip != self.engine_ip: | |
201 |
|
|
197 | q.bind(engine_iface % self.regport) | |
202 | self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport)) |
|
198 | self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport)) | |
203 |
|
199 | |||
204 | ### Engine connections ### |
|
200 | ### Engine connections ### | |
@@ -212,9 +208,6 b' class HubFactory(RegistrationFactory):' | |||||
212 | period=self.ping, logname=self.log.name) |
|
208 | period=self.ping, logname=self.log.name) | |
213 |
|
209 | |||
214 | ### Client connections ### |
|
210 | ### Client connections ### | |
215 | # Clientele socket |
|
|||
216 | c = ZMQStream(ctx.socket(zmq.XREP), loop) |
|
|||
217 | c.bind(client_iface%self.query_port) |
|
|||
218 | # Notifier socket |
|
211 | # Notifier socket | |
219 | n = ZMQStream(ctx.socket(zmq.PUB), loop) |
|
212 | n = ZMQStream(ctx.socket(zmq.PUB), loop) | |
220 | n.bind(client_iface%self.notifier_port) |
|
213 | n.bind(client_iface%self.notifier_port) | |
@@ -230,7 +223,7 b' class HubFactory(RegistrationFactory):' | |||||
230 |
|
223 | |||
231 | # connect the db |
|
224 | # connect the db | |
232 | self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1])) |
|
225 | self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1])) | |
233 | cdir = self.config.Global.cluster_dir |
|
226 | # cdir = self.config.Global.cluster_dir | |
234 | self.db = import_item(self.db_class)(session=self.session.session, config=self.config) |
|
227 | self.db = import_item(self.db_class)(session=self.session.session, config=self.config) | |
235 | time.sleep(.25) |
|
228 | time.sleep(.25) | |
236 |
|
229 | |||
@@ -246,7 +239,6 b' class HubFactory(RegistrationFactory):' | |||||
246 |
|
239 | |||
247 | self.client_info = { |
|
240 | self.client_info = { | |
248 | 'control' : client_iface%self.control[0], |
|
241 | 'control' : client_iface%self.control[0], | |
249 | 'query': client_iface%self.query_port, |
|
|||
250 | 'mux': client_iface%self.mux[0], |
|
242 | 'mux': client_iface%self.mux[0], | |
251 | 'task' : (self.scheme, client_iface%self.task[0]), |
|
243 | 'task' : (self.scheme, client_iface%self.task[0]), | |
252 | 'iopub' : client_iface%self.iopub[0], |
|
244 | 'iopub' : client_iface%self.iopub[0], | |
@@ -255,7 +247,7 b' class HubFactory(RegistrationFactory):' | |||||
255 | self.log.debug("Hub engine addrs: %s"%self.engine_info) |
|
247 | self.log.debug("Hub engine addrs: %s"%self.engine_info) | |
256 | self.log.debug("Hub client addrs: %s"%self.client_info) |
|
248 | self.log.debug("Hub client addrs: %s"%self.client_info) | |
257 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
|
249 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, | |
258 |
|
|
250 | query=q, notifier=n, db=self.db, | |
259 | engine_info=self.engine_info, client_info=self.client_info, |
|
251 | engine_info=self.engine_info, client_info=self.client_info, | |
260 | logname=self.log.name) |
|
252 | logname=self.log.name) | |
261 |
|
253 | |||
@@ -269,10 +261,8 b' class Hub(LoggingFactory):' | |||||
269 | session: StreamSession object |
|
261 | session: StreamSession object | |
270 | <removed> context: zmq context for creating new connections (?) |
|
262 | <removed> context: zmq context for creating new connections (?) | |
271 | queue: ZMQStream for monitoring the command queue (SUB) |
|
263 | queue: ZMQStream for monitoring the command queue (SUB) | |
272 |
|
|
264 | query: ZMQStream for engine registration and client queries requests (XREP) | |
273 | heartbeat: HeartMonitor object checking the pulse of the engines |
|
265 | heartbeat: HeartMonitor object checking the pulse of the engines | |
274 | clientele: ZMQStream for client connections (XREP) |
|
|||
275 | not used for jobs, only query/control commands |
|
|||
276 | notifier: ZMQStream for broadcasting engine registration changes (PUB) |
|
266 | notifier: ZMQStream for broadcasting engine registration changes (PUB) | |
277 | db: connection to db for out of memory logging of commands |
|
267 | db: connection to db for out of memory logging of commands | |
278 | NotImplemented |
|
268 | NotImplemented | |
@@ -300,8 +290,7 b' class Hub(LoggingFactory):' | |||||
300 |
|
290 | |||
301 | # objects from constructor: |
|
291 | # objects from constructor: | |
302 | loop=Instance(ioloop.IOLoop) |
|
292 | loop=Instance(ioloop.IOLoop) | |
303 |
|
|
293 | query=Instance(ZMQStream) | |
304 | clientele=Instance(ZMQStream) |
|
|||
305 | monitor=Instance(ZMQStream) |
|
294 | monitor=Instance(ZMQStream) | |
306 | heartmonitor=Instance(HeartMonitor) |
|
295 | heartmonitor=Instance(HeartMonitor) | |
307 | notifier=Instance(ZMQStream) |
|
296 | notifier=Instance(ZMQStream) | |
@@ -317,10 +306,8 b' class Hub(LoggingFactory):' | |||||
317 | session: streamsession for sending serialized data |
|
306 | session: streamsession for sending serialized data | |
318 | # engine: |
|
307 | # engine: | |
319 | queue: ZMQStream for monitoring queue messages |
|
308 | queue: ZMQStream for monitoring queue messages | |
320 |
|
|
309 | query: ZMQStream for engine+client registration and client requests | |
321 | heartbeat: HeartMonitor object for tracking engines |
|
310 | heartbeat: HeartMonitor object for tracking engines | |
322 | # client: |
|
|||
323 | clientele: ZMQStream for client connections |
|
|||
324 | # extra: |
|
311 | # extra: | |
325 | db: ZMQStream for db connection (NotImplemented) |
|
312 | db: ZMQStream for db connection (NotImplemented) | |
326 | engine_info: zmq address/protocol dict for engine connections |
|
313 | engine_info: zmq address/protocol dict for engine connections | |
@@ -340,8 +327,7 b' class Hub(LoggingFactory):' | |||||
340 | validate_url_container(self.engine_info) |
|
327 | validate_url_container(self.engine_info) | |
341 |
|
328 | |||
342 | # register our callbacks |
|
329 | # register our callbacks | |
343 |
self. |
|
330 | self.query.on_recv(self.dispatch_query) | |
344 | self.clientele.on_recv(self.dispatch_client_msg) |
|
|||
345 | self.monitor.on_recv(self.dispatch_monitor_traffic) |
|
331 | self.monitor.on_recv(self.dispatch_monitor_traffic) | |
346 |
|
332 | |||
347 | self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure) |
|
333 | self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure) | |
@@ -357,15 +343,13 b' class Hub(LoggingFactory):' | |||||
357 | 'iopub': self.save_iopub_message, |
|
343 | 'iopub': self.save_iopub_message, | |
358 | } |
|
344 | } | |
359 |
|
345 | |||
360 |
self. |
|
346 | self.query_handlers = {'queue_request': self.queue_status, | |
361 | 'result_request': self.get_results, |
|
347 | 'result_request': self.get_results, | |
362 | 'purge_request': self.purge_results, |
|
348 | 'purge_request': self.purge_results, | |
363 | 'load_request': self.check_load, |
|
349 | 'load_request': self.check_load, | |
364 | 'resubmit_request': self.resubmit_task, |
|
350 | 'resubmit_request': self.resubmit_task, | |
365 | 'shutdown_request': self.shutdown_request, |
|
351 | 'shutdown_request': self.shutdown_request, | |
366 | } |
|
352 | 'registration_request' : self.register_engine, | |
367 |
|
||||
368 | self.registrar_handlers = {'registration_request' : self.register_engine, |
|
|||
369 | 'unregistration_request' : self.unregister_engine, |
|
353 | 'unregistration_request' : self.unregister_engine, | |
370 | 'connection_request': self.connection_request, |
|
354 | 'connection_request': self.connection_request, | |
371 | } |
|
355 | } | |
@@ -418,27 +402,27 b' class Hub(LoggingFactory):' | |||||
418 | # dispatch methods (1 per stream) |
|
402 | # dispatch methods (1 per stream) | |
419 | #----------------------------------------------------------------------------- |
|
403 | #----------------------------------------------------------------------------- | |
420 |
|
404 | |||
421 |
def dispatch_regist |
|
405 | # def dispatch_registration_request(self, msg): | |
422 | """""" |
|
406 | # """""" | |
423 | self.log.debug("registration::dispatch_register_request(%s)"%msg) |
|
407 | # self.log.debug("registration::dispatch_register_request(%s)"%msg) | |
424 | idents,msg = self.session.feed_identities(msg) |
|
408 | # idents,msg = self.session.feed_identities(msg) | |
425 | if not idents: |
|
409 | # if not idents: | |
426 |
self.log.error("Bad Que |
|
410 | # self.log.error("Bad Query Message: %s"%msg, exc_info=True) | |
427 | return |
|
411 | # return | |
428 | try: |
|
412 | # try: | |
429 | msg = self.session.unpack_message(msg,content=True) |
|
413 | # msg = self.session.unpack_message(msg,content=True) | |
430 | except: |
|
414 | # except: | |
431 | self.log.error("registration::got bad registration message: %s"%msg, exc_info=True) |
|
415 | # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True) | |
432 | return |
|
416 | # return | |
433 |
|
417 | # | ||
434 | msg_type = msg['msg_type'] |
|
418 | # msg_type = msg['msg_type'] | |
435 | content = msg['content'] |
|
419 | # content = msg['content'] | |
436 |
|
420 | # | ||
437 |
handler = self. |
|
421 | # handler = self.query_handlers.get(msg_type, None) | |
438 | if handler is None: |
|
422 | # if handler is None: | |
439 | self.log.error("registration::got bad registration message: %s"%msg) |
|
423 | # self.log.error("registration::got bad registration message: %s"%msg) | |
440 | else: |
|
424 | # else: | |
441 | handler(idents, msg) |
|
425 | # handler(idents, msg) | |
442 |
|
426 | |||
443 | def dispatch_monitor_traffic(self, msg): |
|
427 | def dispatch_monitor_traffic(self, msg): | |
444 | """all ME and Task queue messages come through here, as well as |
|
428 | """all ME and Task queue messages come through here, as well as | |
@@ -456,37 +440,37 b' class Hub(LoggingFactory):' | |||||
456 | self.log.error("Invalid monitor topic: %s"%switch) |
|
440 | self.log.error("Invalid monitor topic: %s"%switch) | |
457 |
|
441 | |||
458 |
|
442 | |||
459 |
def dispatch_ |
|
443 | def dispatch_query(self, msg): | |
460 |
"""Route |
|
444 | """Route registration requests and queries from clients.""" | |
461 | idents, msg = self.session.feed_identities(msg) |
|
445 | idents, msg = self.session.feed_identities(msg) | |
462 | if not idents: |
|
446 | if not idents: | |
463 |
self.log.error("Bad |
|
447 | self.log.error("Bad Query Message: %s"%msg) | |
464 | return |
|
448 | return | |
465 | client_id = idents[0] |
|
449 | client_id = idents[0] | |
466 | try: |
|
450 | try: | |
467 | msg = self.session.unpack_message(msg, content=True) |
|
451 | msg = self.session.unpack_message(msg, content=True) | |
468 | except: |
|
452 | except: | |
469 | content = error.wrap_exception() |
|
453 | content = error.wrap_exception() | |
470 |
self.log.error("Bad |
|
454 | self.log.error("Bad Query Message: %s"%msg, exc_info=True) | |
471 |
self.session.send(self. |
|
455 | self.session.send(self.query, "hub_error", ident=client_id, | |
472 | content=content) |
|
456 | content=content) | |
473 | return |
|
457 | return | |
474 |
|
458 | |||
475 | # print client_id, header, parent, content |
|
459 | # print client_id, header, parent, content | |
476 | #switch on message type: |
|
460 | #switch on message type: | |
477 | msg_type = msg['msg_type'] |
|
461 | msg_type = msg['msg_type'] | |
478 |
self.log.info("client:: |
|
462 | self.log.info("client::client %s requested %s"%(client_id, msg_type)) | |
479 |
handler = self. |
|
463 | handler = self.query_handlers.get(msg_type, None) | |
480 | try: |
|
464 | try: | |
481 | assert handler is not None, "Bad Message Type: %s"%msg_type |
|
465 | assert handler is not None, "Bad Message Type: %s"%msg_type | |
482 | except: |
|
466 | except: | |
483 | content = error.wrap_exception() |
|
467 | content = error.wrap_exception() | |
484 | self.log.error("Bad Message Type: %s"%msg_type, exc_info=True) |
|
468 | self.log.error("Bad Message Type: %s"%msg_type, exc_info=True) | |
485 |
self.session.send(self. |
|
469 | self.session.send(self.query, "hub_error", ident=client_id, | |
486 | content=content) |
|
470 | content=content) | |
487 | return |
|
471 | return | |
488 | else: |
|
472 | else: | |
489 |
handler( |
|
473 | handler(idents, msg) | |
490 |
|
474 | |||
491 | def dispatch_db(self, msg): |
|
475 | def dispatch_db(self, msg): | |
492 | """""" |
|
476 | """""" | |
@@ -752,7 +736,7 b' class Hub(LoggingFactory):' | |||||
752 | for k,v in self.keytable.iteritems(): |
|
736 | for k,v in self.keytable.iteritems(): | |
753 | jsonable[str(k)] = v |
|
737 | jsonable[str(k)] = v | |
754 | content['engines'] = jsonable |
|
738 | content['engines'] = jsonable | |
755 |
self.session.send(self. |
|
739 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) | |
756 |
|
740 | |||
757 | def register_engine(self, reg, msg): |
|
741 | def register_engine(self, reg, msg): | |
758 | """Register a new engine.""" |
|
742 | """Register a new engine.""" | |
@@ -801,7 +785,7 b' class Hub(LoggingFactory):' | |||||
801 | content = error.wrap_exception() |
|
785 | content = error.wrap_exception() | |
802 | break |
|
786 | break | |
803 |
|
787 | |||
804 |
msg = self.session.send(self. |
|
788 | msg = self.session.send(self.query, "registration_reply", | |
805 | content=content, |
|
789 | content=content, | |
806 | ident=reg) |
|
790 | ident=reg) | |
807 |
|
791 | |||
@@ -912,7 +896,7 b' class Hub(LoggingFactory):' | |||||
912 | # for eid,ec in self.engines.iteritems(): |
|
896 | # for eid,ec in self.engines.iteritems(): | |
913 | # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue) |
|
897 | # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue) | |
914 | # time.sleep(1) |
|
898 | # time.sleep(1) | |
915 |
self.session.send(self. |
|
899 | self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id) | |
916 | dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop) |
|
900 | dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop) | |
917 | dc.start() |
|
901 | dc.start() | |
918 |
|
902 | |||
@@ -929,7 +913,7 b' class Hub(LoggingFactory):' | |||||
929 | targets = self._validate_targets(targets) |
|
913 | targets = self._validate_targets(targets) | |
930 | except: |
|
914 | except: | |
931 | content = error.wrap_exception() |
|
915 | content = error.wrap_exception() | |
932 |
self.session.send(self. |
|
916 | self.session.send(self.query, "hub_error", | |
933 | content=content, ident=client_id) |
|
917 | content=content, ident=client_id) | |
934 | return |
|
918 | return | |
935 |
|
919 | |||
@@ -937,7 +921,7 b' class Hub(LoggingFactory):' | |||||
937 | # loads = {} |
|
921 | # loads = {} | |
938 | for t in targets: |
|
922 | for t in targets: | |
939 | content[bytes(t)] = len(self.queues[t])+len(self.tasks[t]) |
|
923 | content[bytes(t)] = len(self.queues[t])+len(self.tasks[t]) | |
940 |
self.session.send(self. |
|
924 | self.session.send(self.query, "load_reply", content=content, ident=client_id) | |
941 |
|
925 | |||
942 |
|
926 | |||
943 | def queue_status(self, client_id, msg): |
|
927 | def queue_status(self, client_id, msg): | |
@@ -953,7 +937,7 b' class Hub(LoggingFactory):' | |||||
953 | targets = self._validate_targets(targets) |
|
937 | targets = self._validate_targets(targets) | |
954 | except: |
|
938 | except: | |
955 | content = error.wrap_exception() |
|
939 | content = error.wrap_exception() | |
956 |
self.session.send(self. |
|
940 | self.session.send(self.query, "hub_error", | |
957 | content=content, ident=client_id) |
|
941 | content=content, ident=client_id) | |
958 | return |
|
942 | return | |
959 | verbose = content.get('verbose', False) |
|
943 | verbose = content.get('verbose', False) | |
@@ -968,7 +952,7 b' class Hub(LoggingFactory):' | |||||
968 | tasks = len(tasks) |
|
952 | tasks = len(tasks) | |
969 | content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} |
|
953 | content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} | |
970 | # pending |
|
954 | # pending | |
971 |
self.session.send(self. |
|
955 | self.session.send(self.query, "queue_reply", content=content, ident=client_id) | |
972 |
|
956 | |||
973 | def purge_results(self, client_id, msg): |
|
957 | def purge_results(self, client_id, msg): | |
974 | """Purge results from memory. This method is more valuable before we move |
|
958 | """Purge results from memory. This method is more valuable before we move | |
@@ -1006,7 +990,7 b' class Hub(LoggingFactory):' | |||||
1006 | uid = self.engines[eid].queue |
|
990 | uid = self.engines[eid].queue | |
1007 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) |
|
991 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) | |
1008 |
|
992 | |||
1009 |
self.session.send(self. |
|
993 | self.session.send(self.query, 'purge_reply', content=reply, ident=client_id) | |
1010 |
|
994 | |||
1011 | def resubmit_task(self, client_id, msg, buffers): |
|
995 | def resubmit_task(self, client_id, msg, buffers): | |
1012 | """Resubmit a task.""" |
|
996 | """Resubmit a task.""" | |
@@ -1049,7 +1033,7 b' class Hub(LoggingFactory):' | |||||
1049 | except: |
|
1033 | except: | |
1050 | content = error.wrap_exception() |
|
1034 | content = error.wrap_exception() | |
1051 | break |
|
1035 | break | |
1052 |
self.session.send(self. |
|
1036 | self.session.send(self.query, "result_reply", content=content, | |
1053 | parent=msg, ident=client_id, |
|
1037 | parent=msg, ident=client_id, | |
1054 | buffers=buffers) |
|
1038 | buffers=buffers) | |
1055 |
|
1039 |
General Comments 0
You need to be logged in to leave comments.
Login now