##// END OF EJS Templates
Progress towards getting the test suite in shape again....
Progress towards getting the test suite in shape again. Work all over the place to get more tests to pass.

File last commit:

r2302:c7310047
r2392:80df84f7
Show More
test_notification.py
154 lines | 4.5 KiB | text/x-python | PythonLexer
/ IPython / utils / tests / test_notification.py
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 # encoding: utf-8
"""This file contains unittests for the notification.py module."""
#-----------------------------------------------------------------------------
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963 # Copyright (C) 2008-2009 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is
# in the file COPYING, distributed as part of this software.
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 #-----------------------------------------------------------------------------
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 #-----------------------------------------------------------------------------
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963 # Imports
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 #-----------------------------------------------------------------------------
Brian Granger
Semi-working refactored ipcluster....
r2302 import unittest
Brian Granger
Fixing misc testing related things.
r1960
Brian Granger
Semi-working refactored ipcluster....
r2302 from IPython.utils.notification import (
NotificationCenter,
NotificationError,
shared_center
)
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963 #-----------------------------------------------------------------------------
# Support Classes
#-----------------------------------------------------------------------------
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 class Observer(object):
Brian Granger
Semi-working refactored ipcluster....
r2302
def __init__(self, expected_ntype, expected_sender,
center=shared_center, *args, **kwargs):
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 super(Observer, self).__init__()
Brian Granger
Semi-working refactored ipcluster....
r2302 self.expected_ntype = expected_ntype
self.expected_sender = expected_sender
self.expected_args = args
self.expected_kwargs = kwargs
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 self.recieved = False
Barry Wark
fixed None for type/sender
r1411 center.add_observer(self.callback,
Brian Granger
Semi-working refactored ipcluster....
r2302 self.expected_ntype,
self.expected_sender)
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
Brian Granger
Semi-working refactored ipcluster....
r2302 def callback(self, ntype, sender, *args, **kwargs):
assert(ntype == self.expected_ntype or
self.expected_ntype == None)
assert(sender == self.expected_sender or
self.expected_sender == None)
assert(args == self.expected_args)
assert(kwargs == self.expected_kwargs)
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 self.recieved = True
def verify(self):
assert(self.recieved)
Barry Wark
fixed None for type/sender
r1411 def reset(self):
self.recieved = False
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
class Notifier(object):
Brian Granger
Semi-working refactored ipcluster....
r2302
def __init__(self, ntype, **kwargs):
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 super(Notifier, self).__init__()
Brian Granger
Semi-working refactored ipcluster....
r2302 self.ntype = ntype
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 self.kwargs = kwargs
Brian Granger
Semi-working refactored ipcluster....
r2302
def post(self, center=shared_center):
center.post_notification(self.ntype, self,
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 **self.kwargs)
Brian Granger
Semi-working refactored ipcluster....
r2302
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963 #-----------------------------------------------------------------------------
# Tests
#-----------------------------------------------------------------------------
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 class NotificationTests(unittest.TestCase):
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 def tearDown(self):
Brian Granger
Semi-working refactored ipcluster....
r2302 shared_center.remove_all_observers()
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
Barry Wark
fixed None for type/sender
r1411 def test_notification_delivered(self):
"""Test that notifications are delivered"""
Brian Granger
Semi-working refactored ipcluster....
r2302
expected_ntype = 'EXPECTED_TYPE'
sender = Notifier(expected_ntype)
observer = Observer(expected_ntype, sender)
Barry Wark
fixed None for type/sender
r1411 sender.post()
observer.verify()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 def test_type_specificity(self):
"""Test that observers are registered by type"""
Brian Granger
Semi-working refactored ipcluster....
r2302
expected_ntype = 1
unexpected_ntype = "UNEXPECTED_TYPE"
sender = Notifier(expected_ntype)
unexpected_sender = Notifier(unexpected_ntype)
observer = Observer(expected_ntype, sender)
Barry Wark
fixed None for type/sender
r1411 sender.post()
Brian Granger
Semi-working refactored ipcluster....
r2302 unexpected_sender.post()
Barry Wark
fixed None for type/sender
r1411 observer.verify()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 def test_sender_specificity(self):
"""Test that observers are registered by sender"""
Brian Granger
Semi-working refactored ipcluster....
r2302 expected_ntype = "EXPECTED_TYPE"
sender1 = Notifier(expected_ntype)
sender2 = Notifier(expected_ntype)
observer = Observer(expected_ntype, sender1)
Barry Wark
fixed None for type/sender
r1411 sender1.post()
sender2.post()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 observer.verify()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 def test_remove_all_observers(self):
"""White-box test for remove_all_observers"""
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 for i in xrange(10):
Brian Granger
Semi-working refactored ipcluster....
r2302 Observer('TYPE', None, center=shared_center)
self.assert_(len(shared_center.observers[('TYPE',None)]) >= 10,
Barry Wark
fixed None for type/sender
r1411 "observers registered")
Brian Granger
Semi-working refactored ipcluster....
r2302
shared_center.remove_all_observers()
self.assert_(len(shared_center.observers) == 0, "observers removed")
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963
Barry Wark
fixed None for type/sender
r1411 def test_any_sender(self):
Brian Granger
Semi-working refactored ipcluster....
r2302 expected_ntype = "EXPECTED_TYPE"
sender1 = Notifier(expected_ntype)
sender2 = Notifier(expected_ntype)
observer = Observer(expected_ntype, None)
Barry Wark
fixed None for type/sender
r1411 sender1.post()
observer.verify()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 observer.reset()
sender2.post()
observer.verify()
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963
Barry Wark
fixed None for type/sender
r1411 def test_post_performance(self):
"""Test that post_notification, even with many registered irrelevant
observers is fast"""
for i in xrange(10):
Observer("UNRELATED_TYPE", None)
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 o = Observer('EXPECTED_TYPE', None)
Brian Granger
Semi-working refactored ipcluster....
r2302 shared_center.post_notification('EXPECTED_TYPE', self)
Barry Wark
fixed None for type/sender
r1411 o.verify()
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963
Brian Granger
Semi-working refactored ipcluster....
r2302