##// END OF EJS Templates
Acknowledge T. Kluyver's work in credits.
Acknowledge T. Kluyver's work in credits.

File last commit:

r2498:3eae1372
r3078:8ea3b505
Show More
test_notification.py
150 lines | 4.5 KiB | text/x-python | PythonLexer
/ IPython / utils / tests / test_notification.py
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 # encoding: utf-8
"""This file contains unittests for the notification.py module."""
#-----------------------------------------------------------------------------
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963 # Copyright (C) 2008-2009 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is
# in the file COPYING, distributed as part of this software.
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 #-----------------------------------------------------------------------------
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 #-----------------------------------------------------------------------------
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963 # Imports
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 #-----------------------------------------------------------------------------
Brian Granger
Semi-working refactored ipcluster....
r2302 import unittest
Brian Granger
Fixing misc testing related things.
r1960
Brian Granger
Work to address the review comments on Fernando's branch....
r2498 from IPython.utils.notification import shared_center
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963 #-----------------------------------------------------------------------------
# Support Classes
#-----------------------------------------------------------------------------
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 class Observer(object):
Brian Granger
Semi-working refactored ipcluster....
r2302
def __init__(self, expected_ntype, expected_sender,
center=shared_center, *args, **kwargs):
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 super(Observer, self).__init__()
Brian Granger
Semi-working refactored ipcluster....
r2302 self.expected_ntype = expected_ntype
self.expected_sender = expected_sender
self.expected_args = args
self.expected_kwargs = kwargs
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 self.recieved = False
Barry Wark
fixed None for type/sender
r1411 center.add_observer(self.callback,
Brian Granger
Semi-working refactored ipcluster....
r2302 self.expected_ntype,
self.expected_sender)
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
Brian Granger
Semi-working refactored ipcluster....
r2302 def callback(self, ntype, sender, *args, **kwargs):
assert(ntype == self.expected_ntype or
self.expected_ntype == None)
assert(sender == self.expected_sender or
self.expected_sender == None)
assert(args == self.expected_args)
assert(kwargs == self.expected_kwargs)
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 self.recieved = True
def verify(self):
assert(self.recieved)
Barry Wark
fixed None for type/sender
r1411 def reset(self):
self.recieved = False
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
class Notifier(object):
Brian Granger
Semi-working refactored ipcluster....
r2302
def __init__(self, ntype, **kwargs):
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 super(Notifier, self).__init__()
Brian Granger
Semi-working refactored ipcluster....
r2302 self.ntype = ntype
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 self.kwargs = kwargs
Brian Granger
Semi-working refactored ipcluster....
r2302
def post(self, center=shared_center):
center.post_notification(self.ntype, self,
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 **self.kwargs)
Brian Granger
Semi-working refactored ipcluster....
r2302
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963 #-----------------------------------------------------------------------------
# Tests
#-----------------------------------------------------------------------------
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 class NotificationTests(unittest.TestCase):
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 def tearDown(self):
Brian Granger
Semi-working refactored ipcluster....
r2302 shared_center.remove_all_observers()
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
Barry Wark
fixed None for type/sender
r1411 def test_notification_delivered(self):
"""Test that notifications are delivered"""
Brian Granger
Semi-working refactored ipcluster....
r2302
expected_ntype = 'EXPECTED_TYPE'
sender = Notifier(expected_ntype)
observer = Observer(expected_ntype, sender)
Barry Wark
fixed None for type/sender
r1411 sender.post()
observer.verify()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 def test_type_specificity(self):
"""Test that observers are registered by type"""
Brian Granger
Semi-working refactored ipcluster....
r2302
expected_ntype = 1
unexpected_ntype = "UNEXPECTED_TYPE"
sender = Notifier(expected_ntype)
unexpected_sender = Notifier(unexpected_ntype)
observer = Observer(expected_ntype, sender)
Barry Wark
fixed None for type/sender
r1411 sender.post()
Brian Granger
Semi-working refactored ipcluster....
r2302 unexpected_sender.post()
Barry Wark
fixed None for type/sender
r1411 observer.verify()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 def test_sender_specificity(self):
"""Test that observers are registered by sender"""
Brian Granger
Semi-working refactored ipcluster....
r2302 expected_ntype = "EXPECTED_TYPE"
sender1 = Notifier(expected_ntype)
sender2 = Notifier(expected_ntype)
observer = Observer(expected_ntype, sender1)
Barry Wark
fixed None for type/sender
r1411 sender1.post()
sender2.post()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 observer.verify()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 def test_remove_all_observers(self):
"""White-box test for remove_all_observers"""
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 for i in xrange(10):
Brian Granger
Semi-working refactored ipcluster....
r2302 Observer('TYPE', None, center=shared_center)
self.assert_(len(shared_center.observers[('TYPE',None)]) >= 10,
Barry Wark
fixed None for type/sender
r1411 "observers registered")
Brian Granger
Semi-working refactored ipcluster....
r2302
shared_center.remove_all_observers()
self.assert_(len(shared_center.observers) == 0, "observers removed")
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963
Barry Wark
fixed None for type/sender
r1411 def test_any_sender(self):
Brian Granger
Semi-working refactored ipcluster....
r2302 expected_ntype = "EXPECTED_TYPE"
sender1 = Notifier(expected_ntype)
sender2 = Notifier(expected_ntype)
observer = Observer(expected_ntype, None)
Barry Wark
fixed None for type/sender
r1411 sender1.post()
observer.verify()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 observer.reset()
sender2.post()
observer.verify()
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963
Barry Wark
fixed None for type/sender
r1411 def test_post_performance(self):
"""Test that post_notification, even with many registered irrelevant
observers is fast"""
for i in xrange(10):
Observer("UNRELATED_TYPE", None)
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 o = Observer('EXPECTED_TYPE', None)
Brian Granger
Semi-working refactored ipcluster....
r2302 shared_center.post_notification('EXPECTED_TYPE', self)
Barry Wark
fixed None for type/sender
r1411 o.verify()
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963
Brian Granger
Semi-working refactored ipcluster....
r2302