##// END OF EJS Templates
use BaseIPythonApp.load_config, not Application.load_config
MinRK -
Show More
@@ -1,544 +1,540 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython cluster directory
4 The IPython cluster directory
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import logging
21 import logging
22 import re
22 import re
23 import shutil
23 import shutil
24 import sys
24 import sys
25
25
26 from subprocess import Popen, PIPE
26 from subprocess import Popen, PIPE
27
27
28 from IPython.config.loader import PyFileConfigLoader, Config
28 from IPython.config.loader import PyFileConfigLoader, Config
29 from IPython.config.configurable import Configurable
29 from IPython.config.configurable import Configurable
30 from IPython.config.application import Application
30 from IPython.config.application import Application
31 from IPython.core.crashhandler import CrashHandler
31 from IPython.core.crashhandler import CrashHandler
32 from IPython.core.newapplication import BaseIPythonApplication
32 from IPython.core.newapplication import BaseIPythonApplication
33 from IPython.core import release
33 from IPython.core import release
34 from IPython.utils.path import (
34 from IPython.utils.path import (
35 get_ipython_package_dir,
35 get_ipython_package_dir,
36 get_ipython_dir,
36 get_ipython_dir,
37 expand_path
37 expand_path
38 )
38 )
39 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict
39 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Module errors
42 # Module errors
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45 class ClusterDirError(Exception):
45 class ClusterDirError(Exception):
46 pass
46 pass
47
47
48
48
49 class PIDFileError(Exception):
49 class PIDFileError(Exception):
50 pass
50 pass
51
51
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Class for managing cluster directories
54 # Class for managing cluster directories
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 class ClusterDir(Configurable):
57 class ClusterDir(Configurable):
58 """An object to manage the cluster directory and its resources.
58 """An object to manage the cluster directory and its resources.
59
59
60 The cluster directory is used by :command:`ipengine`,
60 The cluster directory is used by :command:`ipengine`,
61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
62 configuration, logging and security of these applications.
62 configuration, logging and security of these applications.
63
63
64 This object knows how to find, create and manage these directories. This
64 This object knows how to find, create and manage these directories. This
65 should be used by any code that want's to handle cluster directories.
65 should be used by any code that want's to handle cluster directories.
66 """
66 """
67
67
68 security_dir_name = Unicode('security')
68 security_dir_name = Unicode('security')
69 log_dir_name = Unicode('log')
69 log_dir_name = Unicode('log')
70 pid_dir_name = Unicode('pid')
70 pid_dir_name = Unicode('pid')
71 security_dir = Unicode(u'')
71 security_dir = Unicode(u'')
72 log_dir = Unicode(u'')
72 log_dir = Unicode(u'')
73 pid_dir = Unicode(u'')
73 pid_dir = Unicode(u'')
74
74
75 auto_create = Bool(False,
75 auto_create = Bool(False,
76 help="""Whether to automatically create the ClusterDirectory if it does
76 help="""Whether to automatically create the ClusterDirectory if it does
77 not exist""")
77 not exist""")
78 overwrite = Bool(False,
78 overwrite = Bool(False,
79 help="""Whether to overwrite existing config files""")
79 help="""Whether to overwrite existing config files""")
80 location = Unicode(u'', config=True,
80 location = Unicode(u'', config=True,
81 help="""Set the cluster dir. This overrides the logic used by the
81 help="""Set the cluster dir. This overrides the logic used by the
82 `profile` option.""",
82 `profile` option.""",
83 )
83 )
84 profile = Unicode(u'default', config=True,
84 profile = Unicode(u'default', config=True,
85 help="""The string name of the profile to be used. This determines the name
85 help="""The string name of the profile to be used. This determines the name
86 of the cluster dir as: cluster_<profile>. The default profile is named
86 of the cluster dir as: cluster_<profile>. The default profile is named
87 'default'. The cluster directory is resolve this way if the
87 'default'. The cluster directory is resolve this way if the
88 `cluster_dir` option is not used."""
88 `cluster_dir` option is not used."""
89 )
89 )
90
90
91 _location_isset = Bool(False) # flag for detecting multiply set location
91 _location_isset = Bool(False) # flag for detecting multiply set location
92 _new_dir = Bool(False) # flag for whether a new dir was created
92 _new_dir = Bool(False) # flag for whether a new dir was created
93
93
94 def __init__(self, **kwargs):
94 def __init__(self, **kwargs):
95 # make sure auto_create,overwrite are set *before* location
95 # make sure auto_create,overwrite are set *before* location
96 for name in ('auto_create', 'overwrite'):
96 for name in ('auto_create', 'overwrite'):
97 v = kwargs.pop(name, None)
97 v = kwargs.pop(name, None)
98 if v is not None:
98 if v is not None:
99 setattr(self, name, v)
99 setattr(self, name, v)
100 super(ClusterDir, self).__init__(**kwargs)
100 super(ClusterDir, self).__init__(**kwargs)
101 if not self.location:
101 if not self.location:
102 self._profile_changed('profile', 'default', self.profile)
102 self._profile_changed('profile', 'default', self.profile)
103
103
104 def _location_changed(self, name, old, new):
104 def _location_changed(self, name, old, new):
105 if self._location_isset:
105 if self._location_isset:
106 raise RuntimeError("Cannot set ClusterDir more than once.")
106 raise RuntimeError("Cannot set ClusterDir more than once.")
107 self._location_isset = True
107 self._location_isset = True
108 if not os.path.isdir(new):
108 if not os.path.isdir(new):
109 if self.auto_create:# or self.config.ClusterDir.auto_create:
109 if self.auto_create:# or self.config.ClusterDir.auto_create:
110 os.makedirs(new)
110 os.makedirs(new)
111 self._new_dir = True
111 self._new_dir = True
112 else:
112 else:
113 raise ClusterDirError('Directory not found: %s' % new)
113 raise ClusterDirError('Directory not found: %s' % new)
114
114
115 # ensure config files exist:
115 # ensure config files exist:
116 self.copy_all_config_files(overwrite=self.overwrite)
116 self.copy_all_config_files(overwrite=self.overwrite)
117 self.security_dir = os.path.join(new, self.security_dir_name)
117 self.security_dir = os.path.join(new, self.security_dir_name)
118 self.log_dir = os.path.join(new, self.log_dir_name)
118 self.log_dir = os.path.join(new, self.log_dir_name)
119 self.pid_dir = os.path.join(new, self.pid_dir_name)
119 self.pid_dir = os.path.join(new, self.pid_dir_name)
120 self.check_dirs()
120 self.check_dirs()
121
121
122 def _profile_changed(self, name, old, new):
122 def _profile_changed(self, name, old, new):
123 if self._location_isset:
123 if self._location_isset:
124 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
124 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
125 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
125 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
126
126
127 def _log_dir_changed(self, name, old, new):
127 def _log_dir_changed(self, name, old, new):
128 self.check_log_dir()
128 self.check_log_dir()
129
129
130 def check_log_dir(self):
130 def check_log_dir(self):
131 if not os.path.isdir(self.log_dir):
131 if not os.path.isdir(self.log_dir):
132 os.mkdir(self.log_dir)
132 os.mkdir(self.log_dir)
133
133
134 def _security_dir_changed(self, name, old, new):
134 def _security_dir_changed(self, name, old, new):
135 self.check_security_dir()
135 self.check_security_dir()
136
136
137 def check_security_dir(self):
137 def check_security_dir(self):
138 if not os.path.isdir(self.security_dir):
138 if not os.path.isdir(self.security_dir):
139 os.mkdir(self.security_dir, 0700)
139 os.mkdir(self.security_dir, 0700)
140 os.chmod(self.security_dir, 0700)
140 os.chmod(self.security_dir, 0700)
141
141
142 def _pid_dir_changed(self, name, old, new):
142 def _pid_dir_changed(self, name, old, new):
143 self.check_pid_dir()
143 self.check_pid_dir()
144
144
145 def check_pid_dir(self):
145 def check_pid_dir(self):
146 if not os.path.isdir(self.pid_dir):
146 if not os.path.isdir(self.pid_dir):
147 os.mkdir(self.pid_dir, 0700)
147 os.mkdir(self.pid_dir, 0700)
148 os.chmod(self.pid_dir, 0700)
148 os.chmod(self.pid_dir, 0700)
149
149
150 def check_dirs(self):
150 def check_dirs(self):
151 self.check_security_dir()
151 self.check_security_dir()
152 self.check_log_dir()
152 self.check_log_dir()
153 self.check_pid_dir()
153 self.check_pid_dir()
154
154
155 def copy_config_file(self, config_file, path=None, overwrite=False):
155 def copy_config_file(self, config_file, path=None, overwrite=False):
156 """Copy a default config file into the active cluster directory.
156 """Copy a default config file into the active cluster directory.
157
157
158 Default configuration files are kept in :mod:`IPython.config.default`.
158 Default configuration files are kept in :mod:`IPython.config.default`.
159 This function moves these from that location to the working cluster
159 This function moves these from that location to the working cluster
160 directory.
160 directory.
161 """
161 """
162 if path is None:
162 if path is None:
163 import IPython.config.default
163 import IPython.config.default
164 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
164 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
165 path = os.path.sep.join(path)
165 path = os.path.sep.join(path)
166 src = os.path.join(path, config_file)
166 src = os.path.join(path, config_file)
167 dst = os.path.join(self.location, config_file)
167 dst = os.path.join(self.location, config_file)
168 if not os.path.isfile(dst) or overwrite:
168 if not os.path.isfile(dst) or overwrite:
169 shutil.copy(src, dst)
169 shutil.copy(src, dst)
170
170
171 def copy_all_config_files(self, path=None, overwrite=False):
171 def copy_all_config_files(self, path=None, overwrite=False):
172 """Copy all config files into the active cluster directory."""
172 """Copy all config files into the active cluster directory."""
173 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
173 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
174 u'ipcluster_config.py']:
174 u'ipcluster_config.py']:
175 self.copy_config_file(f, path=path, overwrite=overwrite)
175 self.copy_config_file(f, path=path, overwrite=overwrite)
176
176
177 @classmethod
177 @classmethod
178 def create_cluster_dir(csl, cluster_dir):
178 def create_cluster_dir(csl, cluster_dir):
179 """Create a new cluster directory given a full path.
179 """Create a new cluster directory given a full path.
180
180
181 Parameters
181 Parameters
182 ----------
182 ----------
183 cluster_dir : str
183 cluster_dir : str
184 The full path to the cluster directory. If it does exist, it will
184 The full path to the cluster directory. If it does exist, it will
185 be used. If not, it will be created.
185 be used. If not, it will be created.
186 """
186 """
187 return ClusterDir(location=cluster_dir)
187 return ClusterDir(location=cluster_dir)
188
188
189 @classmethod
189 @classmethod
190 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
190 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
191 """Create a cluster dir by profile name and path.
191 """Create a cluster dir by profile name and path.
192
192
193 Parameters
193 Parameters
194 ----------
194 ----------
195 path : str
195 path : str
196 The path (directory) to put the cluster directory in.
196 The path (directory) to put the cluster directory in.
197 profile : str
197 profile : str
198 The name of the profile. The name of the cluster directory will
198 The name of the profile. The name of the cluster directory will
199 be "cluster_<profile>".
199 be "cluster_<profile>".
200 """
200 """
201 if not os.path.isdir(path):
201 if not os.path.isdir(path):
202 raise ClusterDirError('Directory not found: %s' % path)
202 raise ClusterDirError('Directory not found: %s' % path)
203 cluster_dir = os.path.join(path, u'cluster_' + profile)
203 cluster_dir = os.path.join(path, u'cluster_' + profile)
204 return ClusterDir(location=cluster_dir)
204 return ClusterDir(location=cluster_dir)
205
205
206 @classmethod
206 @classmethod
207 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
207 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
208 """Find an existing cluster dir by profile name, return its ClusterDir.
208 """Find an existing cluster dir by profile name, return its ClusterDir.
209
209
210 This searches through a sequence of paths for a cluster dir. If it
210 This searches through a sequence of paths for a cluster dir. If it
211 is not found, a :class:`ClusterDirError` exception will be raised.
211 is not found, a :class:`ClusterDirError` exception will be raised.
212
212
213 The search path algorithm is:
213 The search path algorithm is:
214 1. ``os.getcwd()``
214 1. ``os.getcwd()``
215 2. ``ipython_dir``
215 2. ``ipython_dir``
216 3. The directories found in the ":" separated
216 3. The directories found in the ":" separated
217 :env:`IPCLUSTER_DIR_PATH` environment variable.
217 :env:`IPCLUSTER_DIR_PATH` environment variable.
218
218
219 Parameters
219 Parameters
220 ----------
220 ----------
221 ipython_dir : unicode or str
221 ipython_dir : unicode or str
222 The IPython directory to use.
222 The IPython directory to use.
223 profile : unicode or str
223 profile : unicode or str
224 The name of the profile. The name of the cluster directory
224 The name of the profile. The name of the cluster directory
225 will be "cluster_<profile>".
225 will be "cluster_<profile>".
226 """
226 """
227 dirname = u'cluster_' + profile
227 dirname = u'cluster_' + profile
228 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
228 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
229 if cluster_dir_paths:
229 if cluster_dir_paths:
230 cluster_dir_paths = cluster_dir_paths.split(':')
230 cluster_dir_paths = cluster_dir_paths.split(':')
231 else:
231 else:
232 cluster_dir_paths = []
232 cluster_dir_paths = []
233 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
233 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
234 for p in paths:
234 for p in paths:
235 cluster_dir = os.path.join(p, dirname)
235 cluster_dir = os.path.join(p, dirname)
236 if os.path.isdir(cluster_dir):
236 if os.path.isdir(cluster_dir):
237 return ClusterDir(location=cluster_dir)
237 return ClusterDir(location=cluster_dir)
238 else:
238 else:
239 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
239 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
240
240
241 @classmethod
241 @classmethod
242 def find_cluster_dir(cls, cluster_dir):
242 def find_cluster_dir(cls, cluster_dir):
243 """Find/create a cluster dir and return its ClusterDir.
243 """Find/create a cluster dir and return its ClusterDir.
244
244
245 This will create the cluster directory if it doesn't exist.
245 This will create the cluster directory if it doesn't exist.
246
246
247 Parameters
247 Parameters
248 ----------
248 ----------
249 cluster_dir : unicode or str
249 cluster_dir : unicode or str
250 The path of the cluster directory. This is expanded using
250 The path of the cluster directory. This is expanded using
251 :func:`IPython.utils.genutils.expand_path`.
251 :func:`IPython.utils.genutils.expand_path`.
252 """
252 """
253 cluster_dir = expand_path(cluster_dir)
253 cluster_dir = expand_path(cluster_dir)
254 if not os.path.isdir(cluster_dir):
254 if not os.path.isdir(cluster_dir):
255 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
255 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
256 return ClusterDir(location=cluster_dir)
256 return ClusterDir(location=cluster_dir)
257
257
258
258
259 #-----------------------------------------------------------------------------
259 #-----------------------------------------------------------------------------
260 # Crash handler for this application
260 # Crash handler for this application
261 #-----------------------------------------------------------------------------
261 #-----------------------------------------------------------------------------
262
262
263
263
264 _message_template = """\
264 _message_template = """\
265 Oops, $self.app_name crashed. We do our best to make it stable, but...
265 Oops, $self.app_name crashed. We do our best to make it stable, but...
266
266
267 A crash report was automatically generated with the following information:
267 A crash report was automatically generated with the following information:
268 - A verbatim copy of the crash traceback.
268 - A verbatim copy of the crash traceback.
269 - Data on your current $self.app_name configuration.
269 - Data on your current $self.app_name configuration.
270
270
271 It was left in the file named:
271 It was left in the file named:
272 \t'$self.crash_report_fname'
272 \t'$self.crash_report_fname'
273 If you can email this file to the developers, the information in it will help
273 If you can email this file to the developers, the information in it will help
274 them in understanding and correcting the problem.
274 them in understanding and correcting the problem.
275
275
276 You can mail it to: $self.contact_name at $self.contact_email
276 You can mail it to: $self.contact_name at $self.contact_email
277 with the subject '$self.app_name Crash Report'.
277 with the subject '$self.app_name Crash Report'.
278
278
279 If you want to do it now, the following command will work (under Unix):
279 If you want to do it now, the following command will work (under Unix):
280 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
280 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
281
281
282 To ensure accurate tracking of this issue, please file a report about it at:
282 To ensure accurate tracking of this issue, please file a report about it at:
283 $self.bug_tracker
283 $self.bug_tracker
284 """
284 """
285
285
286 class ClusterDirCrashHandler(CrashHandler):
286 class ClusterDirCrashHandler(CrashHandler):
287 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
287 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
288
288
289 message_template = _message_template
289 message_template = _message_template
290
290
291 def __init__(self, app):
291 def __init__(self, app):
292 contact_name = release.authors['Min'][0]
292 contact_name = release.authors['Min'][0]
293 contact_email = release.authors['Min'][1]
293 contact_email = release.authors['Min'][1]
294 bug_tracker = 'http://github.com/ipython/ipython/issues'
294 bug_tracker = 'http://github.com/ipython/ipython/issues'
295 super(ClusterDirCrashHandler,self).__init__(
295 super(ClusterDirCrashHandler,self).__init__(
296 app, contact_name, contact_email, bug_tracker
296 app, contact_name, contact_email, bug_tracker
297 )
297 )
298
298
299
299
300 #-----------------------------------------------------------------------------
300 #-----------------------------------------------------------------------------
301 # Main application
301 # Main application
302 #-----------------------------------------------------------------------------
302 #-----------------------------------------------------------------------------
303 base_aliases = {
303 base_aliases = {
304 'profile' : "ClusterDir.profile",
304 'profile' : "ClusterDir.profile",
305 'cluster_dir' : 'ClusterDir.location',
305 'cluster_dir' : 'ClusterDir.location',
306 'auto_create' : 'ClusterDirApplication.auto_create',
307 'log_level' : 'ClusterApplication.log_level',
306 'log_level' : 'ClusterApplication.log_level',
308 'work_dir' : 'ClusterApplication.work_dir',
307 'work_dir' : 'ClusterApplication.work_dir',
309 'log_to_file' : 'ClusterApplication.log_to_file',
308 'log_to_file' : 'ClusterApplication.log_to_file',
310 'clean_logs' : 'ClusterApplication.clean_logs',
309 'clean_logs' : 'ClusterApplication.clean_logs',
311 'log_url' : 'ClusterApplication.log_url',
310 'log_url' : 'ClusterApplication.log_url',
311 'config' : 'ClusterApplication.config_file',
312 }
312 }
313
313
314 base_flags = {
314 base_flags = {
315 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
315 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
316 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
316 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
317 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
317 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
318 }
318 }
319 for k,v in base_flags.iteritems():
319 for k,v in base_flags.iteritems():
320 base_flags[k] = (Config(v[0]),v[1])
320 base_flags[k] = (Config(v[0]),v[1])
321
321
322 class ClusterApplication(BaseIPythonApplication):
322 class ClusterApplication(BaseIPythonApplication):
323 """An application that puts everything into a cluster directory.
323 """An application that puts everything into a cluster directory.
324
324
325 Instead of looking for things in the ipython_dir, this type of application
325 Instead of looking for things in the ipython_dir, this type of application
326 will use its own private directory called the "cluster directory"
326 will use its own private directory called the "cluster directory"
327 for things like config files, log files, etc.
327 for things like config files, log files, etc.
328
328
329 The cluster directory is resolved as follows:
329 The cluster directory is resolved as follows:
330
330
331 * If the ``cluster_dir`` option is given, it is used.
331 * If the ``cluster_dir`` option is given, it is used.
332 * If ``cluster_dir`` is not given, the application directory is
332 * If ``cluster_dir`` is not given, the application directory is
333 resolve using the profile name as ``cluster_<profile>``. The search
333 resolve using the profile name as ``cluster_<profile>``. The search
334 path for this directory is then i) cwd if it is found there
334 path for this directory is then i) cwd if it is found there
335 and ii) in ipython_dir otherwise.
335 and ii) in ipython_dir otherwise.
336
336
337 The config file for the application is to be put in the cluster
337 The config file for the application is to be put in the cluster
338 dir and named the value of the ``config_file_name`` class attribute.
338 dir and named the value of the ``config_file_name`` class attribute.
339 """
339 """
340
340
341 crash_handler_class = ClusterDirCrashHandler
341 crash_handler_class = ClusterDirCrashHandler
342 auto_create_cluster_dir = Bool(True, config=True,
342 auto_create_cluster_dir = Bool(True, config=True,
343 help="whether to create the cluster_dir if it doesn't exist")
343 help="whether to create the cluster_dir if it doesn't exist")
344 cluster_dir = Instance(ClusterDir)
344 cluster_dir = Instance(ClusterDir)
345 classes = [ClusterDir]
345 classes = [ClusterDir]
346
346
347 def _log_level_default(self):
347 def _log_level_default(self):
348 # temporarily override default_log_level to INFO
348 # temporarily override default_log_level to INFO
349 return logging.INFO
349 return logging.INFO
350
350
351 work_dir = Unicode(os.getcwdu(), config=True,
351 work_dir = Unicode(os.getcwdu(), config=True,
352 help='Set the working dir for the process.'
352 help='Set the working dir for the process.'
353 )
353 )
354 def _work_dir_changed(self, name, old, new):
354 def _work_dir_changed(self, name, old, new):
355 self.work_dir = unicode(expand_path(new))
355 self.work_dir = unicode(expand_path(new))
356
356
357 log_to_file = Bool(config=True,
357 log_to_file = Bool(config=True,
358 help="whether to log to a file")
358 help="whether to log to a file")
359
359
360 clean_logs = Bool(False, shortname='--clean-logs', config=True,
360 clean_logs = Bool(False, config=True,
361 help="whether to cleanup old logfiles before starting")
361 help="whether to cleanup old logfiles before starting")
362
362
363 log_url = Unicode('', shortname='--log-url', config=True,
363 log_url = Unicode('', config=True,
364 help="The ZMQ URL of the iplogger to aggregate logging.")
364 help="The ZMQ URL of the iplogger to aggregate logging.")
365
365
366 config_file = Unicode(u'', config=True,
366 config_file = Unicode(u'', config=True,
367 help="""Path to ipcontroller configuration file. The default is to use
367 help="""Path to ip<appname> configuration file. The default is to use
368 <appname>_config.py, as found by cluster-dir."""
368 <appname>_config.py, as found by cluster-dir."""
369 )
369 )
370 def _config_file_paths_default(self):
371 # don't include profile dir
372 return [ os.getcwdu(), self.ipython_dir ]
373
374 def _config_file_changed(self, name, old, new):
375 if os.pathsep in new:
376 path, new = new.rsplit(os.pathsep)
377 self.config_file_paths.insert(0, path)
378 self.config_file_name = new
379
380 config_file_name = Unicode('')
370
381
371 loop = Instance('zmq.eventloop.ioloop.IOLoop')
382 loop = Instance('zmq.eventloop.ioloop.IOLoop')
372 def _loop_default(self):
383 def _loop_default(self):
373 from zmq.eventloop.ioloop import IOLoop
384 from zmq.eventloop.ioloop import IOLoop
374 return IOLoop.instance()
385 return IOLoop.instance()
375
386
376 aliases = Dict(base_aliases)
387 aliases = Dict(base_aliases)
377 flags = Dict(base_flags)
388 flags = Dict(base_flags)
378
389
379 def init_clusterdir(self):
390 def init_clusterdir(self):
380 """This resolves the cluster directory.
391 """This resolves the cluster directory.
381
392
382 This tries to find the cluster directory and if successful, it will
393 This tries to find the cluster directory and if successful, it will
383 have done:
394 have done:
384 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
395 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
385 the application.
396 the application.
386 * Sets ``self.cluster_dir`` attribute of the application and config
397 * Sets ``self.cluster_dir`` attribute of the application and config
387 objects.
398 objects.
388
399
389 The algorithm used for this is as follows:
400 The algorithm used for this is as follows:
390 1. Try ``Global.cluster_dir``.
401 1. Try ``Global.cluster_dir``.
391 2. Try using ``Global.profile``.
402 2. Try using ``Global.profile``.
392 3. If both of these fail and ``self.auto_create_cluster_dir`` is
403 3. If both of these fail and ``self.auto_create_cluster_dir`` is
393 ``True``, then create the new cluster dir in the IPython directory.
404 ``True``, then create the new cluster dir in the IPython directory.
394 4. If all fails, then raise :class:`ClusterDirError`.
405 4. If all fails, then raise :class:`ClusterDirError`.
395 """
406 """
396 try:
407 try:
397 self.cluster_dir = ClusterDir(auto_create=self.auto_create_cluster_dir, config=self.config)
408 self.cluster_dir = ClusterDir(auto_create=self.auto_create_cluster_dir, config=self.config)
398 except ClusterDirError as e:
409 except ClusterDirError as e:
399 self.log.fatal("Error initializing cluster dir: %s"%e)
410 self.log.fatal("Error initializing cluster dir: %s"%e)
400 self.log.fatal("A cluster dir must be created before running this command.")
411 self.log.fatal("A cluster dir must be created before running this command.")
401 self.log.fatal("Do 'ipcluster create -h' or 'ipcluster list -h' for more "
412 self.log.fatal("Do 'ipcluster create -h' or 'ipcluster list -h' for more "
402 "information about creating and listing cluster dirs."
413 "information about creating and listing cluster dirs."
403 )
414 )
404 self.exit(1)
415 self.exit(1)
405
416
406 if self.cluster_dir._new_dir:
417 if self.cluster_dir._new_dir:
407 self.log.info('Creating new cluster dir: %s' % \
418 self.log.info('Creating new cluster dir: %s' % \
408 self.cluster_dir.location)
419 self.cluster_dir.location)
409 else:
420 else:
410 self.log.info('Using existing cluster dir: %s' % \
421 self.log.info('Using existing cluster dir: %s' % \
411 self.cluster_dir.location)
422 self.cluster_dir.location)
423
424 # insert after cwd:
425 self.config_file_paths.insert(1, self.cluster_dir.location)
412
426
413 def initialize(self, argv=None):
427 def initialize(self, argv=None):
414 """initialize the app"""
428 """initialize the app"""
415 self.init_crash_handler()
429 self.init_crash_handler()
416 self.parse_command_line(argv)
430 self.parse_command_line(argv)
417 cl_config = self.config
431 cl_config = self.config
418 self.init_clusterdir()
432 self.init_clusterdir()
419 if self.config_file:
433 self.load_config_file()
420 self.load_config_file(self.config_file)
421 elif self.default_config_file_name:
422 try:
423 self.load_config_file(self.default_config_file_name,
424 path=self.cluster_dir.location)
425 except IOError:
426 self.log.warn("Warning: Default config file not found")
427 # command-line should *override* config file, but command-line is necessary
434 # command-line should *override* config file, but command-line is necessary
428 # to determine clusterdir, etc.
435 # to determine clusterdir, etc.
429 self.update_config(cl_config)
436 self.update_config(cl_config)
430 self.to_work_dir()
437 self.to_work_dir()
431 self.reinit_logging()
438 self.reinit_logging()
432
439
433 def to_work_dir(self):
440 def to_work_dir(self):
434 wd = self.work_dir
441 wd = self.work_dir
435 if unicode(wd) != os.getcwdu():
442 if unicode(wd) != os.getcwdu():
436 os.chdir(wd)
443 os.chdir(wd)
437 self.log.info("Changing to working dir: %s" % wd)
444 self.log.info("Changing to working dir: %s" % wd)
438 # This is the working dir by now.
445 # This is the working dir by now.
439 sys.path.insert(0, '')
446 sys.path.insert(0, '')
440
447
441 def load_config_file(self, filename, path=None):
442 """Load a .py based config file by filename and path."""
443 # use config.application.Application.load_config
444 # instead of inflexible core.newapplication.BaseIPythonApplication.load_config
445 return Application.load_config_file(self, filename, path=path)
446 #
447 # def load_default_config_file(self):
448 # """Load a .py based config file by filename and path."""
449 # return BaseIPythonApplication.load_config_file(self)
450
451 # disable URL-logging
452 def reinit_logging(self):
448 def reinit_logging(self):
453 # Remove old log files
449 # Remove old log files
454 log_dir = self.cluster_dir.log_dir
450 log_dir = self.cluster_dir.log_dir
455 if self.clean_logs:
451 if self.clean_logs:
456 for f in os.listdir(log_dir):
452 for f in os.listdir(log_dir):
457 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
453 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
458 os.remove(os.path.join(log_dir, f))
454 os.remove(os.path.join(log_dir, f))
459 if self.log_to_file:
455 if self.log_to_file:
460 # Start logging to the new log file
456 # Start logging to the new log file
461 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
457 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
462 logfile = os.path.join(log_dir, log_filename)
458 logfile = os.path.join(log_dir, log_filename)
463 open_log_file = open(logfile, 'w')
459 open_log_file = open(logfile, 'w')
464 else:
460 else:
465 open_log_file = None
461 open_log_file = None
466 if open_log_file is not None:
462 if open_log_file is not None:
467 self.log.removeHandler(self._log_handler)
463 self.log.removeHandler(self._log_handler)
468 self._log_handler = logging.StreamHandler(open_log_file)
464 self._log_handler = logging.StreamHandler(open_log_file)
469 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
465 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
470 self._log_handler.setFormatter(self._log_formatter)
466 self._log_handler.setFormatter(self._log_formatter)
471 self.log.addHandler(self._log_handler)
467 self.log.addHandler(self._log_handler)
472
468
473 def write_pid_file(self, overwrite=False):
469 def write_pid_file(self, overwrite=False):
474 """Create a .pid file in the pid_dir with my pid.
470 """Create a .pid file in the pid_dir with my pid.
475
471
476 This must be called after pre_construct, which sets `self.pid_dir`.
472 This must be called after pre_construct, which sets `self.pid_dir`.
477 This raises :exc:`PIDFileError` if the pid file exists already.
473 This raises :exc:`PIDFileError` if the pid file exists already.
478 """
474 """
479 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
475 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
480 if os.path.isfile(pid_file):
476 if os.path.isfile(pid_file):
481 pid = self.get_pid_from_file()
477 pid = self.get_pid_from_file()
482 if not overwrite:
478 if not overwrite:
483 raise PIDFileError(
479 raise PIDFileError(
484 'The pid file [%s] already exists. \nThis could mean that this '
480 'The pid file [%s] already exists. \nThis could mean that this '
485 'server is already running with [pid=%s].' % (pid_file, pid)
481 'server is already running with [pid=%s].' % (pid_file, pid)
486 )
482 )
487 with open(pid_file, 'w') as f:
483 with open(pid_file, 'w') as f:
488 self.log.info("Creating pid file: %s" % pid_file)
484 self.log.info("Creating pid file: %s" % pid_file)
489 f.write(repr(os.getpid())+'\n')
485 f.write(repr(os.getpid())+'\n')
490
486
491 def remove_pid_file(self):
487 def remove_pid_file(self):
492 """Remove the pid file.
488 """Remove the pid file.
493
489
494 This should be called at shutdown by registering a callback with
490 This should be called at shutdown by registering a callback with
495 :func:`reactor.addSystemEventTrigger`. This needs to return
491 :func:`reactor.addSystemEventTrigger`. This needs to return
496 ``None``.
492 ``None``.
497 """
493 """
498 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
494 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
499 if os.path.isfile(pid_file):
495 if os.path.isfile(pid_file):
500 try:
496 try:
501 self.log.info("Removing pid file: %s" % pid_file)
497 self.log.info("Removing pid file: %s" % pid_file)
502 os.remove(pid_file)
498 os.remove(pid_file)
503 except:
499 except:
504 self.log.warn("Error removing the pid file: %s" % pid_file)
500 self.log.warn("Error removing the pid file: %s" % pid_file)
505
501
506 def get_pid_from_file(self):
502 def get_pid_from_file(self):
507 """Get the pid from the pid file.
503 """Get the pid from the pid file.
508
504
509 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
505 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
510 """
506 """
511 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
507 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
512 if os.path.isfile(pid_file):
508 if os.path.isfile(pid_file):
513 with open(pid_file, 'r') as f:
509 with open(pid_file, 'r') as f:
514 pid = int(f.read().strip())
510 pid = int(f.read().strip())
515 return pid
511 return pid
516 else:
512 else:
517 raise PIDFileError('pid file not found: %s' % pid_file)
513 raise PIDFileError('pid file not found: %s' % pid_file)
518
514
519 def check_pid(self, pid):
515 def check_pid(self, pid):
520 if os.name == 'nt':
516 if os.name == 'nt':
521 try:
517 try:
522 import ctypes
518 import ctypes
523 # returns 0 if no such process (of ours) exists
519 # returns 0 if no such process (of ours) exists
524 # positive int otherwise
520 # positive int otherwise
525 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
521 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
526 except Exception:
522 except Exception:
527 self.log.warn(
523 self.log.warn(
528 "Could not determine whether pid %i is running via `OpenProcess`. "
524 "Could not determine whether pid %i is running via `OpenProcess`. "
529 " Making the likely assumption that it is."%pid
525 " Making the likely assumption that it is."%pid
530 )
526 )
531 return True
527 return True
532 return bool(p)
528 return bool(p)
533 else:
529 else:
534 try:
530 try:
535 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
531 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
536 output,_ = p.communicate()
532 output,_ = p.communicate()
537 except OSError:
533 except OSError:
538 self.log.warn(
534 self.log.warn(
539 "Could not determine whether pid %i is running via `ps x`. "
535 "Could not determine whether pid %i is running via `ps x`. "
540 " Making the likely assumption that it is."%pid
536 " Making the likely assumption that it is."%pid
541 )
537 )
542 return True
538 return True
543 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
539 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
544 return pid in pids
540 return pid in pids
@@ -1,542 +1,540 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import errno
18 import errno
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import signal
22 import signal
23
23
24 from subprocess import check_call, CalledProcessError, PIPE
24 from subprocess import check_call, CalledProcessError, PIPE
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27
27
28 from IPython.config.application import Application, boolean_flag
28 from IPython.config.application import Application, boolean_flag
29 from IPython.config.loader import Config
29 from IPython.config.loader import Config
30 from IPython.core.newapplication import BaseIPythonApplication
30 from IPython.core.newapplication import BaseIPythonApplication
31 from IPython.utils.importstring import import_item
31 from IPython.utils.importstring import import_item
32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
33
33
34 from IPython.parallel.apps.clusterdir import (
34 from IPython.parallel.apps.clusterdir import (
35 ClusterApplication, ClusterDirError, ClusterDir,
35 ClusterApplication, ClusterDirError, ClusterDir,
36 PIDFileError,
36 PIDFileError,
37 base_flags, base_aliases
37 base_flags, base_aliases
38 )
38 )
39
39
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Module level variables
42 # Module level variables
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45
45
46 default_config_file_name = u'ipcluster_config.py'
46 default_config_file_name = u'ipcluster_config.py'
47
47
48
48
49 _description = """Start an IPython cluster for parallel computing.
49 _description = """Start an IPython cluster for parallel computing.
50
50
51 An IPython cluster consists of 1 controller and 1 or more engines.
51 An IPython cluster consists of 1 controller and 1 or more engines.
52 This command automates the startup of these processes using a wide
52 This command automates the startup of these processes using a wide
53 range of startup methods (SSH, local processes, PBS, mpiexec,
53 range of startup methods (SSH, local processes, PBS, mpiexec,
54 Windows HPC Server 2008). To start a cluster with 4 engines on your
54 Windows HPC Server 2008). To start a cluster with 4 engines on your
55 local host simply do 'ipcluster start n=4'. For more complex usage
55 local host simply do 'ipcluster start n=4'. For more complex usage
56 you will typically do 'ipcluster create profile=mycluster', then edit
56 you will typically do 'ipcluster create profile=mycluster', then edit
57 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
57 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
58 """
58 """
59
59
60
60
61 # Exit codes for ipcluster
61 # Exit codes for ipcluster
62
62
63 # This will be the exit code if the ipcluster appears to be running because
63 # This will be the exit code if the ipcluster appears to be running because
64 # a .pid file exists
64 # a .pid file exists
65 ALREADY_STARTED = 10
65 ALREADY_STARTED = 10
66
66
67
67
68 # This will be the exit code if ipcluster stop is run, but there is not .pid
68 # This will be the exit code if ipcluster stop is run, but there is not .pid
69 # file to be found.
69 # file to be found.
70 ALREADY_STOPPED = 11
70 ALREADY_STOPPED = 11
71
71
72 # This will be the exit code if ipcluster engines is run, but there is not .pid
72 # This will be the exit code if ipcluster engines is run, but there is not .pid
73 # file to be found.
73 # file to be found.
74 NO_CLUSTER = 12
74 NO_CLUSTER = 12
75
75
76
76
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78 # Main application
78 # Main application
79 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
80 start_help = """Start an IPython cluster for parallel computing
80 start_help = """Start an IPython cluster for parallel computing
81
81
82 Start an ipython cluster by its profile name or cluster
82 Start an ipython cluster by its profile name or cluster
83 directory. Cluster directories contain configuration, log and
83 directory. Cluster directories contain configuration, log and
84 security related files and are named using the convention
84 security related files and are named using the convention
85 'cluster_<profile>' and should be creating using the 'start'
85 'cluster_<profile>' and should be creating using the 'start'
86 subcommand of 'ipcluster'. If your cluster directory is in
86 subcommand of 'ipcluster'. If your cluster directory is in
87 the cwd or the ipython directory, you can simply refer to it
87 the cwd or the ipython directory, you can simply refer to it
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
89 otherwise use the 'cluster_dir' option.
89 otherwise use the 'cluster_dir' option.
90 """
90 """
91 stop_help = """Stop a running IPython cluster
91 stop_help = """Stop a running IPython cluster
92
92
93 Stop a running ipython cluster by its profile name or cluster
93 Stop a running ipython cluster by its profile name or cluster
94 directory. Cluster directories are named using the convention
94 directory. Cluster directories are named using the convention
95 'cluster_<profile>'. If your cluster directory is in
95 'cluster_<profile>'. If your cluster directory is in
96 the cwd or the ipython directory, you can simply refer to it
96 the cwd or the ipython directory, you can simply refer to it
97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
98 use the 'cluster_dir' option.
98 use the 'cluster_dir' option.
99 """
99 """
100 engines_help = """Start engines connected to an existing IPython cluster
100 engines_help = """Start engines connected to an existing IPython cluster
101
101
102 Start one or more engines to connect to an existing Cluster
102 Start one or more engines to connect to an existing Cluster
103 by profile name or cluster directory.
103 by profile name or cluster directory.
104 Cluster directories contain configuration, log and
104 Cluster directories contain configuration, log and
105 security related files and are named using the convention
105 security related files and are named using the convention
106 'cluster_<profile>' and should be creating using the 'start'
106 'cluster_<profile>' and should be creating using the 'start'
107 subcommand of 'ipcluster'. If your cluster directory is in
107 subcommand of 'ipcluster'. If your cluster directory is in
108 the cwd or the ipython directory, you can simply refer to it
108 the cwd or the ipython directory, you can simply refer to it
109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
110 otherwise use the 'cluster_dir' option.
110 otherwise use the 'cluster_dir' option.
111 """
111 """
112 create_help = """Create an ipcluster profile by name
112 create_help = """Create an ipcluster profile by name
113
113
114 Create an ipython cluster directory by its profile name or
114 Create an ipython cluster directory by its profile name or
115 cluster directory path. Cluster directories contain
115 cluster directory path. Cluster directories contain
116 configuration, log and security related files and are named
116 configuration, log and security related files and are named
117 using the convention 'cluster_<profile>'. By default they are
117 using the convention 'cluster_<profile>'. By default they are
118 located in your ipython directory. Once created, you will
118 located in your ipython directory. Once created, you will
119 probably need to edit the configuration files in the cluster
119 probably need to edit the configuration files in the cluster
120 directory to configure your cluster. Most users will create a
120 directory to configure your cluster. Most users will create a
121 cluster directory by profile name,
121 cluster directory by profile name,
122 `ipcluster create profile=mycluster`, which will put the directory
122 `ipcluster create profile=mycluster`, which will put the directory
123 in `<ipython_dir>/cluster_mycluster`.
123 in `<ipython_dir>/cluster_mycluster`.
124 """
124 """
125 list_help = """List available cluster profiles
125 list_help = """List available cluster profiles
126
126
127 List all available clusters, by cluster directory, that can
127 List all available clusters, by cluster directory, that can
128 be found in the current working directly or in the ipython
128 be found in the current working directly or in the ipython
129 directory. Cluster directories are named using the convention
129 directory. Cluster directories are named using the convention
130 'cluster_<profile>'.
130 'cluster_<profile>'.
131 """
131 """
132
132
133
133
134 class IPClusterList(BaseIPythonApplication):
134 class IPClusterList(BaseIPythonApplication):
135 name = u'ipcluster-list'
135 name = u'ipcluster-list'
136 description = list_help
136 description = list_help
137
137
138 # empty aliases
138 # empty aliases
139 aliases=Dict()
139 aliases=Dict()
140 flags = Dict(base_flags)
140 flags = Dict(base_flags)
141
141
142 def _log_level_default(self):
142 def _log_level_default(self):
143 return 20
143 return 20
144
144
145 def list_cluster_dirs(self):
145 def list_cluster_dirs(self):
146 # Find the search paths
146 # Find the search paths
147 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
147 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
148 if cluster_dir_paths:
148 if cluster_dir_paths:
149 cluster_dir_paths = cluster_dir_paths.split(':')
149 cluster_dir_paths = cluster_dir_paths.split(':')
150 else:
150 else:
151 cluster_dir_paths = []
151 cluster_dir_paths = []
152
152
153 ipython_dir = self.ipython_dir
153 ipython_dir = self.ipython_dir
154
154
155 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
155 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
156 paths = list(set(paths))
156 paths = list(set(paths))
157
157
158 self.log.info('Searching for cluster dirs in paths: %r' % paths)
158 self.log.info('Searching for cluster dirs in paths: %r' % paths)
159 for path in paths:
159 for path in paths:
160 files = os.listdir(path)
160 files = os.listdir(path)
161 for f in files:
161 for f in files:
162 full_path = os.path.join(path, f)
162 full_path = os.path.join(path, f)
163 if os.path.isdir(full_path) and f.startswith('cluster_'):
163 if os.path.isdir(full_path) and f.startswith('cluster_'):
164 profile = full_path.split('_')[-1]
164 profile = full_path.split('_')[-1]
165 start_cmd = 'ipcluster start profile=%s n=4' % profile
165 start_cmd = 'ipcluster start profile=%s n=4' % profile
166 print start_cmd + " ==> " + full_path
166 print start_cmd + " ==> " + full_path
167
167
168 def start(self):
168 def start(self):
169 self.list_cluster_dirs()
169 self.list_cluster_dirs()
170
170
171 create_flags = {}
171 create_flags = {}
172 create_flags.update(base_flags)
172 create_flags.update(base_flags)
173 create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
173 create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
174 "reset config files to defaults", "leave existing config files"))
174 "reset config files to defaults", "leave existing config files"))
175
175
176 class IPClusterCreate(ClusterApplication):
176 class IPClusterCreate(ClusterApplication):
177 name = u'ipcluster'
177 name = u'ipcluster'
178 description = create_help
178 description = create_help
179 auto_create_cluster_dir = Bool(True,
179 auto_create_cluster_dir = Bool(True,
180 help="whether to create the cluster_dir if it doesn't exist")
180 help="whether to create the cluster_dir if it doesn't exist")
181 default_config_file_name = default_config_file_name
181 config_file_name = Unicode(default_config_file_name)
182
182
183 reset = Bool(False, config=True,
183 reset = Bool(False, config=True,
184 help="Whether to reset config files as part of 'create'."
184 help="Whether to reset config files as part of 'create'."
185 )
185 )
186
186
187 flags = Dict(create_flags)
187 flags = Dict(create_flags)
188
188
189 aliases = Dict(dict(profile='ClusterDir.profile'))
189 aliases = Dict(dict(profile='ClusterDir.profile'))
190
190
191 classes = [ClusterDir]
191 classes = [ClusterDir]
192
192
193 def init_clusterdir(self):
193 def init_clusterdir(self):
194 super(IPClusterCreate, self).init_clusterdir()
194 super(IPClusterCreate, self).init_clusterdir()
195 self.log.info('Copying default config files to cluster directory '
195 self.log.info('Copying default config files to cluster directory '
196 '[overwrite=%r]' % (self.reset,))
196 '[overwrite=%r]' % (self.reset,))
197 self.cluster_dir.copy_all_config_files(overwrite=self.reset)
197 self.cluster_dir.copy_all_config_files(overwrite=self.reset)
198
198
199 def initialize(self, argv=None):
199 def initialize(self, argv=None):
200 self.parse_command_line(argv)
200 self.parse_command_line(argv)
201 self.init_clusterdir()
201 self.init_clusterdir()
202
202
203 stop_aliases = dict(
203 stop_aliases = dict(
204 signal='IPClusterStop.signal',
204 signal='IPClusterStop.signal',
205 profile='ClusterDir.profile',
205 profile='ClusterDir.profile',
206 cluster_dir='ClusterDir.location',
206 cluster_dir='ClusterDir.location',
207 )
207 )
208
208
209 class IPClusterStop(ClusterApplication):
209 class IPClusterStop(ClusterApplication):
210 name = u'ipcluster'
210 name = u'ipcluster'
211 description = stop_help
211 description = stop_help
212 auto_create_cluster_dir = Bool(False)
212 auto_create_cluster_dir = Bool(False)
213 default_config_file_name = default_config_file_name
213 config_file_name = Unicode(default_config_file_name)
214
214
215 signal = Int(signal.SIGINT, config=True,
215 signal = Int(signal.SIGINT, config=True,
216 help="signal to use for stopping processes.")
216 help="signal to use for stopping processes.")
217
217
218 aliases = Dict(stop_aliases)
218 aliases = Dict(stop_aliases)
219
219
220 def init_clusterdir(self):
220 def init_clusterdir(self):
221 try:
221 try:
222 super(IPClusterStop, self).init_clusterdir()
222 super(IPClusterStop, self).init_clusterdir()
223 except ClusterDirError as e:
223 except ClusterDirError as e:
224 self.log.fatal("Failed ClusterDir init: %s"%e)
224 self.log.fatal("Failed ClusterDir init: %s"%e)
225 self.exit(1)
225 self.exit(1)
226
226
227 def start(self):
227 def start(self):
228 """Start the app for the stop subcommand."""
228 """Start the app for the stop subcommand."""
229 try:
229 try:
230 pid = self.get_pid_from_file()
230 pid = self.get_pid_from_file()
231 except PIDFileError:
231 except PIDFileError:
232 self.log.critical(
232 self.log.critical(
233 'Could not read pid file, cluster is probably not running.'
233 'Could not read pid file, cluster is probably not running.'
234 )
234 )
235 # Here I exit with a unusual exit status that other processes
235 # Here I exit with a unusual exit status that other processes
236 # can watch for to learn how I existed.
236 # can watch for to learn how I existed.
237 self.remove_pid_file()
237 self.remove_pid_file()
238 self.exit(ALREADY_STOPPED)
238 self.exit(ALREADY_STOPPED)
239
239
240 if not self.check_pid(pid):
240 if not self.check_pid(pid):
241 self.log.critical(
241 self.log.critical(
242 'Cluster [pid=%r] is not running.' % pid
242 'Cluster [pid=%r] is not running.' % pid
243 )
243 )
244 self.remove_pid_file()
244 self.remove_pid_file()
245 # Here I exit with a unusual exit status that other processes
245 # Here I exit with a unusual exit status that other processes
246 # can watch for to learn how I existed.
246 # can watch for to learn how I existed.
247 self.exit(ALREADY_STOPPED)
247 self.exit(ALREADY_STOPPED)
248
248
249 elif os.name=='posix':
249 elif os.name=='posix':
250 sig = self.signal
250 sig = self.signal
251 self.log.info(
251 self.log.info(
252 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
252 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
253 )
253 )
254 try:
254 try:
255 os.kill(pid, sig)
255 os.kill(pid, sig)
256 except OSError:
256 except OSError:
257 self.log.error("Stopping cluster failed, assuming already dead.",
257 self.log.error("Stopping cluster failed, assuming already dead.",
258 exc_info=True)
258 exc_info=True)
259 self.remove_pid_file()
259 self.remove_pid_file()
260 elif os.name=='nt':
260 elif os.name=='nt':
261 try:
261 try:
262 # kill the whole tree
262 # kill the whole tree
263 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
263 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
264 except (CalledProcessError, OSError):
264 except (CalledProcessError, OSError):
265 self.log.error("Stopping cluster failed, assuming already dead.",
265 self.log.error("Stopping cluster failed, assuming already dead.",
266 exc_info=True)
266 exc_info=True)
267 self.remove_pid_file()
267 self.remove_pid_file()
268
268
269 engine_aliases = {}
269 engine_aliases = {}
270 engine_aliases.update(base_aliases)
270 engine_aliases.update(base_aliases)
271 engine_aliases.update(dict(
271 engine_aliases.update(dict(
272 n='IPClusterEngines.n',
272 n='IPClusterEngines.n',
273 elauncher = 'IPClusterEngines.engine_launcher_class',
273 elauncher = 'IPClusterEngines.engine_launcher_class',
274 ))
274 ))
275 class IPClusterEngines(ClusterApplication):
275 class IPClusterEngines(ClusterApplication):
276
276
277 name = u'ipcluster'
277 name = u'ipcluster'
278 description = engines_help
278 description = engines_help
279 usage = None
279 usage = None
280 default_config_file_name = default_config_file_name
280 config_file_name = Unicode(default_config_file_name)
281 default_log_level = logging.INFO
281 default_log_level = logging.INFO
282 auto_create_cluster_dir = Bool(False)
282 auto_create_cluster_dir = Bool(False)
283 classes = List()
283 classes = List()
284 def _classes_default(self):
284 def _classes_default(self):
285 from IPython.parallel.apps import launcher
285 from IPython.parallel.apps import launcher
286 launchers = launcher.all_launchers
286 launchers = launcher.all_launchers
287 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
287 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
288 return [ClusterDir]+eslaunchers
288 return [ClusterDir]+eslaunchers
289
289
290 n = Int(2, config=True,
290 n = Int(2, config=True,
291 help="The number of engines to start.")
291 help="The number of engines to start.")
292
292
293 engine_launcher_class = Unicode('LocalEngineSetLauncher',
293 engine_launcher_class = Unicode('LocalEngineSetLauncher',
294 config=True,
294 config=True,
295 help="The class for launching a set of Engines."
295 help="The class for launching a set of Engines."
296 )
296 )
297 daemonize = Bool(False, config=True,
297 daemonize = Bool(False, config=True,
298 help='Daemonize the ipcluster program. This implies --log-to-file')
298 help='Daemonize the ipcluster program. This implies --log-to-file')
299
299
300 def _daemonize_changed(self, name, old, new):
300 def _daemonize_changed(self, name, old, new):
301 if new:
301 if new:
302 self.log_to_file = True
302 self.log_to_file = True
303
303
304 aliases = Dict(engine_aliases)
304 aliases = Dict(engine_aliases)
305 # flags = Dict(flags)
305 # flags = Dict(flags)
306 _stopping = False
306 _stopping = False
307
307
308 def initialize(self, argv=None):
308 def initialize(self, argv=None):
309 super(IPClusterEngines, self).initialize(argv)
309 super(IPClusterEngines, self).initialize(argv)
310 self.init_signal()
310 self.init_signal()
311 self.init_launchers()
311 self.init_launchers()
312
312
313 def init_launchers(self):
313 def init_launchers(self):
314 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
314 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
315 self.engine_launcher.on_stop(lambda r: self.loop.stop())
315 self.engine_launcher.on_stop(lambda r: self.loop.stop())
316
316
317 def init_signal(self):
317 def init_signal(self):
318 # Setup signals
318 # Setup signals
319 signal.signal(signal.SIGINT, self.sigint_handler)
319 signal.signal(signal.SIGINT, self.sigint_handler)
320
320
321 def build_launcher(self, clsname):
321 def build_launcher(self, clsname):
322 """import and instantiate a Launcher based on importstring"""
322 """import and instantiate a Launcher based on importstring"""
323 if '.' not in clsname:
323 if '.' not in clsname:
324 # not a module, presume it's the raw name in apps.launcher
324 # not a module, presume it's the raw name in apps.launcher
325 clsname = 'IPython.parallel.apps.launcher.'+clsname
325 clsname = 'IPython.parallel.apps.launcher.'+clsname
326 # print repr(clsname)
326 # print repr(clsname)
327 klass = import_item(clsname)
327 klass = import_item(clsname)
328
328
329 launcher = klass(
329 launcher = klass(
330 work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
330 work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
331 )
331 )
332 return launcher
332 return launcher
333
333
334 def start_engines(self):
334 def start_engines(self):
335 self.log.info("Starting %i engines"%self.n)
335 self.log.info("Starting %i engines"%self.n)
336 self.engine_launcher.start(
336 self.engine_launcher.start(
337 self.n,
337 self.n,
338 cluster_dir=self.cluster_dir.location
338 cluster_dir=self.cluster_dir.location
339 )
339 )
340
340
341 def stop_engines(self):
341 def stop_engines(self):
342 self.log.info("Stopping Engines...")
342 self.log.info("Stopping Engines...")
343 if self.engine_launcher.running:
343 if self.engine_launcher.running:
344 d = self.engine_launcher.stop()
344 d = self.engine_launcher.stop()
345 return d
345 return d
346 else:
346 else:
347 return None
347 return None
348
348
349 def stop_launchers(self, r=None):
349 def stop_launchers(self, r=None):
350 if not self._stopping:
350 if not self._stopping:
351 self._stopping = True
351 self._stopping = True
352 self.log.error("IPython cluster: stopping")
352 self.log.error("IPython cluster: stopping")
353 self.stop_engines()
353 self.stop_engines()
354 # Wait a few seconds to let things shut down.
354 # Wait a few seconds to let things shut down.
355 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
355 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
356 dc.start()
356 dc.start()
357
357
358 def sigint_handler(self, signum, frame):
358 def sigint_handler(self, signum, frame):
359 self.log.debug("SIGINT received, stopping launchers...")
359 self.log.debug("SIGINT received, stopping launchers...")
360 self.stop_launchers()
360 self.stop_launchers()
361
361
362 def start_logging(self):
362 def start_logging(self):
363 # Remove old log files of the controller and engine
363 # Remove old log files of the controller and engine
364 if self.clean_logs:
364 if self.clean_logs:
365 log_dir = self.cluster_dir.log_dir
365 log_dir = self.cluster_dir.log_dir
366 for f in os.listdir(log_dir):
366 for f in os.listdir(log_dir):
367 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
367 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
368 os.remove(os.path.join(log_dir, f))
368 os.remove(os.path.join(log_dir, f))
369 # This will remove old log files for ipcluster itself
369 # This will remove old log files for ipcluster itself
370 # super(IPClusterApp, self).start_logging()
370 # super(IPClusterApp, self).start_logging()
371
371
372 def start(self):
372 def start(self):
373 """Start the app for the engines subcommand."""
373 """Start the app for the engines subcommand."""
374 self.log.info("IPython cluster: started")
374 self.log.info("IPython cluster: started")
375 # First see if the cluster is already running
375 # First see if the cluster is already running
376
376
377 # Now log and daemonize
377 # Now log and daemonize
378 self.log.info(
378 self.log.info(
379 'Starting engines with [daemon=%r]' % self.daemonize
379 'Starting engines with [daemon=%r]' % self.daemonize
380 )
380 )
381 # TODO: Get daemonize working on Windows or as a Windows Server.
381 # TODO: Get daemonize working on Windows or as a Windows Server.
382 if self.daemonize:
382 if self.daemonize:
383 if os.name=='posix':
383 if os.name=='posix':
384 from twisted.scripts._twistd_unix import daemonize
384 from twisted.scripts._twistd_unix import daemonize
385 daemonize()
385 daemonize()
386
386
387 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
387 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
388 dc.start()
388 dc.start()
389 # Now write the new pid file AFTER our new forked pid is active.
389 # Now write the new pid file AFTER our new forked pid is active.
390 # self.write_pid_file()
390 # self.write_pid_file()
391 try:
391 try:
392 self.loop.start()
392 self.loop.start()
393 except KeyboardInterrupt:
393 except KeyboardInterrupt:
394 pass
394 pass
395 except zmq.ZMQError as e:
395 except zmq.ZMQError as e:
396 if e.errno == errno.EINTR:
396 if e.errno == errno.EINTR:
397 pass
397 pass
398 else:
398 else:
399 raise
399 raise
400
400
401 start_aliases = {}
401 start_aliases = {}
402 start_aliases.update(engine_aliases)
402 start_aliases.update(engine_aliases)
403 start_aliases.update(dict(
403 start_aliases.update(dict(
404 delay='IPClusterStart.delay',
404 delay='IPClusterStart.delay',
405 clean_logs='IPClusterStart.clean_logs',
405 clean_logs='IPClusterStart.clean_logs',
406 ))
406 ))
407
407
408 class IPClusterStart(IPClusterEngines):
408 class IPClusterStart(IPClusterEngines):
409
409
410 name = u'ipcluster'
410 name = u'ipcluster'
411 description = start_help
411 description = start_help
412 usage = None
413 default_config_file_name = default_config_file_name
414 default_log_level = logging.INFO
412 default_log_level = logging.INFO
415 auto_create_cluster_dir = Bool(True, config=True,
413 auto_create_cluster_dir = Bool(True, config=True,
416 help="whether to create the cluster_dir if it doesn't exist")
414 help="whether to create the cluster_dir if it doesn't exist")
417 classes = List()
415 classes = List()
418 def _classes_default(self,):
416 def _classes_default(self,):
419 from IPython.parallel.apps import launcher
417 from IPython.parallel.apps import launcher
420 return [ClusterDir]+launcher.all_launchers
418 return [ClusterDir]+launcher.all_launchers
421
419
422 clean_logs = Bool(True, config=True,
420 clean_logs = Bool(True, config=True,
423 help="whether to cleanup old logs before starting")
421 help="whether to cleanup old logs before starting")
424
422
425 delay = CFloat(1., config=True,
423 delay = CFloat(1., config=True,
426 help="delay (in s) between starting the controller and the engines")
424 help="delay (in s) between starting the controller and the engines")
427
425
428 controller_launcher_class = Unicode('LocalControllerLauncher',
426 controller_launcher_class = Unicode('LocalControllerLauncher',
429 config=True,
427 config=True,
430 help="The class for launching a Controller."
428 help="The class for launching a Controller."
431 )
429 )
432 reset = Bool(False, config=True,
430 reset = Bool(False, config=True,
433 help="Whether to reset config files as part of '--create'."
431 help="Whether to reset config files as part of '--create'."
434 )
432 )
435
433
436 # flags = Dict(flags)
434 # flags = Dict(flags)
437 aliases = Dict(start_aliases)
435 aliases = Dict(start_aliases)
438
436
439 def init_launchers(self):
437 def init_launchers(self):
440 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
438 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
441 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
439 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
442 self.controller_launcher.on_stop(self.stop_launchers)
440 self.controller_launcher.on_stop(self.stop_launchers)
443
441
444 def start_controller(self):
442 def start_controller(self):
445 self.controller_launcher.start(
443 self.controller_launcher.start(
446 cluster_dir=self.cluster_dir.location
444 cluster_dir=self.cluster_dir.location
447 )
445 )
448
446
449 def stop_controller(self):
447 def stop_controller(self):
450 # self.log.info("In stop_controller")
448 # self.log.info("In stop_controller")
451 if self.controller_launcher and self.controller_launcher.running:
449 if self.controller_launcher and self.controller_launcher.running:
452 return self.controller_launcher.stop()
450 return self.controller_launcher.stop()
453
451
454 def stop_launchers(self, r=None):
452 def stop_launchers(self, r=None):
455 if not self._stopping:
453 if not self._stopping:
456 self.stop_controller()
454 self.stop_controller()
457 super(IPClusterStart, self).stop_launchers()
455 super(IPClusterStart, self).stop_launchers()
458
456
459 def start(self):
457 def start(self):
460 """Start the app for the start subcommand."""
458 """Start the app for the start subcommand."""
461 # First see if the cluster is already running
459 # First see if the cluster is already running
462 try:
460 try:
463 pid = self.get_pid_from_file()
461 pid = self.get_pid_from_file()
464 except PIDFileError:
462 except PIDFileError:
465 pass
463 pass
466 else:
464 else:
467 if self.check_pid(pid):
465 if self.check_pid(pid):
468 self.log.critical(
466 self.log.critical(
469 'Cluster is already running with [pid=%s]. '
467 'Cluster is already running with [pid=%s]. '
470 'use "ipcluster stop" to stop the cluster.' % pid
468 'use "ipcluster stop" to stop the cluster.' % pid
471 )
469 )
472 # Here I exit with a unusual exit status that other processes
470 # Here I exit with a unusual exit status that other processes
473 # can watch for to learn how I existed.
471 # can watch for to learn how I existed.
474 self.exit(ALREADY_STARTED)
472 self.exit(ALREADY_STARTED)
475 else:
473 else:
476 self.remove_pid_file()
474 self.remove_pid_file()
477
475
478
476
479 # Now log and daemonize
477 # Now log and daemonize
480 self.log.info(
478 self.log.info(
481 'Starting ipcluster with [daemon=%r]' % self.daemonize
479 'Starting ipcluster with [daemon=%r]' % self.daemonize
482 )
480 )
483 # TODO: Get daemonize working on Windows or as a Windows Server.
481 # TODO: Get daemonize working on Windows or as a Windows Server.
484 if self.daemonize:
482 if self.daemonize:
485 if os.name=='posix':
483 if os.name=='posix':
486 from twisted.scripts._twistd_unix import daemonize
484 from twisted.scripts._twistd_unix import daemonize
487 daemonize()
485 daemonize()
488
486
489 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
487 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
490 dc.start()
488 dc.start()
491 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
489 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
492 dc.start()
490 dc.start()
493 # Now write the new pid file AFTER our new forked pid is active.
491 # Now write the new pid file AFTER our new forked pid is active.
494 self.write_pid_file()
492 self.write_pid_file()
495 try:
493 try:
496 self.loop.start()
494 self.loop.start()
497 except KeyboardInterrupt:
495 except KeyboardInterrupt:
498 pass
496 pass
499 except zmq.ZMQError as e:
497 except zmq.ZMQError as e:
500 if e.errno == errno.EINTR:
498 if e.errno == errno.EINTR:
501 pass
499 pass
502 else:
500 else:
503 raise
501 raise
504 finally:
502 finally:
505 self.remove_pid_file()
503 self.remove_pid_file()
506
504
507 base='IPython.parallel.apps.ipclusterapp.IPCluster'
505 base='IPython.parallel.apps.ipclusterapp.IPCluster'
508
506
509 class IPClusterApp(Application):
507 class IPClusterApp(Application):
510 name = u'ipcluster'
508 name = u'ipcluster'
511 description = _description
509 description = _description
512
510
513 subcommands = {'create' : (base+'Create', create_help),
511 subcommands = {'create' : (base+'Create', create_help),
514 'list' : (base+'List', list_help),
512 'list' : (base+'List', list_help),
515 'start' : (base+'Start', start_help),
513 'start' : (base+'Start', start_help),
516 'stop' : (base+'Stop', stop_help),
514 'stop' : (base+'Stop', stop_help),
517 'engines' : (base+'Engines', engines_help),
515 'engines' : (base+'Engines', engines_help),
518 }
516 }
519
517
520 # no aliases or flags for parent App
518 # no aliases or flags for parent App
521 aliases = Dict()
519 aliases = Dict()
522 flags = Dict()
520 flags = Dict()
523
521
524 def start(self):
522 def start(self):
525 if self.subapp is None:
523 if self.subapp is None:
526 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
524 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
527 print
525 print
528 self.print_subcommands()
526 self.print_subcommands()
529 self.exit(1)
527 self.exit(1)
530 else:
528 else:
531 return self.subapp.start()
529 return self.subapp.start()
532
530
533 def launch_new_instance():
531 def launch_new_instance():
534 """Create and run the IPython cluster."""
532 """Create and run the IPython cluster."""
535 app = IPClusterApp()
533 app = IPClusterApp()
536 app.initialize()
534 app.initialize()
537 app.start()
535 app.start()
538
536
539
537
540 if __name__ == '__main__':
538 if __name__ == '__main__':
541 launch_new_instance()
539 launch_new_instance()
542
540
@@ -1,405 +1,404 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import copy
20 import copy
21 import os
21 import os
22 import logging
22 import logging
23 import socket
23 import socket
24 import stat
24 import stat
25 import sys
25 import sys
26 import uuid
26 import uuid
27
27
28 from multiprocessing import Process
28 from multiprocessing import Process
29
29
30 import zmq
30 import zmq
31 from zmq.devices import ProcessMonitoredQueue
31 from zmq.devices import ProcessMonitoredQueue
32 from zmq.log.handlers import PUBHandler
32 from zmq.log.handlers import PUBHandler
33 from zmq.utils import jsonapi as json
33 from zmq.utils import jsonapi as json
34
34
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36
36
37 from IPython.parallel import factory
37 from IPython.parallel import factory
38
38
39 from IPython.parallel.apps.clusterdir import (
39 from IPython.parallel.apps.clusterdir import (
40 ClusterDir,
40 ClusterDir,
41 ClusterApplication,
41 ClusterApplication,
42 base_flags
42 base_flags
43 # ClusterDirConfigLoader
43 # ClusterDirConfigLoader
44 )
44 )
45 from IPython.utils.importstring import import_item
45 from IPython.utils.importstring import import_item
46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
47
47
48 # from IPython.parallel.controller.controller import ControllerFactory
48 # from IPython.parallel.controller.controller import ControllerFactory
49 from IPython.parallel.streamsession import StreamSession
49 from IPython.parallel.streamsession import StreamSession
50 from IPython.parallel.controller.heartmonitor import HeartMonitor
50 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 from IPython.parallel.controller.hub import Hub, HubFactory
51 from IPython.parallel.controller.hub import Hub, HubFactory
52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 from IPython.parallel.controller.sqlitedb import SQLiteDB
53 from IPython.parallel.controller.sqlitedb import SQLiteDB
54
54
55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
56
56
57 # conditional import of MongoDB backend class
57 # conditional import of MongoDB backend class
58
58
59 try:
59 try:
60 from IPython.parallel.controller.mongodb import MongoDB
60 from IPython.parallel.controller.mongodb import MongoDB
61 except ImportError:
61 except ImportError:
62 maybe_mongo = []
62 maybe_mongo = []
63 else:
63 else:
64 maybe_mongo = [MongoDB]
64 maybe_mongo = [MongoDB]
65
65
66
66
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68 # Module level variables
68 # Module level variables
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70
70
71
71
72 #: The default config file name for this application
72 #: The default config file name for this application
73 default_config_file_name = u'ipcontroller_config.py'
73 default_config_file_name = u'ipcontroller_config.py'
74
74
75
75
76 _description = """Start the IPython controller for parallel computing.
76 _description = """Start the IPython controller for parallel computing.
77
77
78 The IPython controller provides a gateway between the IPython engines and
78 The IPython controller provides a gateway between the IPython engines and
79 clients. The controller needs to be started before the engines and can be
79 clients. The controller needs to be started before the engines and can be
80 configured using command line options or using a cluster directory. Cluster
80 configured using command line options or using a cluster directory. Cluster
81 directories contain config, log and security files and are usually located in
81 directories contain config, log and security files and are usually located in
82 your ipython directory and named as "cluster_<profile>". See the `profile`
82 your ipython directory and named as "cluster_<profile>". See the `profile`
83 and `cluster_dir` options for details.
83 and `cluster_dir` options for details.
84 """
84 """
85
85
86
86
87
87
88
88
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90 # The main application
90 # The main application
91 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
92 flags = {}
92 flags = {}
93 flags.update(base_flags)
93 flags.update(base_flags)
94 flags.update({
94 flags.update({
95 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
95 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
96 'Use threads instead of processes for the schedulers'),
96 'Use threads instead of processes for the schedulers'),
97 'sqlitedb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'})},
97 'sqlitedb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'})},
98 'use the SQLiteDB backend'),
98 'use the SQLiteDB backend'),
99 'mongodb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'})},
99 'mongodb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'})},
100 'use the MongoDB backend'),
100 'use the MongoDB backend'),
101 'dictdb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.dictdb.DictDB'})},
101 'dictdb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.dictdb.DictDB'})},
102 'use the in-memory DictDB backend'),
102 'use the in-memory DictDB backend'),
103 'reuse' : ({'IPControllerApp' : Config({'reuse_files' : True})},
103 'reuse' : ({'IPControllerApp' : Config({'reuse_files' : True})},
104 'reuse existing json connection files')
104 'reuse existing json connection files')
105 })
105 })
106
106
107 flags.update()
107 flags.update()
108
108
109 class IPControllerApp(ClusterApplication):
109 class IPControllerApp(ClusterApplication):
110
110
111 name = u'ipcontroller'
111 name = u'ipcontroller'
112 description = _description
112 description = _description
113 # command_line_loader = IPControllerAppConfigLoader
113 config_file_name = Unicode(default_config_file_name)
114 default_config_file_name = default_config_file_name
115 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
114 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
116
115
117 auto_create_cluster_dir = Bool(True, config=True,
116 auto_create_cluster_dir = Bool(True, config=True,
118 help="Whether to create cluster_dir if it exists.")
117 help="Whether to create cluster_dir if it exists.")
119 reuse_files = Bool(False, config=True,
118 reuse_files = Bool(False, config=True,
120 help='Whether to reuse existing json connection files [default: False]'
119 help='Whether to reuse existing json connection files [default: False]'
121 )
120 )
122 secure = Bool(True, config=True,
121 secure = Bool(True, config=True,
123 help='Whether to use exec_keys for extra authentication [default: True]'
122 help='Whether to use exec_keys for extra authentication [default: True]'
124 )
123 )
125 ssh_server = Unicode(u'', config=True,
124 ssh_server = Unicode(u'', config=True,
126 help="""ssh url for clients to use when connecting to the Controller
125 help="""ssh url for clients to use when connecting to the Controller
127 processes. It should be of the form: [user@]server[:port]. The
126 processes. It should be of the form: [user@]server[:port]. The
128 Controller\'s listening addresses must be accessible from the ssh server""",
127 Controller\'s listening addresses must be accessible from the ssh server""",
129 )
128 )
130 location = Unicode(u'', config=True,
129 location = Unicode(u'', config=True,
131 help="""The external IP or domain name of the Controller, used for disambiguating
130 help="""The external IP or domain name of the Controller, used for disambiguating
132 engine and client connections.""",
131 engine and client connections.""",
133 )
132 )
134 import_statements = List([], config=True,
133 import_statements = List([], config=True,
135 help="import statements to be run at startup. Necessary in some environments"
134 help="import statements to be run at startup. Necessary in some environments"
136 )
135 )
137
136
138 use_threads = Bool(False, config=True,
137 use_threads = Bool(False, config=True,
139 help='Use threads instead of processes for the schedulers',
138 help='Use threads instead of processes for the schedulers',
140 )
139 )
141
140
142 # internal
141 # internal
143 children = List()
142 children = List()
144 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
143 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
145
144
146 def _use_threads_changed(self, name, old, new):
145 def _use_threads_changed(self, name, old, new):
147 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
146 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
148
147
149 aliases = Dict(dict(
148 aliases = Dict(dict(
150 config = 'IPControllerApp.config_file',
149 config = 'IPControllerApp.config_file',
151 # file = 'IPControllerApp.url_file',
150 # file = 'IPControllerApp.url_file',
152 log_level = 'IPControllerApp.log_level',
151 log_level = 'IPControllerApp.log_level',
153 log_url = 'IPControllerApp.log_url',
152 log_url = 'IPControllerApp.log_url',
154 reuse_files = 'IPControllerApp.reuse_files',
153 reuse_files = 'IPControllerApp.reuse_files',
155 secure = 'IPControllerApp.secure',
154 secure = 'IPControllerApp.secure',
156 ssh = 'IPControllerApp.ssh_server',
155 ssh = 'IPControllerApp.ssh_server',
157 use_threads = 'IPControllerApp.use_threads',
156 use_threads = 'IPControllerApp.use_threads',
158 import_statements = 'IPControllerApp.import_statements',
157 import_statements = 'IPControllerApp.import_statements',
159 location = 'IPControllerApp.location',
158 location = 'IPControllerApp.location',
160
159
161 ident = 'StreamSession.session',
160 ident = 'StreamSession.session',
162 user = 'StreamSession.username',
161 user = 'StreamSession.username',
163 exec_key = 'StreamSession.keyfile',
162 exec_key = 'StreamSession.keyfile',
164
163
165 url = 'HubFactory.url',
164 url = 'HubFactory.url',
166 ip = 'HubFactory.ip',
165 ip = 'HubFactory.ip',
167 transport = 'HubFactory.transport',
166 transport = 'HubFactory.transport',
168 port = 'HubFactory.regport',
167 port = 'HubFactory.regport',
169
168
170 ping = 'HeartMonitor.period',
169 ping = 'HeartMonitor.period',
171
170
172 scheme = 'TaskScheduler.scheme_name',
171 scheme = 'TaskScheduler.scheme_name',
173 hwm = 'TaskScheduler.hwm',
172 hwm = 'TaskScheduler.hwm',
174
173
175
174
176 profile = "ClusterDir.profile",
175 profile = "ClusterDir.profile",
177 cluster_dir = 'ClusterDir.location',
176 cluster_dir = 'ClusterDir.location',
178
177
179 ))
178 ))
180 flags = Dict(flags)
179 flags = Dict(flags)
181
180
182
181
183 def save_connection_dict(self, fname, cdict):
182 def save_connection_dict(self, fname, cdict):
184 """save a connection dict to json file."""
183 """save a connection dict to json file."""
185 c = self.config
184 c = self.config
186 url = cdict['url']
185 url = cdict['url']
187 location = cdict['location']
186 location = cdict['location']
188 if not location:
187 if not location:
189 try:
188 try:
190 proto,ip,port = split_url(url)
189 proto,ip,port = split_url(url)
191 except AssertionError:
190 except AssertionError:
192 pass
191 pass
193 else:
192 else:
194 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
193 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
195 cdict['location'] = location
194 cdict['location'] = location
196 fname = os.path.join(self.cluster_dir.security_dir, fname)
195 fname = os.path.join(self.cluster_dir.security_dir, fname)
197 with open(fname, 'w') as f:
196 with open(fname, 'w') as f:
198 f.write(json.dumps(cdict, indent=2))
197 f.write(json.dumps(cdict, indent=2))
199 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
198 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
200
199
201 def load_config_from_json(self):
200 def load_config_from_json(self):
202 """load config from existing json connector files."""
201 """load config from existing json connector files."""
203 c = self.config
202 c = self.config
204 # load from engine config
203 # load from engine config
205 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
204 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
206 cfg = json.loads(f.read())
205 cfg = json.loads(f.read())
207 key = c.StreamSession.key = cfg['exec_key']
206 key = c.StreamSession.key = cfg['exec_key']
208 xport,addr = cfg['url'].split('://')
207 xport,addr = cfg['url'].split('://')
209 c.HubFactory.engine_transport = xport
208 c.HubFactory.engine_transport = xport
210 ip,ports = addr.split(':')
209 ip,ports = addr.split(':')
211 c.HubFactory.engine_ip = ip
210 c.HubFactory.engine_ip = ip
212 c.HubFactory.regport = int(ports)
211 c.HubFactory.regport = int(ports)
213 self.location = cfg['location']
212 self.location = cfg['location']
214
213
215 # load client config
214 # load client config
216 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
215 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
217 cfg = json.loads(f.read())
216 cfg = json.loads(f.read())
218 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
217 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
219 xport,addr = cfg['url'].split('://')
218 xport,addr = cfg['url'].split('://')
220 c.HubFactory.client_transport = xport
219 c.HubFactory.client_transport = xport
221 ip,ports = addr.split(':')
220 ip,ports = addr.split(':')
222 c.HubFactory.client_ip = ip
221 c.HubFactory.client_ip = ip
223 self.ssh_server = cfg['ssh']
222 self.ssh_server = cfg['ssh']
224 assert int(ports) == c.HubFactory.regport, "regport mismatch"
223 assert int(ports) == c.HubFactory.regport, "regport mismatch"
225
224
226 def init_hub(self):
225 def init_hub(self):
227 c = self.config
226 c = self.config
228
227
229 self.do_import_statements()
228 self.do_import_statements()
230 reusing = self.reuse_files
229 reusing = self.reuse_files
231 if reusing:
230 if reusing:
232 try:
231 try:
233 self.load_config_from_json()
232 self.load_config_from_json()
234 except (AssertionError,IOError):
233 except (AssertionError,IOError):
235 reusing=False
234 reusing=False
236 # check again, because reusing may have failed:
235 # check again, because reusing may have failed:
237 if reusing:
236 if reusing:
238 pass
237 pass
239 elif self.secure:
238 elif self.secure:
240 key = str(uuid.uuid4())
239 key = str(uuid.uuid4())
241 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
240 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
242 # with open(keyfile, 'w') as f:
241 # with open(keyfile, 'w') as f:
243 # f.write(key)
242 # f.write(key)
244 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
243 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
245 c.StreamSession.key = key
244 c.StreamSession.key = key
246 else:
245 else:
247 key = c.StreamSession.key = ''
246 key = c.StreamSession.key = ''
248
247
249 try:
248 try:
250 self.factory = HubFactory(config=c, log=self.log)
249 self.factory = HubFactory(config=c, log=self.log)
251 # self.start_logging()
250 # self.start_logging()
252 self.factory.init_hub()
251 self.factory.init_hub()
253 except:
252 except:
254 self.log.error("Couldn't construct the Controller", exc_info=True)
253 self.log.error("Couldn't construct the Controller", exc_info=True)
255 self.exit(1)
254 self.exit(1)
256
255
257 if not reusing:
256 if not reusing:
258 # save to new json config files
257 # save to new json config files
259 f = self.factory
258 f = self.factory
260 cdict = {'exec_key' : key,
259 cdict = {'exec_key' : key,
261 'ssh' : self.ssh_server,
260 'ssh' : self.ssh_server,
262 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
261 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
263 'location' : self.location
262 'location' : self.location
264 }
263 }
265 self.save_connection_dict('ipcontroller-client.json', cdict)
264 self.save_connection_dict('ipcontroller-client.json', cdict)
266 edict = cdict
265 edict = cdict
267 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
266 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
268 self.save_connection_dict('ipcontroller-engine.json', edict)
267 self.save_connection_dict('ipcontroller-engine.json', edict)
269
268
270 #
269 #
271 def init_schedulers(self):
270 def init_schedulers(self):
272 children = self.children
271 children = self.children
273 mq = import_item(str(self.mq_class))
272 mq = import_item(str(self.mq_class))
274
273
275 hub = self.factory
274 hub = self.factory
276 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
275 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
277 # IOPub relay (in a Process)
276 # IOPub relay (in a Process)
278 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
277 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
279 q.bind_in(hub.client_info['iopub'])
278 q.bind_in(hub.client_info['iopub'])
280 q.bind_out(hub.engine_info['iopub'])
279 q.bind_out(hub.engine_info['iopub'])
281 q.setsockopt_out(zmq.SUBSCRIBE, '')
280 q.setsockopt_out(zmq.SUBSCRIBE, '')
282 q.connect_mon(hub.monitor_url)
281 q.connect_mon(hub.monitor_url)
283 q.daemon=True
282 q.daemon=True
284 children.append(q)
283 children.append(q)
285
284
286 # Multiplexer Queue (in a Process)
285 # Multiplexer Queue (in a Process)
287 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
286 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
288 q.bind_in(hub.client_info['mux'])
287 q.bind_in(hub.client_info['mux'])
289 q.setsockopt_in(zmq.IDENTITY, 'mux')
288 q.setsockopt_in(zmq.IDENTITY, 'mux')
290 q.bind_out(hub.engine_info['mux'])
289 q.bind_out(hub.engine_info['mux'])
291 q.connect_mon(hub.monitor_url)
290 q.connect_mon(hub.monitor_url)
292 q.daemon=True
291 q.daemon=True
293 children.append(q)
292 children.append(q)
294
293
295 # Control Queue (in a Process)
294 # Control Queue (in a Process)
296 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
295 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
297 q.bind_in(hub.client_info['control'])
296 q.bind_in(hub.client_info['control'])
298 q.setsockopt_in(zmq.IDENTITY, 'control')
297 q.setsockopt_in(zmq.IDENTITY, 'control')
299 q.bind_out(hub.engine_info['control'])
298 q.bind_out(hub.engine_info['control'])
300 q.connect_mon(hub.monitor_url)
299 q.connect_mon(hub.monitor_url)
301 q.daemon=True
300 q.daemon=True
302 children.append(q)
301 children.append(q)
303 try:
302 try:
304 scheme = self.config.TaskScheduler.scheme_name
303 scheme = self.config.TaskScheduler.scheme_name
305 except AttributeError:
304 except AttributeError:
306 scheme = TaskScheduler.scheme_name.get_default_value()
305 scheme = TaskScheduler.scheme_name.get_default_value()
307 # Task Queue (in a Process)
306 # Task Queue (in a Process)
308 if scheme == 'pure':
307 if scheme == 'pure':
309 self.log.warn("task::using pure XREQ Task scheduler")
308 self.log.warn("task::using pure XREQ Task scheduler")
310 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
309 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
311 # q.setsockopt_out(zmq.HWM, hub.hwm)
310 # q.setsockopt_out(zmq.HWM, hub.hwm)
312 q.bind_in(hub.client_info['task'][1])
311 q.bind_in(hub.client_info['task'][1])
313 q.setsockopt_in(zmq.IDENTITY, 'task')
312 q.setsockopt_in(zmq.IDENTITY, 'task')
314 q.bind_out(hub.engine_info['task'])
313 q.bind_out(hub.engine_info['task'])
315 q.connect_mon(hub.monitor_url)
314 q.connect_mon(hub.monitor_url)
316 q.daemon=True
315 q.daemon=True
317 children.append(q)
316 children.append(q)
318 elif scheme == 'none':
317 elif scheme == 'none':
319 self.log.warn("task::using no Task scheduler")
318 self.log.warn("task::using no Task scheduler")
320
319
321 else:
320 else:
322 self.log.info("task::using Python %s Task scheduler"%scheme)
321 self.log.info("task::using Python %s Task scheduler"%scheme)
323 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
322 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
324 hub.monitor_url, hub.client_info['notification'])
323 hub.monitor_url, hub.client_info['notification'])
325 kwargs = dict(logname='scheduler', loglevel=self.log_level,
324 kwargs = dict(logname='scheduler', loglevel=self.log_level,
326 log_url = self.log_url, config=dict(self.config))
325 log_url = self.log_url, config=dict(self.config))
327 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
326 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
328 q.daemon=True
327 q.daemon=True
329 children.append(q)
328 children.append(q)
330
329
331
330
332 def save_urls(self):
331 def save_urls(self):
333 """save the registration urls to files."""
332 """save the registration urls to files."""
334 c = self.config
333 c = self.config
335
334
336 sec_dir = self.cluster_dir.security_dir
335 sec_dir = self.cluster_dir.security_dir
337 cf = self.factory
336 cf = self.factory
338
337
339 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
338 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
340 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
339 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
341
340
342 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
341 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
343 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
342 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
344
343
345
344
346 def do_import_statements(self):
345 def do_import_statements(self):
347 statements = self.import_statements
346 statements = self.import_statements
348 for s in statements:
347 for s in statements:
349 try:
348 try:
350 self.log.msg("Executing statement: '%s'" % s)
349 self.log.msg("Executing statement: '%s'" % s)
351 exec s in globals(), locals()
350 exec s in globals(), locals()
352 except:
351 except:
353 self.log.msg("Error running statement: %s" % s)
352 self.log.msg("Error running statement: %s" % s)
354
353
355 def forward_logging(self):
354 def forward_logging(self):
356 if self.log_url:
355 if self.log_url:
357 self.log.info("Forwarding logging to %s"%self.log_url)
356 self.log.info("Forwarding logging to %s"%self.log_url)
358 context = zmq.Context.instance()
357 context = zmq.Context.instance()
359 lsock = context.socket(zmq.PUB)
358 lsock = context.socket(zmq.PUB)
360 lsock.connect(self.log_url)
359 lsock.connect(self.log_url)
361 handler = PUBHandler(lsock)
360 handler = PUBHandler(lsock)
362 self.log.removeHandler(self._log_handler)
361 self.log.removeHandler(self._log_handler)
363 handler.root_topic = 'controller'
362 handler.root_topic = 'controller'
364 handler.setLevel(self.log_level)
363 handler.setLevel(self.log_level)
365 self.log.addHandler(handler)
364 self.log.addHandler(handler)
366 self._log_handler = handler
365 self._log_handler = handler
367 # #
366 # #
368
367
369 def initialize(self, argv=None):
368 def initialize(self, argv=None):
370 super(IPControllerApp, self).initialize(argv)
369 super(IPControllerApp, self).initialize(argv)
371 self.forward_logging()
370 self.forward_logging()
372 self.init_hub()
371 self.init_hub()
373 self.init_schedulers()
372 self.init_schedulers()
374
373
375 def start(self):
374 def start(self):
376 # Start the subprocesses:
375 # Start the subprocesses:
377 self.factory.start()
376 self.factory.start()
378 child_procs = []
377 child_procs = []
379 for child in self.children:
378 for child in self.children:
380 child.start()
379 child.start()
381 if isinstance(child, ProcessMonitoredQueue):
380 if isinstance(child, ProcessMonitoredQueue):
382 child_procs.append(child.launcher)
381 child_procs.append(child.launcher)
383 elif isinstance(child, Process):
382 elif isinstance(child, Process):
384 child_procs.append(child)
383 child_procs.append(child)
385 if child_procs:
384 if child_procs:
386 signal_children(child_procs)
385 signal_children(child_procs)
387
386
388 self.write_pid_file(overwrite=True)
387 self.write_pid_file(overwrite=True)
389
388
390 try:
389 try:
391 self.factory.loop.start()
390 self.factory.loop.start()
392 except KeyboardInterrupt:
391 except KeyboardInterrupt:
393 self.log.critical("Interrupted, Exiting...\n")
392 self.log.critical("Interrupted, Exiting...\n")
394
393
395
394
396
395
397 def launch_new_instance():
396 def launch_new_instance():
398 """Create and run the IPython controller"""
397 """Create and run the IPython controller"""
399 app = IPControllerApp()
398 app = IPControllerApp()
400 app.initialize()
399 app.initialize()
401 app.start()
400 app.start()
402
401
403
402
404 if __name__ == '__main__':
403 if __name__ == '__main__':
405 launch_new_instance()
404 launch_new_instance()
@@ -1,277 +1,277 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import json
18 import json
19 import os
19 import os
20 import sys
20 import sys
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24
24
25 from IPython.parallel.apps.clusterdir import (
25 from IPython.parallel.apps.clusterdir import (
26 ClusterApplication,
26 ClusterApplication,
27 ClusterDir,
27 ClusterDir,
28 # ClusterDirConfigLoader
28 # ClusterDirConfigLoader
29 )
29 )
30 from IPython.zmq.log import EnginePUBHandler
30 from IPython.zmq.log import EnginePUBHandler
31
31
32 from IPython.config.configurable import Configurable
32 from IPython.config.configurable import Configurable
33 from IPython.parallel.streamsession import StreamSession
33 from IPython.parallel.streamsession import StreamSession
34 from IPython.parallel.engine.engine import EngineFactory
34 from IPython.parallel.engine.engine import EngineFactory
35 from IPython.parallel.engine.streamkernel import Kernel
35 from IPython.parallel.engine.streamkernel import Kernel
36 from IPython.parallel.util import disambiguate_url
36 from IPython.parallel.util import disambiguate_url
37
37
38 from IPython.utils.importstring import import_item
38 from IPython.utils.importstring import import_item
39 from IPython.utils.traitlets import Bool, Unicode, Dict, List
39 from IPython.utils.traitlets import Bool, Unicode, Dict, List
40
40
41
41
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43 # Module level variables
43 # Module level variables
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45
45
46 #: The default config file name for this application
46 #: The default config file name for this application
47 default_config_file_name = u'ipengine_config.py'
47 default_config_file_name = u'ipengine_config.py'
48
48
49 _description = """Start an IPython engine for parallel computing.
49 _description = """Start an IPython engine for parallel computing.
50
50
51 IPython engines run in parallel and perform computations on behalf of a client
51 IPython engines run in parallel and perform computations on behalf of a client
52 and controller. A controller needs to be started before the engines. The
52 and controller. A controller needs to be started before the engines. The
53 engine can be configured using command line options or using a cluster
53 engine can be configured using command line options or using a cluster
54 directory. Cluster directories contain config, log and security files and are
54 directory. Cluster directories contain config, log and security files and are
55 usually located in your ipython directory and named as "cluster_<profile>".
55 usually located in your ipython directory and named as "cluster_<profile>".
56 See the `profile` and `cluster_dir` options for details.
56 See the `profile` and `cluster_dir` options for details.
57 """
57 """
58
58
59
59
60 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
61 # MPI configuration
61 # MPI configuration
62 #-----------------------------------------------------------------------------
62 #-----------------------------------------------------------------------------
63
63
64 mpi4py_init = """from mpi4py import MPI as mpi
64 mpi4py_init = """from mpi4py import MPI as mpi
65 mpi.size = mpi.COMM_WORLD.Get_size()
65 mpi.size = mpi.COMM_WORLD.Get_size()
66 mpi.rank = mpi.COMM_WORLD.Get_rank()
66 mpi.rank = mpi.COMM_WORLD.Get_rank()
67 """
67 """
68
68
69
69
70 pytrilinos_init = """from PyTrilinos import Epetra
70 pytrilinos_init = """from PyTrilinos import Epetra
71 class SimpleStruct:
71 class SimpleStruct:
72 pass
72 pass
73 mpi = SimpleStruct()
73 mpi = SimpleStruct()
74 mpi.rank = 0
74 mpi.rank = 0
75 mpi.size = 0
75 mpi.size = 0
76 """
76 """
77
77
78 class MPI(Configurable):
78 class MPI(Configurable):
79 """Configurable for MPI initialization"""
79 """Configurable for MPI initialization"""
80 use = Unicode('', config=True,
80 use = Unicode('', config=True,
81 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
81 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
82 )
82 )
83
83
84 def _on_use_changed(self, old, new):
84 def _on_use_changed(self, old, new):
85 # load default init script if it's not set
85 # load default init script if it's not set
86 if not self.init_script:
86 if not self.init_script:
87 self.init_script = self.default_inits.get(new, '')
87 self.init_script = self.default_inits.get(new, '')
88
88
89 init_script = Unicode('', config=True,
89 init_script = Unicode('', config=True,
90 help="Initialization code for MPI")
90 help="Initialization code for MPI")
91
91
92 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
92 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
93 config=True)
93 config=True)
94
94
95
95
96 #-----------------------------------------------------------------------------
96 #-----------------------------------------------------------------------------
97 # Main application
97 # Main application
98 #-----------------------------------------------------------------------------
98 #-----------------------------------------------------------------------------
99
99
100
100
101 class IPEngineApp(ClusterApplication):
101 class IPEngineApp(ClusterApplication):
102
102
103 app_name = Unicode(u'ipengine')
103 app_name = Unicode(u'ipengine')
104 description = Unicode(_description)
104 description = Unicode(_description)
105 default_config_file_name = default_config_file_name
105 config_file_name = Unicode(default_config_file_name)
106 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
106 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
107
107
108 auto_create_cluster_dir = Bool(False,
108 auto_create_cluster_dir = Bool(False,
109 help="whether to create the cluster_dir if it doesn't exist")
109 help="whether to create the cluster_dir if it doesn't exist")
110
110
111 startup_script = Unicode(u'', config=True,
111 startup_script = Unicode(u'', config=True,
112 help='specify a script to be run at startup')
112 help='specify a script to be run at startup')
113 startup_command = Unicode('', config=True,
113 startup_command = Unicode('', config=True,
114 help='specify a command to be run at startup')
114 help='specify a command to be run at startup')
115
115
116 url_file = Unicode(u'', config=True,
116 url_file = Unicode(u'', config=True,
117 help="""The full location of the file containing the connection information for
117 help="""The full location of the file containing the connection information for
118 the controller. If this is not given, the file must be in the
118 the controller. If this is not given, the file must be in the
119 security directory of the cluster directory. This location is
119 security directory of the cluster directory. This location is
120 resolved using the `profile` or `cluster_dir` options.""",
120 resolved using the `profile` or `cluster_dir` options.""",
121 )
121 )
122
122
123 url_file_name = Unicode(u'ipcontroller-engine.json')
123 url_file_name = Unicode(u'ipcontroller-engine.json')
124 log_url = Unicode('', config=True,
124 log_url = Unicode('', config=True,
125 help="""The URL for the iploggerapp instance, for forwarding
125 help="""The URL for the iploggerapp instance, for forwarding
126 logging to a central location.""")
126 logging to a central location.""")
127
127
128 aliases = Dict(dict(
128 aliases = Dict(dict(
129 config = 'IPEngineApp.config_file',
129 config = 'IPEngineApp.config_file',
130 file = 'IPEngineApp.url_file',
130 file = 'IPEngineApp.url_file',
131 c = 'IPEngineApp.startup_command',
131 c = 'IPEngineApp.startup_command',
132 s = 'IPEngineApp.startup_script',
132 s = 'IPEngineApp.startup_script',
133
133
134 ident = 'StreamSession.session',
134 ident = 'StreamSession.session',
135 user = 'StreamSession.username',
135 user = 'StreamSession.username',
136 exec_key = 'StreamSession.keyfile',
136 exec_key = 'StreamSession.keyfile',
137
137
138 url = 'EngineFactory.url',
138 url = 'EngineFactory.url',
139 ip = 'EngineFactory.ip',
139 ip = 'EngineFactory.ip',
140 transport = 'EngineFactory.transport',
140 transport = 'EngineFactory.transport',
141 port = 'EngineFactory.regport',
141 port = 'EngineFactory.regport',
142 location = 'EngineFactory.location',
142 location = 'EngineFactory.location',
143
143
144 timeout = 'EngineFactory.timeout',
144 timeout = 'EngineFactory.timeout',
145
145
146 profile = "ClusterDir.profile",
146 profile = "ClusterDir.profile",
147 cluster_dir = 'ClusterDir.location',
147 cluster_dir = 'ClusterDir.location',
148
148
149 mpi = 'MPI.use',
149 mpi = 'MPI.use',
150
150
151 log_level = 'IPEngineApp.log_level',
151 log_level = 'IPEngineApp.log_level',
152 log_url = 'IPEngineApp.log_url'
152 log_url = 'IPEngineApp.log_url'
153 ))
153 ))
154
154
155 # def find_key_file(self):
155 # def find_key_file(self):
156 # """Set the key file.
156 # """Set the key file.
157 #
157 #
158 # Here we don't try to actually see if it exists for is valid as that
158 # Here we don't try to actually see if it exists for is valid as that
159 # is hadled by the connection logic.
159 # is hadled by the connection logic.
160 # """
160 # """
161 # config = self.master_config
161 # config = self.master_config
162 # # Find the actual controller key file
162 # # Find the actual controller key file
163 # if not config.Global.key_file:
163 # if not config.Global.key_file:
164 # try_this = os.path.join(
164 # try_this = os.path.join(
165 # config.Global.cluster_dir,
165 # config.Global.cluster_dir,
166 # config.Global.security_dir,
166 # config.Global.security_dir,
167 # config.Global.key_file_name
167 # config.Global.key_file_name
168 # )
168 # )
169 # config.Global.key_file = try_this
169 # config.Global.key_file = try_this
170
170
171 def find_url_file(self):
171 def find_url_file(self):
172 """Set the key file.
172 """Set the key file.
173
173
174 Here we don't try to actually see if it exists for is valid as that
174 Here we don't try to actually see if it exists for is valid as that
175 is hadled by the connection logic.
175 is hadled by the connection logic.
176 """
176 """
177 config = self.config
177 config = self.config
178 # Find the actual controller key file
178 # Find the actual controller key file
179 if not self.url_file:
179 if not self.url_file:
180 self.url_file = os.path.join(
180 self.url_file = os.path.join(
181 self.cluster_dir.security_dir,
181 self.cluster_dir.security_dir,
182 self.url_file_name
182 self.url_file_name
183 )
183 )
184 def init_engine(self):
184 def init_engine(self):
185 # This is the working dir by now.
185 # This is the working dir by now.
186 sys.path.insert(0, '')
186 sys.path.insert(0, '')
187 config = self.config
187 config = self.config
188 # print config
188 # print config
189 self.find_url_file()
189 self.find_url_file()
190
190
191 # if os.path.exists(config.Global.key_file) and config.Global.secure:
191 # if os.path.exists(config.Global.key_file) and config.Global.secure:
192 # config.SessionFactory.exec_key = config.Global.key_file
192 # config.SessionFactory.exec_key = config.Global.key_file
193 if os.path.exists(self.url_file):
193 if os.path.exists(self.url_file):
194 with open(self.url_file) as f:
194 with open(self.url_file) as f:
195 d = json.loads(f.read())
195 d = json.loads(f.read())
196 for k,v in d.iteritems():
196 for k,v in d.iteritems():
197 if isinstance(v, unicode):
197 if isinstance(v, unicode):
198 d[k] = v.encode()
198 d[k] = v.encode()
199 if d['exec_key']:
199 if d['exec_key']:
200 config.StreamSession.key = d['exec_key']
200 config.StreamSession.key = d['exec_key']
201 d['url'] = disambiguate_url(d['url'], d['location'])
201 d['url'] = disambiguate_url(d['url'], d['location'])
202 config.EngineFactory.url = d['url']
202 config.EngineFactory.url = d['url']
203 config.EngineFactory.location = d['location']
203 config.EngineFactory.location = d['location']
204
204
205 try:
205 try:
206 exec_lines = config.Kernel.exec_lines
206 exec_lines = config.Kernel.exec_lines
207 except AttributeError:
207 except AttributeError:
208 config.Kernel.exec_lines = []
208 config.Kernel.exec_lines = []
209 exec_lines = config.Kernel.exec_lines
209 exec_lines = config.Kernel.exec_lines
210
210
211 if self.startup_script:
211 if self.startup_script:
212 enc = sys.getfilesystemencoding() or 'utf8'
212 enc = sys.getfilesystemencoding() or 'utf8'
213 cmd="execfile(%r)"%self.startup_script.encode(enc)
213 cmd="execfile(%r)"%self.startup_script.encode(enc)
214 exec_lines.append(cmd)
214 exec_lines.append(cmd)
215 if self.startup_command:
215 if self.startup_command:
216 exec_lines.append(self.startup_command)
216 exec_lines.append(self.startup_command)
217
217
218 # Create the underlying shell class and Engine
218 # Create the underlying shell class and Engine
219 # shell_class = import_item(self.master_config.Global.shell_class)
219 # shell_class = import_item(self.master_config.Global.shell_class)
220 # print self.config
220 # print self.config
221 try:
221 try:
222 self.engine = EngineFactory(config=config, log=self.log)
222 self.engine = EngineFactory(config=config, log=self.log)
223 except:
223 except:
224 self.log.error("Couldn't start the Engine", exc_info=True)
224 self.log.error("Couldn't start the Engine", exc_info=True)
225 self.exit(1)
225 self.exit(1)
226
226
227 def forward_logging(self):
227 def forward_logging(self):
228 if self.log_url:
228 if self.log_url:
229 self.log.info("Forwarding logging to %s"%self.log_url)
229 self.log.info("Forwarding logging to %s"%self.log_url)
230 context = self.engine.context
230 context = self.engine.context
231 lsock = context.socket(zmq.PUB)
231 lsock = context.socket(zmq.PUB)
232 lsock.connect(self.log_url)
232 lsock.connect(self.log_url)
233 self.log.removeHandler(self._log_handler)
233 self.log.removeHandler(self._log_handler)
234 handler = EnginePUBHandler(self.engine, lsock)
234 handler = EnginePUBHandler(self.engine, lsock)
235 handler.setLevel(self.log_level)
235 handler.setLevel(self.log_level)
236 self.log.addHandler(handler)
236 self.log.addHandler(handler)
237 self._log_handler = handler
237 self._log_handler = handler
238 #
238 #
239 def init_mpi(self):
239 def init_mpi(self):
240 global mpi
240 global mpi
241 self.mpi = MPI(config=self.config)
241 self.mpi = MPI(config=self.config)
242
242
243 mpi_import_statement = self.mpi.init_script
243 mpi_import_statement = self.mpi.init_script
244 if mpi_import_statement:
244 if mpi_import_statement:
245 try:
245 try:
246 self.log.info("Initializing MPI:")
246 self.log.info("Initializing MPI:")
247 self.log.info(mpi_import_statement)
247 self.log.info(mpi_import_statement)
248 exec mpi_import_statement in globals()
248 exec mpi_import_statement in globals()
249 except:
249 except:
250 mpi = None
250 mpi = None
251 else:
251 else:
252 mpi = None
252 mpi = None
253
253
254 def initialize(self, argv=None):
254 def initialize(self, argv=None):
255 super(IPEngineApp, self).initialize(argv)
255 super(IPEngineApp, self).initialize(argv)
256 self.init_mpi()
256 self.init_mpi()
257 self.init_engine()
257 self.init_engine()
258 self.forward_logging()
258 self.forward_logging()
259
259
260 def start(self):
260 def start(self):
261 self.engine.start()
261 self.engine.start()
262 try:
262 try:
263 self.engine.loop.start()
263 self.engine.loop.start()
264 except KeyboardInterrupt:
264 except KeyboardInterrupt:
265 self.log.critical("Engine Interrupted, shutting down...\n")
265 self.log.critical("Engine Interrupted, shutting down...\n")
266
266
267
267
268 def launch_new_instance():
268 def launch_new_instance():
269 """Create and run the IPython engine"""
269 """Create and run the IPython engine"""
270 app = IPEngineApp()
270 app = IPEngineApp()
271 app.initialize()
271 app.initialize()
272 app.start()
272 app.start()
273
273
274
274
275 if __name__ == '__main__':
275 if __name__ == '__main__':
276 launch_new_instance()
276 launch_new_instance()
277
277
@@ -1,97 +1,97 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 A simple IPython logger application
4 A simple IPython logger application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
8 # Copyright (C) 2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import sys
19 import sys
20
20
21 import zmq
21 import zmq
22
22
23 from IPython.utils.traitlets import Bool, Dict
23 from IPython.utils.traitlets import Bool, Dict, Unicode
24
24
25 from IPython.parallel.apps.clusterdir import (
25 from IPython.parallel.apps.clusterdir import (
26 ClusterApplication,
26 ClusterApplication,
27 ClusterDir,
27 ClusterDir,
28 base_aliases
28 base_aliases
29 )
29 )
30 from IPython.parallel.apps.logwatcher import LogWatcher
30 from IPython.parallel.apps.logwatcher import LogWatcher
31
31
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33 # Module level variables
33 # Module level variables
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35
35
36 #: The default config file name for this application
36 #: The default config file name for this application
37 default_config_file_name = u'iplogger_config.py'
37 default_config_file_name = u'iplogger_config.py'
38
38
39 _description = """Start an IPython logger for parallel computing.
39 _description = """Start an IPython logger for parallel computing.
40
40
41 IPython controllers and engines (and your own processes) can broadcast log messages
41 IPython controllers and engines (and your own processes) can broadcast log messages
42 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
42 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
43 logger can be configured using command line options or using a cluster
43 logger can be configured using command line options or using a cluster
44 directory. Cluster directories contain config, log and security files and are
44 directory. Cluster directories contain config, log and security files and are
45 usually located in your ipython directory and named as "cluster_<profile>".
45 usually located in your ipython directory and named as "cluster_<profile>".
46 See the `profile` and `cluster_dir` options for details.
46 See the `profile` and `cluster_dir` options for details.
47 """
47 """
48
48
49
49
50 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
51 # Main application
51 # Main application
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 aliases = {}
53 aliases = {}
54 aliases.update(base_aliases)
54 aliases.update(base_aliases)
55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
56
56
57 class IPLoggerApp(ClusterApplication):
57 class IPLoggerApp(ClusterApplication):
58
58
59 name = u'iploggerz'
59 name = u'iploggerz'
60 description = _description
60 description = _description
61 default_config_file_name = default_config_file_name
61 config_file_name = Unicode(default_config_file_name)
62 auto_create_cluster_dir = Bool(False)
62 auto_create_cluster_dir = Bool(False)
63
63
64 classes = [LogWatcher, ClusterDir]
64 classes = [LogWatcher, ClusterDir]
65 aliases = Dict(aliases)
65 aliases = Dict(aliases)
66
66
67 def initialize(self, argv=None):
67 def initialize(self, argv=None):
68 super(IPLoggerApp, self).initialize(argv)
68 super(IPLoggerApp, self).initialize(argv)
69 self.init_watcher()
69 self.init_watcher()
70
70
71 def init_watcher(self):
71 def init_watcher(self):
72 try:
72 try:
73 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
73 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
74 except:
74 except:
75 self.log.error("Couldn't start the LogWatcher", exc_info=True)
75 self.log.error("Couldn't start the LogWatcher", exc_info=True)
76 self.exit(1)
76 self.exit(1)
77 self.log.info("Listening for log messages on %r"%self.watcher.url)
77 self.log.info("Listening for log messages on %r"%self.watcher.url)
78
78
79
79
80 def start(self):
80 def start(self):
81 self.watcher.start()
81 self.watcher.start()
82 try:
82 try:
83 self.watcher.loop.start()
83 self.watcher.loop.start()
84 except KeyboardInterrupt:
84 except KeyboardInterrupt:
85 self.log.critical("Logging Interrupted, shutting down...\n")
85 self.log.critical("Logging Interrupted, shutting down...\n")
86
86
87
87
88 def launch_new_instance():
88 def launch_new_instance():
89 """Create and run the IPython LogWatcher"""
89 """Create and run the IPython LogWatcher"""
90 app = IPLoggerApp()
90 app = IPLoggerApp()
91 app.initialize()
91 app.initialize()
92 app.start()
92 app.start()
93
93
94
94
95 if __name__ == '__main__':
95 if __name__ == '__main__':
96 launch_new_instance()
96 launch_new_instance()
97
97
General Comments 0
You need to be logged in to leave comments. Login now