Show More
@@ -0,0 +1,148 b'' | |||||
|
1 | """A Task logger that presents our DB interface, | |||
|
2 | but exists entirely in memory and implemented with dicts. | |||
|
3 | ||||
|
4 | TaskRecords are dicts of the form: | |||
|
5 | { | |||
|
6 | 'msg_id' : str(uuid), | |||
|
7 | 'client_uuid' : str(uuid), | |||
|
8 | 'engine_uuid' : str(uuid) or None, | |||
|
9 | 'header' : dict(header), | |||
|
10 | 'content': dict(content), | |||
|
11 | 'buffers': list(buffers), | |||
|
12 | 'submitted': datetime, | |||
|
13 | 'started': datetime or None, | |||
|
14 | 'completed': datetime or None, | |||
|
15 | 'resubmitted': datetime or None, | |||
|
16 | 'result_header' : dict(header) or None, | |||
|
17 | 'result_content' : dict(content) or None, | |||
|
18 | 'result_buffers' : list(buffers) or None, | |||
|
19 | } | |||
|
20 | With this info, many of the special categories of tasks can be defined by query: | |||
|
21 | ||||
|
22 | pending: completed is None | |||
|
23 | client's outstanding: client_uuid = uuid && completed is None | |||
|
24 | MIA: arrived is None (and completed is None) | |||
|
25 | etc. | |||
|
26 | ||||
|
27 | EngineRecords are dicts of the form: | |||
|
28 | { | |||
|
29 | 'eid' : int(id), | |||
|
30 | 'uuid': str(uuid) | |||
|
31 | } | |||
|
32 | This may be extended, but is currently. | |||
|
33 | ||||
|
34 | We support a subset of mongodb operators: | |||
|
35 | $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists | |||
|
36 | """ | |||
|
37 | #----------------------------------------------------------------------------- | |||
|
38 | # Copyright (C) 2010 The IPython Development Team | |||
|
39 | # | |||
|
40 | # Distributed under the terms of the BSD License. The full license is in | |||
|
41 | # the file COPYING, distributed as part of this software. | |||
|
42 | #----------------------------------------------------------------------------- | |||
|
43 | ||||
|
44 | ||||
|
45 | from datetime import datetime | |||
|
46 | ||||
|
47 | filters = { | |||
|
48 | '$eq' : lambda a,b: a==b, | |||
|
49 | '$lt' : lambda a,b: a < b, | |||
|
50 | '$gt' : lambda a,b: b > a, | |||
|
51 | '$lte': lambda a,b: a <= b, | |||
|
52 | '$gte': lambda a,b: a >= b, | |||
|
53 | '$ne' : lambda a,b: not a==b, | |||
|
54 | '$in' : lambda a,b: a in b, | |||
|
55 | '$nin': lambda a,b: a not in b, | |||
|
56 | '$all' : lambda a,b: all([ a in bb for bb in b ]), | |||
|
57 | '$mod': lambda a,b: a%b[0] == b[1], | |||
|
58 | '$exists' : lambda a,b: (b and a is not None) or (a is None and not b) | |||
|
59 | } | |||
|
60 | ||||
|
61 | ||||
|
62 | class CompositeFilter(object): | |||
|
63 | """Composite filter for matching multiple properties.""" | |||
|
64 | ||||
|
65 | def __init__(self, dikt): | |||
|
66 | self.tests = [] | |||
|
67 | self.values = [] | |||
|
68 | for key, value in dikt.iteritems(): | |||
|
69 | self.tests.append(filters[key]) | |||
|
70 | self.values.append(value) | |||
|
71 | ||||
|
72 | def __call__(self, value): | |||
|
73 | for test,check in zip(self.tests, self.values): | |||
|
74 | if not test(value, check): | |||
|
75 | return False | |||
|
76 | return True | |||
|
77 | ||||
|
78 | class DictDB(object): | |||
|
79 | """Basic in-memory dict-based object for saving Task Records. | |||
|
80 | ||||
|
81 | This is the first object to present the DB interface | |||
|
82 | for logging tasks out of memory. | |||
|
83 | ||||
|
84 | The interface is based on MongoDB, so adding a MongoDB | |||
|
85 | backend should be straightforward. | |||
|
86 | """ | |||
|
87 | _records = None | |||
|
88 | ||||
|
89 | def __init__(self): | |||
|
90 | self._records = dict() | |||
|
91 | ||||
|
92 | def _match_one(self, rec, tests): | |||
|
93 | """Check if a specific record matches tests.""" | |||
|
94 | for key,test in tests.iteritems(): | |||
|
95 | if not test(rec.get(key, None)): | |||
|
96 | return False | |||
|
97 | return True | |||
|
98 | ||||
|
99 | def _match(self, check, id_only=True): | |||
|
100 | """Find all the matches for a check dict.""" | |||
|
101 | matches = {} | |||
|
102 | tests = {} | |||
|
103 | for k,v in check.iteritems(): | |||
|
104 | if isinstance(v, dict): | |||
|
105 | tests[k] = CompositeFilter(v) | |||
|
106 | else: | |||
|
107 | tests[k] = lambda o: o==v | |||
|
108 | ||||
|
109 | for msg_id, rec in self._records.iteritems(): | |||
|
110 | if self._match_one(rec, tests): | |||
|
111 | matches[msg_id] = rec | |||
|
112 | if id_only: | |||
|
113 | return matches.keys() | |||
|
114 | else: | |||
|
115 | return matches | |||
|
116 | ||||
|
117 | ||||
|
118 | def add_record(self, msg_id, rec): | |||
|
119 | """Add a new Task Record, by msg_id.""" | |||
|
120 | if self._records.has_key(msg_id): | |||
|
121 | raise KeyError("Already have msg_id %r"%(msg_id)) | |||
|
122 | self._records[msg_id] = rec | |||
|
123 | ||||
|
124 | def get_record(self, msg_id): | |||
|
125 | """Get a specific Task Record, by msg_id.""" | |||
|
126 | if not self._records.has_key(msg_id): | |||
|
127 | raise KeyError("No such msg_id %r"%(msg_id)) | |||
|
128 | return self._records[msg_id] | |||
|
129 | ||||
|
130 | def update_record(self, msg_id, rec): | |||
|
131 | """Update the data in an existing record.""" | |||
|
132 | self._records[msg_id].update(rec) | |||
|
133 | ||||
|
134 | def drop_matching_records(self, check): | |||
|
135 | """Remove a record from the DB.""" | |||
|
136 | matches = self._match(check, id_only=True) | |||
|
137 | for m in matches: | |||
|
138 | del self._records[m] | |||
|
139 | ||||
|
140 | def drop_record(self, msg_id): | |||
|
141 | """Remove a record from the DB.""" | |||
|
142 | del self._records[msg_id] | |||
|
143 | ||||
|
144 | ||||
|
145 | def find_records(self, check, id_only=False): | |||
|
146 | """Find records matching a query dict.""" | |||
|
147 | matches = self._match(check, id_only) | |||
|
148 | return matches No newline at end of file |
@@ -0,0 +1,56 b'' | |||||
|
1 | """A TaskRecord backend using mongodb""" | |||
|
2 | #----------------------------------------------------------------------------- | |||
|
3 | # Copyright (C) 2010 The IPython Development Team | |||
|
4 | # | |||
|
5 | # Distributed under the terms of the BSD License. The full license is in | |||
|
6 | # the file COPYING, distributed as part of this software. | |||
|
7 | #----------------------------------------------------------------------------- | |||
|
8 | ||||
|
9 | from datetime import datetime | |||
|
10 | ||||
|
11 | from pymongo import Connection | |||
|
12 | ||||
|
13 | #---------------------- | |||
|
14 | # MongoDB class | |||
|
15 | #---------------------- | |||
|
16 | class MongoDB(object): | |||
|
17 | """MongoDB TaskRecord backend.""" | |||
|
18 | def __init__(self, session_uuid, *args, **kwargs): | |||
|
19 | self._connection = Connection(*args, **kwargs) | |||
|
20 | self._db = self._connection[session_uuid] | |||
|
21 | self._records = self._db['task_records'] | |||
|
22 | self._table = {} | |||
|
23 | ||||
|
24 | ||||
|
25 | def add_record(self, msg_id, rec): | |||
|
26 | """Add a new Task Record, by msg_id.""" | |||
|
27 | # print rec | |||
|
28 | obj_id = self._records.insert(rec) | |||
|
29 | self._table[msg_id] = obj_id | |||
|
30 | ||||
|
31 | def get_record(self, msg_id): | |||
|
32 | """Get a specific Task Record, by msg_id.""" | |||
|
33 | return self._records.find_one(self._table[msg_id]) | |||
|
34 | ||||
|
35 | def update_record(self, msg_id, rec): | |||
|
36 | """Update the data in an existing record.""" | |||
|
37 | obj_id = self._table[msg_id] | |||
|
38 | self._records.update({'_id':obj_id}, rec) | |||
|
39 | ||||
|
40 | def drop_matching_records(self, check): | |||
|
41 | """Remove a record from the DB.""" | |||
|
42 | self._records.remove(check) | |||
|
43 | ||||
|
44 | def drop_record(self, msg_id): | |||
|
45 | """Remove a record from the DB.""" | |||
|
46 | obj_id = self._table.pop(msg_id) | |||
|
47 | self._records.remove(obj_id) | |||
|
48 | ||||
|
49 | def find_records(self, check, id_only=False): | |||
|
50 | """Find records matching a query dict.""" | |||
|
51 | matches = list(self._records.find(check)) | |||
|
52 | if id_only: | |||
|
53 | matches = [ rec['msg_id'] for rec in matches ] | |||
|
54 | return matches | |||
|
55 | ||||
|
56 |
@@ -27,10 +27,17 b' import uuid' | |||||
27 | from IPython.zmq.log import logger # a Logger object |
|
27 | from IPython.zmq.log import logger # a Logger object | |
28 | from IPython.zmq.entry_point import bind_port |
|
28 | from IPython.zmq.entry_point import bind_port | |
29 |
|
29 | |||
30 | from streamsession import Message, wrap_exception |
|
30 | from streamsession import Message, wrap_exception, ISO8601 | |
31 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, |
|
31 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, | |
32 | connect_logger, parse_url, signal_children, generate_exec_key) |
|
32 | connect_logger, parse_url, signal_children, generate_exec_key) | |
33 |
|
33 | from dictdb import DictDB | ||
|
34 | try: | |||
|
35 | from pymongo.binary import Binary | |||
|
36 | except ImportError: | |||
|
37 | MongoDB=None | |||
|
38 | else: | |||
|
39 | from mongodb import MongoDB | |||
|
40 | ||||
34 | #----------------------------------------------------------------------------- |
|
41 | #----------------------------------------------------------------------------- | |
35 | # Code |
|
42 | # Code | |
36 | #----------------------------------------------------------------------------- |
|
43 | #----------------------------------------------------------------------------- | |
@@ -65,6 +72,27 b' class ReverseDict(dict):' | |||||
65 | return value |
|
72 | return value | |
66 |
|
73 | |||
67 |
|
74 | |||
|
75 | def init_record(msg): | |||
|
76 | """return an empty TaskRecord dict, with all keys initialized with None.""" | |||
|
77 | header = msg['header'] | |||
|
78 | return { | |||
|
79 | 'msg_id' : header['msg_id'], | |||
|
80 | 'header' : header, | |||
|
81 | 'content': msg['content'], | |||
|
82 | 'buffers': msg['buffers'], | |||
|
83 | 'submitted': datetime.strptime(header['date'], ISO8601), | |||
|
84 | 'client_uuid' : None, | |||
|
85 | 'engine_uuid' : None, | |||
|
86 | 'started': None, | |||
|
87 | 'completed': None, | |||
|
88 | 'resubmitted': None, | |||
|
89 | 'result_header' : None, | |||
|
90 | 'result_content' : None, | |||
|
91 | 'result_buffers' : None, | |||
|
92 | 'queue' : None | |||
|
93 | } | |||
|
94 | ||||
|
95 | ||||
68 | class EngineConnector(object): |
|
96 | class EngineConnector(object): | |
69 | """A simple object for accessing the various zmq connections of an object. |
|
97 | """A simple object for accessing the various zmq connections of an object. | |
70 | Attributes are: |
|
98 | Attributes are: | |
@@ -159,7 +187,7 b' class Controller(object):' | |||||
159 | self.by_ident = {} |
|
187 | self.by_ident = {} | |
160 | self.clients = {} |
|
188 | self.clients = {} | |
161 | self.hearts = {} |
|
189 | self.hearts = {} | |
162 | self.mia = set() |
|
190 | # self.mia = set() | |
163 |
|
191 | |||
164 | # self.sockets = {} |
|
192 | # self.sockets = {} | |
165 | self.loop = loop |
|
193 | self.loop = loop | |
@@ -210,14 +238,14 b' class Controller(object):' | |||||
210 | 'unregistration_request' : self.unregister_engine, |
|
238 | 'unregistration_request' : self.unregister_engine, | |
211 | 'connection_request': self.connection_request, |
|
239 | 'connection_request': self.connection_request, | |
212 | } |
|
240 | } | |
213 | # |
|
241 | self.registration_timeout = max(5000, 2*self.heartbeat.period) | |
214 | # this is the stuff that will move to DB: |
|
242 | # this is the stuff that will move to DB: | |
215 | self.results = {} # completed results |
|
243 | # self.results = {} # completed results | |
216 |
self.pending = |
|
244 | self.pending = set() # pending messages, keyed by msg_id | |
217 | self.queues = {} # pending msg_ids keyed by engine_id |
|
245 | self.queues = {} # pending msg_ids keyed by engine_id | |
218 | self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id |
|
246 | self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id | |
219 | self.completed = {} # completed msg_ids keyed by engine_id |
|
247 | self.completed = {} # completed msg_ids keyed by engine_id | |
220 | self.registration_timeout = max(5000, 2*self.heartbeat.period) |
|
248 | self.all_completed = set() | |
221 |
|
249 | |||
222 | logger.info("controller::created controller") |
|
250 | logger.info("controller::created controller") | |
223 |
|
251 | |||
@@ -322,6 +350,7 b' class Controller(object):' | |||||
322 | if not idents: |
|
350 | if not idents: | |
323 | logger.error("Bad Client Message: %s"%msg) |
|
351 | logger.error("Bad Client Message: %s"%msg) | |
324 | return |
|
352 | return | |
|
353 | client_id = idents[0] | |||
325 | try: |
|
354 | try: | |
326 | msg = self.session.unpack_message(msg, content=True) |
|
355 | msg = self.session.unpack_message(msg, content=True) | |
327 | except: |
|
356 | except: | |
@@ -401,11 +430,15 b' class Controller(object):' | |||||
401 |
|
430 | |||
402 | header = msg['header'] |
|
431 | header = msg['header'] | |
403 | msg_id = header['msg_id'] |
|
432 | msg_id = header['msg_id'] | |
404 | info = dict(submit=datetime.now(), |
|
433 | record = init_record(msg) | |
405 | received=None, |
|
434 | record['engine_uuid'] = queue_id | |
406 | engine=(eid, queue_id)) |
|
435 | record['client_uuid'] = client_id | |
407 | self.pending[msg_id] = ( msg, info ) |
|
436 | record['queue'] = 'mux' | |
|
437 | if MongoDB is not None and isinstance(self.db, MongoDB): | |||
|
438 | record['buffers'] = map(Binary, record['buffers']) | |||
|
439 | self.pending.add(msg_id) | |||
408 | self.queues[eid].append(msg_id) |
|
440 | self.queues[eid].append(msg_id) | |
|
441 | self.db.add_record(msg_id, record) | |||
409 |
|
442 | |||
410 | def save_queue_result(self, idents, msg): |
|
443 | def save_queue_result(self, idents, msg): | |
411 | if len(idents) < 2: |
|
444 | if len(idents) < 2: | |
@@ -430,11 +463,25 b' class Controller(object):' | |||||
430 | if not parent: |
|
463 | if not parent: | |
431 | return |
|
464 | return | |
432 | msg_id = parent['msg_id'] |
|
465 | msg_id = parent['msg_id'] | |
433 | self.results[msg_id] = msg |
|
|||
434 | if msg_id in self.pending: |
|
466 | if msg_id in self.pending: | |
435 |
self.pending. |
|
467 | self.pending.remove(msg_id) | |
|
468 | self.all_completed.add(msg_id) | |||
436 | self.queues[eid].remove(msg_id) |
|
469 | self.queues[eid].remove(msg_id) | |
437 | self.completed[eid].append(msg_id) |
|
470 | self.completed[eid].append(msg_id) | |
|
471 | rheader = msg['header'] | |||
|
472 | completed = datetime.strptime(rheader['date'], ISO8601) | |||
|
473 | started = rheader.get('started', None) | |||
|
474 | if started is not None: | |||
|
475 | started = datetime.strptime(started, ISO8601) | |||
|
476 | result = { | |||
|
477 | 'result_header' : rheader, | |||
|
478 | 'result_content': msg['content'], | |||
|
479 | 'started' : started, | |||
|
480 | 'completed' : completed | |||
|
481 | } | |||
|
482 | if MongoDB is not None and isinstance(self.db, MongoDB): | |||
|
483 | result['result_buffers'] = map(Binary, msg['buffers']) | |||
|
484 | self.db.update_record(msg_id, result) | |||
438 | else: |
|
485 | else: | |
439 | logger.debug("queue:: unknown msg finished %s"%msg_id) |
|
486 | logger.debug("queue:: unknown msg finished %s"%msg_id) | |
440 |
|
487 | |||
@@ -445,28 +492,26 b' class Controller(object):' | |||||
445 | client_id = idents[0] |
|
492 | client_id = idents[0] | |
446 |
|
493 | |||
447 | try: |
|
494 | try: | |
448 | msg = self.session.unpack_message(msg, content=False) |
|
495 | msg = self.session.unpack_message(msg, content=False, copy=False) | |
449 | except: |
|
496 | except: | |
450 | logger.error("task::client %r sent invalid task message: %s"%( |
|
497 | logger.error("task::client %r sent invalid task message: %s"%( | |
451 | client_id, msg), exc_info=True) |
|
498 | client_id, msg), exc_info=True) | |
452 | return |
|
499 | return | |
453 |
|
500 | rec = init_record(msg) | ||
|
501 | if MongoDB is not None and isinstance(self.db, MongoDB): | |||
|
502 | record['buffers'] = map(Binary, record['buffers']) | |||
|
503 | rec['client_uuid'] = client_id | |||
|
504 | rec['queue'] = 'task' | |||
454 | header = msg['header'] |
|
505 | header = msg['header'] | |
455 | msg_id = header['msg_id'] |
|
506 | msg_id = header['msg_id'] | |
456 |
self. |
|
507 | self.pending.add(msg_id) | |
457 | info = dict(submit=datetime.now(), |
|
508 | self.db.add_record(msg_id, rec) | |
458 | received=None, |
|
|||
459 | engine=None) |
|
|||
460 | self.pending[msg_id] = (msg, info) |
|
|||
461 | if not self.tasks.has_key(client_id): |
|
|||
462 | self.tasks[client_id] = [] |
|
|||
463 | self.tasks[client_id].append(msg_id) |
|
|||
464 |
|
509 | |||
465 | def save_task_result(self, idents, msg): |
|
510 | def save_task_result(self, idents, msg): | |
466 | """save the result of a completed task.""" |
|
511 | """save the result of a completed task.""" | |
467 | client_id = idents[0] |
|
512 | client_id = idents[0] | |
468 | try: |
|
513 | try: | |
469 | msg = self.session.unpack_message(msg, content=False) |
|
514 | msg = self.session.unpack_message(msg, content=False, copy=False) | |
470 | except: |
|
515 | except: | |
471 | logger.error("task::invalid task result message send to %r: %s"%( |
|
516 | logger.error("task::invalid task result message send to %r: %s"%( | |
472 | client_id, msg)) |
|
517 | client_id, msg)) | |
@@ -478,19 +523,33 b' class Controller(object):' | |||||
478 | logger.warn("Task %r had no parent!"%msg) |
|
523 | logger.warn("Task %r had no parent!"%msg) | |
479 | return |
|
524 | return | |
480 | msg_id = parent['msg_id'] |
|
525 | msg_id = parent['msg_id'] | |
481 | self.results[msg_id] = msg |
|
|||
482 |
|
526 | |||
483 | header = msg['header'] |
|
527 | header = msg['header'] | |
484 | engine_uuid = header.get('engine', None) |
|
528 | engine_uuid = header.get('engine', None) | |
485 | eid = self.by_ident.get(engine_uuid, None) |
|
529 | eid = self.by_ident.get(engine_uuid, None) | |
486 |
|
530 | |||
487 | if msg_id in self.pending: |
|
531 | if msg_id in self.pending: | |
488 |
self.pending. |
|
532 | self.pending.remove(msg_id) | |
489 | if msg_id in self.mia: |
|
533 | self.all_completed.add(msg_id) | |
490 | self.mia.remove(msg_id) |
|
534 | if eid is not None: | |
491 | if eid is not None and msg_id in self.tasks[eid]: |
|
|||
492 | self.completed[eid].append(msg_id) |
|
535 | self.completed[eid].append(msg_id) | |
493 |
self.tasks[eid] |
|
536 | if msg_id in self.tasks[eid]: | |
|
537 | self.tasks[eid].remove(msg_id) | |||
|
538 | completed = datetime.strptime(header['date'], ISO8601) | |||
|
539 | started = header.get('started', None) | |||
|
540 | if started is not None: | |||
|
541 | started = datetime.strptime(started, ISO8601) | |||
|
542 | result = { | |||
|
543 | 'result_header' : header, | |||
|
544 | 'result_content': msg['content'], | |||
|
545 | 'started' : started, | |||
|
546 | 'completed' : completed, | |||
|
547 | 'engine_uuid': engine_uuid | |||
|
548 | } | |||
|
549 | if MongoDB is not None and isinstance(self.db, MongoDB): | |||
|
550 | result['result_buffers'] = map(Binary, msg['buffers']) | |||
|
551 | self.db.update_record(msg_id, result) | |||
|
552 | ||||
494 | else: |
|
553 | else: | |
495 | logger.debug("task::unknown task %s finished"%msg_id) |
|
554 | logger.debug("task::unknown task %s finished"%msg_id) | |
496 |
|
555 | |||
@@ -507,18 +566,20 b' class Controller(object):' | |||||
507 | eid = self.by_ident[engine_uuid] |
|
566 | eid = self.by_ident[engine_uuid] | |
508 |
|
567 | |||
509 | logger.info("task::task %s arrived on %s"%(msg_id, eid)) |
|
568 | logger.info("task::task %s arrived on %s"%(msg_id, eid)) | |
510 | if msg_id in self.mia: |
|
569 | # if msg_id in self.mia: | |
511 | self.mia.remove(msg_id) |
|
570 | # self.mia.remove(msg_id) | |
512 | else: |
|
571 | # else: | |
513 | logger.debug("task::task %s not listed as MIA?!"%(msg_id)) |
|
572 | # logger.debug("task::task %s not listed as MIA?!"%(msg_id)) | |
514 |
|
573 | |||
515 | self.tasks[eid].append(msg_id) |
|
574 | self.tasks[eid].append(msg_id) | |
516 | self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid)) |
|
575 | # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid)) | |
|
576 | self.db.update_record(msg_id, dict(engine_uuid=engine_uuid)) | |||
517 |
|
577 | |||
518 | def mia_task_request(self, idents, msg): |
|
578 | def mia_task_request(self, idents, msg): | |
|
579 | raise NotImplementedError | |||
519 | client_id = idents[0] |
|
580 | client_id = idents[0] | |
520 | content = dict(mia=self.mia,status='ok') |
|
581 | # content = dict(mia=self.mia,status='ok') | |
521 | self.session.send('mia_reply', content=content, idents=client_id) |
|
582 | # self.session.send('mia_reply', content=content, idents=client_id) | |
522 |
|
583 | |||
523 |
|
584 | |||
524 |
|
585 | |||
@@ -614,7 +675,7 b' class Controller(object):' | |||||
614 | self.by_ident.pop(ec.queue) |
|
675 | self.by_ident.pop(ec.queue) | |
615 | self.completed.pop(eid) |
|
676 | self.completed.pop(eid) | |
616 | for msg_id in self.queues.pop(eid): |
|
677 | for msg_id in self.queues.pop(eid): | |
617 |
msg = self.pending. |
|
678 | msg = self.pending.remove(msg_id) | |
618 | ############## TODO: HANDLE IT ################ |
|
679 | ############## TODO: HANDLE IT ################ | |
619 |
|
680 | |||
620 | if self.notifier: |
|
681 | if self.notifier: | |
@@ -710,11 +771,11 b' class Controller(object):' | |||||
710 | msg_ids = content.get('msg_ids', []) |
|
771 | msg_ids = content.get('msg_ids', []) | |
711 | reply = dict(status='ok') |
|
772 | reply = dict(status='ok') | |
712 | if msg_ids == 'all': |
|
773 | if msg_ids == 'all': | |
713 | self.results = {} |
|
774 | self.db.drop_matching_records(dict(completed={'$ne':None})) | |
714 | else: |
|
775 | else: | |
715 | for msg_id in msg_ids: |
|
776 | for msg_id in msg_ids: | |
716 |
if msg_id in self. |
|
777 | if msg_id in self.all_completed: | |
717 |
self. |
|
778 | self.db.drop_record(msg_id) | |
718 | else: |
|
779 | else: | |
719 | if msg_id in self.pending: |
|
780 | if msg_id in self.pending: | |
720 | try: |
|
781 | try: | |
@@ -736,10 +797,10 b' class Controller(object):' | |||||
736 | reply = wrap_exception() |
|
797 | reply = wrap_exception() | |
737 | break |
|
798 | break | |
738 | msg_ids = self.completed.pop(eid) |
|
799 | msg_ids = self.completed.pop(eid) | |
739 | for msg_id in msg_ids: |
|
800 | uid = self.engines[eid].queue | |
740 | self.results.pop(msg_id) |
|
801 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) | |
741 |
|
802 | |||
742 |
self.ses |
|
803 | self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id) | |
743 |
|
804 | |||
744 | def resubmit_task(self, client_id, msg, buffers): |
|
805 | def resubmit_task(self, client_id, msg, buffers): | |
745 | """Resubmit a task.""" |
|
806 | """Resubmit a task.""" | |
@@ -755,13 +816,15 b' class Controller(object):' | |||||
755 | content = dict(status='ok') |
|
816 | content = dict(status='ok') | |
756 | content['pending'] = pending |
|
817 | content['pending'] = pending | |
757 | content['completed'] = completed |
|
818 | content['completed'] = completed | |
|
819 | if not statusonly: | |||
|
820 | records = self.db.find_records(dict(msg_id={'$in':msg_ids})) | |||
758 | for msg_id in msg_ids: |
|
821 | for msg_id in msg_ids: | |
759 | if msg_id in self.pending: |
|
822 | if msg_id in self.pending: | |
760 | pending.append(msg_id) |
|
823 | pending.append(msg_id) | |
761 |
elif msg_id in self. |
|
824 | elif msg_id in self.all_completed: | |
762 | completed.append(msg_id) |
|
825 | completed.append(msg_id) | |
763 | if not statusonly: |
|
826 | if not statusonly: | |
764 |
content[msg_id] = |
|
827 | content[msg_id] = records[msg_id]['result_content'] | |
765 | else: |
|
828 | else: | |
766 | try: |
|
829 | try: | |
767 | raise KeyError('No such message: '+msg_id) |
|
830 | raise KeyError('No such message: '+msg_id) | |
@@ -799,6 +862,8 b' def make_argument_parser():' | |||||
799 | parser.add_argument('--scheduler', type=str, default='pure', |
|
862 | parser.add_argument('--scheduler', type=str, default='pure', | |
800 | choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], |
|
863 | choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], | |
801 | help='select the task scheduler [default: pure ZMQ]') |
|
864 | help='select the task scheduler [default: pure ZMQ]') | |
|
865 | parser.add_argument('--mongodb', action='store_true', | |||
|
866 | help='Use MongoDB task storage [default: in-memory]') | |||
802 |
|
867 | |||
803 | return parser |
|
868 | return parser | |
804 |
|
869 | |||
@@ -927,6 +992,11 b' def main(argv=None):' | |||||
927 | q.start() |
|
992 | q.start() | |
928 | children.append(q) |
|
993 | children.append(q) | |
929 |
|
994 | |||
|
995 | if args.mongodb: | |||
|
996 | from mongodb import MongoDB | |||
|
997 | db = MongoDB(thesession.session) | |||
|
998 | else: | |||
|
999 | db = DictDB() | |||
930 | time.sleep(.25) |
|
1000 | time.sleep(.25) | |
931 |
|
1001 | |||
932 | # build connection dicts |
|
1002 | # build connection dicts | |
@@ -946,7 +1016,7 b' def main(argv=None):' | |||||
946 | 'notification': iface%nport |
|
1016 | 'notification': iface%nport | |
947 | } |
|
1017 | } | |
948 | signal_children(children) |
|
1018 | signal_children(children) | |
949 |
con = Controller(loop, thesession, sub, reg, hmon, c, n, |
|
1019 | con = Controller(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs) | |
950 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) |
|
1020 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) | |
951 | loop.start() |
|
1021 | loop.start() | |
952 |
|
1022 |
General Comments 0
You need to be logged in to leave comments.
Login now