##// END OF EJS Templates
Started DB backend with mongoDB support.
MinRK -
Show More
@@ -0,0 +1,148 b''
1 """A Task logger that presents our DB interface,
2 but exists entirely in memory and implemented with dicts.
3
4 TaskRecords are dicts of the form:
5 {
6 'msg_id' : str(uuid),
7 'client_uuid' : str(uuid),
8 'engine_uuid' : str(uuid) or None,
9 'header' : dict(header),
10 'content': dict(content),
11 'buffers': list(buffers),
12 'submitted': datetime,
13 'started': datetime or None,
14 'completed': datetime or None,
15 'resubmitted': datetime or None,
16 'result_header' : dict(header) or None,
17 'result_content' : dict(content) or None,
18 'result_buffers' : list(buffers) or None,
19 }
20 With this info, many of the special categories of tasks can be defined by query:
21
22 pending: completed is None
23 client's outstanding: client_uuid = uuid && completed is None
24 MIA: arrived is None (and completed is None)
25 etc.
26
27 EngineRecords are dicts of the form:
28 {
29 'eid' : int(id),
30 'uuid': str(uuid)
31 }
32 This may be extended, but is currently.
33
34 We support a subset of mongodb operators:
35 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
36 """
37 #-----------------------------------------------------------------------------
38 # Copyright (C) 2010 The IPython Development Team
39 #
40 # Distributed under the terms of the BSD License. The full license is in
41 # the file COPYING, distributed as part of this software.
42 #-----------------------------------------------------------------------------
43
44
45 from datetime import datetime
46
47 filters = {
48 '$eq' : lambda a,b: a==b,
49 '$lt' : lambda a,b: a < b,
50 '$gt' : lambda a,b: b > a,
51 '$lte': lambda a,b: a <= b,
52 '$gte': lambda a,b: a >= b,
53 '$ne' : lambda a,b: not a==b,
54 '$in' : lambda a,b: a in b,
55 '$nin': lambda a,b: a not in b,
56 '$all' : lambda a,b: all([ a in bb for bb in b ]),
57 '$mod': lambda a,b: a%b[0] == b[1],
58 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
59 }
60
61
62 class CompositeFilter(object):
63 """Composite filter for matching multiple properties."""
64
65 def __init__(self, dikt):
66 self.tests = []
67 self.values = []
68 for key, value in dikt.iteritems():
69 self.tests.append(filters[key])
70 self.values.append(value)
71
72 def __call__(self, value):
73 for test,check in zip(self.tests, self.values):
74 if not test(value, check):
75 return False
76 return True
77
78 class DictDB(object):
79 """Basic in-memory dict-based object for saving Task Records.
80
81 This is the first object to present the DB interface
82 for logging tasks out of memory.
83
84 The interface is based on MongoDB, so adding a MongoDB
85 backend should be straightforward.
86 """
87 _records = None
88
89 def __init__(self):
90 self._records = dict()
91
92 def _match_one(self, rec, tests):
93 """Check if a specific record matches tests."""
94 for key,test in tests.iteritems():
95 if not test(rec.get(key, None)):
96 return False
97 return True
98
99 def _match(self, check, id_only=True):
100 """Find all the matches for a check dict."""
101 matches = {}
102 tests = {}
103 for k,v in check.iteritems():
104 if isinstance(v, dict):
105 tests[k] = CompositeFilter(v)
106 else:
107 tests[k] = lambda o: o==v
108
109 for msg_id, rec in self._records.iteritems():
110 if self._match_one(rec, tests):
111 matches[msg_id] = rec
112 if id_only:
113 return matches.keys()
114 else:
115 return matches
116
117
118 def add_record(self, msg_id, rec):
119 """Add a new Task Record, by msg_id."""
120 if self._records.has_key(msg_id):
121 raise KeyError("Already have msg_id %r"%(msg_id))
122 self._records[msg_id] = rec
123
124 def get_record(self, msg_id):
125 """Get a specific Task Record, by msg_id."""
126 if not self._records.has_key(msg_id):
127 raise KeyError("No such msg_id %r"%(msg_id))
128 return self._records[msg_id]
129
130 def update_record(self, msg_id, rec):
131 """Update the data in an existing record."""
132 self._records[msg_id].update(rec)
133
134 def drop_matching_records(self, check):
135 """Remove a record from the DB."""
136 matches = self._match(check, id_only=True)
137 for m in matches:
138 del self._records[m]
139
140 def drop_record(self, msg_id):
141 """Remove a record from the DB."""
142 del self._records[msg_id]
143
144
145 def find_records(self, check, id_only=False):
146 """Find records matching a query dict."""
147 matches = self._match(check, id_only)
148 return matches No newline at end of file
@@ -0,0 +1,56 b''
1 """A TaskRecord backend using mongodb"""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
8
9 from datetime import datetime
10
11 from pymongo import Connection
12
13 #----------------------
14 # MongoDB class
15 #----------------------
16 class MongoDB(object):
17 """MongoDB TaskRecord backend."""
18 def __init__(self, session_uuid, *args, **kwargs):
19 self._connection = Connection(*args, **kwargs)
20 self._db = self._connection[session_uuid]
21 self._records = self._db['task_records']
22 self._table = {}
23
24
25 def add_record(self, msg_id, rec):
26 """Add a new Task Record, by msg_id."""
27 # print rec
28 obj_id = self._records.insert(rec)
29 self._table[msg_id] = obj_id
30
31 def get_record(self, msg_id):
32 """Get a specific Task Record, by msg_id."""
33 return self._records.find_one(self._table[msg_id])
34
35 def update_record(self, msg_id, rec):
36 """Update the data in an existing record."""
37 obj_id = self._table[msg_id]
38 self._records.update({'_id':obj_id}, rec)
39
40 def drop_matching_records(self, check):
41 """Remove a record from the DB."""
42 self._records.remove(check)
43
44 def drop_record(self, msg_id):
45 """Remove a record from the DB."""
46 obj_id = self._table.pop(msg_id)
47 self._records.remove(obj_id)
48
49 def find_records(self, check, id_only=False):
50 """Find records matching a query dict."""
51 matches = list(self._records.find(check))
52 if id_only:
53 matches = [ rec['msg_id'] for rec in matches ]
54 return matches
55
56
@@ -27,10 +27,17 b' import uuid'
27 from IPython.zmq.log import logger # a Logger object
27 from IPython.zmq.log import logger # a Logger object
28 from IPython.zmq.entry_point import bind_port
28 from IPython.zmq.entry_point import bind_port
29
29
30 from streamsession import Message, wrap_exception
30 from streamsession import Message, wrap_exception, ISO8601
31 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
31 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
32 connect_logger, parse_url, signal_children, generate_exec_key)
32 connect_logger, parse_url, signal_children, generate_exec_key)
33
33 from dictdb import DictDB
34 try:
35 from pymongo.binary import Binary
36 except ImportError:
37 MongoDB=None
38 else:
39 from mongodb import MongoDB
40
34 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
35 # Code
42 # Code
36 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
@@ -65,6 +72,27 b' class ReverseDict(dict):'
65 return value
72 return value
66
73
67
74
75 def init_record(msg):
76 """return an empty TaskRecord dict, with all keys initialized with None."""
77 header = msg['header']
78 return {
79 'msg_id' : header['msg_id'],
80 'header' : header,
81 'content': msg['content'],
82 'buffers': msg['buffers'],
83 'submitted': datetime.strptime(header['date'], ISO8601),
84 'client_uuid' : None,
85 'engine_uuid' : None,
86 'started': None,
87 'completed': None,
88 'resubmitted': None,
89 'result_header' : None,
90 'result_content' : None,
91 'result_buffers' : None,
92 'queue' : None
93 }
94
95
68 class EngineConnector(object):
96 class EngineConnector(object):
69 """A simple object for accessing the various zmq connections of an object.
97 """A simple object for accessing the various zmq connections of an object.
70 Attributes are:
98 Attributes are:
@@ -159,7 +187,7 b' class Controller(object):'
159 self.by_ident = {}
187 self.by_ident = {}
160 self.clients = {}
188 self.clients = {}
161 self.hearts = {}
189 self.hearts = {}
162 self.mia = set()
190 # self.mia = set()
163
191
164 # self.sockets = {}
192 # self.sockets = {}
165 self.loop = loop
193 self.loop = loop
@@ -210,14 +238,14 b' class Controller(object):'
210 'unregistration_request' : self.unregister_engine,
238 'unregistration_request' : self.unregister_engine,
211 'connection_request': self.connection_request,
239 'connection_request': self.connection_request,
212 }
240 }
213 #
241 self.registration_timeout = max(5000, 2*self.heartbeat.period)
214 # this is the stuff that will move to DB:
242 # this is the stuff that will move to DB:
215 self.results = {} # completed results
243 # self.results = {} # completed results
216 self.pending = {} # pending messages, keyed by msg_id
244 self.pending = set() # pending messages, keyed by msg_id
217 self.queues = {} # pending msg_ids keyed by engine_id
245 self.queues = {} # pending msg_ids keyed by engine_id
218 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
246 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
219 self.completed = {} # completed msg_ids keyed by engine_id
247 self.completed = {} # completed msg_ids keyed by engine_id
220 self.registration_timeout = max(5000, 2*self.heartbeat.period)
248 self.all_completed = set()
221
249
222 logger.info("controller::created controller")
250 logger.info("controller::created controller")
223
251
@@ -322,6 +350,7 b' class Controller(object):'
322 if not idents:
350 if not idents:
323 logger.error("Bad Client Message: %s"%msg)
351 logger.error("Bad Client Message: %s"%msg)
324 return
352 return
353 client_id = idents[0]
325 try:
354 try:
326 msg = self.session.unpack_message(msg, content=True)
355 msg = self.session.unpack_message(msg, content=True)
327 except:
356 except:
@@ -401,11 +430,15 b' class Controller(object):'
401
430
402 header = msg['header']
431 header = msg['header']
403 msg_id = header['msg_id']
432 msg_id = header['msg_id']
404 info = dict(submit=datetime.now(),
433 record = init_record(msg)
405 received=None,
434 record['engine_uuid'] = queue_id
406 engine=(eid, queue_id))
435 record['client_uuid'] = client_id
407 self.pending[msg_id] = ( msg, info )
436 record['queue'] = 'mux'
437 if MongoDB is not None and isinstance(self.db, MongoDB):
438 record['buffers'] = map(Binary, record['buffers'])
439 self.pending.add(msg_id)
408 self.queues[eid].append(msg_id)
440 self.queues[eid].append(msg_id)
441 self.db.add_record(msg_id, record)
409
442
410 def save_queue_result(self, idents, msg):
443 def save_queue_result(self, idents, msg):
411 if len(idents) < 2:
444 if len(idents) < 2:
@@ -430,11 +463,25 b' class Controller(object):'
430 if not parent:
463 if not parent:
431 return
464 return
432 msg_id = parent['msg_id']
465 msg_id = parent['msg_id']
433 self.results[msg_id] = msg
434 if msg_id in self.pending:
466 if msg_id in self.pending:
435 self.pending.pop(msg_id)
467 self.pending.remove(msg_id)
468 self.all_completed.add(msg_id)
436 self.queues[eid].remove(msg_id)
469 self.queues[eid].remove(msg_id)
437 self.completed[eid].append(msg_id)
470 self.completed[eid].append(msg_id)
471 rheader = msg['header']
472 completed = datetime.strptime(rheader['date'], ISO8601)
473 started = rheader.get('started', None)
474 if started is not None:
475 started = datetime.strptime(started, ISO8601)
476 result = {
477 'result_header' : rheader,
478 'result_content': msg['content'],
479 'started' : started,
480 'completed' : completed
481 }
482 if MongoDB is not None and isinstance(self.db, MongoDB):
483 result['result_buffers'] = map(Binary, msg['buffers'])
484 self.db.update_record(msg_id, result)
438 else:
485 else:
439 logger.debug("queue:: unknown msg finished %s"%msg_id)
486 logger.debug("queue:: unknown msg finished %s"%msg_id)
440
487
@@ -445,28 +492,26 b' class Controller(object):'
445 client_id = idents[0]
492 client_id = idents[0]
446
493
447 try:
494 try:
448 msg = self.session.unpack_message(msg, content=False)
495 msg = self.session.unpack_message(msg, content=False, copy=False)
449 except:
496 except:
450 logger.error("task::client %r sent invalid task message: %s"%(
497 logger.error("task::client %r sent invalid task message: %s"%(
451 client_id, msg), exc_info=True)
498 client_id, msg), exc_info=True)
452 return
499 return
453
500 rec = init_record(msg)
501 if MongoDB is not None and isinstance(self.db, MongoDB):
502 record['buffers'] = map(Binary, record['buffers'])
503 rec['client_uuid'] = client_id
504 rec['queue'] = 'task'
454 header = msg['header']
505 header = msg['header']
455 msg_id = header['msg_id']
506 msg_id = header['msg_id']
456 self.mia.add(msg_id)
507 self.pending.add(msg_id)
457 info = dict(submit=datetime.now(),
508 self.db.add_record(msg_id, rec)
458 received=None,
459 engine=None)
460 self.pending[msg_id] = (msg, info)
461 if not self.tasks.has_key(client_id):
462 self.tasks[client_id] = []
463 self.tasks[client_id].append(msg_id)
464
509
465 def save_task_result(self, idents, msg):
510 def save_task_result(self, idents, msg):
466 """save the result of a completed task."""
511 """save the result of a completed task."""
467 client_id = idents[0]
512 client_id = idents[0]
468 try:
513 try:
469 msg = self.session.unpack_message(msg, content=False)
514 msg = self.session.unpack_message(msg, content=False, copy=False)
470 except:
515 except:
471 logger.error("task::invalid task result message send to %r: %s"%(
516 logger.error("task::invalid task result message send to %r: %s"%(
472 client_id, msg))
517 client_id, msg))
@@ -478,19 +523,33 b' class Controller(object):'
478 logger.warn("Task %r had no parent!"%msg)
523 logger.warn("Task %r had no parent!"%msg)
479 return
524 return
480 msg_id = parent['msg_id']
525 msg_id = parent['msg_id']
481 self.results[msg_id] = msg
482
526
483 header = msg['header']
527 header = msg['header']
484 engine_uuid = header.get('engine', None)
528 engine_uuid = header.get('engine', None)
485 eid = self.by_ident.get(engine_uuid, None)
529 eid = self.by_ident.get(engine_uuid, None)
486
530
487 if msg_id in self.pending:
531 if msg_id in self.pending:
488 self.pending.pop(msg_id)
532 self.pending.remove(msg_id)
489 if msg_id in self.mia:
533 self.all_completed.add(msg_id)
490 self.mia.remove(msg_id)
534 if eid is not None:
491 if eid is not None and msg_id in self.tasks[eid]:
492 self.completed[eid].append(msg_id)
535 self.completed[eid].append(msg_id)
493 self.tasks[eid].remove(msg_id)
536 if msg_id in self.tasks[eid]:
537 self.tasks[eid].remove(msg_id)
538 completed = datetime.strptime(header['date'], ISO8601)
539 started = header.get('started', None)
540 if started is not None:
541 started = datetime.strptime(started, ISO8601)
542 result = {
543 'result_header' : header,
544 'result_content': msg['content'],
545 'started' : started,
546 'completed' : completed,
547 'engine_uuid': engine_uuid
548 }
549 if MongoDB is not None and isinstance(self.db, MongoDB):
550 result['result_buffers'] = map(Binary, msg['buffers'])
551 self.db.update_record(msg_id, result)
552
494 else:
553 else:
495 logger.debug("task::unknown task %s finished"%msg_id)
554 logger.debug("task::unknown task %s finished"%msg_id)
496
555
@@ -507,18 +566,20 b' class Controller(object):'
507 eid = self.by_ident[engine_uuid]
566 eid = self.by_ident[engine_uuid]
508
567
509 logger.info("task::task %s arrived on %s"%(msg_id, eid))
568 logger.info("task::task %s arrived on %s"%(msg_id, eid))
510 if msg_id in self.mia:
569 # if msg_id in self.mia:
511 self.mia.remove(msg_id)
570 # self.mia.remove(msg_id)
512 else:
571 # else:
513 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
572 # logger.debug("task::task %s not listed as MIA?!"%(msg_id))
514
573
515 self.tasks[eid].append(msg_id)
574 self.tasks[eid].append(msg_id)
516 self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
575 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
576 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
517
577
518 def mia_task_request(self, idents, msg):
578 def mia_task_request(self, idents, msg):
579 raise NotImplementedError
519 client_id = idents[0]
580 client_id = idents[0]
520 content = dict(mia=self.mia,status='ok')
581 # content = dict(mia=self.mia,status='ok')
521 self.session.send('mia_reply', content=content, idents=client_id)
582 # self.session.send('mia_reply', content=content, idents=client_id)
522
583
523
584
524
585
@@ -614,7 +675,7 b' class Controller(object):'
614 self.by_ident.pop(ec.queue)
675 self.by_ident.pop(ec.queue)
615 self.completed.pop(eid)
676 self.completed.pop(eid)
616 for msg_id in self.queues.pop(eid):
677 for msg_id in self.queues.pop(eid):
617 msg = self.pending.pop(msg_id)
678 msg = self.pending.remove(msg_id)
618 ############## TODO: HANDLE IT ################
679 ############## TODO: HANDLE IT ################
619
680
620 if self.notifier:
681 if self.notifier:
@@ -710,11 +771,11 b' class Controller(object):'
710 msg_ids = content.get('msg_ids', [])
771 msg_ids = content.get('msg_ids', [])
711 reply = dict(status='ok')
772 reply = dict(status='ok')
712 if msg_ids == 'all':
773 if msg_ids == 'all':
713 self.results = {}
774 self.db.drop_matching_records(dict(completed={'$ne':None}))
714 else:
775 else:
715 for msg_id in msg_ids:
776 for msg_id in msg_ids:
716 if msg_id in self.results:
777 if msg_id in self.all_completed:
717 self.results.pop(msg_id)
778 self.db.drop_record(msg_id)
718 else:
779 else:
719 if msg_id in self.pending:
780 if msg_id in self.pending:
720 try:
781 try:
@@ -736,10 +797,10 b' class Controller(object):'
736 reply = wrap_exception()
797 reply = wrap_exception()
737 break
798 break
738 msg_ids = self.completed.pop(eid)
799 msg_ids = self.completed.pop(eid)
739 for msg_id in msg_ids:
800 uid = self.engines[eid].queue
740 self.results.pop(msg_id)
801 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
741
802
742 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
803 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
743
804
744 def resubmit_task(self, client_id, msg, buffers):
805 def resubmit_task(self, client_id, msg, buffers):
745 """Resubmit a task."""
806 """Resubmit a task."""
@@ -755,13 +816,15 b' class Controller(object):'
755 content = dict(status='ok')
816 content = dict(status='ok')
756 content['pending'] = pending
817 content['pending'] = pending
757 content['completed'] = completed
818 content['completed'] = completed
819 if not statusonly:
820 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
758 for msg_id in msg_ids:
821 for msg_id in msg_ids:
759 if msg_id in self.pending:
822 if msg_id in self.pending:
760 pending.append(msg_id)
823 pending.append(msg_id)
761 elif msg_id in self.results:
824 elif msg_id in self.all_completed:
762 completed.append(msg_id)
825 completed.append(msg_id)
763 if not statusonly:
826 if not statusonly:
764 content[msg_id] = self.results[msg_id]['content']
827 content[msg_id] = records[msg_id]['result_content']
765 else:
828 else:
766 try:
829 try:
767 raise KeyError('No such message: '+msg_id)
830 raise KeyError('No such message: '+msg_id)
@@ -799,6 +862,8 b' def make_argument_parser():'
799 parser.add_argument('--scheduler', type=str, default='pure',
862 parser.add_argument('--scheduler', type=str, default='pure',
800 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
863 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
801 help='select the task scheduler [default: pure ZMQ]')
864 help='select the task scheduler [default: pure ZMQ]')
865 parser.add_argument('--mongodb', action='store_true',
866 help='Use MongoDB task storage [default: in-memory]')
802
867
803 return parser
868 return parser
804
869
@@ -927,6 +992,11 b' def main(argv=None):'
927 q.start()
992 q.start()
928 children.append(q)
993 children.append(q)
929
994
995 if args.mongodb:
996 from mongodb import MongoDB
997 db = MongoDB(thesession.session)
998 else:
999 db = DictDB()
930 time.sleep(.25)
1000 time.sleep(.25)
931
1001
932 # build connection dicts
1002 # build connection dicts
@@ -946,7 +1016,7 b' def main(argv=None):'
946 'notification': iface%nport
1016 'notification': iface%nport
947 }
1017 }
948 signal_children(children)
1018 signal_children(children)
949 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
1019 con = Controller(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
950 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
1020 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
951 loop.start()
1021 loop.start()
952
1022
General Comments 0
You need to be logged in to leave comments. Login now