##// END OF EJS Templates
Semi-working refactored ipcluster....
Brian Granger -
Show More
@@ -271,6 +271,7 b' class CommandLineConfigLoader(ConfigLoader):'
271 class NoConfigDefault(object): pass
271 class NoConfigDefault(object): pass
272 NoConfigDefault = NoConfigDefault()
272 NoConfigDefault = NoConfigDefault()
273
273
274
274 class ArgParseConfigLoader(CommandLineConfigLoader):
275 class ArgParseConfigLoader(CommandLineConfigLoader):
275
276
276 # arguments = [(('-f','--file'),dict(type=str,dest='file'))]
277 # arguments = [(('-f','--file'),dict(type=str,dest='file'))]
@@ -58,18 +58,29 b' class ClusterDir(Component):'
58 os.chmod(new, 0777)
58 os.chmod(new, 0777)
59 self.security_dir = os.path.join(new, self.security_dir_name)
59 self.security_dir = os.path.join(new, self.security_dir_name)
60 self.log_dir = os.path.join(new, self.log_dir_name)
60 self.log_dir = os.path.join(new, self.log_dir_name)
61 self.check_dirs()
61
62
62 def _log_dir_changed(self, name, old, new):
63 def _log_dir_changed(self, name, old, new):
63 if not os.path.isdir(new):
64 self.check_log_dir()
64 os.mkdir(new, 0777)
65
66 def check_log_dir(self):
67 if not os.path.isdir(self.log_dir):
68 os.mkdir(self.log_dir, 0777)
65 else:
69 else:
66 os.chmod(new, 0777)
70 os.chmod(self.log_dir, 0777)
67
71
68 def _security_dir_changed(self, name, old, new):
72 def _security_dir_changed(self, name, old, new):
69 if not os.path.isdir(new):
73 self.check_security_dir()
70 os.mkdir(new, 0700)
74
75 def check_security_dir(self):
76 if not os.path.isdir(self.security_dir):
77 os.mkdir(self.security_dir, 0700)
71 else:
78 else:
72 os.chmod(new, 0700)
79 os.chmod(self.security_dir, 0700)
80
81 def check_dirs(self):
82 self.check_security_dir()
83 self.check_log_dir()
73
84
74 def load_config_file(self, filename):
85 def load_config_file(self, filename):
75 """Load a config file from the top level of the cluster dir.
86 """Load a config file from the top level of the cluster dir.
@@ -83,7 +94,7 b' class ClusterDir(Component):'
83 loader = PyFileConfigLoader(filename, self.location)
94 loader = PyFileConfigLoader(filename, self.location)
84 return loader.load_config()
95 return loader.load_config()
85
96
86 def copy_config_file(self, config_file, path=None):
97 def copy_config_file(self, config_file, path=None, overwrite=False):
87 """Copy a default config file into the active cluster directory.
98 """Copy a default config file into the active cluster directory.
88
99
89 Default configuration files are kept in :mod:`IPython.config.default`.
100 Default configuration files are kept in :mod:`IPython.config.default`.
@@ -96,12 +107,14 b' class ClusterDir(Component):'
96 path = os.path.sep.join(path)
107 path = os.path.sep.join(path)
97 src = os.path.join(path, config_file)
108 src = os.path.join(path, config_file)
98 dst = os.path.join(self.location, config_file)
109 dst = os.path.join(self.location, config_file)
110 if not os.path.isfile(dst) or overwrite:
99 shutil.copy(src, dst)
111 shutil.copy(src, dst)
100
112
101 def copy_all_config_files(self):
113 def copy_all_config_files(self, path=None, overwrite=False):
102 """Copy all config files into the active cluster directory."""
114 """Copy all config files into the active cluster directory."""
103 for f in ['ipcontroller_config.py', 'ipengine_config.py']:
115 for f in ['ipcontroller_config.py', 'ipengine_config.py',
104 self.copy_config_file(f)
116 'ipcluster_config.py']:
117 self.copy_config_file(f, path=path, overwrite=overwrite)
105
118
106 @classmethod
119 @classmethod
107 def find_cluster_dir_by_profile(cls, path, profile='default'):
120 def find_cluster_dir_by_profile(cls, path, profile='default'):
@@ -245,7 +258,6 b' class ApplicationWithClusterDir(Application):'
245 # priority, this will always end up in the master_config.
258 # priority, this will always end up in the master_config.
246 self.default_config.Global.cluster_dir = self.cluster_dir
259 self.default_config.Global.cluster_dir = self.cluster_dir
247 self.command_line_config.Global.cluster_dir = self.cluster_dir
260 self.command_line_config.Global.cluster_dir = self.cluster_dir
248 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
249
261
250 # Set the search path to the cluster directory
262 # Set the search path to the cluster directory
251 self.config_file_paths = (self.cluster_dir,)
263 self.config_file_paths = (self.cluster_dir,)
@@ -20,14 +20,21 b' import cPickle as pickle'
20
20
21 from twisted.python import log, failure
21 from twisted.python import log, failure
22 from twisted.internet import defer
22 from twisted.internet import defer
23 from twisted.internet.defer import inlineCallbacks, returnValue
23
24
24 from IPython.kernel.fcutil import find_furl
25 from IPython.kernel.fcutil import find_furl
25 from IPython.kernel.enginefc import IFCEngine
26 from IPython.kernel.enginefc import IFCEngine
27 from IPython.kernel.twistedutil import sleep_deferred
26
28
27 #-------------------------------------------------------------------------------
29 #-------------------------------------------------------------------------------
28 # The ClientConnector class
30 # The ClientConnector class
29 #-------------------------------------------------------------------------------
31 #-------------------------------------------------------------------------------
30
32
33
34 class EngineConnectorError(Exception):
35 pass
36
37
31 class EngineConnector(object):
38 class EngineConnector(object):
32 """Manage an engines connection to a controller.
39 """Manage an engines connection to a controller.
33
40
@@ -39,7 +46,8 b' class EngineConnector(object):'
39 def __init__(self, tub):
46 def __init__(self, tub):
40 self.tub = tub
47 self.tub = tub
41
48
42 def connect_to_controller(self, engine_service, furl_or_file):
49 def connect_to_controller(self, engine_service, furl_or_file,
50 delay=0.1, max_tries=10):
43 """
51 """
44 Make a connection to a controller specified by a furl.
52 Make a connection to a controller specified by a furl.
45
53
@@ -53,29 +61,60 b' class EngineConnector(object):'
53 `register_engine` method of the controller to actually register the
61 `register_engine` method of the controller to actually register the
54 engine.
62 engine.
55
63
56 :Parameters:
64 This method will try to connect to the controller multiple times with
65 a delay in between. Each time the FURL file is read anew.
66
67 Parameters
68 __________
57 engine_service : IEngineBase
69 engine_service : IEngineBase
58 An instance of an `IEngineBase` implementer
70 An instance of an `IEngineBase` implementer
59 furl_or_file : str
71 furl_or_file : str
60 A furl or a filename containing a furl
72 A furl or a filename containing a furl
73 delay : float
74 The intial time to wait between connection attempts. Subsequent
75 attempts have increasing delays.
76 max_tries : int
77 The maximum number of connection attempts.
61 """
78 """
62 if not self.tub.running:
79 if not self.tub.running:
63 self.tub.startService()
80 self.tub.startService()
64 self.engine_service = engine_service
81 self.engine_service = engine_service
65 self.engine_reference = IFCEngine(self.engine_service)
82 self.engine_reference = IFCEngine(self.engine_service)
83
84 d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0)
85 return d
86
87 @inlineCallbacks
88 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
89 """Try to connect to the controller with retry logic."""
90 if attempt < max_tries:
91 log.msg("Attempting to connect to controller [%r]: %s" % \
92 (attempt, furl_or_file))
66 try:
93 try:
67 self.furl = find_furl(furl_or_file)
94 self.furl = find_furl(furl_or_file)
68 except ValueError:
95 # Uncomment this to see the FURL being tried.
69 return defer.fail(failure.Failure())
96 # log.msg("FURL: %s" % self.furl)
97 rr = yield self.tub.getReference(self.furl)
98 except:
99 if attempt==max_tries-1:
100 # This will propagate the exception all the way to the top
101 # where it can be handled.
102 raise
70 else:
103 else:
71 d = self.tub.getReference(self.furl)
104 yield sleep_deferred(delay)
72 d.addCallbacks(self._register, self._log_failure)
105 yield self._try_to_connect(
73 return d
106 furl_or_file, 1.5*delay, max_tries, attempt+1
74
107 )
75 def _log_failure(self, reason):
108 else:
76 log.err('EngineConnector: engine registration failed:')
109 result = yield self._register(rr)
77 log.err(reason)
110 returnValue(result)
78 return reason
111 else:
112 raise EngineConnectorError(
113 'Could not connect to controller, max_tries (%r) exceeded. '
114 'This usually means that i) the controller was not started, '
115 'or ii) a firewall was blocking the engine from connecting '
116 'to the controller.' % max_tries
117 )
79
118
80 def _register(self, rr):
119 def _register(self, rr):
81 self.remote_ref = rr
120 self.remote_ref = rr
@@ -83,7 +122,7 b' class EngineConnector(object):'
83 desired_id = self.engine_service.id
122 desired_id = self.engine_service.id
84 d = self.remote_ref.callRemote('register_engine', self.engine_reference,
123 d = self.remote_ref.callRemote('register_engine', self.engine_reference,
85 desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2))
124 desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2))
86 return d.addCallbacks(self._reference_sent, self._log_failure)
125 return d.addCallback(self._reference_sent)
87
126
88 def _reference_sent(self, registration_dict):
127 def _reference_sent(self, registration_dict):
89 self.engine_service.id = registration_dict['id']
128 self.engine_service.id = registration_dict['id']
@@ -15,6 +15,8 b' Foolscap related utilities.'
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
19
18 import os
20 import os
19 import tempfile
21 import tempfile
20
22
@@ -85,10 +87,11 b' def find_furl(furl_or_file):'
85 if is_valid(furl_or_file):
87 if is_valid(furl_or_file):
86 return furl_or_file
88 return furl_or_file
87 if os.path.isfile(furl_or_file):
89 if os.path.isfile(furl_or_file):
88 furl = open(furl_or_file, 'r').read().strip()
90 with open(furl_or_file, 'r') as f:
91 furl = f.read().strip()
89 if is_valid(furl):
92 if is_valid(furl):
90 return furl
93 return furl
91 raise ValueError("not a FURL or a file containing a FURL: %s" % furl_or_file)
94 raise ValueError("Not a FURL or a file containing a FURL: %s" % furl_or_file)
92
95
93
96
94 def get_temp_furlfile(filename):
97 def get_temp_furlfile(filename):
@@ -213,6 +213,7 b' class IPControllerApp(ApplicationWithClusterDir):'
213 self.security_dir = config.Global.security_dir = sdir
213 self.security_dir = config.Global.security_dir = sdir
214 ldir = self.cluster_dir_obj.log_dir
214 ldir = self.cluster_dir_obj.log_dir
215 self.log_dir = config.Global.log_dir = ldir
215 self.log_dir = config.Global.log_dir = ldir
216 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
216 self.log.info("Log directory set to: %s" % self.log_dir)
217 self.log.info("Log directory set to: %s" % self.log_dir)
217 self.log.info("Security directory set to: %s" % self.security_dir)
218 self.log.info("Security directory set to: %s" % self.security_dir)
218
219
@@ -266,3 +267,8 b' def launch_new_instance():'
266 """Create and run the IPython controller"""
267 """Create and run the IPython controller"""
267 app = IPControllerApp()
268 app = IPControllerApp()
268 app.start()
269 app.start()
270
271
272 if __name__ == '__main__':
273 launch_new_instance()
274
@@ -133,29 +133,27 b' class IPEngineApp(ApplicationWithClusterDir):'
133 self.security_dir = config.Global.security_dir = sdir
133 self.security_dir = config.Global.security_dir = sdir
134 ldir = self.cluster_dir_obj.log_dir
134 ldir = self.cluster_dir_obj.log_dir
135 self.log_dir = config.Global.log_dir = ldir
135 self.log_dir = config.Global.log_dir = ldir
136 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
136 self.log.info("Log directory set to: %s" % self.log_dir)
137 self.log.info("Log directory set to: %s" % self.log_dir)
137 self.log.info("Security directory set to: %s" % self.security_dir)
138 self.log.info("Security directory set to: %s" % self.security_dir)
138
139
139 self.find_cont_furl_file()
140 self.find_cont_furl_file()
140
141
141 def find_cont_furl_file(self):
142 def find_cont_furl_file(self):
143 """Set the furl file.
144
145 Here we don't try to actually see if it exists for is valid as that
146 is hadled by the connection logic.
147 """
142 config = self.master_config
148 config = self.master_config
143 # Find the actual controller FURL file
149 # Find the actual controller FURL file
144 if os.path.isfile(config.Global.furl_file):
150 if not config.Global.furl_file:
145 return
146 else:
147 # We should know what the app dir is
148 try_this = os.path.join(
151 try_this = os.path.join(
149 config.Global.cluster_dir,
152 config.Global.cluster_dir,
150 config.Global.security_dir,
153 config.Global.security_dir,
151 config.Global.furl_file_name
154 config.Global.furl_file_name
152 )
155 )
153 if os.path.isfile(try_this):
154 config.Global.furl_file = try_this
156 config.Global.furl_file = try_this
155 return
156 else:
157 self.log.critical("Could not find a valid controller FURL file.")
158 self.abort()
159
157
160 def construct(self):
158 def construct(self):
161 # I am a little hesitant to put these into InteractiveShell itself.
159 # I am a little hesitant to put these into InteractiveShell itself.
@@ -194,11 +192,11 b' class IPEngineApp(ApplicationWithClusterDir):'
194 )
192 )
195
193
196 def handle_error(f):
194 def handle_error(f):
197 # If this print statement is replaced by a log.err(f) I get
195 log.msg('Error connecting to controller. This usually means that '
198 # an unhandled error, which makes no sense. I shouldn't have
196 'i) the controller was not started, ii) a firewall was blocking '
199 # to use a print statement here. My only thought is that
197 'the engine from connecting to the controller or iii) the engine '
200 # at the beginning of the process the logging is still starting up
198 ' was not pointed at the right FURL file:')
201 print "Error connecting to controller:", f.getErrorMessage()
199 log.msg(f.getErrorMessage())
202 reactor.callLater(0.1, reactor.stop)
200 reactor.callLater(0.1, reactor.stop)
203
201
204 d.addErrback(handle_error)
202 d.addErrback(handle_error)
@@ -243,3 +241,8 b' def launch_new_instance():'
243 """Create and run the IPython controller"""
241 """Create and run the IPython controller"""
244 app = IPEngineApp()
242 app = IPEngineApp()
245 app.start()
243 app.start()
244
245
246 if __name__ == '__main__':
247 launch_new_instance()
248
@@ -1,22 +1,18 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """ipcluster script"""
4 #-----------------------------------------------------------------------------
5
5 # Copyright (C) 2008-2009 The IPython Development Team
6 __docformat__ = "restructuredtext en"
7
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
10 #
6 #
11 # Distributed under the terms of the BSD License. The full license is in
7 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
14
10
15 #-------------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
16 # Imports
12 # Imports
17 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
18
15
19 if __name__ == '__main__':
16 from IPython.kernel.ipclusterapp import launch_new_instance
20 from IPython.kernel.scripts import ipcluster
21 ipcluster.main()
22
17
18 launch_new_instance()
@@ -4,7 +4,7 b''
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
@@ -25,7 +25,7 b' from twisted.internet import reactor, defer'
25 from twisted.internet.protocol import ProcessProtocol
25 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
27 from twisted.internet.utils import getProcessOutput
28 from twisted.python import failure, log
28 from twisted.python import log
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.external import Itpl
31 from IPython.external import Itpl
@@ -49,10 +49,8 b' get_log_dir()'
49 get_security_dir()
49 get_security_dir()
50
50
51 from IPython.kernel.config import config_manager as kernel_config_manager
51 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.fcutil import have_crypto
54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
52 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.util import printer
53
56
54
57 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
58 # General process handling code
56 # General process handling code
@@ -140,7 +138,7 b' class ProcessLauncher(object):'
140 )
138 )
141 return self.start_deferred
139 return self.start_deferred
142 else:
140 else:
143 s = 'the process has already been started and has state: %r' % \
141 s = 'The process has already been started and has state: %r' % \
144 self.state
142 self.state
145 return defer.fail(ProcessStateError(s))
143 return defer.fail(ProcessStateError(s))
146
144
@@ -247,3 +247,10 b' def wait_for_file(filename, delay=0.1, max_tries=10):'
247
247
248 _test_for_file(filename)
248 _test_for_file(filename)
249 return d
249 return d
250
251
252 def sleep_deferred(seconds):
253 """Sleep without blocking the event loop."""
254 d = defer.Deferred()
255 reactor.callLater(seconds, d.callback, seconds)
256 return d
@@ -1,42 +1,52 b''
1 #!/usr/bin/env python
1 # encoding: utf-8
2 # encoding: utf-8
2
3 """
3 """The IPython Core Notification Center.
4 The IPython Core Notification Center.
4
5
5 See docs/source/development/notification_blueprint.txt for an overview of the
6 See docs/source/development/notification_blueprint.txt for an overview of the
6 notification module.
7 notification module.
7 """
8
8
9 __docformat__ = "restructuredtext en"
9 Authors:
10
11 * Barry Wark
12 * Brian Granger
13 """
10
14
11 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008 The IPython Development Team
16 # Copyright (C) 2008-2009 The IPython Development Team
13 #
17 #
14 # Distributed under the terms of the BSD License. The full license is in
18 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
19 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
17
21
18 # Tell nose to skip the testing of this module
22 #-----------------------------------------------------------------------------
19 __test__ = {}
23 # Code
24 #-----------------------------------------------------------------------------
25
26
27 class NotificationError(Exception):
28 pass
29
20
30
21 class NotificationCenter(object):
31 class NotificationCenter(object):
22 """Synchronous notification center
32 """Synchronous notification center.
23
33
24 Examples
34 Examples
25 --------
35 --------
26 >>> import IPython.kernel.core.notification as notification
36 Here is a simple example of how to use this::
27 >>> def callback(theType, theSender, args={}):
28 ... print theType,theSender,args
29 ...
30 >>> notification.sharedCenter.add_observer(callback, 'NOTIFICATION_TYPE', None)
31 >>> notification.sharedCenter.post_notification('NOTIFICATION_TYPE', object()) # doctest:+ELLIPSIS
32 NOTIFICATION_TYPE ...
33
37
38 import IPython.kernel.core.notification as notification
39 def callback(ntype, theSender, args={}):
40 print ntype,theSender,args
41
42 notification.sharedCenter.add_observer(callback, 'NOTIFICATION_TYPE', None)
43 notification.sharedCenter.post_notification('NOTIFICATION_TYPE', object()) # doctest:+ELLIPSIS
44 NOTIFICATION_TYPE ...
34 """
45 """
35 def __init__(self):
46 def __init__(self):
36 super(NotificationCenter, self).__init__()
47 super(NotificationCenter, self).__init__()
37 self._init_observers()
48 self._init_observers()
38
49
39
40 def _init_observers(self):
50 def _init_observers(self):
41 """Initialize observer storage"""
51 """Initialize observer storage"""
42
52
@@ -44,76 +54,84 b' class NotificationCenter(object):'
44 self.registered_senders = set() #set of senders that are observed
54 self.registered_senders = set() #set of senders that are observed
45 self.observers = {} #map (type,sender) => callback (callable)
55 self.observers = {} #map (type,sender) => callback (callable)
46
56
57 def post_notification(self, ntype, sender, *args, **kwargs):
58 """Post notification to all registered observers.
47
59
48 def post_notification(self, theType, sender, **kwargs):
60 The registered callback will be called as::
49 """Post notification (type,sender,**kwargs) to all registered
50 observers.
51
61
52 Implementation notes:
62 callback(ntype, sender, *args, **kwargs)
53
63
64 Parameters
65 ----------
66 ntype : hashable
67 The notification type.
68 sender : hashable
69 The object sending the notification.
70 *args : tuple
71 The positional arguments to be passed to the callback.
72 **kwargs : dict
73 The keyword argument to be passed to the callback.
74
75 Notes
76 -----
54 * If no registered observers, performance is O(1).
77 * If no registered observers, performance is O(1).
55 * Notificaiton order is undefined.
78 * Notificaiton order is undefined.
56 * Notifications are posted synchronously.
79 * Notifications are posted synchronously.
57 """
80 """
58
81
59 if(theType==None or sender==None):
82 if(ntype==None or sender==None):
60 raise Exception("NotificationCenter.post_notification requires \
83 raise NotificationError(
61 type and sender.")
84 "Notification type and sender are required.")
62
85
63 # If there are no registered observers for the type/sender pair
86 # If there are no registered observers for the type/sender pair
64 if((theType not in self.registered_types and
87 if((ntype not in self.registered_types and
65 None not in self.registered_types) or
88 None not in self.registered_types) or
66 (sender not in self.registered_senders and
89 (sender not in self.registered_senders and
67 None not in self.registered_senders)):
90 None not in self.registered_senders)):
68 return
91 return
69
92
70 for o in self._observers_for_notification(theType, sender):
93 for o in self._observers_for_notification(ntype, sender):
71 o(theType, sender, args=kwargs)
94 o(ntype, sender, *args, **kwargs)
72
95
73
96 def _observers_for_notification(self, ntype, sender):
74 def _observers_for_notification(self, theType, sender):
75 """Find all registered observers that should recieve notification"""
97 """Find all registered observers that should recieve notification"""
76
98
77 keys = (
99 keys = (
78 (theType,sender),
100 (ntype,sender),
79 (theType, None),
101 (ntype, None),
80 (None, sender),
102 (None, sender),
81 (None,None)
103 (None,None)
82 )
104 )
83
105
84
85 obs = set()
106 obs = set()
86 for k in keys:
107 for k in keys:
87 obs.update(self.observers.get(k, set()))
108 obs.update(self.observers.get(k, set()))
88
109
89 return obs
110 return obs
90
111
91
112 def add_observer(self, callback, ntype, sender):
92 def add_observer(self, callback, theType, sender):
93 """Add an observer callback to this notification center.
113 """Add an observer callback to this notification center.
94
114
95 The given callback will be called upon posting of notifications of
115 The given callback will be called upon posting of notifications of
96 the given type/sender and will receive any additional kwargs passed
116 the given type/sender and will receive any additional arguments passed
97 to post_notification.
117 to post_notification.
98
118
99 Parameters
119 Parameters
100 ----------
120 ----------
101 observerCallback : callable
121 callback : callable
102 Callable. Must take at least two arguments::
122 The callable that will be called by :meth:`post_notification`
103 observerCallback(type, sender, args={})
123 as ``callback(ntype, sender, *args, **kwargs)
104
124 ntype : hashable
105 theType : hashable
106 The notification type. If None, all notifications from sender
125 The notification type. If None, all notifications from sender
107 will be posted.
126 will be posted.
108
109 sender : hashable
127 sender : hashable
110 The notification sender. If None, all notifications of theType
128 The notification sender. If None, all notifications of ntype
111 will be posted.
129 will be posted.
112 """
130 """
113 assert(callback != None)
131 assert(callback != None)
114 self.registered_types.add(theType)
132 self.registered_types.add(ntype)
115 self.registered_senders.add(sender)
133 self.registered_senders.add(sender)
116 self.observers.setdefault((theType,sender), set()).add(callback)
134 self.observers.setdefault((ntype,sender), set()).add(callback)
117
135
118 def remove_all_observers(self):
136 def remove_all_observers(self):
119 """Removes all observers from this notification center"""
137 """Removes all observers from this notification center"""
@@ -122,4 +140,4 b' class NotificationCenter(object):'
122
140
123
141
124
142
125 sharedCenter = NotificationCenter()
143 shared_center = NotificationCenter()
@@ -13,104 +13,102 b''
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 # Tell nose to skip this module
16 import unittest
17 __test__ = {}
18
17
19 from twisted.trial import unittest
18 from IPython.utils.notification import (
20 import IPython.kernel.core.notification as notification
19 NotificationCenter,
20 NotificationError,
21 shared_center
22 )
21
23
22 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
23 # Support Classes
25 # Support Classes
24 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
25
27
28
26 class Observer(object):
29 class Observer(object):
27 """docstring for Observer"""
30
28 def __init__(self, expectedType, expectedSender,
31 def __init__(self, expected_ntype, expected_sender,
29 center=notification.sharedCenter, **kwargs):
32 center=shared_center, *args, **kwargs):
30 super(Observer, self).__init__()
33 super(Observer, self).__init__()
31 self.expectedType = expectedType
34 self.expected_ntype = expected_ntype
32 self.expectedSender = expectedSender
35 self.expected_sender = expected_sender
33 self.expectedKwArgs = kwargs
36 self.expected_args = args
37 self.expected_kwargs = kwargs
34 self.recieved = False
38 self.recieved = False
35 center.add_observer(self.callback,
39 center.add_observer(self.callback,
36 self.expectedType,
40 self.expected_ntype,
37 self.expectedSender)
41 self.expected_sender)
38
42
39 def callback(self, theType, sender, args={}):
43 def callback(self, ntype, sender, *args, **kwargs):
40 """callback"""
44 assert(ntype == self.expected_ntype or
41
45 self.expected_ntype == None)
42 assert(theType == self.expectedType or
46 assert(sender == self.expected_sender or
43 self.expectedType == None)
47 self.expected_sender == None)
44 assert(sender == self.expectedSender or
48 assert(args == self.expected_args)
45 self.expectedSender == None)
49 assert(kwargs == self.expected_kwargs)
46 assert(args == self.expectedKwArgs)
47 self.recieved = True
50 self.recieved = True
48
51
49 def verify(self):
52 def verify(self):
50 """verify"""
51
52 assert(self.recieved)
53 assert(self.recieved)
53
54
54 def reset(self):
55 def reset(self):
55 """reset"""
56
57 self.recieved = False
56 self.recieved = False
58
57
59
58
60 class Notifier(object):
59 class Notifier(object):
61 """docstring for Notifier"""
60
62 def __init__(self, theType, **kwargs):
61 def __init__(self, ntype, **kwargs):
63 super(Notifier, self).__init__()
62 super(Notifier, self).__init__()
64 self.theType = theType
63 self.ntype = ntype
65 self.kwargs = kwargs
64 self.kwargs = kwargs
66
65
67 def post(self, center=notification.sharedCenter):
66 def post(self, center=shared_center):
68 """fire"""
69
67
70 center.post_notification(self.theType, self,
68 center.post_notification(self.ntype, self,
71 **self.kwargs)
69 **self.kwargs)
72
70
71
73 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
74 # Tests
73 # Tests
75 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
76
75
76
77 class NotificationTests(unittest.TestCase):
77 class NotificationTests(unittest.TestCase):
78 """docstring for NotificationTests"""
79
78
80 def tearDown(self):
79 def tearDown(self):
81 notification.sharedCenter.remove_all_observers()
80 shared_center.remove_all_observers()
82
81
83 def test_notification_delivered(self):
82 def test_notification_delivered(self):
84 """Test that notifications are delivered"""
83 """Test that notifications are delivered"""
85 expectedType = 'EXPECTED_TYPE'
86 sender = Notifier(expectedType)
87 observer = Observer(expectedType, sender)
88
84
89 sender.post()
85 expected_ntype = 'EXPECTED_TYPE'
86 sender = Notifier(expected_ntype)
87 observer = Observer(expected_ntype, sender)
90
88
89 sender.post()
91 observer.verify()
90 observer.verify()
92
91
93 def test_type_specificity(self):
92 def test_type_specificity(self):
94 """Test that observers are registered by type"""
93 """Test that observers are registered by type"""
95
94
96 expectedType = 1
95 expected_ntype = 1
97 unexpectedType = "UNEXPECTED_TYPE"
96 unexpected_ntype = "UNEXPECTED_TYPE"
98 sender = Notifier(expectedType)
97 sender = Notifier(expected_ntype)
99 unexpectedSender = Notifier(unexpectedType)
98 unexpected_sender = Notifier(unexpected_ntype)
100 observer = Observer(expectedType, sender)
99 observer = Observer(expected_ntype, sender)
101
100
102 sender.post()
101 sender.post()
103 unexpectedSender.post()
102 unexpected_sender.post()
104
105 observer.verify()
103 observer.verify()
106
104
107 def test_sender_specificity(self):
105 def test_sender_specificity(self):
108 """Test that observers are registered by sender"""
106 """Test that observers are registered by sender"""
109
107
110 expectedType = "EXPECTED_TYPE"
108 expected_ntype = "EXPECTED_TYPE"
111 sender1 = Notifier(expectedType)
109 sender1 = Notifier(expected_ntype)
112 sender2 = Notifier(expectedType)
110 sender2 = Notifier(expected_ntype)
113 observer = Observer(expectedType, sender1)
111 observer = Observer(expected_ntype, sender1)
114
112
115 sender1.post()
113 sender1.post()
116 sender2.post()
114 sender2.post()
@@ -121,23 +119,19 b' class NotificationTests(unittest.TestCase):'
121 """White-box test for remove_all_observers"""
119 """White-box test for remove_all_observers"""
122
120
123 for i in xrange(10):
121 for i in xrange(10):
124 Observer('TYPE', None, center=notification.sharedCenter)
122 Observer('TYPE', None, center=shared_center)
125
123
126 self.assert_(len(notification.sharedCenter.observers[('TYPE',None)]) >= 10,
124 self.assert_(len(shared_center.observers[('TYPE',None)]) >= 10,
127 "observers registered")
125 "observers registered")
128
126
129 notification.sharedCenter.remove_all_observers()
127 shared_center.remove_all_observers()
130
128 self.assert_(len(shared_center.observers) == 0, "observers removed")
131 self.assert_(len(notification.sharedCenter.observers) == 0, "observers removed")
132
129
133 def test_any_sender(self):
130 def test_any_sender(self):
134 """test_any_sender"""
131 expected_ntype = "EXPECTED_TYPE"
135
132 sender1 = Notifier(expected_ntype)
136 expectedType = "EXPECTED_TYPE"
133 sender2 = Notifier(expected_ntype)
137 sender1 = Notifier(expectedType)
134 observer = Observer(expected_ntype, None)
138 sender2 = Notifier(expectedType)
139 observer = Observer(expectedType, None)
140
141
135
142 sender1.post()
136 sender1.post()
143 observer.verify()
137 observer.verify()
@@ -154,8 +148,7 b' class NotificationTests(unittest.TestCase):'
154 Observer("UNRELATED_TYPE", None)
148 Observer("UNRELATED_TYPE", None)
155
149
156 o = Observer('EXPECTED_TYPE', None)
150 o = Observer('EXPECTED_TYPE', None)
157
151 shared_center.post_notification('EXPECTED_TYPE', self)
158 notification.sharedCenter.post_notification('EXPECTED_TYPE', self)
159
160 o.verify()
152 o.verify()
161
153
154
@@ -171,7 +171,7 b" if 'setuptools' in sys.modules:"
171 'pycolor = IPython.utils.PyColorize:main',
171 'pycolor = IPython.utils.PyColorize:main',
172 'ipcontroller = IPython.kernel.ipcontrollerapp:launch_new_instance',
172 'ipcontroller = IPython.kernel.ipcontrollerapp:launch_new_instance',
173 'ipengine = IPython.kernel.ipengineapp:launch_new_instance',
173 'ipengine = IPython.kernel.ipengineapp:launch_new_instance',
174 'ipcluster = IPython.kernel.scripts.ipcluster:main',
174 'ipcluster = IPython.kernel.ipclusterapp:launch_new_instance',
175 'ipythonx = IPython.frontend.wx.ipythonx:main',
175 'ipythonx = IPython.frontend.wx.ipythonx:main',
176 'iptest = IPython.testing.iptest:main',
176 'iptest = IPython.testing.iptest:main',
177 'irunner = IPython.lib.irunner:main'
177 'irunner = IPython.lib.irunner:main'
General Comments 0
You need to be logged in to leave comments. Login now