test_notification.py
154 lines
| 4.5 KiB
| text/x-python
|
PythonLexer
Barry Wark
|
r1410 | # encoding: utf-8 | ||
"""This file contains unittests for the notification.py module.""" | ||||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r1963 | # Copyright (C) 2008-2009 The IPython Development Team | ||
# | ||||
# Distributed under the terms of the BSD License. The full license is | ||||
# in the file COPYING, distributed as part of this software. | ||||
Barry Wark
|
r1410 | #----------------------------------------------------------------------------- | ||
Brian Granger
|
r1963 | |||
Barry Wark
|
r1410 | #----------------------------------------------------------------------------- | ||
Brian Granger
|
r1963 | # Imports | ||
Barry Wark
|
r1410 | #----------------------------------------------------------------------------- | ||
Brian Granger
|
r2302 | import unittest | ||
Brian Granger
|
r1960 | |||
Brian Granger
|
r2302 | from IPython.utils.notification import ( | ||
NotificationCenter, | ||||
NotificationError, | ||||
shared_center | ||||
) | ||||
Barry Wark
|
r1410 | |||
Brian Granger
|
r1963 | #----------------------------------------------------------------------------- | ||
# Support Classes | ||||
#----------------------------------------------------------------------------- | ||||
Barry Wark
|
r1410 | |||
Brian Granger
|
r2302 | |||
Barry Wark
|
r1410 | class Observer(object): | ||
Brian Granger
|
r2302 | |||
def __init__(self, expected_ntype, expected_sender, | ||||
center=shared_center, *args, **kwargs): | ||||
Barry Wark
|
r1410 | super(Observer, self).__init__() | ||
Brian Granger
|
r2302 | self.expected_ntype = expected_ntype | ||
self.expected_sender = expected_sender | ||||
self.expected_args = args | ||||
self.expected_kwargs = kwargs | ||||
Barry Wark
|
r1410 | self.recieved = False | ||
Barry Wark
|
r1411 | center.add_observer(self.callback, | ||
Brian Granger
|
r2302 | self.expected_ntype, | ||
self.expected_sender) | ||||
Barry Wark
|
r1410 | |||
Brian Granger
|
r2302 | def callback(self, ntype, sender, *args, **kwargs): | ||
assert(ntype == self.expected_ntype or | ||||
self.expected_ntype == None) | ||||
assert(sender == self.expected_sender or | ||||
self.expected_sender == None) | ||||
assert(args == self.expected_args) | ||||
assert(kwargs == self.expected_kwargs) | ||||
Barry Wark
|
r1410 | self.recieved = True | ||
def verify(self): | ||||
assert(self.recieved) | ||||
Barry Wark
|
r1411 | def reset(self): | ||
self.recieved = False | ||||
Barry Wark
|
r1410 | |||
class Notifier(object): | ||||
Brian Granger
|
r2302 | |||
def __init__(self, ntype, **kwargs): | ||||
Barry Wark
|
r1410 | super(Notifier, self).__init__() | ||
Brian Granger
|
r2302 | self.ntype = ntype | ||
Barry Wark
|
r1410 | self.kwargs = kwargs | ||
Brian Granger
|
r2302 | |||
def post(self, center=shared_center): | ||||
center.post_notification(self.ntype, self, | ||||
Barry Wark
|
r1410 | **self.kwargs) | ||
Brian Granger
|
r2302 | |||
Brian Granger
|
r1963 | #----------------------------------------------------------------------------- | ||
# Tests | ||||
#----------------------------------------------------------------------------- | ||||
Barry Wark
|
r1410 | |||
Brian Granger
|
r2302 | |||
Barry Wark
|
r1411 | class NotificationTests(unittest.TestCase): | ||
Brian Granger
|
r2302 | |||
Barry Wark
|
r1411 | def tearDown(self): | ||
Brian Granger
|
r2302 | shared_center.remove_all_observers() | ||
Barry Wark
|
r1410 | |||
Barry Wark
|
r1411 | def test_notification_delivered(self): | ||
"""Test that notifications are delivered""" | ||||
Brian Granger
|
r2302 | |||
expected_ntype = 'EXPECTED_TYPE' | ||||
sender = Notifier(expected_ntype) | ||||
observer = Observer(expected_ntype, sender) | ||||
Barry Wark
|
r1411 | sender.post() | ||
observer.verify() | ||||
Brian Granger
|
r2302 | |||
Barry Wark
|
r1411 | def test_type_specificity(self): | ||
"""Test that observers are registered by type""" | ||||
Brian Granger
|
r2302 | |||
expected_ntype = 1 | ||||
unexpected_ntype = "UNEXPECTED_TYPE" | ||||
sender = Notifier(expected_ntype) | ||||
unexpected_sender = Notifier(unexpected_ntype) | ||||
observer = Observer(expected_ntype, sender) | ||||
Barry Wark
|
r1411 | sender.post() | ||
Brian Granger
|
r2302 | unexpected_sender.post() | ||
Barry Wark
|
r1411 | observer.verify() | ||
Brian Granger
|
r2302 | |||
Barry Wark
|
r1411 | def test_sender_specificity(self): | ||
"""Test that observers are registered by sender""" | ||||
Brian Granger
|
r2302 | expected_ntype = "EXPECTED_TYPE" | ||
sender1 = Notifier(expected_ntype) | ||||
sender2 = Notifier(expected_ntype) | ||||
observer = Observer(expected_ntype, sender1) | ||||
Barry Wark
|
r1411 | sender1.post() | ||
sender2.post() | ||||
Brian Granger
|
r2302 | |||
Barry Wark
|
r1411 | observer.verify() | ||
Brian Granger
|
r2302 | |||
Barry Wark
|
r1411 | def test_remove_all_observers(self): | ||
"""White-box test for remove_all_observers""" | ||||
Brian Granger
|
r2302 | |||
Barry Wark
|
r1411 | for i in xrange(10): | ||
Brian Granger
|
r2302 | Observer('TYPE', None, center=shared_center) | ||
self.assert_(len(shared_center.observers[('TYPE',None)]) >= 10, | ||||
Barry Wark
|
r1411 | "observers registered") | ||
Brian Granger
|
r2302 | |||
shared_center.remove_all_observers() | ||||
self.assert_(len(shared_center.observers) == 0, "observers removed") | ||||
Brian Granger
|
r1963 | |||
Barry Wark
|
r1411 | def test_any_sender(self): | ||
Brian Granger
|
r2302 | expected_ntype = "EXPECTED_TYPE" | ||
sender1 = Notifier(expected_ntype) | ||||
sender2 = Notifier(expected_ntype) | ||||
observer = Observer(expected_ntype, None) | ||||
Barry Wark
|
r1411 | sender1.post() | ||
observer.verify() | ||||
Brian Granger
|
r2302 | |||
Barry Wark
|
r1411 | observer.reset() | ||
sender2.post() | ||||
observer.verify() | ||||
Brian Granger
|
r1963 | |||
Barry Wark
|
r1411 | def test_post_performance(self): | ||
"""Test that post_notification, even with many registered irrelevant | ||||
observers is fast""" | ||||
for i in xrange(10): | ||||
Observer("UNRELATED_TYPE", None) | ||||
Brian Granger
|
r2302 | |||
Barry Wark
|
r1411 | o = Observer('EXPECTED_TYPE', None) | ||
Brian Granger
|
r2302 | shared_center.post_notification('EXPECTED_TYPE', self) | ||
Barry Wark
|
r1411 | o.verify() | ||
Brian Granger
|
r1963 | |||
Brian Granger
|
r2302 | |||