##// END OF EJS Templates
add size-limiting to the DictDB backend
MinRK -
Show More
@@ -1,220 +1,306 b''
1 1 """A Task logger that presents our DB interface,
2 2 but exists entirely in memory and implemented with dicts.
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7
8 8
9 9 TaskRecords are dicts of the form:
10 10 {
11 11 'msg_id' : str(uuid),
12 12 'client_uuid' : str(uuid),
13 13 'engine_uuid' : str(uuid) or None,
14 14 'header' : dict(header),
15 15 'content': dict(content),
16 16 'buffers': list(buffers),
17 17 'submitted': datetime,
18 18 'started': datetime or None,
19 19 'completed': datetime or None,
20 20 'resubmitted': datetime or None,
21 21 'result_header' : dict(header) or None,
22 22 'result_content' : dict(content) or None,
23 23 'result_buffers' : list(buffers) or None,
24 24 }
25 25 With this info, many of the special categories of tasks can be defined by query:
26 26
27 27 pending: completed is None
28 28 client's outstanding: client_uuid = uuid && completed is None
29 29 MIA: arrived is None (and completed is None)
30 30 etc.
31 31
32 32 EngineRecords are dicts of the form:
33 33 {
34 34 'eid' : int(id),
35 35 'uuid': str(uuid)
36 36 }
37 37 This may be extended, but is currently.
38 38
39 39 We support a subset of mongodb operators:
40 40 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
41 41 """
42 42 #-----------------------------------------------------------------------------
43 43 # Copyright (C) 2010-2011 The IPython Development Team
44 44 #
45 45 # Distributed under the terms of the BSD License. The full license is in
46 46 # the file COPYING, distributed as part of this software.
47 47 #-----------------------------------------------------------------------------
48 48
49 49 from copy import deepcopy as copy
50 50 from datetime import datetime
51 51
52 52 from IPython.config.configurable import LoggingConfigurable
53 53
54 from IPython.utils.traitlets import Dict, Unicode, Instance
54 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
55 55
56 56 filters = {
57 57 '$lt' : lambda a,b: a < b,
58 58 '$gt' : lambda a,b: b > a,
59 59 '$eq' : lambda a,b: a == b,
60 60 '$ne' : lambda a,b: a != b,
61 61 '$lte': lambda a,b: a <= b,
62 62 '$gte': lambda a,b: a >= b,
63 63 '$in' : lambda a,b: a in b,
64 64 '$nin': lambda a,b: a not in b,
65 65 '$all': lambda a,b: all([ a in bb for bb in b ]),
66 66 '$mod': lambda a,b: a%b[0] == b[1],
67 67 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
68 68 }
69 69
70 70
71 71 class CompositeFilter(object):
72 72 """Composite filter for matching multiple properties."""
73 73
74 74 def __init__(self, dikt):
75 75 self.tests = []
76 76 self.values = []
77 77 for key, value in dikt.iteritems():
78 78 self.tests.append(filters[key])
79 79 self.values.append(value)
80 80
81 81 def __call__(self, value):
82 82 for test,check in zip(self.tests, self.values):
83 83 if not test(value, check):
84 84 return False
85 85 return True
86 86
87 87 class BaseDB(LoggingConfigurable):
88 88 """Empty Parent class so traitlets work on DB."""
89 89 # base configurable traits:
90 90 session = Unicode("")
91 91
92 92 class DictDB(BaseDB):
93 93 """Basic in-memory dict-based object for saving Task Records.
94 94
95 95 This is the first object to present the DB interface
96 96 for logging tasks out of memory.
97 97
98 98 The interface is based on MongoDB, so adding a MongoDB
99 99 backend should be straightforward.
100 100 """
101 101
102 102 _records = Dict()
103 _culled_ids = set() # set of ids which have been culled
104 _buffer_bytes = Integer(0) # running total of the bytes in the DB
105
106 size_limit = Integer(1024*1024, config=True,
107 help="""The maximum total size (in bytes) of the buffers stored in the db
108
109 When the db exceeds this size, the oldest records will be culled until
110 the total size is under size_limit * (1-cull_fraction).
111 """
112 )
113 record_limit = Integer(1024, config=True,
114 help="""The maximum number of records in the db
115
116 When the history exceeds this size, the first record_limit * cull_fraction
117 records will be culled.
118 """
119 )
120 cull_fraction = Float(0.1, config=True,
121 help="""The fraction by which the db should culled when one of the limits is exceeded
122
123 In general, the db size will spend most of its time with a size in the range:
124
125 [limit * (1-cull_fraction), limit]
126
127 for each of size_limit and record_limit.
128 """
129 )
103 130
104 131 def _match_one(self, rec, tests):
105 132 """Check if a specific record matches tests."""
106 133 for key,test in tests.iteritems():
107 134 if not test(rec.get(key, None)):
108 135 return False
109 136 return True
110 137
111 138 def _match(self, check):
112 139 """Find all the matches for a check dict."""
113 140 matches = []
114 141 tests = {}
115 142 for k,v in check.iteritems():
116 143 if isinstance(v, dict):
117 144 tests[k] = CompositeFilter(v)
118 145 else:
119 146 tests[k] = lambda o: o==v
120 147
121 148 for rec in self._records.itervalues():
122 149 if self._match_one(rec, tests):
123 150 matches.append(copy(rec))
124 151 return matches
125 152
126 153 def _extract_subdict(self, rec, keys):
127 154 """extract subdict of keys"""
128 155 d = {}
129 156 d['msg_id'] = rec['msg_id']
130 157 for key in keys:
131 158 d[key] = rec[key]
132 159 return copy(d)
160
161 # methods for monitoring size / culling history
162
163 def _add_bytes(self, rec):
164 for key in ('buffers', 'result_buffers'):
165 for buf in rec.get(key) or []:
166 self._buffer_bytes += len(buf)
167
168 self._maybe_cull()
169
170 def _drop_bytes(self, rec):
171 for key in ('buffers', 'result_buffers'):
172 for buf in rec.get(key) or []:
173 self._buffer_bytes -= len(buf)
174
175 def _cull_oldest(self, n=1):
176 """cull the oldest N records"""
177 for msg_id in self.get_history()[:n]:
178 self.log.debug("Culling record: %r", msg_id)
179 self._culled_ids.add(msg_id)
180 self.drop_record(msg_id)
181
182 def _maybe_cull(self):
183 # cull by count:
184 if len(self._records) > self.record_limit:
185 to_cull = int(self.cull_fraction * self.record_limit)
186 self.log.info("%i records exceeds limit of %i, culling oldest %i",
187 len(self._records), self.record_limit, to_cull
188 )
189 self._cull_oldest(to_cull)
190
191 # cull by size:
192 if self._buffer_bytes > self.size_limit:
193 limit = self.size_limit * (1 - self.cull_fraction)
194
195 before = self._buffer_bytes
196 before_count = len(self._records)
197 culled = 0
198 while self._buffer_bytes > limit:
199 self._cull_oldest(1)
200 culled += 1
201
202 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
203 before_count, before, self.size_limit, culled
204 )
205
206 # public API methods:
133 207
134 208 def add_record(self, msg_id, rec):
135 209 """Add a new Task Record, by msg_id."""
136 210 if self._records.has_key(msg_id):
137 211 raise KeyError("Already have msg_id %r"%(msg_id))
138 212 self._records[msg_id] = rec
213 self._add_bytes(rec)
214 self._maybe_cull()
139 215
140 216 def get_record(self, msg_id):
141 217 """Get a specific Task Record, by msg_id."""
218 if msg_id in self._culled_ids:
219 raise KeyError("Record %r has been culled for size" % msg_id)
142 220 if not msg_id in self._records:
143 221 raise KeyError("No such msg_id %r"%(msg_id))
144 222 return copy(self._records[msg_id])
145 223
146 224 def update_record(self, msg_id, rec):
147 225 """Update the data in an existing record."""
148 self._records[msg_id].update(rec)
226 if msg_id in self._culled_ids:
227 raise KeyError("Record %r has been culled for size" % msg_id)
228 _rec = self._records[msg_id]
229 self._drop_bytes(_rec)
230 _rec.update(rec)
231 self._add_bytes(_rec)
149 232
150 233 def drop_matching_records(self, check):
151 234 """Remove a record from the DB."""
152 235 matches = self._match(check)
153 for m in matches:
154 del self._records[m['msg_id']]
236 for rec in matches:
237 self._drop_bytes(rec)
238 del self._records[rec['msg_id']]
155 239
156 240 def drop_record(self, msg_id):
157 241 """Remove a record from the DB."""
242 rec = self._records[msg_id]
243 self._drop_bytes(rec)
158 244 del self._records[msg_id]
159 245
160
161 246 def find_records(self, check, keys=None):
162 247 """Find records matching a query dict, optionally extracting subset of keys.
163 248
164 249 Returns dict keyed by msg_id of matching records.
165 250
166 251 Parameters
167 252 ----------
168 253
169 254 check: dict
170 255 mongodb-style query argument
171 256 keys: list of strs [optional]
172 257 if specified, the subset of keys to extract. msg_id will *always* be
173 258 included.
174 259 """
175 260 matches = self._match(check)
176 261 if keys:
177 262 return [ self._extract_subdict(rec, keys) for rec in matches ]
178 263 else:
179 264 return matches
180 265
181
182 266 def get_history(self):
183 267 """get all msg_ids, ordered by time submitted."""
184 268 msg_ids = self._records.keys()
185 269 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
186 270
271
187 272 NODATA = KeyError("NoDB backend doesn't store any data. "
188 273 "Start the Controller with a DB backend to enable resubmission / result persistence."
189 274 )
190 275
191 class NoDB(DictDB):
276
277 class NoDB(BaseDB):
192 278 """A blackhole db backend that actually stores no information.
193 279
194 280 Provides the full DB interface, but raises KeyErrors on any
195 281 method that tries to access the records. This can be used to
196 282 minimize the memory footprint of the Hub when its record-keeping
197 283 functionality is not required.
198 284 """
199 285
200 286 def add_record(self, msg_id, record):
201 287 pass
202 288
203 289 def get_record(self, msg_id):
204 290 raise NODATA
205 291
206 292 def update_record(self, msg_id, record):
207 293 pass
208 294
209 295 def drop_matching_records(self, check):
210 296 pass
211 297
212 298 def drop_record(self, msg_id):
213 299 pass
214 300
215 301 def find_records(self, check, keys=None):
216 302 raise NODATA
217 303
218 304 def get_history(self):
219 305 raise NODATA
220 306
@@ -1,249 +1,314 b''
1 1 """Tests for db backends
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import logging
22 22 import os
23 23 import tempfile
24 24 import time
25 25
26 26 from datetime import datetime, timedelta
27 27 from unittest import TestCase
28 28
29 29 from IPython.parallel import error
30 30 from IPython.parallel.controller.dictdb import DictDB
31 31 from IPython.parallel.controller.sqlitedb import SQLiteDB
32 32 from IPython.parallel.controller.hub import init_record, empty_record
33 33
34 34 from IPython.testing import decorators as dec
35 35 from IPython.zmq.session import Session
36 36
37 37
38 38 #-------------------------------------------------------------------------------
39 39 # TestCases
40 40 #-------------------------------------------------------------------------------
41 41
42 42
43 43 def setup():
44 44 global temp_db
45 45 temp_db = tempfile.NamedTemporaryFile(suffix='.db').name
46 46
47 47
48 class TestDictBackend(TestCase):
48 class TaskDBTest:
49 49 def setUp(self):
50 50 self.session = Session()
51 51 self.db = self.create_db()
52 52 self.load_records(16)
53 53
54 54 def create_db(self):
55 return DictDB()
55 raise NotImplementedError
56 56
57 def load_records(self, n=1):
57 def load_records(self, n=1, buffer_size=100):
58 58 """load n records for testing"""
59 59 #sleep 1/10 s, to ensure timestamp is different to previous calls
60 60 time.sleep(0.1)
61 61 msg_ids = []
62 62 for i in range(n):
63 63 msg = self.session.msg('apply_request', content=dict(a=5))
64 msg['buffers'] = []
64 msg['buffers'] = [os.urandom(buffer_size)]
65 65 rec = init_record(msg)
66 66 msg_id = msg['header']['msg_id']
67 67 msg_ids.append(msg_id)
68 68 self.db.add_record(msg_id, rec)
69 69 return msg_ids
70 70
71 71 def test_add_record(self):
72 72 before = self.db.get_history()
73 73 self.load_records(5)
74 74 after = self.db.get_history()
75 75 self.assertEquals(len(after), len(before)+5)
76 76 self.assertEquals(after[:-5],before)
77 77
78 78 def test_drop_record(self):
79 79 msg_id = self.load_records()[-1]
80 80 rec = self.db.get_record(msg_id)
81 81 self.db.drop_record(msg_id)
82 82 self.assertRaises(KeyError,self.db.get_record, msg_id)
83 83
84 84 def _round_to_millisecond(self, dt):
85 85 """necessary because mongodb rounds microseconds"""
86 86 micro = dt.microsecond
87 87 extra = int(str(micro)[-3:])
88 88 return dt - timedelta(microseconds=extra)
89 89
90 90 def test_update_record(self):
91 91 now = self._round_to_millisecond(datetime.now())
92 92 #
93 93 msg_id = self.db.get_history()[-1]
94 94 rec1 = self.db.get_record(msg_id)
95 95 data = {'stdout': 'hello there', 'completed' : now}
96 96 self.db.update_record(msg_id, data)
97 97 rec2 = self.db.get_record(msg_id)
98 98 self.assertEquals(rec2['stdout'], 'hello there')
99 99 self.assertEquals(rec2['completed'], now)
100 100 rec1.update(data)
101 101 self.assertEquals(rec1, rec2)
102 102
103 103 # def test_update_record_bad(self):
104 104 # """test updating nonexistant records"""
105 105 # msg_id = str(uuid.uuid4())
106 106 # data = {'stdout': 'hello there'}
107 107 # self.assertRaises(KeyError, self.db.update_record, msg_id, data)
108 108
109 109 def test_find_records_dt(self):
110 110 """test finding records by date"""
111 111 hist = self.db.get_history()
112 112 middle = self.db.get_record(hist[len(hist)//2])
113 113 tic = middle['submitted']
114 114 before = self.db.find_records({'submitted' : {'$lt' : tic}})
115 115 after = self.db.find_records({'submitted' : {'$gte' : tic}})
116 116 self.assertEquals(len(before)+len(after),len(hist))
117 117 for b in before:
118 118 self.assertTrue(b['submitted'] < tic)
119 119 for a in after:
120 120 self.assertTrue(a['submitted'] >= tic)
121 121 same = self.db.find_records({'submitted' : tic})
122 122 for s in same:
123 123 self.assertTrue(s['submitted'] == tic)
124 124
125 125 def test_find_records_keys(self):
126 126 """test extracting subset of record keys"""
127 127 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
128 128 for rec in found:
129 129 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
130 130
131 131 def test_find_records_msg_id(self):
132 132 """ensure msg_id is always in found records"""
133 133 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
134 134 for rec in found:
135 135 self.assertTrue('msg_id' in rec.keys())
136 136 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted'])
137 137 for rec in found:
138 138 self.assertTrue('msg_id' in rec.keys())
139 139 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['msg_id'])
140 140 for rec in found:
141 141 self.assertTrue('msg_id' in rec.keys())
142 142
143 143 def test_find_records_in(self):
144 144 """test finding records with '$in','$nin' operators"""
145 145 hist = self.db.get_history()
146 146 even = hist[::2]
147 147 odd = hist[1::2]
148 148 recs = self.db.find_records({ 'msg_id' : {'$in' : even}})
149 149 found = [ r['msg_id'] for r in recs ]
150 150 self.assertEquals(set(even), set(found))
151 151 recs = self.db.find_records({ 'msg_id' : {'$nin' : even}})
152 152 found = [ r['msg_id'] for r in recs ]
153 153 self.assertEquals(set(odd), set(found))
154 154
155 155 def test_get_history(self):
156 156 msg_ids = self.db.get_history()
157 157 latest = datetime(1984,1,1)
158 158 for msg_id in msg_ids:
159 159 rec = self.db.get_record(msg_id)
160 160 newt = rec['submitted']
161 161 self.assertTrue(newt >= latest)
162 162 latest = newt
163 163 msg_id = self.load_records(1)[-1]
164 164 self.assertEquals(self.db.get_history()[-1],msg_id)
165 165
166 166 def test_datetime(self):
167 167 """get/set timestamps with datetime objects"""
168 168 msg_id = self.db.get_history()[-1]
169 169 rec = self.db.get_record(msg_id)
170 170 self.assertTrue(isinstance(rec['submitted'], datetime))
171 171 self.db.update_record(msg_id, dict(completed=datetime.now()))
172 172 rec = self.db.get_record(msg_id)
173 173 self.assertTrue(isinstance(rec['completed'], datetime))
174 174
175 175 def test_drop_matching(self):
176 176 msg_ids = self.load_records(10)
177 177 query = {'msg_id' : {'$in':msg_ids}}
178 178 self.db.drop_matching_records(query)
179 179 recs = self.db.find_records(query)
180 180 self.assertEquals(len(recs), 0)
181 181
182 182 def test_null(self):
183 183 """test None comparison queries"""
184 184 msg_ids = self.load_records(10)
185 185
186 186 query = {'msg_id' : None}
187 187 recs = self.db.find_records(query)
188 188 self.assertEquals(len(recs), 0)
189 189
190 190 query = {'msg_id' : {'$ne' : None}}
191 191 recs = self.db.find_records(query)
192 192 self.assertTrue(len(recs) >= 10)
193 193
194 194 def test_pop_safe_get(self):
195 195 """editing query results shouldn't affect record [get]"""
196 196 msg_id = self.db.get_history()[-1]
197 197 rec = self.db.get_record(msg_id)
198 198 rec.pop('buffers')
199 199 rec['garbage'] = 'hello'
200 200 rec['header']['msg_id'] = 'fubar'
201 201 rec2 = self.db.get_record(msg_id)
202 202 self.assertTrue('buffers' in rec2)
203 203 self.assertFalse('garbage' in rec2)
204 204 self.assertEquals(rec2['header']['msg_id'], msg_id)
205 205
206 206 def test_pop_safe_find(self):
207 207 """editing query results shouldn't affect record [find]"""
208 208 msg_id = self.db.get_history()[-1]
209 209 rec = self.db.find_records({'msg_id' : msg_id})[0]
210 210 rec.pop('buffers')
211 211 rec['garbage'] = 'hello'
212 212 rec['header']['msg_id'] = 'fubar'
213 213 rec2 = self.db.find_records({'msg_id' : msg_id})[0]
214 214 self.assertTrue('buffers' in rec2)
215 215 self.assertFalse('garbage' in rec2)
216 216 self.assertEquals(rec2['header']['msg_id'], msg_id)
217 217
218 218 def test_pop_safe_find_keys(self):
219 219 """editing query results shouldn't affect record [find+keys]"""
220 220 msg_id = self.db.get_history()[-1]
221 221 rec = self.db.find_records({'msg_id' : msg_id}, keys=['buffers', 'header'])[0]
222 222 rec.pop('buffers')
223 223 rec['garbage'] = 'hello'
224 224 rec['header']['msg_id'] = 'fubar'
225 225 rec2 = self.db.find_records({'msg_id' : msg_id})[0]
226 226 self.assertTrue('buffers' in rec2)
227 227 self.assertFalse('garbage' in rec2)
228 228 self.assertEquals(rec2['header']['msg_id'], msg_id)
229 229
230 230
231 class TestSQLiteBackend(TestDictBackend):
231 class TestDictBackend(TaskDBTest, TestCase):
232
233 def create_db(self):
234 return DictDB()
235
236 def test_cull_count(self):
237 self.db = self.create_db() # skip the load-records init from setUp
238 self.db.record_limit = 20
239 self.db.cull_fraction = 0.2
240 self.load_records(20)
241 self.assertEquals(len(self.db.get_history()), 20)
242 self.load_records(1)
243 # 0.2 * 20 = 4, 21 - 4 = 17
244 self.assertEquals(len(self.db.get_history()), 17)
245 self.load_records(3)
246 self.assertEquals(len(self.db.get_history()), 20)
247 self.load_records(1)
248 self.assertEquals(len(self.db.get_history()), 17)
249
250 for i in range(100):
251 self.load_records(1)
252 self.assertTrue(len(self.db.get_history()) >= 17)
253 self.assertTrue(len(self.db.get_history()) <= 20)
254
255 def test_cull_size(self):
256 self.db = self.create_db() # skip the load-records init from setUp
257 self.db.size_limit = 1000
258 self.db.cull_fraction = 0.2
259 self.load_records(100, buffer_size=10)
260 self.assertEquals(len(self.db.get_history()), 100)
261 self.load_records(1, buffer_size=0)
262 self.assertEquals(len(self.db.get_history()), 101)
263 self.load_records(1, buffer_size=1)
264 # 0.2 * 100 = 20, 101 - 20 = 81
265 self.assertEquals(len(self.db.get_history()), 81)
266
267 def test_cull_size_drop(self):
268 """dropping records updates tracked buffer size"""
269 self.db = self.create_db() # skip the load-records init from setUp
270 self.db.size_limit = 1000
271 self.db.cull_fraction = 0.2
272 self.load_records(100, buffer_size=10)
273 self.assertEquals(len(self.db.get_history()), 100)
274 self.db.drop_record(self.db.get_history()[-1])
275 self.assertEquals(len(self.db.get_history()), 99)
276 self.load_records(1, buffer_size=5)
277 self.assertEquals(len(self.db.get_history()), 100)
278 self.load_records(1, buffer_size=5)
279 self.assertEquals(len(self.db.get_history()), 101)
280 self.load_records(1, buffer_size=1)
281 self.assertEquals(len(self.db.get_history()), 81)
282
283 def test_cull_size_update(self):
284 """updating records updates tracked buffer size"""
285 self.db = self.create_db() # skip the load-records init from setUp
286 self.db.size_limit = 1000
287 self.db.cull_fraction = 0.2
288 self.load_records(100, buffer_size=10)
289 self.assertEquals(len(self.db.get_history()), 100)
290 msg_id = self.db.get_history()[-1]
291 self.db.update_record(msg_id, dict(result_buffers = [os.urandom(10)], buffers=[]))
292 self.assertEquals(len(self.db.get_history()), 100)
293 self.db.update_record(msg_id, dict(result_buffers = [os.urandom(11)], buffers=[]))
294 self.assertEquals(len(self.db.get_history()), 79)
295
296 class TestSQLiteBackend(TaskDBTest, TestCase):
232 297
233 298 @dec.skip_without('sqlite3')
234 299 def create_db(self):
235 300 location, fname = os.path.split(temp_db)
236 301 log = logging.getLogger('test')
237 302 log.setLevel(logging.CRITICAL)
238 303 return SQLiteDB(location=location, fname=fname, log=log)
239 304
240 305 def tearDown(self):
241 306 self.db._db.close()
242 307
243 308
244 309 def teardown():
245 310 """cleanup task db file after all tests have run"""
246 311 try:
247 312 os.remove(temp_db)
248 313 except:
249 314 pass
@@ -1,42 +1,44 b''
1 1 """Tests for mongodb backend
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 from unittest import TestCase
20
19 21 from nose import SkipTest
20 22
21 23 from pymongo import Connection
22 24 from IPython.parallel.controller.mongodb import MongoDB
23 25
24 26 from . import test_db
25 27
26 28 try:
27 29 c = Connection()
28 30 except Exception:
29 31 c=None
30 32
31 class TestMongoBackend(test_db.TestDictBackend):
33 class TestMongoBackend(test_db.TaskDBTest, TestCase):
32 34 """MongoDB backend tests"""
33 35
34 36 def create_db(self):
35 37 try:
36 38 return MongoDB(database='iptestdb', _connection=c)
37 39 except Exception:
38 40 raise SkipTest("Couldn't connect to mongodb")
39 41
40 42 def teardown(self):
41 43 if c is not None:
42 44 c.drop_database('iptestdb')
General Comments 0
You need to be logged in to leave comments. Login now