Show More
This diff has been collapsed as it changes many lines, (837 lines changed) Show them Hide them | |||||
@@ -0,0 +1,837 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """The IPython Controller Hub with 0MQ | |||
|
3 | This is the master object that handles connections from engines and clients, | |||
|
4 | and monitors traffic through the various queues. | |||
|
5 | """ | |||
|
6 | #----------------------------------------------------------------------------- | |||
|
7 | # Copyright (C) 2010 The IPython Development Team | |||
|
8 | # | |||
|
9 | # Distributed under the terms of the BSD License. The full license is in | |||
|
10 | # the file COPYING, distributed as part of this software. | |||
|
11 | #----------------------------------------------------------------------------- | |||
|
12 | ||||
|
13 | #----------------------------------------------------------------------------- | |||
|
14 | # Imports | |||
|
15 | #----------------------------------------------------------------------------- | |||
|
16 | from __future__ import print_function | |||
|
17 | ||||
|
18 | import sys | |||
|
19 | from datetime import datetime | |||
|
20 | import time | |||
|
21 | ||||
|
22 | import zmq | |||
|
23 | from zmq.eventloop import ioloop | |||
|
24 | ||||
|
25 | # internal: | |||
|
26 | from IPython.zmq.log import logger # a Logger object | |||
|
27 | ||||
|
28 | from streamsession import Message, wrap_exception, ISO8601 | |||
|
29 | ||||
|
30 | try: | |||
|
31 | from pymongo.binary import Binary | |||
|
32 | except ImportError: | |||
|
33 | MongoDB=None | |||
|
34 | else: | |||
|
35 | from mongodb import MongoDB | |||
|
36 | ||||
|
37 | #----------------------------------------------------------------------------- | |||
|
38 | # Code | |||
|
39 | #----------------------------------------------------------------------------- | |||
|
40 | ||||
|
41 | def _passer(*args, **kwargs): | |||
|
42 | return | |||
|
43 | ||||
|
44 | def init_record(msg): | |||
|
45 | """return an empty TaskRecord dict, with all keys initialized with None.""" | |||
|
46 | header = msg['header'] | |||
|
47 | return { | |||
|
48 | 'msg_id' : header['msg_id'], | |||
|
49 | 'header' : header, | |||
|
50 | 'content': msg['content'], | |||
|
51 | 'buffers': msg['buffers'], | |||
|
52 | 'submitted': datetime.strptime(header['date'], ISO8601), | |||
|
53 | 'client_uuid' : None, | |||
|
54 | 'engine_uuid' : None, | |||
|
55 | 'started': None, | |||
|
56 | 'completed': None, | |||
|
57 | 'resubmitted': None, | |||
|
58 | 'result_header' : None, | |||
|
59 | 'result_content' : None, | |||
|
60 | 'result_buffers' : None, | |||
|
61 | 'queue' : None | |||
|
62 | } | |||
|
63 | ||||
|
64 | ||||
|
65 | class EngineConnector(object): | |||
|
66 | """A simple object for accessing the various zmq connections of an object. | |||
|
67 | Attributes are: | |||
|
68 | id (int): engine ID | |||
|
69 | uuid (str): uuid (unused?) | |||
|
70 | queue (str): identity of queue's XREQ socket | |||
|
71 | registration (str): identity of registration XREQ socket | |||
|
72 | heartbeat (str): identity of heartbeat XREQ socket | |||
|
73 | """ | |||
|
74 | id=0 | |||
|
75 | queue=None | |||
|
76 | control=None | |||
|
77 | registration=None | |||
|
78 | heartbeat=None | |||
|
79 | pending=None | |||
|
80 | ||||
|
81 | def __init__(self, id, queue, registration, control, heartbeat=None): | |||
|
82 | logger.info("engine::Engine Connected: %i"%id) | |||
|
83 | self.id = id | |||
|
84 | self.queue = queue | |||
|
85 | self.registration = registration | |||
|
86 | self.control = control | |||
|
87 | self.heartbeat = heartbeat | |||
|
88 | ||||
|
89 | class Hub(object): | |||
|
90 | """The IPython Controller Hub with 0MQ connections | |||
|
91 | ||||
|
92 | Parameters | |||
|
93 | ========== | |||
|
94 | loop: zmq IOLoop instance | |||
|
95 | session: StreamSession object | |||
|
96 | <removed> context: zmq context for creating new connections (?) | |||
|
97 | queue: ZMQStream for monitoring the command queue (SUB) | |||
|
98 | registrar: ZMQStream for engine registration requests (XREP) | |||
|
99 | heartbeat: HeartMonitor object checking the pulse of the engines | |||
|
100 | clientele: ZMQStream for client connections (XREP) | |||
|
101 | not used for jobs, only query/control commands | |||
|
102 | notifier: ZMQStream for broadcasting engine registration changes (PUB) | |||
|
103 | db: connection to db for out of memory logging of commands | |||
|
104 | NotImplemented | |||
|
105 | engine_addrs: dict of zmq connection information for engines to connect | |||
|
106 | to the queues. | |||
|
107 | client_addrs: dict of zmq connection information for engines to connect | |||
|
108 | to the queues. | |||
|
109 | """ | |||
|
110 | # internal data structures: | |||
|
111 | ids=None # engine IDs | |||
|
112 | keytable=None | |||
|
113 | engines=None | |||
|
114 | clients=None | |||
|
115 | hearts=None | |||
|
116 | pending=None | |||
|
117 | results=None | |||
|
118 | tasks=None | |||
|
119 | completed=None | |||
|
120 | mia=None | |||
|
121 | incoming_registrations=None | |||
|
122 | registration_timeout=None | |||
|
123 | ||||
|
124 | #objects from constructor: | |||
|
125 | loop=None | |||
|
126 | registrar=None | |||
|
127 | clientelle=None | |||
|
128 | queue=None | |||
|
129 | heartbeat=None | |||
|
130 | notifier=None | |||
|
131 | db=None | |||
|
132 | client_addr=None | |||
|
133 | engine_addrs=None | |||
|
134 | ||||
|
135 | ||||
|
136 | def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs): | |||
|
137 | """ | |||
|
138 | # universal: | |||
|
139 | loop: IOLoop for creating future connections | |||
|
140 | session: streamsession for sending serialized data | |||
|
141 | # engine: | |||
|
142 | queue: ZMQStream for monitoring queue messages | |||
|
143 | registrar: ZMQStream for engine registration | |||
|
144 | heartbeat: HeartMonitor object for tracking engines | |||
|
145 | # client: | |||
|
146 | clientele: ZMQStream for client connections | |||
|
147 | # extra: | |||
|
148 | db: ZMQStream for db connection (NotImplemented) | |||
|
149 | engine_addrs: zmq address/protocol dict for engine connections | |||
|
150 | client_addrs: zmq address/protocol dict for client connections | |||
|
151 | """ | |||
|
152 | self.ids = set() | |||
|
153 | self.keytable={} | |||
|
154 | self.incoming_registrations={} | |||
|
155 | self.engines = {} | |||
|
156 | self.by_ident = {} | |||
|
157 | self.clients = {} | |||
|
158 | self.hearts = {} | |||
|
159 | # self.mia = set() | |||
|
160 | ||||
|
161 | # self.sockets = {} | |||
|
162 | self.loop = loop | |||
|
163 | self.session = session | |||
|
164 | self.registrar = registrar | |||
|
165 | self.clientele = clientele | |||
|
166 | self.queue = queue | |||
|
167 | self.heartbeat = heartbeat | |||
|
168 | self.notifier = notifier | |||
|
169 | self.db = db | |||
|
170 | ||||
|
171 | # validate connection dicts: | |||
|
172 | self.client_addrs = client_addrs | |||
|
173 | assert isinstance(client_addrs['queue'], str) | |||
|
174 | assert isinstance(client_addrs['control'], str) | |||
|
175 | # self.hb_addrs = hb_addrs | |||
|
176 | self.engine_addrs = engine_addrs | |||
|
177 | assert isinstance(engine_addrs['queue'], str) | |||
|
178 | assert isinstance(client_addrs['control'], str) | |||
|
179 | assert len(engine_addrs['heartbeat']) == 2 | |||
|
180 | ||||
|
181 | # register our callbacks | |||
|
182 | self.registrar.on_recv(self.dispatch_register_request) | |||
|
183 | self.clientele.on_recv(self.dispatch_client_msg) | |||
|
184 | self.queue.on_recv(self.dispatch_queue_traffic) | |||
|
185 | ||||
|
186 | if heartbeat is not None: | |||
|
187 | heartbeat.add_heart_failure_handler(self.handle_heart_failure) | |||
|
188 | heartbeat.add_new_heart_handler(self.handle_new_heart) | |||
|
189 | ||||
|
190 | self.queue_handlers = { 'in' : self.save_queue_request, | |||
|
191 | 'out': self.save_queue_result, | |||
|
192 | 'intask': self.save_task_request, | |||
|
193 | 'outtask': self.save_task_result, | |||
|
194 | 'tracktask': self.save_task_destination, | |||
|
195 | 'incontrol': _passer, | |||
|
196 | 'outcontrol': _passer, | |||
|
197 | } | |||
|
198 | ||||
|
199 | self.client_handlers = {'queue_request': self.queue_status, | |||
|
200 | 'result_request': self.get_results, | |||
|
201 | 'purge_request': self.purge_results, | |||
|
202 | 'load_request': self.check_load, | |||
|
203 | 'resubmit_request': self.resubmit_task, | |||
|
204 | 'shutdown_request': self.shutdown_request, | |||
|
205 | } | |||
|
206 | ||||
|
207 | self.registrar_handlers = {'registration_request' : self.register_engine, | |||
|
208 | 'unregistration_request' : self.unregister_engine, | |||
|
209 | 'connection_request': self.connection_request, | |||
|
210 | } | |||
|
211 | self.registration_timeout = max(5000, 2*self.heartbeat.period) | |||
|
212 | # this is the stuff that will move to DB: | |||
|
213 | # self.results = {} # completed results | |||
|
214 | self.pending = set() # pending messages, keyed by msg_id | |||
|
215 | self.queues = {} # pending msg_ids keyed by engine_id | |||
|
216 | self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id | |||
|
217 | self.completed = {} # completed msg_ids keyed by engine_id | |||
|
218 | self.all_completed = set() | |||
|
219 | ||||
|
220 | logger.info("controller::created controller") | |||
|
221 | ||||
|
222 | def _new_id(self): | |||
|
223 | """gemerate a new ID""" | |||
|
224 | newid = 0 | |||
|
225 | incoming = [id[0] for id in self.incoming_registrations.itervalues()] | |||
|
226 | # print newid, self.ids, self.incoming_registrations | |||
|
227 | while newid in self.ids or newid in incoming: | |||
|
228 | newid += 1 | |||
|
229 | return newid | |||
|
230 | ||||
|
231 | #----------------------------------------------------------------------------- | |||
|
232 | # message validation | |||
|
233 | #----------------------------------------------------------------------------- | |||
|
234 | ||||
|
235 | def _validate_targets(self, targets): | |||
|
236 | """turn any valid targets argument into a list of integer ids""" | |||
|
237 | if targets is None: | |||
|
238 | # default to all | |||
|
239 | targets = self.ids | |||
|
240 | ||||
|
241 | if isinstance(targets, (int,str,unicode)): | |||
|
242 | # only one target specified | |||
|
243 | targets = [targets] | |||
|
244 | _targets = [] | |||
|
245 | for t in targets: | |||
|
246 | # map raw identities to ids | |||
|
247 | if isinstance(t, (str,unicode)): | |||
|
248 | t = self.by_ident.get(t, t) | |||
|
249 | _targets.append(t) | |||
|
250 | targets = _targets | |||
|
251 | bad_targets = [ t for t in targets if t not in self.ids ] | |||
|
252 | if bad_targets: | |||
|
253 | raise IndexError("No Such Engine: %r"%bad_targets) | |||
|
254 | if not targets: | |||
|
255 | raise IndexError("No Engines Registered") | |||
|
256 | return targets | |||
|
257 | ||||
|
258 | def _validate_client_msg(self, msg): | |||
|
259 | """validates and unpacks headers of a message. Returns False if invalid, | |||
|
260 | (ident, header, parent, content)""" | |||
|
261 | client_id = msg[0] | |||
|
262 | try: | |||
|
263 | msg = self.session.unpack_message(msg[1:], content=True) | |||
|
264 | except: | |||
|
265 | logger.error("client::Invalid Message %s"%msg) | |||
|
266 | return False | |||
|
267 | ||||
|
268 | msg_type = msg.get('msg_type', None) | |||
|
269 | if msg_type is None: | |||
|
270 | return False | |||
|
271 | header = msg.get('header') | |||
|
272 | # session doesn't handle split content for now: | |||
|
273 | return client_id, msg | |||
|
274 | ||||
|
275 | ||||
|
276 | #----------------------------------------------------------------------------- | |||
|
277 | # dispatch methods (1 per stream) | |||
|
278 | #----------------------------------------------------------------------------- | |||
|
279 | ||||
|
280 | def dispatch_register_request(self, msg): | |||
|
281 | """""" | |||
|
282 | logger.debug("registration::dispatch_register_request(%s)"%msg) | |||
|
283 | idents,msg = self.session.feed_identities(msg) | |||
|
284 | if not idents: | |||
|
285 | logger.error("Bad Queue Message: %s"%msg, exc_info=True) | |||
|
286 | return | |||
|
287 | try: | |||
|
288 | msg = self.session.unpack_message(msg,content=True) | |||
|
289 | except: | |||
|
290 | logger.error("registration::got bad registration message: %s"%msg, exc_info=True) | |||
|
291 | return | |||
|
292 | ||||
|
293 | msg_type = msg['msg_type'] | |||
|
294 | content = msg['content'] | |||
|
295 | ||||
|
296 | handler = self.registrar_handlers.get(msg_type, None) | |||
|
297 | if handler is None: | |||
|
298 | logger.error("registration::got bad registration message: %s"%msg) | |||
|
299 | else: | |||
|
300 | handler(idents, msg) | |||
|
301 | ||||
|
302 | def dispatch_queue_traffic(self, msg): | |||
|
303 | """all ME and Task queue messages come through here""" | |||
|
304 | logger.debug("queue traffic: %s"%msg[:2]) | |||
|
305 | switch = msg[0] | |||
|
306 | idents, msg = self.session.feed_identities(msg[1:]) | |||
|
307 | if not idents: | |||
|
308 | logger.error("Bad Queue Message: %s"%msg) | |||
|
309 | return | |||
|
310 | handler = self.queue_handlers.get(switch, None) | |||
|
311 | if handler is not None: | |||
|
312 | handler(idents, msg) | |||
|
313 | else: | |||
|
314 | logger.error("Invalid message topic: %s"%switch) | |||
|
315 | ||||
|
316 | ||||
|
317 | def dispatch_client_msg(self, msg): | |||
|
318 | """Route messages from clients""" | |||
|
319 | idents, msg = self.session.feed_identities(msg) | |||
|
320 | if not idents: | |||
|
321 | logger.error("Bad Client Message: %s"%msg) | |||
|
322 | return | |||
|
323 | client_id = idents[0] | |||
|
324 | try: | |||
|
325 | msg = self.session.unpack_message(msg, content=True) | |||
|
326 | except: | |||
|
327 | content = wrap_exception() | |||
|
328 | logger.error("Bad Client Message: %s"%msg, exc_info=True) | |||
|
329 | self.session.send(self.clientele, "controller_error", ident=client_id, | |||
|
330 | content=content) | |||
|
331 | return | |||
|
332 | ||||
|
333 | # print client_id, header, parent, content | |||
|
334 | #switch on message type: | |||
|
335 | msg_type = msg['msg_type'] | |||
|
336 | logger.info("client:: client %s requested %s"%(client_id, msg_type)) | |||
|
337 | handler = self.client_handlers.get(msg_type, None) | |||
|
338 | try: | |||
|
339 | assert handler is not None, "Bad Message Type: %s"%msg_type | |||
|
340 | except: | |||
|
341 | content = wrap_exception() | |||
|
342 | logger.error("Bad Message Type: %s"%msg_type, exc_info=True) | |||
|
343 | self.session.send(self.clientele, "controller_error", ident=client_id, | |||
|
344 | content=content) | |||
|
345 | return | |||
|
346 | else: | |||
|
347 | handler(client_id, msg) | |||
|
348 | ||||
|
349 | def dispatch_db(self, msg): | |||
|
350 | """""" | |||
|
351 | raise NotImplementedError | |||
|
352 | ||||
|
353 | #--------------------------------------------------------------------------- | |||
|
354 | # handler methods (1 per event) | |||
|
355 | #--------------------------------------------------------------------------- | |||
|
356 | ||||
|
357 | #----------------------- Heartbeat -------------------------------------- | |||
|
358 | ||||
|
359 | def handle_new_heart(self, heart): | |||
|
360 | """handler to attach to heartbeater. | |||
|
361 | Called when a new heart starts to beat. | |||
|
362 | Triggers completion of registration.""" | |||
|
363 | logger.debug("heartbeat::handle_new_heart(%r)"%heart) | |||
|
364 | if heart not in self.incoming_registrations: | |||
|
365 | logger.info("heartbeat::ignoring new heart: %r"%heart) | |||
|
366 | else: | |||
|
367 | self.finish_registration(heart) | |||
|
368 | ||||
|
369 | ||||
|
370 | def handle_heart_failure(self, heart): | |||
|
371 | """handler to attach to heartbeater. | |||
|
372 | called when a previously registered heart fails to respond to beat request. | |||
|
373 | triggers unregistration""" | |||
|
374 | logger.debug("heartbeat::handle_heart_failure(%r)"%heart) | |||
|
375 | eid = self.hearts.get(heart, None) | |||
|
376 | queue = self.engines[eid].queue | |||
|
377 | if eid is None: | |||
|
378 | logger.info("heartbeat::ignoring heart failure %r"%heart) | |||
|
379 | else: | |||
|
380 | self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue))) | |||
|
381 | ||||
|
382 | #----------------------- MUX Queue Traffic ------------------------------ | |||
|
383 | ||||
|
384 | def save_queue_request(self, idents, msg): | |||
|
385 | if len(idents) < 2: | |||
|
386 | logger.error("invalid identity prefix: %s"%idents) | |||
|
387 | return | |||
|
388 | queue_id, client_id = idents[:2] | |||
|
389 | try: | |||
|
390 | msg = self.session.unpack_message(msg, content=False) | |||
|
391 | except: | |||
|
392 | logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True) | |||
|
393 | return | |||
|
394 | ||||
|
395 | eid = self.by_ident.get(queue_id, None) | |||
|
396 | if eid is None: | |||
|
397 | logger.error("queue::target %r not registered"%queue_id) | |||
|
398 | logger.debug("queue:: valid are: %s"%(self.by_ident.keys())) | |||
|
399 | return | |||
|
400 | ||||
|
401 | header = msg['header'] | |||
|
402 | msg_id = header['msg_id'] | |||
|
403 | record = init_record(msg) | |||
|
404 | record['engine_uuid'] = queue_id | |||
|
405 | record['client_uuid'] = client_id | |||
|
406 | record['queue'] = 'mux' | |||
|
407 | if MongoDB is not None and isinstance(self.db, MongoDB): | |||
|
408 | record['buffers'] = map(Binary, record['buffers']) | |||
|
409 | self.pending.add(msg_id) | |||
|
410 | self.queues[eid].append(msg_id) | |||
|
411 | self.db.add_record(msg_id, record) | |||
|
412 | ||||
|
413 | def save_queue_result(self, idents, msg): | |||
|
414 | if len(idents) < 2: | |||
|
415 | logger.error("invalid identity prefix: %s"%idents) | |||
|
416 | return | |||
|
417 | ||||
|
418 | client_id, queue_id = idents[:2] | |||
|
419 | try: | |||
|
420 | msg = self.session.unpack_message(msg, content=False) | |||
|
421 | except: | |||
|
422 | logger.error("queue::engine %r sent invalid message to %r: %s"%( | |||
|
423 | queue_id,client_id, msg), exc_info=True) | |||
|
424 | return | |||
|
425 | ||||
|
426 | eid = self.by_ident.get(queue_id, None) | |||
|
427 | if eid is None: | |||
|
428 | logger.error("queue::unknown engine %r is sending a reply: "%queue_id) | |||
|
429 | logger.debug("queue:: %s"%msg[2:]) | |||
|
430 | return | |||
|
431 | ||||
|
432 | parent = msg['parent_header'] | |||
|
433 | if not parent: | |||
|
434 | return | |||
|
435 | msg_id = parent['msg_id'] | |||
|
436 | if msg_id in self.pending: | |||
|
437 | self.pending.remove(msg_id) | |||
|
438 | self.all_completed.add(msg_id) | |||
|
439 | self.queues[eid].remove(msg_id) | |||
|
440 | self.completed[eid].append(msg_id) | |||
|
441 | rheader = msg['header'] | |||
|
442 | completed = datetime.strptime(rheader['date'], ISO8601) | |||
|
443 | started = rheader.get('started', None) | |||
|
444 | if started is not None: | |||
|
445 | started = datetime.strptime(started, ISO8601) | |||
|
446 | result = { | |||
|
447 | 'result_header' : rheader, | |||
|
448 | 'result_content': msg['content'], | |||
|
449 | 'started' : started, | |||
|
450 | 'completed' : completed | |||
|
451 | } | |||
|
452 | if MongoDB is not None and isinstance(self.db, MongoDB): | |||
|
453 | result['result_buffers'] = map(Binary, msg['buffers']) | |||
|
454 | else: | |||
|
455 | result['result_buffers'] = msg['buffers'] | |||
|
456 | self.db.update_record(msg_id, result) | |||
|
457 | else: | |||
|
458 | logger.debug("queue:: unknown msg finished %s"%msg_id) | |||
|
459 | ||||
|
460 | #--------------------- Task Queue Traffic ------------------------------ | |||
|
461 | ||||
|
462 | def save_task_request(self, idents, msg): | |||
|
463 | """Save the submission of a task.""" | |||
|
464 | client_id = idents[0] | |||
|
465 | ||||
|
466 | try: | |||
|
467 | msg = self.session.unpack_message(msg, content=False) | |||
|
468 | except: | |||
|
469 | logger.error("task::client %r sent invalid task message: %s"%( | |||
|
470 | client_id, msg), exc_info=True) | |||
|
471 | return | |||
|
472 | record = init_record(msg) | |||
|
473 | if MongoDB is not None and isinstance(self.db, MongoDB): | |||
|
474 | record['buffers'] = map(Binary, record['buffers']) | |||
|
475 | record['client_uuid'] = client_id | |||
|
476 | record['queue'] = 'task' | |||
|
477 | header = msg['header'] | |||
|
478 | msg_id = header['msg_id'] | |||
|
479 | self.pending.add(msg_id) | |||
|
480 | self.db.add_record(msg_id, record) | |||
|
481 | ||||
|
482 | def save_task_result(self, idents, msg): | |||
|
483 | """save the result of a completed task.""" | |||
|
484 | client_id = idents[0] | |||
|
485 | try: | |||
|
486 | msg = self.session.unpack_message(msg, content=False) | |||
|
487 | except: | |||
|
488 | logger.error("task::invalid task result message send to %r: %s"%( | |||
|
489 | client_id, msg)) | |||
|
490 | raise | |||
|
491 | return | |||
|
492 | ||||
|
493 | parent = msg['parent_header'] | |||
|
494 | if not parent: | |||
|
495 | # print msg | |||
|
496 | logger.warn("Task %r had no parent!"%msg) | |||
|
497 | return | |||
|
498 | msg_id = parent['msg_id'] | |||
|
499 | ||||
|
500 | header = msg['header'] | |||
|
501 | engine_uuid = header.get('engine', None) | |||
|
502 | eid = self.by_ident.get(engine_uuid, None) | |||
|
503 | ||||
|
504 | if msg_id in self.pending: | |||
|
505 | self.pending.remove(msg_id) | |||
|
506 | self.all_completed.add(msg_id) | |||
|
507 | if eid is not None: | |||
|
508 | self.completed[eid].append(msg_id) | |||
|
509 | if msg_id in self.tasks[eid]: | |||
|
510 | self.tasks[eid].remove(msg_id) | |||
|
511 | completed = datetime.strptime(header['date'], ISO8601) | |||
|
512 | started = header.get('started', None) | |||
|
513 | if started is not None: | |||
|
514 | started = datetime.strptime(started, ISO8601) | |||
|
515 | result = { | |||
|
516 | 'result_header' : header, | |||
|
517 | 'result_content': msg['content'], | |||
|
518 | 'started' : started, | |||
|
519 | 'completed' : completed, | |||
|
520 | 'engine_uuid': engine_uuid | |||
|
521 | } | |||
|
522 | if MongoDB is not None and isinstance(self.db, MongoDB): | |||
|
523 | result['result_buffers'] = map(Binary, msg['buffers']) | |||
|
524 | else: | |||
|
525 | result['result_buffers'] = msg['buffers'] | |||
|
526 | self.db.update_record(msg_id, result) | |||
|
527 | ||||
|
528 | else: | |||
|
529 | logger.debug("task::unknown task %s finished"%msg_id) | |||
|
530 | ||||
|
531 | def save_task_destination(self, idents, msg): | |||
|
532 | try: | |||
|
533 | msg = self.session.unpack_message(msg, content=True) | |||
|
534 | except: | |||
|
535 | logger.error("task::invalid task tracking message") | |||
|
536 | return | |||
|
537 | content = msg['content'] | |||
|
538 | print (content) | |||
|
539 | msg_id = content['msg_id'] | |||
|
540 | engine_uuid = content['engine_id'] | |||
|
541 | eid = self.by_ident[engine_uuid] | |||
|
542 | ||||
|
543 | logger.info("task::task %s arrived on %s"%(msg_id, eid)) | |||
|
544 | # if msg_id in self.mia: | |||
|
545 | # self.mia.remove(msg_id) | |||
|
546 | # else: | |||
|
547 | # logger.debug("task::task %s not listed as MIA?!"%(msg_id)) | |||
|
548 | ||||
|
549 | self.tasks[eid].append(msg_id) | |||
|
550 | # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid)) | |||
|
551 | self.db.update_record(msg_id, dict(engine_uuid=engine_uuid)) | |||
|
552 | ||||
|
553 | def mia_task_request(self, idents, msg): | |||
|
554 | raise NotImplementedError | |||
|
555 | client_id = idents[0] | |||
|
556 | # content = dict(mia=self.mia,status='ok') | |||
|
557 | # self.session.send('mia_reply', content=content, idents=client_id) | |||
|
558 | ||||
|
559 | ||||
|
560 | ||||
|
561 | #------------------------------------------------------------------------- | |||
|
562 | # Registration requests | |||
|
563 | #------------------------------------------------------------------------- | |||
|
564 | ||||
|
565 | def connection_request(self, client_id, msg): | |||
|
566 | """Reply with connection addresses for clients.""" | |||
|
567 | logger.info("client::client %s connected"%client_id) | |||
|
568 | content = dict(status='ok') | |||
|
569 | content.update(self.client_addrs) | |||
|
570 | jsonable = {} | |||
|
571 | for k,v in self.keytable.iteritems(): | |||
|
572 | jsonable[str(k)] = v | |||
|
573 | content['engines'] = jsonable | |||
|
574 | self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id) | |||
|
575 | ||||
|
576 | def register_engine(self, reg, msg): | |||
|
577 | """Register a new engine.""" | |||
|
578 | content = msg['content'] | |||
|
579 | try: | |||
|
580 | queue = content['queue'] | |||
|
581 | except KeyError: | |||
|
582 | logger.error("registration::queue not specified") | |||
|
583 | return | |||
|
584 | heart = content.get('heartbeat', None) | |||
|
585 | """register a new engine, and create the socket(s) necessary""" | |||
|
586 | eid = self._new_id() | |||
|
587 | # print (eid, queue, reg, heart) | |||
|
588 | ||||
|
589 | logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) | |||
|
590 | ||||
|
591 | content = dict(id=eid,status='ok') | |||
|
592 | content.update(self.engine_addrs) | |||
|
593 | # check if requesting available IDs: | |||
|
594 | if queue in self.by_ident: | |||
|
595 | try: | |||
|
596 | raise KeyError("queue_id %r in use"%queue) | |||
|
597 | except: | |||
|
598 | content = wrap_exception() | |||
|
599 | elif heart in self.hearts: # need to check unique hearts? | |||
|
600 | try: | |||
|
601 | raise KeyError("heart_id %r in use"%heart) | |||
|
602 | except: | |||
|
603 | content = wrap_exception() | |||
|
604 | else: | |||
|
605 | for h, pack in self.incoming_registrations.iteritems(): | |||
|
606 | if heart == h: | |||
|
607 | try: | |||
|
608 | raise KeyError("heart_id %r in use"%heart) | |||
|
609 | except: | |||
|
610 | content = wrap_exception() | |||
|
611 | break | |||
|
612 | elif queue == pack[1]: | |||
|
613 | try: | |||
|
614 | raise KeyError("queue_id %r in use"%queue) | |||
|
615 | except: | |||
|
616 | content = wrap_exception() | |||
|
617 | break | |||
|
618 | ||||
|
619 | msg = self.session.send(self.registrar, "registration_reply", | |||
|
620 | content=content, | |||
|
621 | ident=reg) | |||
|
622 | ||||
|
623 | if content['status'] == 'ok': | |||
|
624 | if heart in self.heartbeat.hearts: | |||
|
625 | # already beating | |||
|
626 | self.incoming_registrations[heart] = (eid,queue,reg,None) | |||
|
627 | self.finish_registration(heart) | |||
|
628 | else: | |||
|
629 | purge = lambda : self._purge_stalled_registration(heart) | |||
|
630 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) | |||
|
631 | dc.start() | |||
|
632 | self.incoming_registrations[heart] = (eid,queue,reg,dc) | |||
|
633 | else: | |||
|
634 | logger.error("registration::registration %i failed: %s"%(eid, content['evalue'])) | |||
|
635 | return eid | |||
|
636 | ||||
|
637 | def unregister_engine(self, ident, msg): | |||
|
638 | """Unregister an engine that explicitly requested to leave.""" | |||
|
639 | try: | |||
|
640 | eid = msg['content']['id'] | |||
|
641 | except: | |||
|
642 | logger.error("registration::bad engine id for unregistration: %s"%ident) | |||
|
643 | return | |||
|
644 | logger.info("registration::unregister_engine(%s)"%eid) | |||
|
645 | content=dict(id=eid, queue=self.engines[eid].queue) | |||
|
646 | self.ids.remove(eid) | |||
|
647 | self.keytable.pop(eid) | |||
|
648 | ec = self.engines.pop(eid) | |||
|
649 | self.hearts.pop(ec.heartbeat) | |||
|
650 | self.by_ident.pop(ec.queue) | |||
|
651 | self.completed.pop(eid) | |||
|
652 | for msg_id in self.queues.pop(eid): | |||
|
653 | msg = self.pending.remove(msg_id) | |||
|
654 | ############## TODO: HANDLE IT ################ | |||
|
655 | ||||
|
656 | if self.notifier: | |||
|
657 | self.session.send(self.notifier, "unregistration_notification", content=content) | |||
|
658 | ||||
|
659 | def finish_registration(self, heart): | |||
|
660 | """Second half of engine registration, called after our HeartMonitor | |||
|
661 | has received a beat from the Engine's Heart.""" | |||
|
662 | try: | |||
|
663 | (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) | |||
|
664 | except KeyError: | |||
|
665 | logger.error("registration::tried to finish nonexistant registration") | |||
|
666 | return | |||
|
667 | logger.info("registration::finished registering engine %i:%r"%(eid,queue)) | |||
|
668 | if purge is not None: | |||
|
669 | purge.stop() | |||
|
670 | control = queue | |||
|
671 | self.ids.add(eid) | |||
|
672 | self.keytable[eid] = queue | |||
|
673 | self.engines[eid] = EngineConnector(eid, queue, reg, control, heart) | |||
|
674 | self.by_ident[queue] = eid | |||
|
675 | self.queues[eid] = list() | |||
|
676 | self.tasks[eid] = list() | |||
|
677 | self.completed[eid] = list() | |||
|
678 | self.hearts[heart] = eid | |||
|
679 | content = dict(id=eid, queue=self.engines[eid].queue) | |||
|
680 | if self.notifier: | |||
|
681 | self.session.send(self.notifier, "registration_notification", content=content) | |||
|
682 | ||||
|
683 | def _purge_stalled_registration(self, heart): | |||
|
684 | if heart in self.incoming_registrations: | |||
|
685 | eid = self.incoming_registrations.pop(heart)[0] | |||
|
686 | logger.info("registration::purging stalled registration: %i"%eid) | |||
|
687 | else: | |||
|
688 | pass | |||
|
689 | ||||
|
690 | #------------------------------------------------------------------------- | |||
|
691 | # Client Requests | |||
|
692 | #------------------------------------------------------------------------- | |||
|
693 | ||||
|
694 | def shutdown_request(self, client_id, msg): | |||
|
695 | """handle shutdown request.""" | |||
|
696 | # s = self.context.socket(zmq.XREQ) | |||
|
697 | # s.connect(self.client_connections['mux']) | |||
|
698 | # time.sleep(0.1) | |||
|
699 | # for eid,ec in self.engines.iteritems(): | |||
|
700 | # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue) | |||
|
701 | # time.sleep(1) | |||
|
702 | self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id) | |||
|
703 | dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop) | |||
|
704 | dc.start() | |||
|
705 | ||||
|
706 | def _shutdown(self): | |||
|
707 | logger.info("controller::controller shutting down.") | |||
|
708 | time.sleep(0.1) | |||
|
709 | sys.exit(0) | |||
|
710 | ||||
|
711 | ||||
|
712 | def check_load(self, client_id, msg): | |||
|
713 | content = msg['content'] | |||
|
714 | try: | |||
|
715 | targets = content['targets'] | |||
|
716 | targets = self._validate_targets(targets) | |||
|
717 | except: | |||
|
718 | content = wrap_exception() | |||
|
719 | self.session.send(self.clientele, "controller_error", | |||
|
720 | content=content, ident=client_id) | |||
|
721 | return | |||
|
722 | ||||
|
723 | content = dict(status='ok') | |||
|
724 | # loads = {} | |||
|
725 | for t in targets: | |||
|
726 | content[bytes(t)] = len(self.queues[t])+len(self.tasks[t]) | |||
|
727 | self.session.send(self.clientele, "load_reply", content=content, ident=client_id) | |||
|
728 | ||||
|
729 | ||||
|
730 | def queue_status(self, client_id, msg): | |||
|
731 | """Return the Queue status of one or more targets. | |||
|
732 | if verbose: return the msg_ids | |||
|
733 | else: return len of each type. | |||
|
734 | keys: queue (pending MUX jobs) | |||
|
735 | tasks (pending Task jobs) | |||
|
736 | completed (finished jobs from both queues)""" | |||
|
737 | content = msg['content'] | |||
|
738 | targets = content['targets'] | |||
|
739 | try: | |||
|
740 | targets = self._validate_targets(targets) | |||
|
741 | except: | |||
|
742 | content = wrap_exception() | |||
|
743 | self.session.send(self.clientele, "controller_error", | |||
|
744 | content=content, ident=client_id) | |||
|
745 | return | |||
|
746 | verbose = content.get('verbose', False) | |||
|
747 | content = dict(status='ok') | |||
|
748 | for t in targets: | |||
|
749 | queue = self.queues[t] | |||
|
750 | completed = self.completed[t] | |||
|
751 | tasks = self.tasks[t] | |||
|
752 | if not verbose: | |||
|
753 | queue = len(queue) | |||
|
754 | completed = len(completed) | |||
|
755 | tasks = len(tasks) | |||
|
756 | content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} | |||
|
757 | # pending | |||
|
758 | self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) | |||
|
759 | ||||
|
760 | def purge_results(self, client_id, msg): | |||
|
761 | """Purge results from memory. This method is more valuable before we move | |||
|
762 | to a DB based message storage mechanism.""" | |||
|
763 | content = msg['content'] | |||
|
764 | msg_ids = content.get('msg_ids', []) | |||
|
765 | reply = dict(status='ok') | |||
|
766 | if msg_ids == 'all': | |||
|
767 | self.db.drop_matching_records(dict(completed={'$ne':None})) | |||
|
768 | else: | |||
|
769 | for msg_id in msg_ids: | |||
|
770 | if msg_id in self.all_completed: | |||
|
771 | self.db.drop_record(msg_id) | |||
|
772 | else: | |||
|
773 | if msg_id in self.pending: | |||
|
774 | try: | |||
|
775 | raise IndexError("msg pending: %r"%msg_id) | |||
|
776 | except: | |||
|
777 | reply = wrap_exception() | |||
|
778 | else: | |||
|
779 | try: | |||
|
780 | raise IndexError("No such msg: %r"%msg_id) | |||
|
781 | except: | |||
|
782 | reply = wrap_exception() | |||
|
783 | break | |||
|
784 | eids = content.get('engine_ids', []) | |||
|
785 | for eid in eids: | |||
|
786 | if eid not in self.engines: | |||
|
787 | try: | |||
|
788 | raise IndexError("No such engine: %i"%eid) | |||
|
789 | except: | |||
|
790 | reply = wrap_exception() | |||
|
791 | break | |||
|
792 | msg_ids = self.completed.pop(eid) | |||
|
793 | uid = self.engines[eid].queue | |||
|
794 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) | |||
|
795 | ||||
|
796 | self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id) | |||
|
797 | ||||
|
798 | def resubmit_task(self, client_id, msg, buffers): | |||
|
799 | """Resubmit a task.""" | |||
|
800 | raise NotImplementedError | |||
|
801 | ||||
|
802 | def get_results(self, client_id, msg): | |||
|
803 | """Get the result of 1 or more messages.""" | |||
|
804 | content = msg['content'] | |||
|
805 | msg_ids = sorted(set(content['msg_ids'])) | |||
|
806 | statusonly = content.get('status_only', False) | |||
|
807 | pending = [] | |||
|
808 | completed = [] | |||
|
809 | content = dict(status='ok') | |||
|
810 | content['pending'] = pending | |||
|
811 | content['completed'] = completed | |||
|
812 | buffers = [] | |||
|
813 | if not statusonly: | |||
|
814 | content['results'] = {} | |||
|
815 | records = self.db.find_records(dict(msg_id={'$in':msg_ids})) | |||
|
816 | for msg_id in msg_ids: | |||
|
817 | if msg_id in self.pending: | |||
|
818 | pending.append(msg_id) | |||
|
819 | elif msg_id in self.all_completed: | |||
|
820 | completed.append(msg_id) | |||
|
821 | if not statusonly: | |||
|
822 | rec = records[msg_id] | |||
|
823 | content[msg_id] = { 'result_content': rec['result_content'], | |||
|
824 | 'header': rec['header'], | |||
|
825 | 'result_header' : rec['result_header'], | |||
|
826 | } | |||
|
827 | buffers.extend(map(str, rec['result_buffers'])) | |||
|
828 | else: | |||
|
829 | try: | |||
|
830 | raise KeyError('No such message: '+msg_id) | |||
|
831 | except: | |||
|
832 | content = wrap_exception() | |||
|
833 | break | |||
|
834 | self.session.send(self.clientele, "result_reply", content=content, | |||
|
835 | parent=msg, ident=client_id, | |||
|
836 | buffers=buffers) | |||
|
837 |
This diff has been collapsed as it changes many lines, (834 lines changed) Show them Hide them | |||||
@@ -15,833 +15,35 b' and monitors traffic through the various queues.' | |||||
15 | #----------------------------------------------------------------------------- |
|
15 | #----------------------------------------------------------------------------- | |
16 | from __future__ import print_function |
|
16 | from __future__ import print_function | |
17 |
|
17 | |||
18 | import sys |
|
|||
19 | import os |
|
18 | import os | |
20 | from datetime import datetime |
|
|||
21 | import logging |
|
|||
22 | import time |
|
19 | import time | |
23 | import uuid |
|
20 | from multiprocessing import Process | |
24 |
|
21 | |||
25 | import zmq |
|
22 | import zmq | |
26 |
from zmq.eventloop import |
|
23 | from zmq.eventloop import ioloop | |
|
24 | from zmq.eventloop.zmqstream import ZMQStream | |||
|
25 | from zmq.devices import ProcessMonitoredQueue | |||
27 |
|
26 | |||
28 | # internal: |
|
27 | # internal: | |
29 | from IPython.zmq.log import logger # a Logger object |
|
|||
30 | from IPython.zmq.entry_point import bind_port |
|
28 | from IPython.zmq.entry_point import bind_port | |
31 |
|
29 | |||
32 | from streamsession import Message, wrap_exception, ISO8601 |
|
30 | from hub import Hub | |
33 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, |
|
31 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, | |
34 | connect_logger, parse_url, signal_children, generate_exec_key) |
|
32 | connect_logger, parse_url, signal_children, generate_exec_key) | |
|
33 | ||||
|
34 | ||||
|
35 | import streamsession as session | |||
|
36 | import heartmonitor | |||
|
37 | from scheduler import launch_scheduler | |||
|
38 | ||||
35 | from dictdb import DictDB |
|
39 | from dictdb import DictDB | |
36 | try: |
|
40 | try: | |
37 | from pymongo.binary import Binary |
|
41 | import pymongo | |
38 | except ImportError: |
|
42 | except ImportError: | |
39 | MongoDB=None |
|
43 | MongoDB=None | |
40 | else: |
|
44 | else: | |
41 | from mongodb import MongoDB |
|
45 | from mongodb import MongoDB | |
42 |
|
46 | |||
43 | #----------------------------------------------------------------------------- |
|
|||
44 | # Code |
|
|||
45 | #----------------------------------------------------------------------------- |
|
|||
46 |
|
||||
47 | def _passer(*args, **kwargs): |
|
|||
48 | return |
|
|||
49 |
|
||||
50 | def init_record(msg): |
|
|||
51 | """return an empty TaskRecord dict, with all keys initialized with None.""" |
|
|||
52 | header = msg['header'] |
|
|||
53 | return { |
|
|||
54 | 'msg_id' : header['msg_id'], |
|
|||
55 | 'header' : header, |
|
|||
56 | 'content': msg['content'], |
|
|||
57 | 'buffers': msg['buffers'], |
|
|||
58 | 'submitted': datetime.strptime(header['date'], ISO8601), |
|
|||
59 | 'client_uuid' : None, |
|
|||
60 | 'engine_uuid' : None, |
|
|||
61 | 'started': None, |
|
|||
62 | 'completed': None, |
|
|||
63 | 'resubmitted': None, |
|
|||
64 | 'result_header' : None, |
|
|||
65 | 'result_content' : None, |
|
|||
66 | 'result_buffers' : None, |
|
|||
67 | 'queue' : None |
|
|||
68 | } |
|
|||
69 |
|
||||
70 |
|
||||
71 | class EngineConnector(object): |
|
|||
72 | """A simple object for accessing the various zmq connections of an object. |
|
|||
73 | Attributes are: |
|
|||
74 | id (int): engine ID |
|
|||
75 | uuid (str): uuid (unused?) |
|
|||
76 | queue (str): identity of queue's XREQ socket |
|
|||
77 | registration (str): identity of registration XREQ socket |
|
|||
78 | heartbeat (str): identity of heartbeat XREQ socket |
|
|||
79 | """ |
|
|||
80 | id=0 |
|
|||
81 | queue=None |
|
|||
82 | control=None |
|
|||
83 | registration=None |
|
|||
84 | heartbeat=None |
|
|||
85 | pending=None |
|
|||
86 |
|
||||
87 | def __init__(self, id, queue, registration, control, heartbeat=None): |
|
|||
88 | logger.info("engine::Engine Connected: %i"%id) |
|
|||
89 | self.id = id |
|
|||
90 | self.queue = queue |
|
|||
91 | self.registration = registration |
|
|||
92 | self.control = control |
|
|||
93 | self.heartbeat = heartbeat |
|
|||
94 |
|
||||
95 | class Controller(object): |
|
|||
96 | """The IPython Controller with 0MQ connections |
|
|||
97 |
|
||||
98 | Parameters |
|
|||
99 | ========== |
|
|||
100 | loop: zmq IOLoop instance |
|
|||
101 | session: StreamSession object |
|
|||
102 | <removed> context: zmq context for creating new connections (?) |
|
|||
103 | queue: ZMQStream for monitoring the command queue (SUB) |
|
|||
104 | registrar: ZMQStream for engine registration requests (XREP) |
|
|||
105 | heartbeat: HeartMonitor object checking the pulse of the engines |
|
|||
106 | clientele: ZMQStream for client connections (XREP) |
|
|||
107 | not used for jobs, only query/control commands |
|
|||
108 | notifier: ZMQStream for broadcasting engine registration changes (PUB) |
|
|||
109 | db: connection to db for out of memory logging of commands |
|
|||
110 | NotImplemented |
|
|||
111 | engine_addrs: dict of zmq connection information for engines to connect |
|
|||
112 | to the queues. |
|
|||
113 | client_addrs: dict of zmq connection information for engines to connect |
|
|||
114 | to the queues. |
|
|||
115 | """ |
|
|||
116 | # internal data structures: |
|
|||
117 | ids=None # engine IDs |
|
|||
118 | keytable=None |
|
|||
119 | engines=None |
|
|||
120 | clients=None |
|
|||
121 | hearts=None |
|
|||
122 | pending=None |
|
|||
123 | results=None |
|
|||
124 | tasks=None |
|
|||
125 | completed=None |
|
|||
126 | mia=None |
|
|||
127 | incoming_registrations=None |
|
|||
128 | registration_timeout=None |
|
|||
129 |
|
||||
130 | #objects from constructor: |
|
|||
131 | loop=None |
|
|||
132 | registrar=None |
|
|||
133 | clientelle=None |
|
|||
134 | queue=None |
|
|||
135 | heartbeat=None |
|
|||
136 | notifier=None |
|
|||
137 | db=None |
|
|||
138 | client_addr=None |
|
|||
139 | engine_addrs=None |
|
|||
140 |
|
||||
141 |
|
||||
142 | def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs): |
|
|||
143 | """ |
|
|||
144 | # universal: |
|
|||
145 | loop: IOLoop for creating future connections |
|
|||
146 | session: streamsession for sending serialized data |
|
|||
147 | # engine: |
|
|||
148 | queue: ZMQStream for monitoring queue messages |
|
|||
149 | registrar: ZMQStream for engine registration |
|
|||
150 | heartbeat: HeartMonitor object for tracking engines |
|
|||
151 | # client: |
|
|||
152 | clientele: ZMQStream for client connections |
|
|||
153 | # extra: |
|
|||
154 | db: ZMQStream for db connection (NotImplemented) |
|
|||
155 | engine_addrs: zmq address/protocol dict for engine connections |
|
|||
156 | client_addrs: zmq address/protocol dict for client connections |
|
|||
157 | """ |
|
|||
158 | self.ids = set() |
|
|||
159 | self.keytable={} |
|
|||
160 | self.incoming_registrations={} |
|
|||
161 | self.engines = {} |
|
|||
162 | self.by_ident = {} |
|
|||
163 | self.clients = {} |
|
|||
164 | self.hearts = {} |
|
|||
165 | # self.mia = set() |
|
|||
166 |
|
||||
167 | # self.sockets = {} |
|
|||
168 | self.loop = loop |
|
|||
169 | self.session = session |
|
|||
170 | self.registrar = registrar |
|
|||
171 | self.clientele = clientele |
|
|||
172 | self.queue = queue |
|
|||
173 | self.heartbeat = heartbeat |
|
|||
174 | self.notifier = notifier |
|
|||
175 | self.db = db |
|
|||
176 |
|
||||
177 | # validate connection dicts: |
|
|||
178 | self.client_addrs = client_addrs |
|
|||
179 | assert isinstance(client_addrs['queue'], str) |
|
|||
180 | assert isinstance(client_addrs['control'], str) |
|
|||
181 | # self.hb_addrs = hb_addrs |
|
|||
182 | self.engine_addrs = engine_addrs |
|
|||
183 | assert isinstance(engine_addrs['queue'], str) |
|
|||
184 | assert isinstance(client_addrs['control'], str) |
|
|||
185 | assert len(engine_addrs['heartbeat']) == 2 |
|
|||
186 |
|
||||
187 | # register our callbacks |
|
|||
188 | self.registrar.on_recv(self.dispatch_register_request) |
|
|||
189 | self.clientele.on_recv(self.dispatch_client_msg) |
|
|||
190 | self.queue.on_recv(self.dispatch_queue_traffic) |
|
|||
191 |
|
||||
192 | if heartbeat is not None: |
|
|||
193 | heartbeat.add_heart_failure_handler(self.handle_heart_failure) |
|
|||
194 | heartbeat.add_new_heart_handler(self.handle_new_heart) |
|
|||
195 |
|
||||
196 | self.queue_handlers = { 'in' : self.save_queue_request, |
|
|||
197 | 'out': self.save_queue_result, |
|
|||
198 | 'intask': self.save_task_request, |
|
|||
199 | 'outtask': self.save_task_result, |
|
|||
200 | 'tracktask': self.save_task_destination, |
|
|||
201 | 'incontrol': _passer, |
|
|||
202 | 'outcontrol': _passer, |
|
|||
203 | } |
|
|||
204 |
|
||||
205 | self.client_handlers = {'queue_request': self.queue_status, |
|
|||
206 | 'result_request': self.get_results, |
|
|||
207 | 'purge_request': self.purge_results, |
|
|||
208 | 'load_request': self.check_load, |
|
|||
209 | 'resubmit_request': self.resubmit_task, |
|
|||
210 | 'shutdown_request': self.shutdown_request, |
|
|||
211 | } |
|
|||
212 |
|
||||
213 | self.registrar_handlers = {'registration_request' : self.register_engine, |
|
|||
214 | 'unregistration_request' : self.unregister_engine, |
|
|||
215 | 'connection_request': self.connection_request, |
|
|||
216 | } |
|
|||
217 | self.registration_timeout = max(5000, 2*self.heartbeat.period) |
|
|||
218 | # this is the stuff that will move to DB: |
|
|||
219 | # self.results = {} # completed results |
|
|||
220 | self.pending = set() # pending messages, keyed by msg_id |
|
|||
221 | self.queues = {} # pending msg_ids keyed by engine_id |
|
|||
222 | self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id |
|
|||
223 | self.completed = {} # completed msg_ids keyed by engine_id |
|
|||
224 | self.all_completed = set() |
|
|||
225 |
|
||||
226 | logger.info("controller::created controller") |
|
|||
227 |
|
||||
228 | def _new_id(self): |
|
|||
229 | """gemerate a new ID""" |
|
|||
230 | newid = 0 |
|
|||
231 | incoming = [id[0] for id in self.incoming_registrations.itervalues()] |
|
|||
232 | # print newid, self.ids, self.incoming_registrations |
|
|||
233 | while newid in self.ids or newid in incoming: |
|
|||
234 | newid += 1 |
|
|||
235 | return newid |
|
|||
236 |
|
||||
237 | #----------------------------------------------------------------------------- |
|
|||
238 | # message validation |
|
|||
239 | #----------------------------------------------------------------------------- |
|
|||
240 |
|
||||
241 | def _validate_targets(self, targets): |
|
|||
242 | """turn any valid targets argument into a list of integer ids""" |
|
|||
243 | if targets is None: |
|
|||
244 | # default to all |
|
|||
245 | targets = self.ids |
|
|||
246 |
|
||||
247 | if isinstance(targets, (int,str,unicode)): |
|
|||
248 | # only one target specified |
|
|||
249 | targets = [targets] |
|
|||
250 | _targets = [] |
|
|||
251 | for t in targets: |
|
|||
252 | # map raw identities to ids |
|
|||
253 | if isinstance(t, (str,unicode)): |
|
|||
254 | t = self.by_ident.get(t, t) |
|
|||
255 | _targets.append(t) |
|
|||
256 | targets = _targets |
|
|||
257 | bad_targets = [ t for t in targets if t not in self.ids ] |
|
|||
258 | if bad_targets: |
|
|||
259 | raise IndexError("No Such Engine: %r"%bad_targets) |
|
|||
260 | if not targets: |
|
|||
261 | raise IndexError("No Engines Registered") |
|
|||
262 | return targets |
|
|||
263 |
|
||||
264 | def _validate_client_msg(self, msg): |
|
|||
265 | """validates and unpacks headers of a message. Returns False if invalid, |
|
|||
266 | (ident, header, parent, content)""" |
|
|||
267 | client_id = msg[0] |
|
|||
268 | try: |
|
|||
269 | msg = self.session.unpack_message(msg[1:], content=True) |
|
|||
270 | except: |
|
|||
271 | logger.error("client::Invalid Message %s"%msg) |
|
|||
272 | return False |
|
|||
273 |
|
||||
274 | msg_type = msg.get('msg_type', None) |
|
|||
275 | if msg_type is None: |
|
|||
276 | return False |
|
|||
277 | header = msg.get('header') |
|
|||
278 | # session doesn't handle split content for now: |
|
|||
279 | return client_id, msg |
|
|||
280 |
|
||||
281 |
|
||||
282 | #----------------------------------------------------------------------------- |
|
|||
283 | # dispatch methods (1 per stream) |
|
|||
284 | #----------------------------------------------------------------------------- |
|
|||
285 |
|
||||
286 | def dispatch_register_request(self, msg): |
|
|||
287 | """""" |
|
|||
288 | logger.debug("registration::dispatch_register_request(%s)"%msg) |
|
|||
289 | idents,msg = self.session.feed_identities(msg) |
|
|||
290 | if not idents: |
|
|||
291 | logger.error("Bad Queue Message: %s"%msg, exc_info=True) |
|
|||
292 | return |
|
|||
293 | try: |
|
|||
294 | msg = self.session.unpack_message(msg,content=True) |
|
|||
295 | except: |
|
|||
296 | logger.error("registration::got bad registration message: %s"%msg, exc_info=True) |
|
|||
297 | return |
|
|||
298 |
|
||||
299 | msg_type = msg['msg_type'] |
|
|||
300 | content = msg['content'] |
|
|||
301 |
|
||||
302 | handler = self.registrar_handlers.get(msg_type, None) |
|
|||
303 | if handler is None: |
|
|||
304 | logger.error("registration::got bad registration message: %s"%msg) |
|
|||
305 | else: |
|
|||
306 | handler(idents, msg) |
|
|||
307 |
|
||||
308 | def dispatch_queue_traffic(self, msg): |
|
|||
309 | """all ME and Task queue messages come through here""" |
|
|||
310 | logger.debug("queue traffic: %s"%msg[:2]) |
|
|||
311 | switch = msg[0] |
|
|||
312 | idents, msg = self.session.feed_identities(msg[1:]) |
|
|||
313 | if not idents: |
|
|||
314 | logger.error("Bad Queue Message: %s"%msg) |
|
|||
315 | return |
|
|||
316 | handler = self.queue_handlers.get(switch, None) |
|
|||
317 | if handler is not None: |
|
|||
318 | handler(idents, msg) |
|
|||
319 | else: |
|
|||
320 | logger.error("Invalid message topic: %s"%switch) |
|
|||
321 |
|
||||
322 |
|
||||
323 | def dispatch_client_msg(self, msg): |
|
|||
324 | """Route messages from clients""" |
|
|||
325 | idents, msg = self.session.feed_identities(msg) |
|
|||
326 | if not idents: |
|
|||
327 | logger.error("Bad Client Message: %s"%msg) |
|
|||
328 | return |
|
|||
329 | client_id = idents[0] |
|
|||
330 | try: |
|
|||
331 | msg = self.session.unpack_message(msg, content=True) |
|
|||
332 | except: |
|
|||
333 | content = wrap_exception() |
|
|||
334 | logger.error("Bad Client Message: %s"%msg, exc_info=True) |
|
|||
335 | self.session.send(self.clientele, "controller_error", ident=client_id, |
|
|||
336 | content=content) |
|
|||
337 | return |
|
|||
338 |
|
||||
339 | # print client_id, header, parent, content |
|
|||
340 | #switch on message type: |
|
|||
341 | msg_type = msg['msg_type'] |
|
|||
342 | logger.info("client:: client %s requested %s"%(client_id, msg_type)) |
|
|||
343 | handler = self.client_handlers.get(msg_type, None) |
|
|||
344 | try: |
|
|||
345 | assert handler is not None, "Bad Message Type: %s"%msg_type |
|
|||
346 | except: |
|
|||
347 | content = wrap_exception() |
|
|||
348 | logger.error("Bad Message Type: %s"%msg_type, exc_info=True) |
|
|||
349 | self.session.send(self.clientele, "controller_error", ident=client_id, |
|
|||
350 | content=content) |
|
|||
351 | return |
|
|||
352 | else: |
|
|||
353 | handler(client_id, msg) |
|
|||
354 |
|
||||
355 | def dispatch_db(self, msg): |
|
|||
356 | """""" |
|
|||
357 | raise NotImplementedError |
|
|||
358 |
|
||||
359 | #--------------------------------------------------------------------------- |
|
|||
360 | # handler methods (1 per event) |
|
|||
361 | #--------------------------------------------------------------------------- |
|
|||
362 |
|
||||
363 | #----------------------- Heartbeat -------------------------------------- |
|
|||
364 |
|
||||
365 | def handle_new_heart(self, heart): |
|
|||
366 | """handler to attach to heartbeater. |
|
|||
367 | Called when a new heart starts to beat. |
|
|||
368 | Triggers completion of registration.""" |
|
|||
369 | logger.debug("heartbeat::handle_new_heart(%r)"%heart) |
|
|||
370 | if heart not in self.incoming_registrations: |
|
|||
371 | logger.info("heartbeat::ignoring new heart: %r"%heart) |
|
|||
372 | else: |
|
|||
373 | self.finish_registration(heart) |
|
|||
374 |
|
||||
375 |
|
||||
376 | def handle_heart_failure(self, heart): |
|
|||
377 | """handler to attach to heartbeater. |
|
|||
378 | called when a previously registered heart fails to respond to beat request. |
|
|||
379 | triggers unregistration""" |
|
|||
380 | logger.debug("heartbeat::handle_heart_failure(%r)"%heart) |
|
|||
381 | eid = self.hearts.get(heart, None) |
|
|||
382 | queue = self.engines[eid].queue |
|
|||
383 | if eid is None: |
|
|||
384 | logger.info("heartbeat::ignoring heart failure %r"%heart) |
|
|||
385 | else: |
|
|||
386 | self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue))) |
|
|||
387 |
|
||||
388 | #----------------------- MUX Queue Traffic ------------------------------ |
|
|||
389 |
|
||||
390 | def save_queue_request(self, idents, msg): |
|
|||
391 | if len(idents) < 2: |
|
|||
392 | logger.error("invalid identity prefix: %s"%idents) |
|
|||
393 | return |
|
|||
394 | queue_id, client_id = idents[:2] |
|
|||
395 | try: |
|
|||
396 | msg = self.session.unpack_message(msg, content=False) |
|
|||
397 | except: |
|
|||
398 | logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True) |
|
|||
399 | return |
|
|||
400 |
|
||||
401 | eid = self.by_ident.get(queue_id, None) |
|
|||
402 | if eid is None: |
|
|||
403 | logger.error("queue::target %r not registered"%queue_id) |
|
|||
404 | logger.debug("queue:: valid are: %s"%(self.by_ident.keys())) |
|
|||
405 | return |
|
|||
406 |
|
||||
407 | header = msg['header'] |
|
|||
408 | msg_id = header['msg_id'] |
|
|||
409 | record = init_record(msg) |
|
|||
410 | record['engine_uuid'] = queue_id |
|
|||
411 | record['client_uuid'] = client_id |
|
|||
412 | record['queue'] = 'mux' |
|
|||
413 | if MongoDB is not None and isinstance(self.db, MongoDB): |
|
|||
414 | record['buffers'] = map(Binary, record['buffers']) |
|
|||
415 | self.pending.add(msg_id) |
|
|||
416 | self.queues[eid].append(msg_id) |
|
|||
417 | self.db.add_record(msg_id, record) |
|
|||
418 |
|
||||
419 | def save_queue_result(self, idents, msg): |
|
|||
420 | if len(idents) < 2: |
|
|||
421 | logger.error("invalid identity prefix: %s"%idents) |
|
|||
422 | return |
|
|||
423 |
|
||||
424 | client_id, queue_id = idents[:2] |
|
|||
425 | try: |
|
|||
426 | msg = self.session.unpack_message(msg, content=False) |
|
|||
427 | except: |
|
|||
428 | logger.error("queue::engine %r sent invalid message to %r: %s"%( |
|
|||
429 | queue_id,client_id, msg), exc_info=True) |
|
|||
430 | return |
|
|||
431 |
|
||||
432 | eid = self.by_ident.get(queue_id, None) |
|
|||
433 | if eid is None: |
|
|||
434 | logger.error("queue::unknown engine %r is sending a reply: "%queue_id) |
|
|||
435 | logger.debug("queue:: %s"%msg[2:]) |
|
|||
436 | return |
|
|||
437 |
|
||||
438 | parent = msg['parent_header'] |
|
|||
439 | if not parent: |
|
|||
440 | return |
|
|||
441 | msg_id = parent['msg_id'] |
|
|||
442 | if msg_id in self.pending: |
|
|||
443 | self.pending.remove(msg_id) |
|
|||
444 | self.all_completed.add(msg_id) |
|
|||
445 | self.queues[eid].remove(msg_id) |
|
|||
446 | self.completed[eid].append(msg_id) |
|
|||
447 | rheader = msg['header'] |
|
|||
448 | completed = datetime.strptime(rheader['date'], ISO8601) |
|
|||
449 | started = rheader.get('started', None) |
|
|||
450 | if started is not None: |
|
|||
451 | started = datetime.strptime(started, ISO8601) |
|
|||
452 | result = { |
|
|||
453 | 'result_header' : rheader, |
|
|||
454 | 'result_content': msg['content'], |
|
|||
455 | 'started' : started, |
|
|||
456 | 'completed' : completed |
|
|||
457 | } |
|
|||
458 | if MongoDB is not None and isinstance(self.db, MongoDB): |
|
|||
459 | result['result_buffers'] = map(Binary, msg['buffers']) |
|
|||
460 | else: |
|
|||
461 | result['result_buffers'] = msg['buffers'] |
|
|||
462 | self.db.update_record(msg_id, result) |
|
|||
463 | else: |
|
|||
464 | logger.debug("queue:: unknown msg finished %s"%msg_id) |
|
|||
465 |
|
||||
466 | #--------------------- Task Queue Traffic ------------------------------ |
|
|||
467 |
|
||||
468 | def save_task_request(self, idents, msg): |
|
|||
469 | """Save the submission of a task.""" |
|
|||
470 | client_id = idents[0] |
|
|||
471 |
|
||||
472 | try: |
|
|||
473 | msg = self.session.unpack_message(msg, content=False) |
|
|||
474 | except: |
|
|||
475 | logger.error("task::client %r sent invalid task message: %s"%( |
|
|||
476 | client_id, msg), exc_info=True) |
|
|||
477 | return |
|
|||
478 | record = init_record(msg) |
|
|||
479 | if MongoDB is not None and isinstance(self.db, MongoDB): |
|
|||
480 | record['buffers'] = map(Binary, record['buffers']) |
|
|||
481 | record['client_uuid'] = client_id |
|
|||
482 | record['queue'] = 'task' |
|
|||
483 | header = msg['header'] |
|
|||
484 | msg_id = header['msg_id'] |
|
|||
485 | self.pending.add(msg_id) |
|
|||
486 | self.db.add_record(msg_id, record) |
|
|||
487 |
|
||||
488 | def save_task_result(self, idents, msg): |
|
|||
489 | """save the result of a completed task.""" |
|
|||
490 | client_id = idents[0] |
|
|||
491 | try: |
|
|||
492 | msg = self.session.unpack_message(msg, content=False) |
|
|||
493 | except: |
|
|||
494 | logger.error("task::invalid task result message send to %r: %s"%( |
|
|||
495 | client_id, msg)) |
|
|||
496 | raise |
|
|||
497 | return |
|
|||
498 |
|
||||
499 | parent = msg['parent_header'] |
|
|||
500 | if not parent: |
|
|||
501 | # print msg |
|
|||
502 | logger.warn("Task %r had no parent!"%msg) |
|
|||
503 | return |
|
|||
504 | msg_id = parent['msg_id'] |
|
|||
505 |
|
||||
506 | header = msg['header'] |
|
|||
507 | engine_uuid = header.get('engine', None) |
|
|||
508 | eid = self.by_ident.get(engine_uuid, None) |
|
|||
509 |
|
||||
510 | if msg_id in self.pending: |
|
|||
511 | self.pending.remove(msg_id) |
|
|||
512 | self.all_completed.add(msg_id) |
|
|||
513 | if eid is not None: |
|
|||
514 | self.completed[eid].append(msg_id) |
|
|||
515 | if msg_id in self.tasks[eid]: |
|
|||
516 | self.tasks[eid].remove(msg_id) |
|
|||
517 | completed = datetime.strptime(header['date'], ISO8601) |
|
|||
518 | started = header.get('started', None) |
|
|||
519 | if started is not None: |
|
|||
520 | started = datetime.strptime(started, ISO8601) |
|
|||
521 | result = { |
|
|||
522 | 'result_header' : header, |
|
|||
523 | 'result_content': msg['content'], |
|
|||
524 | 'started' : started, |
|
|||
525 | 'completed' : completed, |
|
|||
526 | 'engine_uuid': engine_uuid |
|
|||
527 | } |
|
|||
528 | if MongoDB is not None and isinstance(self.db, MongoDB): |
|
|||
529 | result['result_buffers'] = map(Binary, msg['buffers']) |
|
|||
530 | else: |
|
|||
531 | result['result_buffers'] = msg['buffers'] |
|
|||
532 | self.db.update_record(msg_id, result) |
|
|||
533 |
|
||||
534 | else: |
|
|||
535 | logger.debug("task::unknown task %s finished"%msg_id) |
|
|||
536 |
|
||||
537 | def save_task_destination(self, idents, msg): |
|
|||
538 | try: |
|
|||
539 | msg = self.session.unpack_message(msg, content=True) |
|
|||
540 | except: |
|
|||
541 | logger.error("task::invalid task tracking message") |
|
|||
542 | return |
|
|||
543 | content = msg['content'] |
|
|||
544 | print (content) |
|
|||
545 | msg_id = content['msg_id'] |
|
|||
546 | engine_uuid = content['engine_id'] |
|
|||
547 | eid = self.by_ident[engine_uuid] |
|
|||
548 |
|
||||
549 | logger.info("task::task %s arrived on %s"%(msg_id, eid)) |
|
|||
550 | # if msg_id in self.mia: |
|
|||
551 | # self.mia.remove(msg_id) |
|
|||
552 | # else: |
|
|||
553 | # logger.debug("task::task %s not listed as MIA?!"%(msg_id)) |
|
|||
554 |
|
||||
555 | self.tasks[eid].append(msg_id) |
|
|||
556 | # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid)) |
|
|||
557 | self.db.update_record(msg_id, dict(engine_uuid=engine_uuid)) |
|
|||
558 |
|
||||
559 | def mia_task_request(self, idents, msg): |
|
|||
560 | raise NotImplementedError |
|
|||
561 | client_id = idents[0] |
|
|||
562 | # content = dict(mia=self.mia,status='ok') |
|
|||
563 | # self.session.send('mia_reply', content=content, idents=client_id) |
|
|||
564 |
|
||||
565 |
|
||||
566 |
|
||||
567 | #------------------------------------------------------------------------- |
|
|||
568 | # Registration requests |
|
|||
569 | #------------------------------------------------------------------------- |
|
|||
570 |
|
||||
571 | def connection_request(self, client_id, msg): |
|
|||
572 | """Reply with connection addresses for clients.""" |
|
|||
573 | logger.info("client::client %s connected"%client_id) |
|
|||
574 | content = dict(status='ok') |
|
|||
575 | content.update(self.client_addrs) |
|
|||
576 | jsonable = {} |
|
|||
577 | for k,v in self.keytable.iteritems(): |
|
|||
578 | jsonable[str(k)] = v |
|
|||
579 | content['engines'] = jsonable |
|
|||
580 | self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id) |
|
|||
581 |
|
||||
582 | def register_engine(self, reg, msg): |
|
|||
583 | """Register a new engine.""" |
|
|||
584 | content = msg['content'] |
|
|||
585 | try: |
|
|||
586 | queue = content['queue'] |
|
|||
587 | except KeyError: |
|
|||
588 | logger.error("registration::queue not specified") |
|
|||
589 | return |
|
|||
590 | heart = content.get('heartbeat', None) |
|
|||
591 | """register a new engine, and create the socket(s) necessary""" |
|
|||
592 | eid = self._new_id() |
|
|||
593 | # print (eid, queue, reg, heart) |
|
|||
594 |
|
||||
595 | logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) |
|
|||
596 |
|
||||
597 | content = dict(id=eid,status='ok') |
|
|||
598 | content.update(self.engine_addrs) |
|
|||
599 | # check if requesting available IDs: |
|
|||
600 | if queue in self.by_ident: |
|
|||
601 | try: |
|
|||
602 | raise KeyError("queue_id %r in use"%queue) |
|
|||
603 | except: |
|
|||
604 | content = wrap_exception() |
|
|||
605 | elif heart in self.hearts: # need to check unique hearts? |
|
|||
606 | try: |
|
|||
607 | raise KeyError("heart_id %r in use"%heart) |
|
|||
608 | except: |
|
|||
609 | content = wrap_exception() |
|
|||
610 | else: |
|
|||
611 | for h, pack in self.incoming_registrations.iteritems(): |
|
|||
612 | if heart == h: |
|
|||
613 | try: |
|
|||
614 | raise KeyError("heart_id %r in use"%heart) |
|
|||
615 | except: |
|
|||
616 | content = wrap_exception() |
|
|||
617 | break |
|
|||
618 | elif queue == pack[1]: |
|
|||
619 | try: |
|
|||
620 | raise KeyError("queue_id %r in use"%queue) |
|
|||
621 | except: |
|
|||
622 | content = wrap_exception() |
|
|||
623 | break |
|
|||
624 |
|
||||
625 | msg = self.session.send(self.registrar, "registration_reply", |
|
|||
626 | content=content, |
|
|||
627 | ident=reg) |
|
|||
628 |
|
||||
629 | if content['status'] == 'ok': |
|
|||
630 | if heart in self.heartbeat.hearts: |
|
|||
631 | # already beating |
|
|||
632 | self.incoming_registrations[heart] = (eid,queue,reg,None) |
|
|||
633 | self.finish_registration(heart) |
|
|||
634 | else: |
|
|||
635 | purge = lambda : self._purge_stalled_registration(heart) |
|
|||
636 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) |
|
|||
637 | dc.start() |
|
|||
638 | self.incoming_registrations[heart] = (eid,queue,reg,dc) |
|
|||
639 | else: |
|
|||
640 | logger.error("registration::registration %i failed: %s"%(eid, content['evalue'])) |
|
|||
641 | return eid |
|
|||
642 |
|
||||
643 | def unregister_engine(self, ident, msg): |
|
|||
644 | """Unregister an engine that explicitly requested to leave.""" |
|
|||
645 | try: |
|
|||
646 | eid = msg['content']['id'] |
|
|||
647 | except: |
|
|||
648 | logger.error("registration::bad engine id for unregistration: %s"%ident) |
|
|||
649 | return |
|
|||
650 | logger.info("registration::unregister_engine(%s)"%eid) |
|
|||
651 | content=dict(id=eid, queue=self.engines[eid].queue) |
|
|||
652 | self.ids.remove(eid) |
|
|||
653 | self.keytable.pop(eid) |
|
|||
654 | ec = self.engines.pop(eid) |
|
|||
655 | self.hearts.pop(ec.heartbeat) |
|
|||
656 | self.by_ident.pop(ec.queue) |
|
|||
657 | self.completed.pop(eid) |
|
|||
658 | for msg_id in self.queues.pop(eid): |
|
|||
659 | msg = self.pending.remove(msg_id) |
|
|||
660 | ############## TODO: HANDLE IT ################ |
|
|||
661 |
|
||||
662 | if self.notifier: |
|
|||
663 | self.session.send(self.notifier, "unregistration_notification", content=content) |
|
|||
664 |
|
||||
665 | def finish_registration(self, heart): |
|
|||
666 | """Second half of engine registration, called after our HeartMonitor |
|
|||
667 | has received a beat from the Engine's Heart.""" |
|
|||
668 | try: |
|
|||
669 | (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) |
|
|||
670 | except KeyError: |
|
|||
671 | logger.error("registration::tried to finish nonexistant registration") |
|
|||
672 | return |
|
|||
673 | logger.info("registration::finished registering engine %i:%r"%(eid,queue)) |
|
|||
674 | if purge is not None: |
|
|||
675 | purge.stop() |
|
|||
676 | control = queue |
|
|||
677 | self.ids.add(eid) |
|
|||
678 | self.keytable[eid] = queue |
|
|||
679 | self.engines[eid] = EngineConnector(eid, queue, reg, control, heart) |
|
|||
680 | self.by_ident[queue] = eid |
|
|||
681 | self.queues[eid] = list() |
|
|||
682 | self.tasks[eid] = list() |
|
|||
683 | self.completed[eid] = list() |
|
|||
684 | self.hearts[heart] = eid |
|
|||
685 | content = dict(id=eid, queue=self.engines[eid].queue) |
|
|||
686 | if self.notifier: |
|
|||
687 | self.session.send(self.notifier, "registration_notification", content=content) |
|
|||
688 |
|
||||
689 | def _purge_stalled_registration(self, heart): |
|
|||
690 | if heart in self.incoming_registrations: |
|
|||
691 | eid = self.incoming_registrations.pop(heart)[0] |
|
|||
692 | logger.info("registration::purging stalled registration: %i"%eid) |
|
|||
693 | else: |
|
|||
694 | pass |
|
|||
695 |
|
||||
696 | #------------------------------------------------------------------------- |
|
|||
697 | # Client Requests |
|
|||
698 | #------------------------------------------------------------------------- |
|
|||
699 |
|
||||
700 | def shutdown_request(self, client_id, msg): |
|
|||
701 | """handle shutdown request.""" |
|
|||
702 | # s = self.context.socket(zmq.XREQ) |
|
|||
703 | # s.connect(self.client_connections['mux']) |
|
|||
704 | # time.sleep(0.1) |
|
|||
705 | # for eid,ec in self.engines.iteritems(): |
|
|||
706 | # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue) |
|
|||
707 | # time.sleep(1) |
|
|||
708 | self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id) |
|
|||
709 | dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop) |
|
|||
710 | dc.start() |
|
|||
711 |
|
||||
712 | def _shutdown(self): |
|
|||
713 | logger.info("controller::controller shutting down.") |
|
|||
714 | time.sleep(0.1) |
|
|||
715 | sys.exit(0) |
|
|||
716 |
|
||||
717 |
|
||||
718 | def check_load(self, client_id, msg): |
|
|||
719 | content = msg['content'] |
|
|||
720 | try: |
|
|||
721 | targets = content['targets'] |
|
|||
722 | targets = self._validate_targets(targets) |
|
|||
723 | except: |
|
|||
724 | content = wrap_exception() |
|
|||
725 | self.session.send(self.clientele, "controller_error", |
|
|||
726 | content=content, ident=client_id) |
|
|||
727 | return |
|
|||
728 |
|
||||
729 | content = dict(status='ok') |
|
|||
730 | # loads = {} |
|
|||
731 | for t in targets: |
|
|||
732 | content[bytes(t)] = len(self.queues[t])+len(self.tasks[t]) |
|
|||
733 | self.session.send(self.clientele, "load_reply", content=content, ident=client_id) |
|
|||
734 |
|
||||
735 |
|
||||
736 | def queue_status(self, client_id, msg): |
|
|||
737 | """Return the Queue status of one or more targets. |
|
|||
738 | if verbose: return the msg_ids |
|
|||
739 | else: return len of each type. |
|
|||
740 | keys: queue (pending MUX jobs) |
|
|||
741 | tasks (pending Task jobs) |
|
|||
742 | completed (finished jobs from both queues)""" |
|
|||
743 | content = msg['content'] |
|
|||
744 | targets = content['targets'] |
|
|||
745 | try: |
|
|||
746 | targets = self._validate_targets(targets) |
|
|||
747 | except: |
|
|||
748 | content = wrap_exception() |
|
|||
749 | self.session.send(self.clientele, "controller_error", |
|
|||
750 | content=content, ident=client_id) |
|
|||
751 | return |
|
|||
752 | verbose = content.get('verbose', False) |
|
|||
753 | content = dict(status='ok') |
|
|||
754 | for t in targets: |
|
|||
755 | queue = self.queues[t] |
|
|||
756 | completed = self.completed[t] |
|
|||
757 | tasks = self.tasks[t] |
|
|||
758 | if not verbose: |
|
|||
759 | queue = len(queue) |
|
|||
760 | completed = len(completed) |
|
|||
761 | tasks = len(tasks) |
|
|||
762 | content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} |
|
|||
763 | # pending |
|
|||
764 | self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) |
|
|||
765 |
|
||||
766 | def purge_results(self, client_id, msg): |
|
|||
767 | """Purge results from memory. This method is more valuable before we move |
|
|||
768 | to a DB based message storage mechanism.""" |
|
|||
769 | content = msg['content'] |
|
|||
770 | msg_ids = content.get('msg_ids', []) |
|
|||
771 | reply = dict(status='ok') |
|
|||
772 | if msg_ids == 'all': |
|
|||
773 | self.db.drop_matching_records(dict(completed={'$ne':None})) |
|
|||
774 | else: |
|
|||
775 | for msg_id in msg_ids: |
|
|||
776 | if msg_id in self.all_completed: |
|
|||
777 | self.db.drop_record(msg_id) |
|
|||
778 | else: |
|
|||
779 | if msg_id in self.pending: |
|
|||
780 | try: |
|
|||
781 | raise IndexError("msg pending: %r"%msg_id) |
|
|||
782 | except: |
|
|||
783 | reply = wrap_exception() |
|
|||
784 | else: |
|
|||
785 | try: |
|
|||
786 | raise IndexError("No such msg: %r"%msg_id) |
|
|||
787 | except: |
|
|||
788 | reply = wrap_exception() |
|
|||
789 | break |
|
|||
790 | eids = content.get('engine_ids', []) |
|
|||
791 | for eid in eids: |
|
|||
792 | if eid not in self.engines: |
|
|||
793 | try: |
|
|||
794 | raise IndexError("No such engine: %i"%eid) |
|
|||
795 | except: |
|
|||
796 | reply = wrap_exception() |
|
|||
797 | break |
|
|||
798 | msg_ids = self.completed.pop(eid) |
|
|||
799 | uid = self.engines[eid].queue |
|
|||
800 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) |
|
|||
801 |
|
||||
802 | self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id) |
|
|||
803 |
|
||||
804 | def resubmit_task(self, client_id, msg, buffers): |
|
|||
805 | """Resubmit a task.""" |
|
|||
806 | raise NotImplementedError |
|
|||
807 |
|
||||
808 | def get_results(self, client_id, msg): |
|
|||
809 | """Get the result of 1 or more messages.""" |
|
|||
810 | content = msg['content'] |
|
|||
811 | msg_ids = sorted(set(content['msg_ids'])) |
|
|||
812 | statusonly = content.get('status_only', False) |
|
|||
813 | pending = [] |
|
|||
814 | completed = [] |
|
|||
815 | content = dict(status='ok') |
|
|||
816 | content['pending'] = pending |
|
|||
817 | content['completed'] = completed |
|
|||
818 | buffers = [] |
|
|||
819 | if not statusonly: |
|
|||
820 | content['results'] = {} |
|
|||
821 | records = self.db.find_records(dict(msg_id={'$in':msg_ids})) |
|
|||
822 | for msg_id in msg_ids: |
|
|||
823 | if msg_id in self.pending: |
|
|||
824 | pending.append(msg_id) |
|
|||
825 | elif msg_id in self.all_completed: |
|
|||
826 | completed.append(msg_id) |
|
|||
827 | if not statusonly: |
|
|||
828 | rec = records[msg_id] |
|
|||
829 | content[msg_id] = { 'result_content': rec['result_content'], |
|
|||
830 | 'header': rec['header'], |
|
|||
831 | 'result_header' : rec['result_header'], |
|
|||
832 | } |
|
|||
833 | buffers.extend(map(str, rec['result_buffers'])) |
|
|||
834 | else: |
|
|||
835 | try: |
|
|||
836 | raise KeyError('No such message: '+msg_id) |
|
|||
837 | except: |
|
|||
838 | content = wrap_exception() |
|
|||
839 | break |
|
|||
840 | self.session.send(self.clientele, "result_reply", content=content, |
|
|||
841 | parent=msg, ident=client_id, |
|
|||
842 | buffers=buffers) |
|
|||
843 |
|
||||
844 |
|
||||
845 | #------------------------------------------------------------------------- |
|
47 | #------------------------------------------------------------------------- | |
846 | # Entry Point |
|
48 | # Entry Point | |
847 | #------------------------------------------------------------------------- |
|
49 | #------------------------------------------------------------------------- | |
@@ -875,16 +77,6 b' def make_argument_parser():' | |||||
875 | return parser |
|
77 | return parser | |
876 |
|
78 | |||
877 | def main(argv=None): |
|
79 | def main(argv=None): | |
878 | import time |
|
|||
879 | from multiprocessing import Process |
|
|||
880 |
|
||||
881 | from zmq.eventloop.zmqstream import ZMQStream |
|
|||
882 | from zmq.devices import ProcessMonitoredQueue |
|
|||
883 | from zmq.log import handlers |
|
|||
884 |
|
||||
885 | import streamsession as session |
|
|||
886 | import heartmonitor |
|
|||
887 | from scheduler import launch_scheduler |
|
|||
888 |
|
80 | |||
889 | parser = make_argument_parser() |
|
81 | parser = make_argument_parser() | |
890 |
|
82 | |||
@@ -1023,7 +215,7 b' def main(argv=None):' | |||||
1023 | 'notification': iface%nport |
|
215 | 'notification': iface%nport | |
1024 | } |
|
216 | } | |
1025 | signal_children(children) |
|
217 | signal_children(children) | |
1026 |
|
|
218 | hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs) | |
1027 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) |
|
219 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) | |
1028 | dc.start() |
|
220 | dc.start() | |
1029 | loop.start() |
|
221 | loop.start() |
General Comments 0
You need to be logged in to leave comments.
Login now