test_db.py
314 lines
| 11.6 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """Tests for db backends | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3780 | |||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
MinRK
|
r4155 | from __future__ import division | ||
MinRK
|
r3780 | |||
MinRK
|
r6815 | import logging | ||
MinRK
|
r6496 | import os | ||
MinRK
|
r3780 | import tempfile | ||
import time | ||||
from datetime import datetime, timedelta | ||||
from unittest import TestCase | ||||
MinRK
|
r4006 | from IPython.parallel import error | ||
MinRK
|
r3780 | from IPython.parallel.controller.dictdb import DictDB | ||
from IPython.parallel.controller.sqlitedb import SQLiteDB | ||||
from IPython.parallel.controller.hub import init_record, empty_record | ||||
MinRK
|
r5147 | from IPython.testing import decorators as dec | ||
MinRK
|
r4006 | from IPython.zmq.session import Session | ||
MinRK
|
r3780 | #------------------------------------------------------------------------------- | ||
# TestCases | ||||
#------------------------------------------------------------------------------- | ||||
MinRK
|
r6506 | |||
def setup(): | ||||
global temp_db | ||||
temp_db = tempfile.NamedTemporaryFile(suffix='.db').name | ||||
MinRK
|
r7533 | class TaskDBTest: | ||
MinRK
|
r3780 | def setUp(self): | ||
MinRK
|
r4006 | self.session = Session() | ||
MinRK
|
r3780 | self.db = self.create_db() | ||
self.load_records(16) | ||||
def create_db(self): | ||||
MinRK
|
r7533 | raise NotImplementedError | ||
MinRK
|
r3780 | |||
MinRK
|
r7533 | def load_records(self, n=1, buffer_size=100): | ||
MinRK
|
r3780 | """load n records for testing""" | ||
#sleep 1/10 s, to ensure timestamp is different to previous calls | ||||
time.sleep(0.1) | ||||
msg_ids = [] | ||||
for i in range(n): | ||||
msg = self.session.msg('apply_request', content=dict(a=5)) | ||||
MinRK
|
r7533 | msg['buffers'] = [os.urandom(buffer_size)] | ||
MinRK
|
r3780 | rec = init_record(msg) | ||
Brian E. Granger
|
r4236 | msg_id = msg['header']['msg_id'] | ||
msg_ids.append(msg_id) | ||||
self.db.add_record(msg_id, rec) | ||||
MinRK
|
r3780 | return msg_ids | ||
def test_add_record(self): | ||||
before = self.db.get_history() | ||||
self.load_records(5) | ||||
after = self.db.get_history() | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(len(after), len(before)+5) | ||
self.assertEqual(after[:-5],before) | ||||
MinRK
|
r3780 | |||
def test_drop_record(self): | ||||
msg_id = self.load_records()[-1] | ||||
rec = self.db.get_record(msg_id) | ||||
self.db.drop_record(msg_id) | ||||
self.assertRaises(KeyError,self.db.get_record, msg_id) | ||||
def _round_to_millisecond(self, dt): | ||||
"""necessary because mongodb rounds microseconds""" | ||||
micro = dt.microsecond | ||||
extra = int(str(micro)[-3:]) | ||||
return dt - timedelta(microseconds=extra) | ||||
def test_update_record(self): | ||||
now = self._round_to_millisecond(datetime.now()) | ||||
# | ||||
msg_id = self.db.get_history()[-1] | ||||
rec1 = self.db.get_record(msg_id) | ||||
data = {'stdout': 'hello there', 'completed' : now} | ||||
self.db.update_record(msg_id, data) | ||||
rec2 = self.db.get_record(msg_id) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(rec2['stdout'], 'hello there') | ||
self.assertEqual(rec2['completed'], now) | ||||
MinRK
|
r3780 | rec1.update(data) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(rec1, rec2) | ||
MinRK
|
r3780 | |||
# def test_update_record_bad(self): | ||||
# """test updating nonexistant records""" | ||||
# msg_id = str(uuid.uuid4()) | ||||
# data = {'stdout': 'hello there'} | ||||
# self.assertRaises(KeyError, self.db.update_record, msg_id, data) | ||||
def test_find_records_dt(self): | ||||
"""test finding records by date""" | ||||
hist = self.db.get_history() | ||||
MinRK
|
r4155 | middle = self.db.get_record(hist[len(hist)//2]) | ||
MinRK
|
r3780 | tic = middle['submitted'] | ||
before = self.db.find_records({'submitted' : {'$lt' : tic}}) | ||||
after = self.db.find_records({'submitted' : {'$gte' : tic}}) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(len(before)+len(after),len(hist)) | ||
MinRK
|
r3780 | for b in before: | ||
self.assertTrue(b['submitted'] < tic) | ||||
for a in after: | ||||
self.assertTrue(a['submitted'] >= tic) | ||||
same = self.db.find_records({'submitted' : tic}) | ||||
for s in same: | ||||
self.assertTrue(s['submitted'] == tic) | ||||
def test_find_records_keys(self): | ||||
"""test extracting subset of record keys""" | ||||
found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed']) | ||||
for rec in found: | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed'])) | ||
MinRK
|
r3780 | |||
def test_find_records_msg_id(self): | ||||
"""ensure msg_id is always in found records""" | ||||
found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed']) | ||||
for rec in found: | ||||
self.assertTrue('msg_id' in rec.keys()) | ||||
found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted']) | ||||
for rec in found: | ||||
self.assertTrue('msg_id' in rec.keys()) | ||||
found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['msg_id']) | ||||
for rec in found: | ||||
self.assertTrue('msg_id' in rec.keys()) | ||||
def test_find_records_in(self): | ||||
"""test finding records with '$in','$nin' operators""" | ||||
hist = self.db.get_history() | ||||
even = hist[::2] | ||||
odd = hist[1::2] | ||||
recs = self.db.find_records({ 'msg_id' : {'$in' : even}}) | ||||
found = [ r['msg_id'] for r in recs ] | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(set(even), set(found)) | ||
MinRK
|
r3780 | recs = self.db.find_records({ 'msg_id' : {'$nin' : even}}) | ||
found = [ r['msg_id'] for r in recs ] | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(set(odd), set(found)) | ||
MinRK
|
r3780 | |||
def test_get_history(self): | ||||
msg_ids = self.db.get_history() | ||||
latest = datetime(1984,1,1) | ||||
for msg_id in msg_ids: | ||||
rec = self.db.get_record(msg_id) | ||||
newt = rec['submitted'] | ||||
self.assertTrue(newt >= latest) | ||||
latest = newt | ||||
msg_id = self.load_records(1)[-1] | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(self.db.get_history()[-1],msg_id) | ||
MinRK
|
r3780 | |||
def test_datetime(self): | ||||
"""get/set timestamps with datetime objects""" | ||||
msg_id = self.db.get_history()[-1] | ||||
rec = self.db.get_record(msg_id) | ||||
self.assertTrue(isinstance(rec['submitted'], datetime)) | ||||
self.db.update_record(msg_id, dict(completed=datetime.now())) | ||||
rec = self.db.get_record(msg_id) | ||||
self.assertTrue(isinstance(rec['completed'], datetime)) | ||||
MinRK
|
r3875 | |||
def test_drop_matching(self): | ||||
msg_ids = self.load_records(10) | ||||
query = {'msg_id' : {'$in':msg_ids}} | ||||
self.db.drop_matching_records(query) | ||||
recs = self.db.find_records(query) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(len(recs), 0) | ||
MinRK
|
r5884 | |||
def test_null(self): | ||||
"""test None comparison queries""" | ||||
msg_ids = self.load_records(10) | ||||
query = {'msg_id' : None} | ||||
recs = self.db.find_records(query) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(len(recs), 0) | ||
MinRK
|
r5884 | |||
query = {'msg_id' : {'$ne' : None}} | ||||
recs = self.db.find_records(query) | ||||
self.assertTrue(len(recs) >= 10) | ||||
MinRK
|
r6323 | |||
def test_pop_safe_get(self): | ||||
"""editing query results shouldn't affect record [get]""" | ||||
msg_id = self.db.get_history()[-1] | ||||
rec = self.db.get_record(msg_id) | ||||
rec.pop('buffers') | ||||
rec['garbage'] = 'hello' | ||||
MinRK
|
r7261 | rec['header']['msg_id'] = 'fubar' | ||
MinRK
|
r6323 | rec2 = self.db.get_record(msg_id) | ||
self.assertTrue('buffers' in rec2) | ||||
self.assertFalse('garbage' in rec2) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(rec2['header']['msg_id'], msg_id) | ||
MinRK
|
r6323 | |||
def test_pop_safe_find(self): | ||||
"""editing query results shouldn't affect record [find]""" | ||||
msg_id = self.db.get_history()[-1] | ||||
rec = self.db.find_records({'msg_id' : msg_id})[0] | ||||
rec.pop('buffers') | ||||
rec['garbage'] = 'hello' | ||||
MinRK
|
r7261 | rec['header']['msg_id'] = 'fubar' | ||
MinRK
|
r6323 | rec2 = self.db.find_records({'msg_id' : msg_id})[0] | ||
self.assertTrue('buffers' in rec2) | ||||
self.assertFalse('garbage' in rec2) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(rec2['header']['msg_id'], msg_id) | ||
MinRK
|
r6323 | |||
def test_pop_safe_find_keys(self): | ||||
"""editing query results shouldn't affect record [find+keys]""" | ||||
msg_id = self.db.get_history()[-1] | ||||
MinRK
|
r7261 | rec = self.db.find_records({'msg_id' : msg_id}, keys=['buffers', 'header'])[0] | ||
MinRK
|
r6323 | rec.pop('buffers') | ||
rec['garbage'] = 'hello' | ||||
MinRK
|
r7261 | rec['header']['msg_id'] = 'fubar' | ||
MinRK
|
r6323 | rec2 = self.db.find_records({'msg_id' : msg_id})[0] | ||
self.assertTrue('buffers' in rec2) | ||||
self.assertFalse('garbage' in rec2) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(rec2['header']['msg_id'], msg_id) | ||
MinRK
|
r5147 | |||
MinRK
|
r7533 | class TestDictBackend(TaskDBTest, TestCase): | ||
def create_db(self): | ||||
return DictDB() | ||||
def test_cull_count(self): | ||||
self.db = self.create_db() # skip the load-records init from setUp | ||||
self.db.record_limit = 20 | ||||
self.db.cull_fraction = 0.2 | ||||
self.load_records(20) | ||||
self.assertEquals(len(self.db.get_history()), 20) | ||||
self.load_records(1) | ||||
# 0.2 * 20 = 4, 21 - 4 = 17 | ||||
self.assertEquals(len(self.db.get_history()), 17) | ||||
self.load_records(3) | ||||
self.assertEquals(len(self.db.get_history()), 20) | ||||
self.load_records(1) | ||||
self.assertEquals(len(self.db.get_history()), 17) | ||||
for i in range(100): | ||||
self.load_records(1) | ||||
self.assertTrue(len(self.db.get_history()) >= 17) | ||||
self.assertTrue(len(self.db.get_history()) <= 20) | ||||
def test_cull_size(self): | ||||
self.db = self.create_db() # skip the load-records init from setUp | ||||
self.db.size_limit = 1000 | ||||
self.db.cull_fraction = 0.2 | ||||
self.load_records(100, buffer_size=10) | ||||
self.assertEquals(len(self.db.get_history()), 100) | ||||
self.load_records(1, buffer_size=0) | ||||
self.assertEquals(len(self.db.get_history()), 101) | ||||
self.load_records(1, buffer_size=1) | ||||
# 0.2 * 100 = 20, 101 - 20 = 81 | ||||
self.assertEquals(len(self.db.get_history()), 81) | ||||
def test_cull_size_drop(self): | ||||
"""dropping records updates tracked buffer size""" | ||||
self.db = self.create_db() # skip the load-records init from setUp | ||||
self.db.size_limit = 1000 | ||||
self.db.cull_fraction = 0.2 | ||||
self.load_records(100, buffer_size=10) | ||||
self.assertEquals(len(self.db.get_history()), 100) | ||||
self.db.drop_record(self.db.get_history()[-1]) | ||||
self.assertEquals(len(self.db.get_history()), 99) | ||||
self.load_records(1, buffer_size=5) | ||||
self.assertEquals(len(self.db.get_history()), 100) | ||||
self.load_records(1, buffer_size=5) | ||||
self.assertEquals(len(self.db.get_history()), 101) | ||||
self.load_records(1, buffer_size=1) | ||||
self.assertEquals(len(self.db.get_history()), 81) | ||||
def test_cull_size_update(self): | ||||
"""updating records updates tracked buffer size""" | ||||
self.db = self.create_db() # skip the load-records init from setUp | ||||
self.db.size_limit = 1000 | ||||
self.db.cull_fraction = 0.2 | ||||
self.load_records(100, buffer_size=10) | ||||
self.assertEquals(len(self.db.get_history()), 100) | ||||
msg_id = self.db.get_history()[-1] | ||||
self.db.update_record(msg_id, dict(result_buffers = [os.urandom(10)], buffers=[])) | ||||
self.assertEquals(len(self.db.get_history()), 100) | ||||
self.db.update_record(msg_id, dict(result_buffers = [os.urandom(11)], buffers=[])) | ||||
self.assertEquals(len(self.db.get_history()), 79) | ||||
class TestSQLiteBackend(TaskDBTest, TestCase): | ||||
MinRK
|
r5147 | |||
@dec.skip_without('sqlite3') | ||||
MinRK
|
r3780 | def create_db(self): | ||
MinRK
|
r6506 | location, fname = os.path.split(temp_db) | ||
MinRK
|
r6815 | log = logging.getLogger('test') | ||
log.setLevel(logging.CRITICAL) | ||||
return SQLiteDB(location=location, fname=fname, log=log) | ||||
MinRK
|
r3780 | |||
def tearDown(self): | ||||
self.db._db.close() | ||||
MinRK
|
r6496 | |||
def teardown(): | ||||
"""cleanup task db file after all tests have run""" | ||||
try: | ||||
MinRK
|
r6506 | os.remove(temp_db) | ||
MinRK
|
r6496 | except: | ||
pass | ||||