test_mongodb.py
52 lines
| 1.3 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """Tests for mongodb backend | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3875 | |||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
Michał Górny
|
r9539 | import os | ||
MinRK
|
r7533 | from unittest import TestCase | ||
MinRK
|
r3875 | from nose import SkipTest | ||
from pymongo import Connection | ||||
from IPython.parallel.controller.mongodb import MongoDB | ||||
from . import test_db | ||||
Michał Górny
|
r9539 | conn_kwargs = {} | ||
if 'DB_IP' in os.environ: | ||||
conn_kwargs['host'] = os.environ['DB_IP'] | ||||
if 'DB_PORT' in os.environ: | ||||
conn_kwargs['port'] = int(os.environ['DB_PORT']) | ||||
MinRK
|
r3875 | try: | ||
Michał Górny
|
r9539 | c = Connection(**conn_kwargs) | ||
MinRK
|
r3875 | except Exception: | ||
c=None | ||||
MinRK
|
r7533 | class TestMongoBackend(test_db.TaskDBTest, TestCase): | ||
MinRK
|
r3875 | """MongoDB backend tests""" | ||
def create_db(self): | ||||
try: | ||||
return MongoDB(database='iptestdb', _connection=c) | ||||
except Exception: | ||||
raise SkipTest("Couldn't connect to mongodb") | ||||
def teardown(self): | ||||
if c is not None: | ||||
c.drop_database('iptestdb') | ||||