##// END OF EJS Templates
remove all PAIR sockets, Merge registration+query
MinRK -
Show More
@@ -265,7 +265,7 b' class Client(HasTraits):'
265 _context = Instance('zmq.Context')
265 _context = Instance('zmq.Context')
266 _config = Dict()
266 _config = Dict()
267 _engines=Instance(ReverseDict, (), {})
267 _engines=Instance(ReverseDict, (), {})
268 _registration_socket=Instance('zmq.Socket')
268 # _hub_socket=Instance('zmq.Socket')
269 _query_socket=Instance('zmq.Socket')
269 _query_socket=Instance('zmq.Socket')
270 _control_socket=Instance('zmq.Socket')
270 _control_socket=Instance('zmq.Socket')
271 _iopub_socket=Instance('zmq.Socket')
271 _iopub_socket=Instance('zmq.Socket')
@@ -339,12 +339,12 b' class Client(HasTraits):'
339 self.session = ss.StreamSession(**key_arg)
339 self.session = ss.StreamSession(**key_arg)
340 else:
340 else:
341 self.session = ss.StreamSession(username, **key_arg)
341 self.session = ss.StreamSession(username, **key_arg)
342 self._registration_socket = self._context.socket(zmq.XREQ)
342 self._query_socket = self._context.socket(zmq.XREQ)
343 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
343 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
344 if self._ssh:
344 if self._ssh:
345 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
345 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
346 else:
346 else:
347 self._registration_socket.connect(url)
347 self._query_socket.connect(url)
348
348
349 self.session.debug = self.debug
349 self.session.debug = self.debug
350
350
@@ -449,8 +449,8 b' class Client(HasTraits):'
449 else:
449 else:
450 return s.connect(url)
450 return s.connect(url)
451
451
452 self.session.send(self._registration_socket, 'connection_request')
452 self.session.send(self._query_socket, 'connection_request')
453 idents,msg = self.session.recv(self._registration_socket,mode=0)
453 idents,msg = self.session.recv(self._query_socket,mode=0)
454 if self.debug:
454 if self.debug:
455 pprint(msg)
455 pprint(msg)
456 msg = ss.Message(msg)
456 msg = ss.Message(msg)
@@ -458,29 +458,29 b' class Client(HasTraits):'
458 self._config['registration'] = dict(content)
458 self._config['registration'] = dict(content)
459 if content.status == 'ok':
459 if content.status == 'ok':
460 if content.mux:
460 if content.mux:
461 self._mux_socket = self._context.socket(zmq.PAIR)
461 self._mux_socket = self._context.socket(zmq.XREQ)
462 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
462 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
463 connect_socket(self._mux_socket, content.mux)
463 connect_socket(self._mux_socket, content.mux)
464 if content.task:
464 if content.task:
465 self._task_scheme, task_addr = content.task
465 self._task_scheme, task_addr = content.task
466 self._task_socket = self._context.socket(zmq.PAIR)
466 self._task_socket = self._context.socket(zmq.XREQ)
467 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
467 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
468 connect_socket(self._task_socket, task_addr)
468 connect_socket(self._task_socket, task_addr)
469 if content.notification:
469 if content.notification:
470 self._notification_socket = self._context.socket(zmq.SUB)
470 self._notification_socket = self._context.socket(zmq.SUB)
471 connect_socket(self._notification_socket, content.notification)
471 connect_socket(self._notification_socket, content.notification)
472 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
472 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
473 if content.query:
473 # if content.query:
474 self._query_socket = self._context.socket(zmq.PAIR)
474 # self._query_socket = self._context.socket(zmq.XREQ)
475 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
475 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
476 connect_socket(self._query_socket, content.query)
476 # connect_socket(self._query_socket, content.query)
477 if content.control:
477 if content.control:
478 self._control_socket = self._context.socket(zmq.PAIR)
478 self._control_socket = self._context.socket(zmq.XREQ)
479 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
479 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
480 connect_socket(self._control_socket, content.control)
480 connect_socket(self._control_socket, content.control)
481 if content.iopub:
481 if content.iopub:
482 self._iopub_socket = self._context.socket(zmq.SUB)
482 self._iopub_socket = self._context.socket(zmq.SUB)
483 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
483 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
484 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
484 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
485 connect_socket(self._iopub_socket, content.iopub)
485 connect_socket(self._iopub_socket, content.iopub)
486 self._update_engines(dict(content.engines))
486 self._update_engines(dict(content.engines))
@@ -496,6 +496,7 b' class Client(HasTraits):'
496 def _unwrap_exception(self, content):
496 def _unwrap_exception(self, content):
497 """unwrap exception, and remap engineid to int."""
497 """unwrap exception, and remap engineid to int."""
498 e = error.unwrap_exception(content)
498 e = error.unwrap_exception(content)
499 print e.traceback
499 if e.engine_info:
500 if e.engine_info:
500 e_uuid = e.engine_info['engine_uuid']
501 e_uuid = e.engine_info['engine_uuid']
501 eid = self._engines[e_uuid]
502 eid = self._engines[e_uuid]
@@ -41,7 +41,7 b' class EngineFactory(RegistrationFactory):'
41 super(EngineFactory, self).__init__(**kwargs)
41 super(EngineFactory, self).__init__(**kwargs)
42 ctx = self.context
42 ctx = self.context
43
43
44 reg = ctx.socket(zmq.PAIR)
44 reg = ctx.socket(zmq.XREQ)
45 reg.setsockopt(zmq.IDENTITY, self.ident)
45 reg.setsockopt(zmq.IDENTITY, self.ident)
46 reg.connect(self.url)
46 reg.connect(self.url)
47 self.registrar = zmqstream.ZMQStream(reg, self.loop)
47 self.registrar = zmqstream.ZMQStream(reg, self.loop)
@@ -74,16 +74,26 b' class EngineFactory(RegistrationFactory):'
74 task_addr = msg.content.task
74 task_addr = msg.content.task
75 if task_addr:
75 if task_addr:
76 shell_addrs.append(str(task_addr))
76 shell_addrs.append(str(task_addr))
77 shell_streams = []
77
78 # Uncomment this to go back to two-socket model
79 # shell_streams = []
80 # for addr in shell_addrs:
81 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
82 # stream.setsockopt(zmq.IDENTITY, identity)
83 # stream.connect(disambiguate_url(addr, self.location))
84 # shell_streams.append(stream)
85
86 # Now use only one shell stream for mux and tasks
87 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
88 stream.setsockopt(zmq.IDENTITY, identity)
89 shell_streams = [stream]
78 for addr in shell_addrs:
90 for addr in shell_addrs:
79 stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
80 stream.setsockopt(zmq.IDENTITY, identity)
81 stream.connect(disambiguate_url(addr, self.location))
91 stream.connect(disambiguate_url(addr, self.location))
82 shell_streams.append(stream)
92 # end single stream-socket
83
93
84 # control stream:
94 # control stream:
85 control_addr = str(msg.content.control)
95 control_addr = str(msg.content.control)
86 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
96 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
87 control_stream.setsockopt(zmq.IDENTITY, identity)
97 control_stream.setsockopt(zmq.IDENTITY, identity)
88 control_stream.connect(disambiguate_url(control_addr, self.location))
98 control_stream.connect(disambiguate_url(control_addr, self.location))
89
99
@@ -119,10 +119,6 b' class HubFactory(RegistrationFactory):'
119 def _mon_port_default(self):
119 def _mon_port_default(self):
120 return select_random_ports(1)[0]
120 return select_random_ports(1)[0]
121
121
122 query_port = Instance(int, config=True)
123 def _query_port_default(self):
124 return select_random_ports(1)[0]
125
126 notifier_port = Instance(int, config=True)
122 notifier_port = Instance(int, config=True)
127 def _notifier_port_default(self):
123 def _notifier_port_default(self):
128 return select_random_ports(1)[0]
124 return select_random_ports(1)[0]
@@ -194,11 +190,11 b' class HubFactory(RegistrationFactory):'
194 loop = self.loop
190 loop = self.loop
195
191
196 # Registrar socket
192 # Registrar socket
197 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
193 q = ZMQStream(ctx.socket(zmq.XREP), loop)
198 reg.bind(client_iface % self.regport)
194 q.bind(client_iface % self.regport)
199 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
195 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
200 if self.client_ip != self.engine_ip:
196 if self.client_ip != self.engine_ip:
201 reg.bind(engine_iface % self.regport)
197 q.bind(engine_iface % self.regport)
202 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
198 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
203
199
204 ### Engine connections ###
200 ### Engine connections ###
@@ -212,9 +208,6 b' class HubFactory(RegistrationFactory):'
212 period=self.ping, logname=self.log.name)
208 period=self.ping, logname=self.log.name)
213
209
214 ### Client connections ###
210 ### Client connections ###
215 # Clientele socket
216 c = ZMQStream(ctx.socket(zmq.XREP), loop)
217 c.bind(client_iface%self.query_port)
218 # Notifier socket
211 # Notifier socket
219 n = ZMQStream(ctx.socket(zmq.PUB), loop)
212 n = ZMQStream(ctx.socket(zmq.PUB), loop)
220 n.bind(client_iface%self.notifier_port)
213 n.bind(client_iface%self.notifier_port)
@@ -230,7 +223,7 b' class HubFactory(RegistrationFactory):'
230
223
231 # connect the db
224 # connect the db
232 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
225 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
233 cdir = self.config.Global.cluster_dir
226 # cdir = self.config.Global.cluster_dir
234 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
227 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
235 time.sleep(.25)
228 time.sleep(.25)
236
229
@@ -246,7 +239,6 b' class HubFactory(RegistrationFactory):'
246
239
247 self.client_info = {
240 self.client_info = {
248 'control' : client_iface%self.control[0],
241 'control' : client_iface%self.control[0],
249 'query': client_iface%self.query_port,
250 'mux': client_iface%self.mux[0],
242 'mux': client_iface%self.mux[0],
251 'task' : (self.scheme, client_iface%self.task[0]),
243 'task' : (self.scheme, client_iface%self.task[0]),
252 'iopub' : client_iface%self.iopub[0],
244 'iopub' : client_iface%self.iopub[0],
@@ -255,7 +247,7 b' class HubFactory(RegistrationFactory):'
255 self.log.debug("Hub engine addrs: %s"%self.engine_info)
247 self.log.debug("Hub engine addrs: %s"%self.engine_info)
256 self.log.debug("Hub client addrs: %s"%self.client_info)
248 self.log.debug("Hub client addrs: %s"%self.client_info)
257 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
249 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
258 registrar=reg, clientele=c, notifier=n, db=self.db,
250 query=q, notifier=n, db=self.db,
259 engine_info=self.engine_info, client_info=self.client_info,
251 engine_info=self.engine_info, client_info=self.client_info,
260 logname=self.log.name)
252 logname=self.log.name)
261
253
@@ -269,10 +261,8 b' class Hub(LoggingFactory):'
269 session: StreamSession object
261 session: StreamSession object
270 <removed> context: zmq context for creating new connections (?)
262 <removed> context: zmq context for creating new connections (?)
271 queue: ZMQStream for monitoring the command queue (SUB)
263 queue: ZMQStream for monitoring the command queue (SUB)
272 registrar: ZMQStream for engine registration requests (XREP)
264 query: ZMQStream for engine registration and client queries requests (XREP)
273 heartbeat: HeartMonitor object checking the pulse of the engines
265 heartbeat: HeartMonitor object checking the pulse of the engines
274 clientele: ZMQStream for client connections (XREP)
275 not used for jobs, only query/control commands
276 notifier: ZMQStream for broadcasting engine registration changes (PUB)
266 notifier: ZMQStream for broadcasting engine registration changes (PUB)
277 db: connection to db for out of memory logging of commands
267 db: connection to db for out of memory logging of commands
278 NotImplemented
268 NotImplemented
@@ -300,8 +290,7 b' class Hub(LoggingFactory):'
300
290
301 # objects from constructor:
291 # objects from constructor:
302 loop=Instance(ioloop.IOLoop)
292 loop=Instance(ioloop.IOLoop)
303 registrar=Instance(ZMQStream)
293 query=Instance(ZMQStream)
304 clientele=Instance(ZMQStream)
305 monitor=Instance(ZMQStream)
294 monitor=Instance(ZMQStream)
306 heartmonitor=Instance(HeartMonitor)
295 heartmonitor=Instance(HeartMonitor)
307 notifier=Instance(ZMQStream)
296 notifier=Instance(ZMQStream)
@@ -317,10 +306,8 b' class Hub(LoggingFactory):'
317 session: streamsession for sending serialized data
306 session: streamsession for sending serialized data
318 # engine:
307 # engine:
319 queue: ZMQStream for monitoring queue messages
308 queue: ZMQStream for monitoring queue messages
320 registrar: ZMQStream for engine registration
309 query: ZMQStream for engine+client registration and client requests
321 heartbeat: HeartMonitor object for tracking engines
310 heartbeat: HeartMonitor object for tracking engines
322 # client:
323 clientele: ZMQStream for client connections
324 # extra:
311 # extra:
325 db: ZMQStream for db connection (NotImplemented)
312 db: ZMQStream for db connection (NotImplemented)
326 engine_info: zmq address/protocol dict for engine connections
313 engine_info: zmq address/protocol dict for engine connections
@@ -340,8 +327,7 b' class Hub(LoggingFactory):'
340 validate_url_container(self.engine_info)
327 validate_url_container(self.engine_info)
341
328
342 # register our callbacks
329 # register our callbacks
343 self.registrar.on_recv(self.dispatch_register_request)
330 self.query.on_recv(self.dispatch_query)
344 self.clientele.on_recv(self.dispatch_client_msg)
345 self.monitor.on_recv(self.dispatch_monitor_traffic)
331 self.monitor.on_recv(self.dispatch_monitor_traffic)
346
332
347 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
333 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
@@ -357,15 +343,13 b' class Hub(LoggingFactory):'
357 'iopub': self.save_iopub_message,
343 'iopub': self.save_iopub_message,
358 }
344 }
359
345
360 self.client_handlers = {'queue_request': self.queue_status,
346 self.query_handlers = {'queue_request': self.queue_status,
361 'result_request': self.get_results,
347 'result_request': self.get_results,
362 'purge_request': self.purge_results,
348 'purge_request': self.purge_results,
363 'load_request': self.check_load,
349 'load_request': self.check_load,
364 'resubmit_request': self.resubmit_task,
350 'resubmit_request': self.resubmit_task,
365 'shutdown_request': self.shutdown_request,
351 'shutdown_request': self.shutdown_request,
366 }
352 'registration_request' : self.register_engine,
367
368 self.registrar_handlers = {'registration_request' : self.register_engine,
369 'unregistration_request' : self.unregister_engine,
353 'unregistration_request' : self.unregister_engine,
370 'connection_request': self.connection_request,
354 'connection_request': self.connection_request,
371 }
355 }
@@ -418,27 +402,27 b' class Hub(LoggingFactory):'
418 # dispatch methods (1 per stream)
402 # dispatch methods (1 per stream)
419 #-----------------------------------------------------------------------------
403 #-----------------------------------------------------------------------------
420
404
421 def dispatch_register_request(self, msg):
405 # def dispatch_registration_request(self, msg):
422 """"""
406 # """"""
423 self.log.debug("registration::dispatch_register_request(%s)"%msg)
407 # self.log.debug("registration::dispatch_register_request(%s)"%msg)
424 idents,msg = self.session.feed_identities(msg)
408 # idents,msg = self.session.feed_identities(msg)
425 if not idents:
409 # if not idents:
426 self.log.error("Bad Queue Message: %s"%msg, exc_info=True)
410 # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
427 return
411 # return
428 try:
412 # try:
429 msg = self.session.unpack_message(msg,content=True)
413 # msg = self.session.unpack_message(msg,content=True)
430 except:
414 # except:
431 self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
415 # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
432 return
416 # return
433
417 #
434 msg_type = msg['msg_type']
418 # msg_type = msg['msg_type']
435 content = msg['content']
419 # content = msg['content']
436
420 #
437 handler = self.registrar_handlers.get(msg_type, None)
421 # handler = self.query_handlers.get(msg_type, None)
438 if handler is None:
422 # if handler is None:
439 self.log.error("registration::got bad registration message: %s"%msg)
423 # self.log.error("registration::got bad registration message: %s"%msg)
440 else:
424 # else:
441 handler(idents, msg)
425 # handler(idents, msg)
442
426
443 def dispatch_monitor_traffic(self, msg):
427 def dispatch_monitor_traffic(self, msg):
444 """all ME and Task queue messages come through here, as well as
428 """all ME and Task queue messages come through here, as well as
@@ -456,37 +440,37 b' class Hub(LoggingFactory):'
456 self.log.error("Invalid monitor topic: %s"%switch)
440 self.log.error("Invalid monitor topic: %s"%switch)
457
441
458
442
459 def dispatch_client_msg(self, msg):
443 def dispatch_query(self, msg):
460 """Route messages from clients"""
444 """Route registration requests and queries from clients."""
461 idents, msg = self.session.feed_identities(msg)
445 idents, msg = self.session.feed_identities(msg)
462 if not idents:
446 if not idents:
463 self.log.error("Bad Client Message: %s"%msg)
447 self.log.error("Bad Query Message: %s"%msg)
464 return
448 return
465 client_id = idents[0]
449 client_id = idents[0]
466 try:
450 try:
467 msg = self.session.unpack_message(msg, content=True)
451 msg = self.session.unpack_message(msg, content=True)
468 except:
452 except:
469 content = error.wrap_exception()
453 content = error.wrap_exception()
470 self.log.error("Bad Client Message: %s"%msg, exc_info=True)
454 self.log.error("Bad Query Message: %s"%msg, exc_info=True)
471 self.session.send(self.clientele, "hub_error", ident=client_id,
455 self.session.send(self.query, "hub_error", ident=client_id,
472 content=content)
456 content=content)
473 return
457 return
474
458
475 # print client_id, header, parent, content
459 # print client_id, header, parent, content
476 #switch on message type:
460 #switch on message type:
477 msg_type = msg['msg_type']
461 msg_type = msg['msg_type']
478 self.log.info("client:: client %s requested %s"%(client_id, msg_type))
462 self.log.info("client::client %s requested %s"%(client_id, msg_type))
479 handler = self.client_handlers.get(msg_type, None)
463 handler = self.query_handlers.get(msg_type, None)
480 try:
464 try:
481 assert handler is not None, "Bad Message Type: %s"%msg_type
465 assert handler is not None, "Bad Message Type: %s"%msg_type
482 except:
466 except:
483 content = error.wrap_exception()
467 content = error.wrap_exception()
484 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
468 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
485 self.session.send(self.clientele, "hub_error", ident=client_id,
469 self.session.send(self.query, "hub_error", ident=client_id,
486 content=content)
470 content=content)
487 return
471 return
488 else:
472 else:
489 handler(client_id, msg)
473 handler(idents, msg)
490
474
491 def dispatch_db(self, msg):
475 def dispatch_db(self, msg):
492 """"""
476 """"""
@@ -752,7 +736,7 b' class Hub(LoggingFactory):'
752 for k,v in self.keytable.iteritems():
736 for k,v in self.keytable.iteritems():
753 jsonable[str(k)] = v
737 jsonable[str(k)] = v
754 content['engines'] = jsonable
738 content['engines'] = jsonable
755 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
739 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
756
740
757 def register_engine(self, reg, msg):
741 def register_engine(self, reg, msg):
758 """Register a new engine."""
742 """Register a new engine."""
@@ -801,7 +785,7 b' class Hub(LoggingFactory):'
801 content = error.wrap_exception()
785 content = error.wrap_exception()
802 break
786 break
803
787
804 msg = self.session.send(self.registrar, "registration_reply",
788 msg = self.session.send(self.query, "registration_reply",
805 content=content,
789 content=content,
806 ident=reg)
790 ident=reg)
807
791
@@ -912,7 +896,7 b' class Hub(LoggingFactory):'
912 # for eid,ec in self.engines.iteritems():
896 # for eid,ec in self.engines.iteritems():
913 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
897 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
914 # time.sleep(1)
898 # time.sleep(1)
915 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
899 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
916 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
900 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
917 dc.start()
901 dc.start()
918
902
@@ -929,7 +913,7 b' class Hub(LoggingFactory):'
929 targets = self._validate_targets(targets)
913 targets = self._validate_targets(targets)
930 except:
914 except:
931 content = error.wrap_exception()
915 content = error.wrap_exception()
932 self.session.send(self.clientele, "hub_error",
916 self.session.send(self.query, "hub_error",
933 content=content, ident=client_id)
917 content=content, ident=client_id)
934 return
918 return
935
919
@@ -937,7 +921,7 b' class Hub(LoggingFactory):'
937 # loads = {}
921 # loads = {}
938 for t in targets:
922 for t in targets:
939 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
923 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
940 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
924 self.session.send(self.query, "load_reply", content=content, ident=client_id)
941
925
942
926
943 def queue_status(self, client_id, msg):
927 def queue_status(self, client_id, msg):
@@ -953,7 +937,7 b' class Hub(LoggingFactory):'
953 targets = self._validate_targets(targets)
937 targets = self._validate_targets(targets)
954 except:
938 except:
955 content = error.wrap_exception()
939 content = error.wrap_exception()
956 self.session.send(self.clientele, "hub_error",
940 self.session.send(self.query, "hub_error",
957 content=content, ident=client_id)
941 content=content, ident=client_id)
958 return
942 return
959 verbose = content.get('verbose', False)
943 verbose = content.get('verbose', False)
@@ -968,7 +952,7 b' class Hub(LoggingFactory):'
968 tasks = len(tasks)
952 tasks = len(tasks)
969 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
953 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
970 # pending
954 # pending
971 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
955 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
972
956
973 def purge_results(self, client_id, msg):
957 def purge_results(self, client_id, msg):
974 """Purge results from memory. This method is more valuable before we move
958 """Purge results from memory. This method is more valuable before we move
@@ -1006,7 +990,7 b' class Hub(LoggingFactory):'
1006 uid = self.engines[eid].queue
990 uid = self.engines[eid].queue
1007 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
991 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1008
992
1009 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
993 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1010
994
1011 def resubmit_task(self, client_id, msg, buffers):
995 def resubmit_task(self, client_id, msg, buffers):
1012 """Resubmit a task."""
996 """Resubmit a task."""
@@ -1049,7 +1033,7 b' class Hub(LoggingFactory):'
1049 except:
1033 except:
1050 content = error.wrap_exception()
1034 content = error.wrap_exception()
1051 break
1035 break
1052 self.session.send(self.clientele, "result_reply", content=content,
1036 self.session.send(self.query, "result_reply", content=content,
1053 parent=msg, ident=client_id,
1037 parent=msg, ident=client_id,
1054 buffers=buffers)
1038 buffers=buffers)
1055
1039
General Comments 0
You need to be logged in to leave comments. Login now