##// END OF EJS Templates
further fix/tests for record edits clobbering DictDB
MinRK -
Show More
@@ -1,216 +1,216
1 1 """A Task logger that presents our DB interface,
2 2 but exists entirely in memory and implemented with dicts.
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7
8 8
9 9 TaskRecords are dicts of the form:
10 10 {
11 11 'msg_id' : str(uuid),
12 12 'client_uuid' : str(uuid),
13 13 'engine_uuid' : str(uuid) or None,
14 14 'header' : dict(header),
15 15 'content': dict(content),
16 16 'buffers': list(buffers),
17 17 'submitted': datetime,
18 18 'started': datetime or None,
19 19 'completed': datetime or None,
20 20 'resubmitted': datetime or None,
21 21 'result_header' : dict(header) or None,
22 22 'result_content' : dict(content) or None,
23 23 'result_buffers' : list(buffers) or None,
24 24 }
25 25 With this info, many of the special categories of tasks can be defined by query:
26 26
27 27 pending: completed is None
28 28 client's outstanding: client_uuid = uuid && completed is None
29 29 MIA: arrived is None (and completed is None)
30 30 etc.
31 31
32 32 EngineRecords are dicts of the form:
33 33 {
34 34 'eid' : int(id),
35 35 'uuid': str(uuid)
36 36 }
37 37 This may be extended, but is currently.
38 38
39 39 We support a subset of mongodb operators:
40 40 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
41 41 """
42 42 #-----------------------------------------------------------------------------
43 43 # Copyright (C) 2010-2011 The IPython Development Team
44 44 #
45 45 # Distributed under the terms of the BSD License. The full license is in
46 46 # the file COPYING, distributed as part of this software.
47 47 #-----------------------------------------------------------------------------
48 48
49 from copy import copy
49 from copy import deepcopy as copy
50 50 from datetime import datetime
51 51
52 52 from IPython.config.configurable import LoggingConfigurable
53 53
54 54 from IPython.utils.traitlets import Dict, Unicode, Instance
55 55
56 56 filters = {
57 57 '$lt' : lambda a,b: a < b,
58 58 '$gt' : lambda a,b: b > a,
59 59 '$eq' : lambda a,b: a == b,
60 60 '$ne' : lambda a,b: a != b,
61 61 '$lte': lambda a,b: a <= b,
62 62 '$gte': lambda a,b: a >= b,
63 63 '$in' : lambda a,b: a in b,
64 64 '$nin': lambda a,b: a not in b,
65 65 '$all': lambda a,b: all([ a in bb for bb in b ]),
66 66 '$mod': lambda a,b: a%b[0] == b[1],
67 67 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
68 68 }
69 69
70 70
71 71 class CompositeFilter(object):
72 72 """Composite filter for matching multiple properties."""
73 73
74 74 def __init__(self, dikt):
75 75 self.tests = []
76 76 self.values = []
77 77 for key, value in dikt.iteritems():
78 78 self.tests.append(filters[key])
79 79 self.values.append(value)
80 80
81 81 def __call__(self, value):
82 82 for test,check in zip(self.tests, self.values):
83 83 if not test(value, check):
84 84 return False
85 85 return True
86 86
87 87 class BaseDB(LoggingConfigurable):
88 88 """Empty Parent class so traitlets work on DB."""
89 89 # base configurable traits:
90 90 session = Unicode("")
91 91
92 92 class DictDB(BaseDB):
93 93 """Basic in-memory dict-based object for saving Task Records.
94 94
95 95 This is the first object to present the DB interface
96 96 for logging tasks out of memory.
97 97
98 98 The interface is based on MongoDB, so adding a MongoDB
99 99 backend should be straightforward.
100 100 """
101 101
102 102 _records = Dict()
103 103
104 104 def _match_one(self, rec, tests):
105 105 """Check if a specific record matches tests."""
106 106 for key,test in tests.iteritems():
107 107 if not test(rec.get(key, None)):
108 108 return False
109 109 return True
110 110
111 111 def _match(self, check):
112 112 """Find all the matches for a check dict."""
113 113 matches = []
114 114 tests = {}
115 115 for k,v in check.iteritems():
116 116 if isinstance(v, dict):
117 117 tests[k] = CompositeFilter(v)
118 118 else:
119 119 tests[k] = lambda o: o==v
120 120
121 121 for rec in self._records.itervalues():
122 122 if self._match_one(rec, tests):
123 123 matches.append(copy(rec))
124 124 return matches
125 125
126 126 def _extract_subdict(self, rec, keys):
127 127 """extract subdict of keys"""
128 128 d = {}
129 129 d['msg_id'] = rec['msg_id']
130 130 for key in keys:
131 131 d[key] = rec[key]
132 return d
132 return copy(d)
133 133
134 134 def add_record(self, msg_id, rec):
135 135 """Add a new Task Record, by msg_id."""
136 136 if self._records.has_key(msg_id):
137 137 raise KeyError("Already have msg_id %r"%(msg_id))
138 138 self._records[msg_id] = rec
139 139
140 140 def get_record(self, msg_id):
141 141 """Get a specific Task Record, by msg_id."""
142 142 if not msg_id in self._records:
143 143 raise KeyError("No such msg_id %r"%(msg_id))
144 144 return copy(self._records[msg_id])
145 145
146 146 def update_record(self, msg_id, rec):
147 147 """Update the data in an existing record."""
148 148 self._records[msg_id].update(rec)
149 149
150 150 def drop_matching_records(self, check):
151 151 """Remove a record from the DB."""
152 152 matches = self._match(check)
153 153 for m in matches:
154 154 del self._records[m['msg_id']]
155 155
156 156 def drop_record(self, msg_id):
157 157 """Remove a record from the DB."""
158 158 del self._records[msg_id]
159 159
160 160
161 161 def find_records(self, check, keys=None):
162 162 """Find records matching a query dict, optionally extracting subset of keys.
163 163
164 164 Returns dict keyed by msg_id of matching records.
165 165
166 166 Parameters
167 167 ----------
168 168
169 169 check: dict
170 170 mongodb-style query argument
171 171 keys: list of strs [optional]
172 172 if specified, the subset of keys to extract. msg_id will *always* be
173 173 included.
174 174 """
175 175 matches = self._match(check)
176 176 if keys:
177 177 return [ self._extract_subdict(rec, keys) for rec in matches ]
178 178 else:
179 179 return matches
180 180
181 181
182 182 def get_history(self):
183 183 """get all msg_ids, ordered by time submitted."""
184 184 msg_ids = self._records.keys()
185 185 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
186 186
187 187 class NoDB(DictDB):
188 188 """A blackhole db backend that actually stores no information.
189 189
190 190 Provides the full DB interface, but raises KeyErrors on any
191 191 method that tries to access the records. This can be used to
192 192 minimize the memory footprint of the Hub when its record-keeping
193 193 functionality is not required.
194 194 """
195 195
196 196 def add_record(self, msg_id, record):
197 197 pass
198 198
199 199 def get_record(self, msg_id):
200 200 raise KeyError("NoDB does not support record access")
201 201
202 202 def update_record(self, msg_id, record):
203 203 pass
204 204
205 205 def drop_matching_records(self, check):
206 206 pass
207 207
208 208 def drop_record(self, msg_id):
209 209 pass
210 210
211 211 def find_records(self, check, keys=None):
212 212 raise KeyError("NoDB does not store information")
213 213
214 214 def get_history(self):
215 215 raise KeyError("NoDB does not store information")
216 216
@@ -1,243 +1,249
1 1 """Tests for db backends
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import logging
22 22 import os
23 23 import tempfile
24 24 import time
25 25
26 26 from datetime import datetime, timedelta
27 27 from unittest import TestCase
28 28
29 29 from IPython.parallel import error
30 30 from IPython.parallel.controller.dictdb import DictDB
31 31 from IPython.parallel.controller.sqlitedb import SQLiteDB
32 32 from IPython.parallel.controller.hub import init_record, empty_record
33 33
34 34 from IPython.testing import decorators as dec
35 35 from IPython.zmq.session import Session
36 36
37 37
38 38 #-------------------------------------------------------------------------------
39 39 # TestCases
40 40 #-------------------------------------------------------------------------------
41 41
42 42
43 43 def setup():
44 44 global temp_db
45 45 temp_db = tempfile.NamedTemporaryFile(suffix='.db').name
46 46
47 47
48 48 class TestDictBackend(TestCase):
49 49 def setUp(self):
50 50 self.session = Session()
51 51 self.db = self.create_db()
52 52 self.load_records(16)
53 53
54 54 def create_db(self):
55 55 return DictDB()
56 56
57 57 def load_records(self, n=1):
58 58 """load n records for testing"""
59 59 #sleep 1/10 s, to ensure timestamp is different to previous calls
60 60 time.sleep(0.1)
61 61 msg_ids = []
62 62 for i in range(n):
63 63 msg = self.session.msg('apply_request', content=dict(a=5))
64 64 msg['buffers'] = []
65 65 rec = init_record(msg)
66 66 msg_id = msg['header']['msg_id']
67 67 msg_ids.append(msg_id)
68 68 self.db.add_record(msg_id, rec)
69 69 return msg_ids
70 70
71 71 def test_add_record(self):
72 72 before = self.db.get_history()
73 73 self.load_records(5)
74 74 after = self.db.get_history()
75 75 self.assertEquals(len(after), len(before)+5)
76 76 self.assertEquals(after[:-5],before)
77 77
78 78 def test_drop_record(self):
79 79 msg_id = self.load_records()[-1]
80 80 rec = self.db.get_record(msg_id)
81 81 self.db.drop_record(msg_id)
82 82 self.assertRaises(KeyError,self.db.get_record, msg_id)
83 83
84 84 def _round_to_millisecond(self, dt):
85 85 """necessary because mongodb rounds microseconds"""
86 86 micro = dt.microsecond
87 87 extra = int(str(micro)[-3:])
88 88 return dt - timedelta(microseconds=extra)
89 89
90 90 def test_update_record(self):
91 91 now = self._round_to_millisecond(datetime.now())
92 92 #
93 93 msg_id = self.db.get_history()[-1]
94 94 rec1 = self.db.get_record(msg_id)
95 95 data = {'stdout': 'hello there', 'completed' : now}
96 96 self.db.update_record(msg_id, data)
97 97 rec2 = self.db.get_record(msg_id)
98 98 self.assertEquals(rec2['stdout'], 'hello there')
99 99 self.assertEquals(rec2['completed'], now)
100 100 rec1.update(data)
101 101 self.assertEquals(rec1, rec2)
102 102
103 103 # def test_update_record_bad(self):
104 104 # """test updating nonexistant records"""
105 105 # msg_id = str(uuid.uuid4())
106 106 # data = {'stdout': 'hello there'}
107 107 # self.assertRaises(KeyError, self.db.update_record, msg_id, data)
108 108
109 109 def test_find_records_dt(self):
110 110 """test finding records by date"""
111 111 hist = self.db.get_history()
112 112 middle = self.db.get_record(hist[len(hist)//2])
113 113 tic = middle['submitted']
114 114 before = self.db.find_records({'submitted' : {'$lt' : tic}})
115 115 after = self.db.find_records({'submitted' : {'$gte' : tic}})
116 116 self.assertEquals(len(before)+len(after),len(hist))
117 117 for b in before:
118 118 self.assertTrue(b['submitted'] < tic)
119 119 for a in after:
120 120 self.assertTrue(a['submitted'] >= tic)
121 121 same = self.db.find_records({'submitted' : tic})
122 122 for s in same:
123 123 self.assertTrue(s['submitted'] == tic)
124 124
125 125 def test_find_records_keys(self):
126 126 """test extracting subset of record keys"""
127 127 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
128 128 for rec in found:
129 129 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
130 130
131 131 def test_find_records_msg_id(self):
132 132 """ensure msg_id is always in found records"""
133 133 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
134 134 for rec in found:
135 135 self.assertTrue('msg_id' in rec.keys())
136 136 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted'])
137 137 for rec in found:
138 138 self.assertTrue('msg_id' in rec.keys())
139 139 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['msg_id'])
140 140 for rec in found:
141 141 self.assertTrue('msg_id' in rec.keys())
142 142
143 143 def test_find_records_in(self):
144 144 """test finding records with '$in','$nin' operators"""
145 145 hist = self.db.get_history()
146 146 even = hist[::2]
147 147 odd = hist[1::2]
148 148 recs = self.db.find_records({ 'msg_id' : {'$in' : even}})
149 149 found = [ r['msg_id'] for r in recs ]
150 150 self.assertEquals(set(even), set(found))
151 151 recs = self.db.find_records({ 'msg_id' : {'$nin' : even}})
152 152 found = [ r['msg_id'] for r in recs ]
153 153 self.assertEquals(set(odd), set(found))
154 154
155 155 def test_get_history(self):
156 156 msg_ids = self.db.get_history()
157 157 latest = datetime(1984,1,1)
158 158 for msg_id in msg_ids:
159 159 rec = self.db.get_record(msg_id)
160 160 newt = rec['submitted']
161 161 self.assertTrue(newt >= latest)
162 162 latest = newt
163 163 msg_id = self.load_records(1)[-1]
164 164 self.assertEquals(self.db.get_history()[-1],msg_id)
165 165
166 166 def test_datetime(self):
167 167 """get/set timestamps with datetime objects"""
168 168 msg_id = self.db.get_history()[-1]
169 169 rec = self.db.get_record(msg_id)
170 170 self.assertTrue(isinstance(rec['submitted'], datetime))
171 171 self.db.update_record(msg_id, dict(completed=datetime.now()))
172 172 rec = self.db.get_record(msg_id)
173 173 self.assertTrue(isinstance(rec['completed'], datetime))
174 174
175 175 def test_drop_matching(self):
176 176 msg_ids = self.load_records(10)
177 177 query = {'msg_id' : {'$in':msg_ids}}
178 178 self.db.drop_matching_records(query)
179 179 recs = self.db.find_records(query)
180 180 self.assertEquals(len(recs), 0)
181 181
182 182 def test_null(self):
183 183 """test None comparison queries"""
184 184 msg_ids = self.load_records(10)
185 185
186 186 query = {'msg_id' : None}
187 187 recs = self.db.find_records(query)
188 188 self.assertEquals(len(recs), 0)
189 189
190 190 query = {'msg_id' : {'$ne' : None}}
191 191 recs = self.db.find_records(query)
192 192 self.assertTrue(len(recs) >= 10)
193 193
194 194 def test_pop_safe_get(self):
195 195 """editing query results shouldn't affect record [get]"""
196 196 msg_id = self.db.get_history()[-1]
197 197 rec = self.db.get_record(msg_id)
198 198 rec.pop('buffers')
199 199 rec['garbage'] = 'hello'
200 rec['header']['msg_id'] = 'fubar'
200 201 rec2 = self.db.get_record(msg_id)
201 202 self.assertTrue('buffers' in rec2)
202 203 self.assertFalse('garbage' in rec2)
204 self.assertEquals(rec2['header']['msg_id'], msg_id)
203 205
204 206 def test_pop_safe_find(self):
205 207 """editing query results shouldn't affect record [find]"""
206 208 msg_id = self.db.get_history()[-1]
207 209 rec = self.db.find_records({'msg_id' : msg_id})[0]
208 210 rec.pop('buffers')
209 211 rec['garbage'] = 'hello'
212 rec['header']['msg_id'] = 'fubar'
210 213 rec2 = self.db.find_records({'msg_id' : msg_id})[0]
211 214 self.assertTrue('buffers' in rec2)
212 215 self.assertFalse('garbage' in rec2)
216 self.assertEquals(rec2['header']['msg_id'], msg_id)
213 217
214 218 def test_pop_safe_find_keys(self):
215 219 """editing query results shouldn't affect record [find+keys]"""
216 220 msg_id = self.db.get_history()[-1]
217 rec = self.db.find_records({'msg_id' : msg_id}, keys=['buffers'])[0]
221 rec = self.db.find_records({'msg_id' : msg_id}, keys=['buffers', 'header'])[0]
218 222 rec.pop('buffers')
219 223 rec['garbage'] = 'hello'
224 rec['header']['msg_id'] = 'fubar'
220 225 rec2 = self.db.find_records({'msg_id' : msg_id})[0]
221 226 self.assertTrue('buffers' in rec2)
222 227 self.assertFalse('garbage' in rec2)
228 self.assertEquals(rec2['header']['msg_id'], msg_id)
223 229
224 230
225 231 class TestSQLiteBackend(TestDictBackend):
226 232
227 233 @dec.skip_without('sqlite3')
228 234 def create_db(self):
229 235 location, fname = os.path.split(temp_db)
230 236 log = logging.getLogger('test')
231 237 log.setLevel(logging.CRITICAL)
232 238 return SQLiteDB(location=location, fname=fname, log=log)
233 239
234 240 def tearDown(self):
235 241 self.db._db.close()
236 242
237 243
238 244 def teardown():
239 245 """cleanup task db file after all tests have run"""
240 246 try:
241 247 os.remove(temp_db)
242 248 except:
243 249 pass
General Comments 0
You need to be logged in to leave comments. Login now