##// END OF EJS Templates
Clean code, retab and minor fix...
Clean code, retab and minor fix remove unused code, convert some tab to space, and correct some semicolon according to jslint jlint fixes

File last commit:

r5390:c82649ea
r7170:e0eb36bc
Show More
test_notification.py
150 lines | 4.4 KiB | text/x-python | PythonLexer
/ IPython / utils / tests / test_notification.py
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 # encoding: utf-8
"""This file contains unittests for the notification.py module."""
#-----------------------------------------------------------------------------
Matthias BUSSONNIER
update copyright to 2011/20xx-2011...
r5390 # Copyright (C) 2008-2011 The IPython Development Team
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963 #
# Distributed under the terms of the BSD License. The full license is
# in the file COPYING, distributed as part of this software.
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 #-----------------------------------------------------------------------------
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 #-----------------------------------------------------------------------------
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963 # Imports
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 #-----------------------------------------------------------------------------
Brian Granger
Semi-working refactored ipcluster....
r2302 import unittest
Brian Granger
Fixing misc testing related things.
r1960
Brian Granger
Work to address the review comments on Fernando's branch....
r2498 from IPython.utils.notification import shared_center
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963 #-----------------------------------------------------------------------------
# Support Classes
#-----------------------------------------------------------------------------
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 class Observer(object):
Brian Granger
Semi-working refactored ipcluster....
r2302
Bernardo B. Marques
remove all trailling spaces
r4872 def __init__(self, expected_ntype, expected_sender,
Brian Granger
Semi-working refactored ipcluster....
r2302 center=shared_center, *args, **kwargs):
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 super(Observer, self).__init__()
Brian Granger
Semi-working refactored ipcluster....
r2302 self.expected_ntype = expected_ntype
self.expected_sender = expected_sender
self.expected_args = args
self.expected_kwargs = kwargs
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 self.recieved = False
Bernardo B. Marques
remove all trailling spaces
r4872 center.add_observer(self.callback,
self.expected_ntype,
Brian Granger
Semi-working refactored ipcluster....
r2302 self.expected_sender)
Bernardo B. Marques
remove all trailling spaces
r4872
Brian Granger
Semi-working refactored ipcluster....
r2302 def callback(self, ntype, sender, *args, **kwargs):
assert(ntype == self.expected_ntype or
self.expected_ntype == None)
assert(sender == self.expected_sender or
self.expected_sender == None)
assert(args == self.expected_args)
assert(kwargs == self.expected_kwargs)
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 self.recieved = True
Bernardo B. Marques
remove all trailling spaces
r4872
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 def verify(self):
assert(self.recieved)
Bernardo B. Marques
remove all trailling spaces
r4872
Barry Wark
fixed None for type/sender
r1411 def reset(self):
self.recieved = False
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
class Notifier(object):
Brian Granger
Semi-working refactored ipcluster....
r2302
def __init__(self, ntype, **kwargs):
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 super(Notifier, self).__init__()
Brian Granger
Semi-working refactored ipcluster....
r2302 self.ntype = ntype
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 self.kwargs = kwargs
Brian Granger
Semi-working refactored ipcluster....
r2302
def post(self, center=shared_center):
center.post_notification(self.ntype, self,
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410 **self.kwargs)
Brian Granger
Semi-working refactored ipcluster....
r2302
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963 #-----------------------------------------------------------------------------
# Tests
#-----------------------------------------------------------------------------
Barry Wark
I.k.c.notification blueprint, test, implementation.
r1410
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 class NotificationTests(unittest.TestCase):
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 def tearDown(self):
Brian Granger
Semi-working refactored ipcluster....
r2302 shared_center.remove_all_observers()
Bernardo B. Marques
remove all trailling spaces
r4872
Barry Wark
fixed None for type/sender
r1411 def test_notification_delivered(self):
"""Test that notifications are delivered"""
Brian Granger
Semi-working refactored ipcluster....
r2302
expected_ntype = 'EXPECTED_TYPE'
sender = Notifier(expected_ntype)
observer = Observer(expected_ntype, sender)
Barry Wark
fixed None for type/sender
r1411 sender.post()
observer.verify()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 def test_type_specificity(self):
"""Test that observers are registered by type"""
Brian Granger
Semi-working refactored ipcluster....
r2302
expected_ntype = 1
unexpected_ntype = "UNEXPECTED_TYPE"
sender = Notifier(expected_ntype)
unexpected_sender = Notifier(unexpected_ntype)
observer = Observer(expected_ntype, sender)
Barry Wark
fixed None for type/sender
r1411 sender.post()
Brian Granger
Semi-working refactored ipcluster....
r2302 unexpected_sender.post()
Barry Wark
fixed None for type/sender
r1411 observer.verify()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 def test_sender_specificity(self):
"""Test that observers are registered by sender"""
Bernardo B. Marques
remove all trailling spaces
r4872
Brian Granger
Semi-working refactored ipcluster....
r2302 expected_ntype = "EXPECTED_TYPE"
sender1 = Notifier(expected_ntype)
sender2 = Notifier(expected_ntype)
observer = Observer(expected_ntype, sender1)
Barry Wark
fixed None for type/sender
r1411 sender1.post()
sender2.post()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 observer.verify()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 def test_remove_all_observers(self):
"""White-box test for remove_all_observers"""
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 for i in xrange(10):
Brian Granger
Semi-working refactored ipcluster....
r2302 Observer('TYPE', None, center=shared_center)
Bernardo B. Marques
remove all trailling spaces
r4872 self.assert_(len(shared_center.observers[('TYPE',None)]) >= 10,
Barry Wark
fixed None for type/sender
r1411 "observers registered")
Brian Granger
Semi-working refactored ipcluster....
r2302
shared_center.remove_all_observers()
self.assert_(len(shared_center.observers) == 0, "observers removed")
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963
Barry Wark
fixed None for type/sender
r1411 def test_any_sender(self):
Brian Granger
Semi-working refactored ipcluster....
r2302 expected_ntype = "EXPECTED_TYPE"
sender1 = Notifier(expected_ntype)
sender2 = Notifier(expected_ntype)
observer = Observer(expected_ntype, None)
Barry Wark
fixed None for type/sender
r1411 sender1.post()
observer.verify()
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 observer.reset()
sender2.post()
observer.verify()
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963
Barry Wark
fixed None for type/sender
r1411 def test_post_performance(self):
"""Test that post_notification, even with many registered irrelevant
observers is fast"""
Bernardo B. Marques
remove all trailling spaces
r4872
Barry Wark
fixed None for type/sender
r1411 for i in xrange(10):
Observer("UNRELATED_TYPE", None)
Brian Granger
Semi-working refactored ipcluster....
r2302
Barry Wark
fixed None for type/sender
r1411 o = Observer('EXPECTED_TYPE', None)
Brian Granger
Semi-working refactored ipcluster....
r2302 shared_center.post_notification('EXPECTED_TYPE', self)
Barry Wark
fixed None for type/sender
r1411 o.verify()
Brian Granger
Commiting fixes for running our test suite using trial and nose....
r1963
Brian Granger
Semi-working refactored ipcluster....
r2302