Show More
@@ -0,0 +1,37 b'' | |||||
|
1 | """Tests for mongodb backend""" | |||
|
2 | ||||
|
3 | #------------------------------------------------------------------------------- | |||
|
4 | # Copyright (C) 2011 The IPython Development Team | |||
|
5 | # | |||
|
6 | # Distributed under the terms of the BSD License. The full license is in | |||
|
7 | # the file COPYING, distributed as part of this software. | |||
|
8 | #------------------------------------------------------------------------------- | |||
|
9 | ||||
|
10 | #------------------------------------------------------------------------------- | |||
|
11 | # Imports | |||
|
12 | #------------------------------------------------------------------------------- | |||
|
13 | ||||
|
14 | from nose import SkipTest | |||
|
15 | ||||
|
16 | from pymongo import Connection | |||
|
17 | from IPython.parallel.controller.mongodb import MongoDB | |||
|
18 | ||||
|
19 | from . import test_db | |||
|
20 | ||||
|
21 | try: | |||
|
22 | c = Connection() | |||
|
23 | except Exception: | |||
|
24 | c=None | |||
|
25 | ||||
|
26 | class TestMongoBackend(test_db.TestDictBackend): | |||
|
27 | """MongoDB backend tests""" | |||
|
28 | ||||
|
29 | def create_db(self): | |||
|
30 | try: | |||
|
31 | return MongoDB(database='iptestdb', _connection=c) | |||
|
32 | except Exception: | |||
|
33 | raise SkipTest("Couldn't connect to mongodb") | |||
|
34 | ||||
|
35 | def teardown(self): | |||
|
36 | if c is not None: | |||
|
37 | c.drop_database('iptestdb') |
@@ -146,7 +146,7 b' class DictDB(BaseDB):' | |||||
146 | """Remove a record from the DB.""" |
|
146 | """Remove a record from the DB.""" | |
147 | matches = self._match(check) |
|
147 | matches = self._match(check) | |
148 | for m in matches: |
|
148 | for m in matches: | |
149 | del self._records[m] |
|
149 | del self._records[m['msg_id']] | |
150 |
|
150 | |||
151 | def drop_record(self, msg_id): |
|
151 | def drop_record(self, msg_id): | |
152 | """Remove a record from the DB.""" |
|
152 | """Remove a record from the DB.""" |
@@ -1066,36 +1066,34 b' class Hub(LoggingFactory):' | |||||
1066 | except Exception: |
|
1066 | except Exception: | |
1067 | reply = error.wrap_exception() |
|
1067 | reply = error.wrap_exception() | |
1068 | else: |
|
1068 | else: | |
1069 | for msg_id in msg_ids: |
|
1069 | pending = filter(lambda m: m in self.pending, msg_ids) | |
1070 | if msg_id in self.all_completed: |
|
1070 | if pending: | |
1071 | self.db.drop_record(msg_id) |
|
1071 | try: | |
1072 | else: |
|
1072 | raise IndexError("msg pending: %r"%pending[0]) | |
1073 | if msg_id in self.pending: |
|
1073 | except: | |
1074 | try: |
|
1074 | reply = error.wrap_exception() | |
1075 | raise IndexError("msg pending: %r"%msg_id) |
|
1075 | else: | |
1076 |
|
|
1076 | try: | |
1077 | reply = error.wrap_exception() |
|
1077 | self.db.drop_matching_records(dict(msg_id={'$in':msg_ids})) | |
1078 |
|
|
1078 | except Exception: | |
|
1079 | reply = error.wrap_exception() | |||
|
1080 | ||||
|
1081 | if reply['status'] == 'ok': | |||
|
1082 | eids = content.get('engine_ids', []) | |||
|
1083 | for eid in eids: | |||
|
1084 | if eid not in self.engines: | |||
1079 | try: |
|
1085 | try: | |
1080 |
raise IndexError("No such |
|
1086 | raise IndexError("No such engine: %i"%eid) | |
1081 | except: |
|
1087 | except: | |
1082 | reply = error.wrap_exception() |
|
1088 | reply = error.wrap_exception() | |
1083 | break |
|
1089 | break | |
1084 | eids = content.get('engine_ids', []) |
|
1090 | msg_ids = self.completed.pop(eid) | |
1085 | for eid in eids: |
|
1091 | uid = self.engines[eid].queue | |
1086 | if eid not in self.engines: |
|
|||
1087 | try: |
|
1092 | try: | |
1088 | raise IndexError("No such engine: %i"%eid) |
|
1093 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) | |
1089 | except: |
|
1094 | except Exception: | |
1090 | reply = error.wrap_exception() |
|
1095 | reply = error.wrap_exception() | |
1091 | break |
|
1096 | break | |
1092 | msg_ids = self.completed.pop(eid) |
|
|||
1093 | uid = self.engines[eid].queue |
|
|||
1094 | try: |
|
|||
1095 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) |
|
|||
1096 | except Exception: |
|
|||
1097 | reply = error.wrap_exception() |
|
|||
1098 | break |
|
|||
1099 |
|
1097 | |||
1100 | self.session.send(self.query, 'purge_reply', content=reply, ident=client_id) |
|
1098 | self.session.send(self.query, 'purge_reply', content=reply, ident=client_id) | |
1101 |
|
1099 |
@@ -6,12 +6,10 b'' | |||||
6 | # the file COPYING, distributed as part of this software. |
|
6 | # the file COPYING, distributed as part of this software. | |
7 | #----------------------------------------------------------------------------- |
|
7 | #----------------------------------------------------------------------------- | |
8 |
|
8 | |||
9 | from datetime import datetime |
|
|||
10 |
|
||||
11 | from pymongo import Connection |
|
9 | from pymongo import Connection | |
12 | from pymongo.binary import Binary |
|
10 | from pymongo.binary import Binary | |
13 |
|
11 | |||
14 | from IPython.utils.traitlets import Dict, List, CUnicode |
|
12 | from IPython.utils.traitlets import Dict, List, CUnicode, CStr, Instance | |
15 |
|
13 | |||
16 | from .dictdb import BaseDB |
|
14 | from .dictdb import BaseDB | |
17 |
|
15 | |||
@@ -25,15 +23,20 b' class MongoDB(BaseDB):' | |||||
25 | connection_args = List(config=True) # args passed to pymongo.Connection |
|
23 | connection_args = List(config=True) # args passed to pymongo.Connection | |
26 | connection_kwargs = Dict(config=True) # kwargs passed to pymongo.Connection |
|
24 | connection_kwargs = Dict(config=True) # kwargs passed to pymongo.Connection | |
27 | database = CUnicode(config=True) # name of the mongodb database |
|
25 | database = CUnicode(config=True) # name of the mongodb database | |
28 | _table = Dict() |
|
26 | ||
|
27 | _connection = Instance(Connection) # pymongo connection | |||
29 |
|
28 | |||
30 | def __init__(self, **kwargs): |
|
29 | def __init__(self, **kwargs): | |
31 | super(MongoDB, self).__init__(**kwargs) |
|
30 | super(MongoDB, self).__init__(**kwargs) | |
32 | self._connection = Connection(*self.connection_args, **self.connection_kwargs) |
|
31 | if self._connection is None: | |
|
32 | self._connection = Connection(*self.connection_args, **self.connection_kwargs) | |||
33 | if not self.database: |
|
33 | if not self.database: | |
34 | self.database = self.session |
|
34 | self.database = self.session | |
35 | self._db = self._connection[self.database] |
|
35 | self._db = self._connection[self.database] | |
36 | self._records = self._db['task_records'] |
|
36 | self._records = self._db['task_records'] | |
|
37 | self._records.ensure_index('msg_id', unique=True) | |||
|
38 | self._records.ensure_index('submitted') # for sorting history | |||
|
39 | # for rec in self._records.find | |||
37 |
|
40 | |||
38 | def _binary_buffers(self, rec): |
|
41 | def _binary_buffers(self, rec): | |
39 | for key in ('buffers', 'result_buffers'): |
|
42 | for key in ('buffers', 'result_buffers'): | |
@@ -45,18 +48,21 b' class MongoDB(BaseDB):' | |||||
45 | """Add a new Task Record, by msg_id.""" |
|
48 | """Add a new Task Record, by msg_id.""" | |
46 | # print rec |
|
49 | # print rec | |
47 | rec = self._binary_buffers(rec) |
|
50 | rec = self._binary_buffers(rec) | |
48 |
|
|
51 | self._records.insert(rec) | |
49 | self._table[msg_id] = obj_id |
|
|||
50 |
|
52 | |||
51 | def get_record(self, msg_id): |
|
53 | def get_record(self, msg_id): | |
52 | """Get a specific Task Record, by msg_id.""" |
|
54 | """Get a specific Task Record, by msg_id.""" | |
53 |
r |
|
55 | r = self._records.find_one({'msg_id': msg_id}) | |
|
56 | if not r: | |||
|
57 | # r will be '' if nothing is found | |||
|
58 | raise KeyError(msg_id) | |||
|
59 | return r | |||
54 |
|
60 | |||
55 | def update_record(self, msg_id, rec): |
|
61 | def update_record(self, msg_id, rec): | |
56 | """Update the data in an existing record.""" |
|
62 | """Update the data in an existing record.""" | |
57 | rec = self._binary_buffers(rec) |
|
63 | rec = self._binary_buffers(rec) | |
58 | obj_id = self._table[msg_id] |
|
64 | ||
59 |
self._records.update({'_id': |
|
65 | self._records.update({'msg_id':msg_id}, {'$set': rec}) | |
60 |
|
66 | |||
61 | def drop_matching_records(self, check): |
|
67 | def drop_matching_records(self, check): | |
62 | """Remove a record from the DB.""" |
|
68 | """Remove a record from the DB.""" | |
@@ -64,8 +70,7 b' class MongoDB(BaseDB):' | |||||
64 |
|
70 | |||
65 | def drop_record(self, msg_id): |
|
71 | def drop_record(self, msg_id): | |
66 | """Remove a record from the DB.""" |
|
72 | """Remove a record from the DB.""" | |
67 | obj_id = self._table.pop(msg_id) |
|
73 | self._records.remove({'msg_id':msg_id}) | |
68 | self._records.remove(obj_id) |
|
|||
69 |
|
74 | |||
70 | def find_records(self, check, keys=None): |
|
75 | def find_records(self, check, keys=None): | |
71 | """Find records matching a query dict, optionally extracting subset of keys. |
|
76 | """Find records matching a query dict, optionally extracting subset of keys. |
@@ -27,16 +27,20 b' operators = {' | |||||
27 | '$lt' : "<", |
|
27 | '$lt' : "<", | |
28 | '$gt' : ">", |
|
28 | '$gt' : ">", | |
29 | # null is handled weird with ==,!= |
|
29 | # null is handled weird with ==,!= | |
30 |
'$eq' : " |
|
30 | '$eq' : "=", | |
31 |
'$ne' : " |
|
31 | '$ne' : "!=", | |
32 | '$lte': "<=", |
|
32 | '$lte': "<=", | |
33 | '$gte': ">=", |
|
33 | '$gte': ">=", | |
34 |
'$in' : (' |
|
34 | '$in' : ('=', ' OR '), | |
35 |
'$nin': (' |
|
35 | '$nin': ('!=', ' AND '), | |
36 | # '$all': None, |
|
36 | # '$all': None, | |
37 | # '$mod': None, |
|
37 | # '$mod': None, | |
38 | # '$exists' : None |
|
38 | # '$exists' : None | |
39 | } |
|
39 | } | |
|
40 | null_operators = { | |||
|
41 | '=' : "IS NULL", | |||
|
42 | '!=' : "IS NOT NULL", | |||
|
43 | } | |||
40 |
|
44 | |||
41 | def _adapt_datetime(dt): |
|
45 | def _adapt_datetime(dt): | |
42 | return dt.strftime(ISO8601) |
|
46 | return dt.strftime(ISO8601) | |
@@ -205,17 +209,27 b' class SQLiteDB(BaseDB):' | |||||
205 | raise KeyError("Unsupported operator: %r"%test) |
|
209 | raise KeyError("Unsupported operator: %r"%test) | |
206 | if isinstance(op, tuple): |
|
210 | if isinstance(op, tuple): | |
207 | op, join = op |
|
211 | op, join = op | |
208 | expr = "%s %s ?"%(name, op) |
|
212 | ||
209 | if isinstance(value, (tuple,list)): |
|
213 | if value is None and op in null_operators: | |
210 |
expr = |
|
214 | expr = "%s %s"%null_operators[op] | |
211 | args.extend(value) |
|
|||
212 | else: |
|
215 | else: | |
213 | args.append(value) |
|
216 | expr = "%s %s ?"%(name, op) | |
|
217 | if isinstance(value, (tuple,list)): | |||
|
218 | if op in null_operators and any([v is None for v in value]): | |||
|
219 | # equality tests don't work with NULL | |||
|
220 | raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test) | |||
|
221 | expr = '( %s )'%( join.join([expr]*len(value)) ) | |||
|
222 | args.extend(value) | |||
|
223 | else: | |||
|
224 | args.append(value) | |||
214 | expressions.append(expr) |
|
225 | expressions.append(expr) | |
215 | else: |
|
226 | else: | |
216 | # it's an equality check |
|
227 | # it's an equality check | |
217 | expressions.append("%s IS ?"%name) |
|
228 | if sub_check is None: | |
218 |
|
|
229 | expressions.append("%s IS NULL") | |
|
230 | else: | |||
|
231 | expressions.append("%s = ?"%name) | |||
|
232 | args.append(sub_check) | |||
219 |
|
233 | |||
220 | expr = " AND ".join(expressions) |
|
234 | expr = " AND ".join(expressions) | |
221 | return expr, args |
|
235 | return expr, args |
@@ -235,3 +235,10 b' class TestClient(ClusterTestCase):' | |||||
235 | def test_resubmit_badkey(self): |
|
235 | def test_resubmit_badkey(self): | |
236 | """ensure KeyError on resubmit of nonexistant task""" |
|
236 | """ensure KeyError on resubmit of nonexistant task""" | |
237 | self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid']) |
|
237 | self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid']) | |
|
238 | ||||
|
239 | def test_purge_results(self): | |||
|
240 | hist = self.client.hub_history() | |||
|
241 | self.client.purge_results(hist) | |||
|
242 | newhist = self.client.hub_history() | |||
|
243 | self.assertTrue(len(newhist) == 0) | |||
|
244 |
@@ -15,10 +15,7 b'' | |||||
15 | import tempfile |
|
15 | import tempfile | |
16 | import time |
|
16 | import time | |
17 |
|
17 | |||
18 | import uuid |
|
|||
19 |
|
||||
20 | from datetime import datetime, timedelta |
|
18 | from datetime import datetime, timedelta | |
21 | from random import choice, randint |
|
|||
22 | from unittest import TestCase |
|
19 | from unittest import TestCase | |
23 |
|
20 | |||
24 | from nose import SkipTest |
|
21 | from nose import SkipTest | |
@@ -157,6 +154,13 b' class TestDictBackend(TestCase):' | |||||
157 | self.db.update_record(msg_id, dict(completed=datetime.now())) |
|
154 | self.db.update_record(msg_id, dict(completed=datetime.now())) | |
158 | rec = self.db.get_record(msg_id) |
|
155 | rec = self.db.get_record(msg_id) | |
159 | self.assertTrue(isinstance(rec['completed'], datetime)) |
|
156 | self.assertTrue(isinstance(rec['completed'], datetime)) | |
|
157 | ||||
|
158 | def test_drop_matching(self): | |||
|
159 | msg_ids = self.load_records(10) | |||
|
160 | query = {'msg_id' : {'$in':msg_ids}} | |||
|
161 | self.db.drop_matching_records(query) | |||
|
162 | recs = self.db.find_records(query) | |||
|
163 | self.assertTrue(len(recs)==0) | |||
160 |
|
164 | |||
161 | class TestSQLiteBackend(TestDictBackend): |
|
165 | class TestSQLiteBackend(TestDictBackend): | |
162 | def create_db(self): |
|
166 | def create_db(self): | |
@@ -164,19 +168,3 b' class TestSQLiteBackend(TestDictBackend):' | |||||
164 |
|
168 | |||
165 | def tearDown(self): |
|
169 | def tearDown(self): | |
166 | self.db._db.close() |
|
170 | self.db._db.close() | |
167 |
|
||||
168 | # optional MongoDB test |
|
|||
169 | try: |
|
|||
170 | from IPython.parallel.controller.mongodb import MongoDB |
|
|||
171 | except ImportError: |
|
|||
172 | pass |
|
|||
173 | else: |
|
|||
174 | class TestMongoBackend(TestDictBackend): |
|
|||
175 | def create_db(self): |
|
|||
176 | try: |
|
|||
177 | return MongoDB(database='iptestdb') |
|
|||
178 | except Exception: |
|
|||
179 | raise SkipTest("Couldn't connect to mongodb instance") |
|
|||
180 |
|
||||
181 | def tearDown(self): |
|
|||
182 | self.db._connection.drop_database('iptestdb') |
|
@@ -199,6 +199,7 b' def make_exclude():' | |||||
199 |
|
199 | |||
200 | if not have['pymongo']: |
|
200 | if not have['pymongo']: | |
201 | exclusions.append(ipjoin('parallel', 'controller', 'mongodb')) |
|
201 | exclusions.append(ipjoin('parallel', 'controller', 'mongodb')) | |
|
202 | exclusions.append(ipjoin('parallel', 'tests', 'test_mongodb')) | |||
202 |
|
203 | |||
203 | if not have['matplotlib']: |
|
204 | if not have['matplotlib']: | |
204 | exclusions.extend([ipjoin('lib', 'pylabtools'), |
|
205 | exclusions.extend([ipjoin('lib', 'pylabtools'), |
General Comments 0
You need to be logged in to leave comments.
Login now