##// END OF EJS Templates
ipcluster implemented with new subcommands
MinRK -
Show More
@@ -1,492 +1,525 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython cluster directory
4 The IPython cluster directory
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import logging
21 import logging
22 import re
22 import re
23 import shutil
23 import shutil
24 import sys
24 import sys
25
25
26 from subprocess import Popen, PIPE
26 from subprocess import Popen, PIPE
27
27
28 from IPython.config.loader import PyFileConfigLoader, Config
28 from IPython.config.loader import PyFileConfigLoader, Config
29 from IPython.config.configurable import Configurable
29 from IPython.config.configurable import Configurable
30 from IPython.config.application import Application
30 from IPython.config.application import Application
31 from IPython.core.crashhandler import CrashHandler
31 from IPython.core.crashhandler import CrashHandler
32 from IPython.core.newapplication import BaseIPythonApplication
32 from IPython.core.newapplication import BaseIPythonApplication
33 from IPython.core import release
33 from IPython.core import release
34 from IPython.utils.path import (
34 from IPython.utils.path import (
35 get_ipython_package_dir,
35 get_ipython_package_dir,
36 get_ipython_dir,
36 get_ipython_dir,
37 expand_path
37 expand_path
38 )
38 )
39 from IPython.utils.traitlets import Unicode, Bool, CStr, Instance, Dict
39 from IPython.utils.traitlets import Unicode, Bool, CStr, Instance, Dict
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Module errors
42 # Module errors
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45 class ClusterDirError(Exception):
45 class ClusterDirError(Exception):
46 pass
46 pass
47
47
48
48
49 class PIDFileError(Exception):
49 class PIDFileError(Exception):
50 pass
50 pass
51
51
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Class for managing cluster directories
54 # Class for managing cluster directories
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 class ClusterDir(Configurable):
57 class ClusterDir(Configurable):
58 """An object to manage the cluster directory and its resources.
58 """An object to manage the cluster directory and its resources.
59
59
60 The cluster directory is used by :command:`ipengine`,
60 The cluster directory is used by :command:`ipengine`,
61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
62 configuration, logging and security of these applications.
62 configuration, logging and security of these applications.
63
63
64 This object knows how to find, create and manage these directories. This
64 This object knows how to find, create and manage these directories. This
65 should be used by any code that want's to handle cluster directories.
65 should be used by any code that want's to handle cluster directories.
66 """
66 """
67
67
68 security_dir_name = Unicode('security')
68 security_dir_name = Unicode('security')
69 log_dir_name = Unicode('log')
69 log_dir_name = Unicode('log')
70 pid_dir_name = Unicode('pid')
70 pid_dir_name = Unicode('pid')
71 security_dir = Unicode(u'')
71 security_dir = Unicode(u'')
72 log_dir = Unicode(u'')
72 log_dir = Unicode(u'')
73 pid_dir = Unicode(u'')
73 pid_dir = Unicode(u'')
74
74
75 location = Unicode(u'', config=True,
75 location = Unicode(u'', config=True,
76 help="""Set the cluster dir. This overrides the logic used by the
76 help="""Set the cluster dir. This overrides the logic used by the
77 `profile` option.""",
77 `profile` option.""",
78 )
78 )
79 profile = Unicode(u'default',
79 profile = Unicode(u'default',
80 help="""The string name of the profile to be used. This determines the name
80 help="""The string name of the profile to be used. This determines the name
81 of the cluster dir as: cluster_<profile>. The default profile is named
81 of the cluster dir as: cluster_<profile>. The default profile is named
82 'default'. The cluster directory is resolve this way if the
82 'default'. The cluster directory is resolve this way if the
83 `cluster_dir` option is not used.""", config=True
83 `cluster_dir` option is not used.""", config=True
84 )
84 )
85 auto_create = Bool(False,
86 help="""Whether to automatically create the ClusterDirectory if it does
87 not exist""")
88 overwrite = Bool(False,
89 help="""Whether to overwrite existing config files""")
85
90
86 _location_isset = Bool(False) # flag for detecting multiply set location
91 _location_isset = Bool(False) # flag for detecting multiply set location
87 _new_dir = Bool(False) # flag for whether a new dir was created
92 _new_dir = Bool(False) # flag for whether a new dir was created
88
93
89 def __init__(self, **kwargs):
94 def __init__(self, **kwargs):
90 super(ClusterDir, self).__init__(**kwargs)
95 super(ClusterDir, self).__init__(**kwargs)
91 if not self.location:
96 if not self.location:
92 self._profile_changed('profile', 'default', self.profile)
97 self._profile_changed('profile', 'default', self.profile)
93
98
94 def _location_changed(self, name, old, new):
99 def _location_changed(self, name, old, new):
95 if self._location_isset:
100 if self._location_isset:
96 raise RuntimeError("Cannot set ClusterDir more than once.")
101 raise RuntimeError("Cannot set ClusterDir more than once.")
97 self._location_isset = True
102 self._location_isset = True
98 if not os.path.isdir(new):
103 if not os.path.isdir(new):
104 if self.auto_create:
99 os.makedirs(new)
105 os.makedirs(new)
100 self._new_dir = True
106 self._new_dir = True
107 else:
108 raise ClusterDirError('Directory not found: %s' % new)
109
101 # ensure config files exist:
110 # ensure config files exist:
102 self.copy_all_config_files(overwrite=False)
111 self.copy_all_config_files(overwrite=self.overwrite)
103 self.security_dir = os.path.join(new, self.security_dir_name)
112 self.security_dir = os.path.join(new, self.security_dir_name)
104 self.log_dir = os.path.join(new, self.log_dir_name)
113 self.log_dir = os.path.join(new, self.log_dir_name)
105 self.pid_dir = os.path.join(new, self.pid_dir_name)
114 self.pid_dir = os.path.join(new, self.pid_dir_name)
106 self.check_dirs()
115 self.check_dirs()
107
116
108 def _profile_changed(self, name, old, new):
117 def _profile_changed(self, name, old, new):
109 if self._location_isset:
118 if self._location_isset:
110 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
119 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
111 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
120 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
112
121
113 def _log_dir_changed(self, name, old, new):
122 def _log_dir_changed(self, name, old, new):
114 self.check_log_dir()
123 self.check_log_dir()
115
124
116 def check_log_dir(self):
125 def check_log_dir(self):
117 if not os.path.isdir(self.log_dir):
126 if not os.path.isdir(self.log_dir):
118 os.mkdir(self.log_dir)
127 os.mkdir(self.log_dir)
119
128
120 def _security_dir_changed(self, name, old, new):
129 def _security_dir_changed(self, name, old, new):
121 self.check_security_dir()
130 self.check_security_dir()
122
131
123 def check_security_dir(self):
132 def check_security_dir(self):
124 if not os.path.isdir(self.security_dir):
133 if not os.path.isdir(self.security_dir):
125 os.mkdir(self.security_dir, 0700)
134 os.mkdir(self.security_dir, 0700)
126 os.chmod(self.security_dir, 0700)
135 os.chmod(self.security_dir, 0700)
127
136
128 def _pid_dir_changed(self, name, old, new):
137 def _pid_dir_changed(self, name, old, new):
129 self.check_pid_dir()
138 self.check_pid_dir()
130
139
131 def check_pid_dir(self):
140 def check_pid_dir(self):
132 if not os.path.isdir(self.pid_dir):
141 if not os.path.isdir(self.pid_dir):
133 os.mkdir(self.pid_dir, 0700)
142 os.mkdir(self.pid_dir, 0700)
134 os.chmod(self.pid_dir, 0700)
143 os.chmod(self.pid_dir, 0700)
135
144
136 def check_dirs(self):
145 def check_dirs(self):
137 self.check_security_dir()
146 self.check_security_dir()
138 self.check_log_dir()
147 self.check_log_dir()
139 self.check_pid_dir()
148 self.check_pid_dir()
140
149
141 def copy_config_file(self, config_file, path=None, overwrite=False):
150 def copy_config_file(self, config_file, path=None, overwrite=False):
142 """Copy a default config file into the active cluster directory.
151 """Copy a default config file into the active cluster directory.
143
152
144 Default configuration files are kept in :mod:`IPython.config.default`.
153 Default configuration files are kept in :mod:`IPython.config.default`.
145 This function moves these from that location to the working cluster
154 This function moves these from that location to the working cluster
146 directory.
155 directory.
147 """
156 """
148 if path is None:
157 if path is None:
149 import IPython.config.default
158 import IPython.config.default
150 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
159 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
151 path = os.path.sep.join(path)
160 path = os.path.sep.join(path)
152 src = os.path.join(path, config_file)
161 src = os.path.join(path, config_file)
153 dst = os.path.join(self.location, config_file)
162 dst = os.path.join(self.location, config_file)
154 if not os.path.isfile(dst) or overwrite:
163 if not os.path.isfile(dst) or overwrite:
155 shutil.copy(src, dst)
164 shutil.copy(src, dst)
156
165
157 def copy_all_config_files(self, path=None, overwrite=False):
166 def copy_all_config_files(self, path=None, overwrite=False):
158 """Copy all config files into the active cluster directory."""
167 """Copy all config files into the active cluster directory."""
159 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
168 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
160 u'ipcluster_config.py']:
169 u'ipcluster_config.py']:
161 self.copy_config_file(f, path=path, overwrite=overwrite)
170 self.copy_config_file(f, path=path, overwrite=overwrite)
162
171
163 @classmethod
172 @classmethod
164 def create_cluster_dir(csl, cluster_dir):
173 def create_cluster_dir(csl, cluster_dir):
165 """Create a new cluster directory given a full path.
174 """Create a new cluster directory given a full path.
166
175
167 Parameters
176 Parameters
168 ----------
177 ----------
169 cluster_dir : str
178 cluster_dir : str
170 The full path to the cluster directory. If it does exist, it will
179 The full path to the cluster directory. If it does exist, it will
171 be used. If not, it will be created.
180 be used. If not, it will be created.
172 """
181 """
173 return ClusterDir(location=cluster_dir)
182 return ClusterDir(location=cluster_dir)
174
183
175 @classmethod
184 @classmethod
176 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
185 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
177 """Create a cluster dir by profile name and path.
186 """Create a cluster dir by profile name and path.
178
187
179 Parameters
188 Parameters
180 ----------
189 ----------
181 path : str
190 path : str
182 The path (directory) to put the cluster directory in.
191 The path (directory) to put the cluster directory in.
183 profile : str
192 profile : str
184 The name of the profile. The name of the cluster directory will
193 The name of the profile. The name of the cluster directory will
185 be "cluster_<profile>".
194 be "cluster_<profile>".
186 """
195 """
187 if not os.path.isdir(path):
196 if not os.path.isdir(path):
188 raise ClusterDirError('Directory not found: %s' % path)
197 raise ClusterDirError('Directory not found: %s' % path)
189 cluster_dir = os.path.join(path, u'cluster_' + profile)
198 cluster_dir = os.path.join(path, u'cluster_' + profile)
190 return ClusterDir(location=cluster_dir)
199 return ClusterDir(location=cluster_dir)
191
200
192 @classmethod
201 @classmethod
193 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
202 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
194 """Find an existing cluster dir by profile name, return its ClusterDir.
203 """Find an existing cluster dir by profile name, return its ClusterDir.
195
204
196 This searches through a sequence of paths for a cluster dir. If it
205 This searches through a sequence of paths for a cluster dir. If it
197 is not found, a :class:`ClusterDirError` exception will be raised.
206 is not found, a :class:`ClusterDirError` exception will be raised.
198
207
199 The search path algorithm is:
208 The search path algorithm is:
200 1. ``os.getcwd()``
209 1. ``os.getcwd()``
201 2. ``ipython_dir``
210 2. ``ipython_dir``
202 3. The directories found in the ":" separated
211 3. The directories found in the ":" separated
203 :env:`IPCLUSTER_DIR_PATH` environment variable.
212 :env:`IPCLUSTER_DIR_PATH` environment variable.
204
213
205 Parameters
214 Parameters
206 ----------
215 ----------
207 ipython_dir : unicode or str
216 ipython_dir : unicode or str
208 The IPython directory to use.
217 The IPython directory to use.
209 profile : unicode or str
218 profile : unicode or str
210 The name of the profile. The name of the cluster directory
219 The name of the profile. The name of the cluster directory
211 will be "cluster_<profile>".
220 will be "cluster_<profile>".
212 """
221 """
213 dirname = u'cluster_' + profile
222 dirname = u'cluster_' + profile
214 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
223 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
215 if cluster_dir_paths:
224 if cluster_dir_paths:
216 cluster_dir_paths = cluster_dir_paths.split(':')
225 cluster_dir_paths = cluster_dir_paths.split(':')
217 else:
226 else:
218 cluster_dir_paths = []
227 cluster_dir_paths = []
219 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
228 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
220 for p in paths:
229 for p in paths:
221 cluster_dir = os.path.join(p, dirname)
230 cluster_dir = os.path.join(p, dirname)
222 if os.path.isdir(cluster_dir):
231 if os.path.isdir(cluster_dir):
223 return ClusterDir(location=cluster_dir)
232 return ClusterDir(location=cluster_dir)
224 else:
233 else:
225 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
234 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
226
235
227 @classmethod
236 @classmethod
228 def find_cluster_dir(cls, cluster_dir):
237 def find_cluster_dir(cls, cluster_dir):
229 """Find/create a cluster dir and return its ClusterDir.
238 """Find/create a cluster dir and return its ClusterDir.
230
239
231 This will create the cluster directory if it doesn't exist.
240 This will create the cluster directory if it doesn't exist.
232
241
233 Parameters
242 Parameters
234 ----------
243 ----------
235 cluster_dir : unicode or str
244 cluster_dir : unicode or str
236 The path of the cluster directory. This is expanded using
245 The path of the cluster directory. This is expanded using
237 :func:`IPython.utils.genutils.expand_path`.
246 :func:`IPython.utils.genutils.expand_path`.
238 """
247 """
239 cluster_dir = expand_path(cluster_dir)
248 cluster_dir = expand_path(cluster_dir)
240 if not os.path.isdir(cluster_dir):
249 if not os.path.isdir(cluster_dir):
241 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
250 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
242 return ClusterDir(location=cluster_dir)
251 return ClusterDir(location=cluster_dir)
243
252
244
253
245 #-----------------------------------------------------------------------------
254 #-----------------------------------------------------------------------------
246 # Crash handler for this application
255 # Crash handler for this application
247 #-----------------------------------------------------------------------------
256 #-----------------------------------------------------------------------------
248
257
249
258
250 _message_template = """\
259 _message_template = """\
251 Oops, $self.app_name crashed. We do our best to make it stable, but...
260 Oops, $self.app_name crashed. We do our best to make it stable, but...
252
261
253 A crash report was automatically generated with the following information:
262 A crash report was automatically generated with the following information:
254 - A verbatim copy of the crash traceback.
263 - A verbatim copy of the crash traceback.
255 - Data on your current $self.app_name configuration.
264 - Data on your current $self.app_name configuration.
256
265
257 It was left in the file named:
266 It was left in the file named:
258 \t'$self.crash_report_fname'
267 \t'$self.crash_report_fname'
259 If you can email this file to the developers, the information in it will help
268 If you can email this file to the developers, the information in it will help
260 them in understanding and correcting the problem.
269 them in understanding and correcting the problem.
261
270
262 You can mail it to: $self.contact_name at $self.contact_email
271 You can mail it to: $self.contact_name at $self.contact_email
263 with the subject '$self.app_name Crash Report'.
272 with the subject '$self.app_name Crash Report'.
264
273
265 If you want to do it now, the following command will work (under Unix):
274 If you want to do it now, the following command will work (under Unix):
266 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
275 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
267
276
268 To ensure accurate tracking of this issue, please file a report about it at:
277 To ensure accurate tracking of this issue, please file a report about it at:
269 $self.bug_tracker
278 $self.bug_tracker
270 """
279 """
271
280
272 class ClusterDirCrashHandler(CrashHandler):
281 class ClusterDirCrashHandler(CrashHandler):
273 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
282 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
274
283
275 message_template = _message_template
284 message_template = _message_template
276
285
277 def __init__(self, app):
286 def __init__(self, app):
278 contact_name = release.authors['Min'][0]
287 contact_name = release.authors['Min'][0]
279 contact_email = release.authors['Min'][1]
288 contact_email = release.authors['Min'][1]
280 bug_tracker = 'http://github.com/ipython/ipython/issues'
289 bug_tracker = 'http://github.com/ipython/ipython/issues'
281 super(ClusterDirCrashHandler,self).__init__(
290 super(ClusterDirCrashHandler,self).__init__(
282 app, contact_name, contact_email, bug_tracker
291 app, contact_name, contact_email, bug_tracker
283 )
292 )
284
293
285
294
286 #-----------------------------------------------------------------------------
295 #-----------------------------------------------------------------------------
287 # Main application
296 # Main application
288 #-----------------------------------------------------------------------------
297 #-----------------------------------------------------------------------------
289 base_aliases = {
298 base_aliases = {
290 'profile' : "ClusterDir.profile",
299 'profile' : "ClusterDir.profile",
291 'cluster_dir' : 'ClusterDir.location',
300 'cluster_dir' : 'ClusterDir.location',
292 'log_level' : 'Application.log_level',
301 'auto_create' : 'ClusterDirApplication.auto_create',
293 'work_dir' : 'ClusterDirApplicaiton.work_dir',
302 'log_level' : 'ClusterApplication.log_level',
294 'log_to_file' : 'ClusterDirApplicaiton.log_to_file',
303 'work_dir' : 'ClusterApplication.work_dir',
295 'clean_logs' : 'ClusterDirApplicaiton.clean_logs',
304 'log_to_file' : 'ClusterApplication.log_to_file',
296 'log_url' : 'ClusterDirApplicaiton.log_url',
305 'clean_logs' : 'ClusterApplication.clean_logs',
306 'log_url' : 'ClusterApplication.log_url',
297 }
307 }
298
308
299 base_flags = {
309 base_flags = {
300 'debug' : ( {"Application" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
310 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
301 'clean-logs' : ( {"ClusterDirApplication" : {"clean_logs" : True}}, "cleanup old logfiles"),
311 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
302 'log-to-file' : ( {"ClusterDirApplication" : {"log_to_file" : True}}, "log to a file")
312 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
303 }
313 }
304 for k,v in base_flags.iteritems():
314 for k,v in base_flags.iteritems():
305 base_flags[k] = (Config(v[0]),v[1])
315 base_flags[k] = (Config(v[0]),v[1])
306
316
307 class ClusterDirApplication(BaseIPythonApplication):
317 class ClusterApplication(BaseIPythonApplication):
308 """An application that puts everything into a cluster directory.
318 """An application that puts everything into a cluster directory.
309
319
310 Instead of looking for things in the ipython_dir, this type of application
320 Instead of looking for things in the ipython_dir, this type of application
311 will use its own private directory called the "cluster directory"
321 will use its own private directory called the "cluster directory"
312 for things like config files, log files, etc.
322 for things like config files, log files, etc.
313
323
314 The cluster directory is resolved as follows:
324 The cluster directory is resolved as follows:
315
325
316 * If the ``--cluster-dir`` option is given, it is used.
326 * If the ``--cluster-dir`` option is given, it is used.
317 * If ``--cluster-dir`` is not given, the application directory is
327 * If ``--cluster-dir`` is not given, the application directory is
318 resolve using the profile name as ``cluster_<profile>``. The search
328 resolve using the profile name as ``cluster_<profile>``. The search
319 path for this directory is then i) cwd if it is found there
329 path for this directory is then i) cwd if it is found there
320 and ii) in ipython_dir otherwise.
330 and ii) in ipython_dir otherwise.
321
331
322 The config file for the application is to be put in the cluster
332 The config file for the application is to be put in the cluster
323 dir and named the value of the ``config_file_name`` class attribute.
333 dir and named the value of the ``config_file_name`` class attribute.
324 """
334 """
325
335
326 crash_handler_class = ClusterDirCrashHandler
336 crash_handler_class = ClusterDirCrashHandler
327 auto_create_cluster_dir = Bool(True, config=True,
337 auto_create_cluster_dir = Bool(True, config=True,
328 help="whether to create the cluster_dir if it doesn't exist")
338 help="whether to create the cluster_dir if it doesn't exist")
329 # temporarily override default_log_level to INFO
330 default_log_level = logging.INFO
331 cluster_dir = Instance(ClusterDir)
339 cluster_dir = Instance(ClusterDir)
340 classes = [ClusterDir]
341
342 def _log_level_default(self):
343 # temporarily override default_log_level to INFO
344 return logging.INFO
332
345
333 work_dir = Unicode(os.getcwdu(), config=True,
346 work_dir = Unicode(os.getcwdu(), config=True,
334 help='Set the working dir for the process.'
347 help='Set the working dir for the process.'
335 )
348 )
336 def _work_dir_changed(self, name, old, new):
349 def _work_dir_changed(self, name, old, new):
337 self.work_dir = unicode(expand_path(new))
350 self.work_dir = unicode(expand_path(new))
338
351
339 log_to_file = Bool(config=True,
352 log_to_file = Bool(config=True,
340 help="whether to log to a file")
353 help="whether to log to a file")
341
354
342 clean_logs = Bool(True, shortname='--clean-logs', config=True,
355 clean_logs = Bool(False, shortname='--clean-logs', config=True,
343 help="whether to cleanup old logfiles before starting")
356 help="whether to cleanup old logfiles before starting")
344
357
345 log_url = CStr('', shortname='--log-url', config=True,
358 log_url = CStr('', shortname='--log-url', config=True,
346 help="The ZMQ URL of the iplooger to aggregate logging.")
359 help="The ZMQ URL of the iplooger to aggregate logging.")
347
360
348 config_file = Unicode(u'', config=True,
361 config_file = Unicode(u'', config=True,
349 help="""Path to ipcontroller configuration file. The default is to use
362 help="""Path to ipcontroller configuration file. The default is to use
350 <appname>_config.py, as found by cluster-dir."""
363 <appname>_config.py, as found by cluster-dir."""
351 )
364 )
352
365
366 loop = Instance('zmq.eventloop.ioloop.IOLoop')
367 def _loop_default(self):
368 from zmq.eventloop.ioloop import IOLoop
369 return IOLoop.instance()
370
353 aliases = Dict(base_aliases)
371 aliases = Dict(base_aliases)
354 flags = Dict(base_flags)
372 flags = Dict(base_flags)
355
373
356 def init_clusterdir(self):
374 def init_clusterdir(self):
357 """This resolves the cluster directory.
375 """This resolves the cluster directory.
358
376
359 This tries to find the cluster directory and if successful, it will
377 This tries to find the cluster directory and if successful, it will
360 have done:
378 have done:
361 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
379 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
362 the application.
380 the application.
363 * Sets ``self.cluster_dir`` attribute of the application and config
381 * Sets ``self.cluster_dir`` attribute of the application and config
364 objects.
382 objects.
365
383
366 The algorithm used for this is as follows:
384 The algorithm used for this is as follows:
367 1. Try ``Global.cluster_dir``.
385 1. Try ``Global.cluster_dir``.
368 2. Try using ``Global.profile``.
386 2. Try using ``Global.profile``.
369 3. If both of these fail and ``self.auto_create_cluster_dir`` is
387 3. If both of these fail and ``self.auto_create_cluster_dir`` is
370 ``True``, then create the new cluster dir in the IPython directory.
388 ``True``, then create the new cluster dir in the IPython directory.
371 4. If all fails, then raise :class:`ClusterDirError`.
389 4. If all fails, then raise :class:`ClusterDirError`.
372 """
390 """
373 self.cluster_dir = ClusterDir(config=self.config)
391 self.cluster_dir = ClusterDir(config=self.config, auto_create=self.auto_create_cluster_dir)
374 if self.cluster_dir._new_dir:
392 if self.cluster_dir._new_dir:
375 self.log.info('Creating new cluster dir: %s' % \
393 self.log.info('Creating new cluster dir: %s' % \
376 self.cluster_dir.location)
394 self.cluster_dir.location)
377 else:
395 else:
378 self.log.info('Using existing cluster dir: %s' % \
396 self.log.info('Using existing cluster dir: %s' % \
379 self.cluster_dir.location)
397 self.cluster_dir.location)
380
398
399 def initialize(self, argv=None):
400 """initialize the app"""
401 self.parse_command_line(argv)
402 cl_config = self.config
403 self.init_clusterdir()
404 if self.config_file:
405 self.load_config_file(self.config_file)
406 else:
407 self.load_config_file(self.default_config_file_name, path=self.cluster_dir.location)
408 # command-line should *override* config file, but command-line is necessary
409 # to determine clusterdir, etc.
410 self.update_config(cl_config)
411 self.reinit_logging()
412
413 self.to_work_dir()
414
381 def to_work_dir(self):
415 def to_work_dir(self):
382 wd = self.work_dir
416 wd = self.work_dir
383 if unicode(wd) != os.getcwdu():
417 if unicode(wd) != os.getcwdu():
384 os.chdir(wd)
418 os.chdir(wd)
385 self.log.info("Changing to working dir: %s" % wd)
419 self.log.info("Changing to working dir: %s" % wd)
386
420
387 def load_config_file(self, filename, path=None):
421 def load_config_file(self, filename, path=None):
388 """Load a .py based config file by filename and path."""
422 """Load a .py based config file by filename and path."""
423 # use config.application.Application.load_config
424 # instead of inflexible
425 # core.newapplication.BaseIPythonApplication.load_config
389 return Application.load_config_file(self, filename, path=path)
426 return Application.load_config_file(self, filename, path=path)
390 #
427 #
391 # def load_default_config_file(self):
428 # def load_default_config_file(self):
392 # """Load a .py based config file by filename and path."""
429 # """Load a .py based config file by filename and path."""
393 # return BaseIPythonApplication.load_config_file(self)
430 # return BaseIPythonApplication.load_config_file(self)
394
431
395 # disable URL-logging
432 # disable URL-logging
396 # def init_logging(self):
433 def reinit_logging(self):
397 # # Remove old log files
434 # Remove old log files
398 # if self.master_config.Global.clean_logs:
435 log_dir = self.cluster_dir.log_dir
399 # log_dir = self.master_config.Global.log_dir
436 if self.clean_logs:
400 # for f in os.listdir(log_dir):
437 for f in os.listdir(log_dir):
401 # if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
438 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
402 # # if f.startswith(self.name + u'-') and f.endswith('.log'):
439 os.remove(os.path.join(log_dir, f))
403 # os.remove(os.path.join(log_dir, f))
440 if self.log_to_file:
404 # # Start logging to the new log file
441 # Start logging to the new log file
405 # if self.master_config.Global.log_to_file:
442 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
406 # log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
443 logfile = os.path.join(log_dir, log_filename)
407 # logfile = os.path.join(self.log_dir, log_filename)
444 open_log_file = open(logfile, 'w')
408 # open_log_file = open(logfile, 'w')
445 else:
409 # elif self.master_config.Global.log_url:
446 open_log_file = None
410 # open_log_file = None
447 if open_log_file is not None:
411 # else:
448 self.log.removeHandler(self._log_handler)
412 # open_log_file = sys.stdout
449 self._log_handler = logging.StreamHandler(open_log_file)
413 # if open_log_file is not None:
450 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
414 # self.log.removeHandler(self._log_handler)
451 self._log_handler.setFormatter(self._log_formatter)
415 # self._log_handler = logging.StreamHandler(open_log_file)
452 self.log.addHandler(self._log_handler)
416 # self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
417 # self._log_handler.setFormatter(self._log_formatter)
418 # self.log.addHandler(self._log_handler)
419 # # log.startLogging(open_log_file)
420
453
421 def write_pid_file(self, overwrite=False):
454 def write_pid_file(self, overwrite=False):
422 """Create a .pid file in the pid_dir with my pid.
455 """Create a .pid file in the pid_dir with my pid.
423
456
424 This must be called after pre_construct, which sets `self.pid_dir`.
457 This must be called after pre_construct, which sets `self.pid_dir`.
425 This raises :exc:`PIDFileError` if the pid file exists already.
458 This raises :exc:`PIDFileError` if the pid file exists already.
426 """
459 """
427 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
460 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
428 if os.path.isfile(pid_file):
461 if os.path.isfile(pid_file):
429 pid = self.get_pid_from_file()
462 pid = self.get_pid_from_file()
430 if not overwrite:
463 if not overwrite:
431 raise PIDFileError(
464 raise PIDFileError(
432 'The pid file [%s] already exists. \nThis could mean that this '
465 'The pid file [%s] already exists. \nThis could mean that this '
433 'server is already running with [pid=%s].' % (pid_file, pid)
466 'server is already running with [pid=%s].' % (pid_file, pid)
434 )
467 )
435 with open(pid_file, 'w') as f:
468 with open(pid_file, 'w') as f:
436 self.log.info("Creating pid file: %s" % pid_file)
469 self.log.info("Creating pid file: %s" % pid_file)
437 f.write(repr(os.getpid())+'\n')
470 f.write(repr(os.getpid())+'\n')
438
471
439 def remove_pid_file(self):
472 def remove_pid_file(self):
440 """Remove the pid file.
473 """Remove the pid file.
441
474
442 This should be called at shutdown by registering a callback with
475 This should be called at shutdown by registering a callback with
443 :func:`reactor.addSystemEventTrigger`. This needs to return
476 :func:`reactor.addSystemEventTrigger`. This needs to return
444 ``None``.
477 ``None``.
445 """
478 """
446 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
479 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
447 if os.path.isfile(pid_file):
480 if os.path.isfile(pid_file):
448 try:
481 try:
449 self.log.info("Removing pid file: %s" % pid_file)
482 self.log.info("Removing pid file: %s" % pid_file)
450 os.remove(pid_file)
483 os.remove(pid_file)
451 except:
484 except:
452 self.log.warn("Error removing the pid file: %s" % pid_file)
485 self.log.warn("Error removing the pid file: %s" % pid_file)
453
486
454 def get_pid_from_file(self):
487 def get_pid_from_file(self):
455 """Get the pid from the pid file.
488 """Get the pid from the pid file.
456
489
457 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
490 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
458 """
491 """
459 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
492 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
460 if os.path.isfile(pid_file):
493 if os.path.isfile(pid_file):
461 with open(pid_file, 'r') as f:
494 with open(pid_file, 'r') as f:
462 pid = int(f.read().strip())
495 pid = int(f.read().strip())
463 return pid
496 return pid
464 else:
497 else:
465 raise PIDFileError('pid file not found: %s' % pid_file)
498 raise PIDFileError('pid file not found: %s' % pid_file)
466
499
467 def check_pid(self, pid):
500 def check_pid(self, pid):
468 if os.name == 'nt':
501 if os.name == 'nt':
469 try:
502 try:
470 import ctypes
503 import ctypes
471 # returns 0 if no such process (of ours) exists
504 # returns 0 if no such process (of ours) exists
472 # positive int otherwise
505 # positive int otherwise
473 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
506 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
474 except Exception:
507 except Exception:
475 self.log.warn(
508 self.log.warn(
476 "Could not determine whether pid %i is running via `OpenProcess`. "
509 "Could not determine whether pid %i is running via `OpenProcess`. "
477 " Making the likely assumption that it is."%pid
510 " Making the likely assumption that it is."%pid
478 )
511 )
479 return True
512 return True
480 return bool(p)
513 return bool(p)
481 else:
514 else:
482 try:
515 try:
483 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
516 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
484 output,_ = p.communicate()
517 output,_ = p.communicate()
485 except OSError:
518 except OSError:
486 self.log.warn(
519 self.log.warn(
487 "Could not determine whether pid %i is running via `ps x`. "
520 "Could not determine whether pid %i is running via `ps x`. "
488 " Making the likely assumption that it is."%pid
521 " Making the likely assumption that it is."%pid
489 )
522 )
490 return True
523 return True
491 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
524 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
492 return pid in pids
525 return pid in pids
This diff has been collapsed as it changes many lines, (630 lines changed) Show them Hide them
@@ -1,550 +1,542 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import errno
18 import errno
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import signal
22 import signal
23
23
24 from subprocess import check_call, CalledProcessError, PIPE
24 from subprocess import check_call, CalledProcessError, PIPE
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27
27
28 from IPython.config.application import Application, boolean_flag
28 from IPython.config.loader import Config
29 from IPython.config.loader import Config
30 from IPython.core.newapplication import BaseIPythonApplication
29 from IPython.utils.importstring import import_item
31 from IPython.utils.importstring import import_item
30 from IPython.utils.traitlets import Int, CStr, CUnicode, Str, Bool, CFloat, Dict, List
32 from IPython.utils.traitlets import Int, CStr, CUnicode, Str, Bool, CFloat, Dict, List
31
33
32 from IPython.parallel.apps.clusterdir import (
34 from IPython.parallel.apps.clusterdir import (
33 ClusterDirApplication, ClusterDirError,
35 ClusterApplication, ClusterDirError, ClusterDir,
34 PIDFileError,
36 PIDFileError,
35 base_flags,
37 base_flags, base_aliases
36 )
38 )
37
39
38
40
39 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
40 # Module level variables
42 # Module level variables
41 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
42
44
43
45
44 default_config_file_name = u'ipcluster_config.py'
46 default_config_file_name = u'ipcluster_config.py'
45
47
46
48
47 _description = """\
49 _description = """\
48 Start an IPython cluster for parallel computing.\n\n
50 Start an IPython cluster for parallel computing.\n\n
49
51
50 An IPython cluster consists of 1 controller and 1 or more engines.
52 An IPython cluster consists of 1 controller and 1 or more engines.
51 This command automates the startup of these processes using a wide
53 This command automates the startup of these processes using a wide
52 range of startup methods (SSH, local processes, PBS, mpiexec,
54 range of startup methods (SSH, local processes, PBS, mpiexec,
53 Windows HPC Server 2008). To start a cluster with 4 engines on your
55 Windows HPC Server 2008). To start a cluster with 4 engines on your
54 local host simply do 'ipcluster start n=4'. For more complex usage
56 local host simply do 'ipcluster start n=4'. For more complex usage
55 you will typically do 'ipcluster --create profile=mycluster', then edit
57 you will typically do 'ipcluster create profile=mycluster', then edit
56 configuration files, followed by 'ipcluster --start -p mycluster -n 4'.
58 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
57 """
59 """
58
60
59
61
60 # Exit codes for ipcluster
62 # Exit codes for ipcluster
61
63
62 # This will be the exit code if the ipcluster appears to be running because
64 # This will be the exit code if the ipcluster appears to be running because
63 # a .pid file exists
65 # a .pid file exists
64 ALREADY_STARTED = 10
66 ALREADY_STARTED = 10
65
67
66
68
67 # This will be the exit code if ipcluster stop is run, but there is not .pid
69 # This will be the exit code if ipcluster stop is run, but there is not .pid
68 # file to be found.
70 # file to be found.
69 ALREADY_STOPPED = 11
71 ALREADY_STOPPED = 11
70
72
71 # This will be the exit code if ipcluster engines is run, but there is not .pid
73 # This will be the exit code if ipcluster engines is run, but there is not .pid
72 # file to be found.
74 # file to be found.
73 NO_CLUSTER = 12
75 NO_CLUSTER = 12
74
76
75
77
76 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
77 # Main application
79 # Main application
78 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
79 start_help = """Start an ipython cluster by its profile name or cluster
81 start_help = """
82 Start an ipython cluster by its profile name or cluster
80 directory. Cluster directories contain configuration, log and
83 directory. Cluster directories contain configuration, log and
81 security related files and are named using the convention
84 security related files and are named using the convention
82 'cluster_<profile>' and should be creating using the 'start'
85 'cluster_<profile>' and should be creating using the 'start'
83 subcommand of 'ipcluster'. If your cluster directory is in
86 subcommand of 'ipcluster'. If your cluster directory is in
84 the cwd or the ipython directory, you can simply refer to it
87 the cwd or the ipython directory, you can simply refer to it
85 using its profile name, 'ipcluster start -n 4 -p <profile>`,
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
86 otherwise use the '--cluster-dir' option.
89 otherwise use the 'cluster_dir' option.
87 """
90 """
88 stop_help = """Stop a running ipython cluster by its profile name or cluster
91 stop_help = """
92 Stop a running ipython cluster by its profile name or cluster
89 directory. Cluster directories are named using the convention
93 directory. Cluster directories are named using the convention
90 'cluster_<profile>'. If your cluster directory is in
94 'cluster_<profile>'. If your cluster directory is in
91 the cwd or the ipython directory, you can simply refer to it
95 the cwd or the ipython directory, you can simply refer to it
92 using its profile name, 'ipcluster stop -p <profile>`, otherwise
96 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
93 use the '--cluster-dir' option.
97 use the 'cluster_dir' option.
94 """
98 """
95 engines_help = """Start one or more engines to connect to an existing Cluster
99 engines_help = """
100 Start one or more engines to connect to an existing Cluster
96 by profile name or cluster directory.
101 by profile name or cluster directory.
97 Cluster directories contain configuration, log and
102 Cluster directories contain configuration, log and
98 security related files and are named using the convention
103 security related files and are named using the convention
99 'cluster_<profile>' and should be creating using the 'start'
104 'cluster_<profile>' and should be creating using the 'start'
100 subcommand of 'ipcluster'. If your cluster directory is in
105 subcommand of 'ipcluster'. If your cluster directory is in
101 the cwd or the ipython directory, you can simply refer to it
106 the cwd or the ipython directory, you can simply refer to it
102 using its profile name, 'ipcluster --engines -n 4 -p <profile>`,
107 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
103 otherwise use the 'cluster_dir' option.
108 otherwise use the 'cluster_dir' option.
104 """
109 """
105 create_help = """Create an ipython cluster directory by its profile name or
110 create_help = """
111 Create an ipython cluster directory by its profile name or
106 cluster directory path. Cluster directories contain
112 cluster directory path. Cluster directories contain
107 configuration, log and security related files and are named
113 configuration, log and security related files and are named
108 using the convention 'cluster_<profile>'. By default they are
114 using the convention 'cluster_<profile>'. By default they are
109 located in your ipython directory. Once created, you will
115 located in your ipython directory. Once created, you will
110 probably need to edit the configuration files in the cluster
116 probably need to edit the configuration files in the cluster
111 directory to configure your cluster. Most users will create a
117 directory to configure your cluster. Most users will create a
112 cluster directory by profile name,
118 cluster directory by profile name,
113 'ipcluster create -p mycluster', which will put the directory
119 `ipcluster create profile=mycluster`, which will put the directory
114 in '<ipython_dir>/cluster_mycluster'.
120 in `<ipython_dir>/cluster_mycluster`.
115 """
121 """
116 list_help = """List all available clusters, by cluster directory, that can
122 list_help = """List all available clusters, by cluster directory, that can
117 be found in the current working directly or in the ipython
123 be found in the current working directly or in the ipython
118 directory. Cluster directories are named using the convention
124 directory. Cluster directories are named using the convention
119 'cluster_<profile>'."""
125 'cluster_<profile>'.
120
126 """
121
122 flags = {}
123 flags.update(base_flags)
124 flags.update({
125 'start' : ({ 'IPClusterApp': Config({'subcommand' : 'start'})} , start_help),
126 'stop' : ({ 'IPClusterApp': Config({'subcommand' : 'stop'})} , stop_help),
127 'create' : ({ 'IPClusterApp': Config({'subcommand' : 'create'})} , create_help),
128 'engines' : ({ 'IPClusterApp': Config({'subcommand' : 'engines'})} , engines_help),
129 'list' : ({ 'IPClusterApp': Config({'subcommand' : 'list'})} , list_help),
130
131 })
132
133 class IPClusterApp(ClusterDirApplication):
134
135 name = u'ipcluster'
136 description = _description
137 usage = None
138 default_config_file_name = default_config_file_name
139 default_log_level = logging.INFO
140 auto_create_cluster_dir = False
141 classes = List()
142 def _classes_default(self,):
143 from IPython.parallel.apps import launcher
144 return launcher.all_launchers
145
146 n = Int(0, config=True,
147 help="The number of engines to start.")
148 signal = Int(signal.SIGINT, config=True,
149 help="signal to use for stopping. [default: SIGINT]")
150 delay = CFloat(1., config=True,
151 help="delay (in s) between starting the controller and the engines")
152
153 subcommand = Str('', config=True,
154 help="""ipcluster has a variety of subcommands. The general way of
155 running ipcluster is 'ipcluster --<cmd> [options]'."""
156 )
157
127
158 controller_launcher_class = Str('IPython.parallel.apps.launcher.LocalControllerLauncher',
159 config=True,
160 help="The class for launching a Controller."
161 )
162 engine_launcher_class = Str('IPython.parallel.apps.launcher.LocalEngineSetLauncher',
163 config=True,
164 help="The class for launching Engines."
165 )
166 reset = Bool(False, config=True,
167 help="Whether to reset config files as part of '--create'."
168 )
169 daemonize = Bool(False, config=True,
170 help='Daemonize the ipcluster program. This implies --log-to-file')
171
128
172 def _daemonize_changed(self, name, old, new):
129 class IPClusterList(BaseIPythonApplication):
173 if new:
130 name = u'ipcluster-list'
174 self.log_to_file = True
131 description = list_help
175
132
176 def _n_changed(self, name, old, new):
133 # empty aliases
177 # propagate n all over the place...
134 aliases=Dict()
178 # TODO make this clean
135 flags = Dict(base_flags)
179 # ensure all classes are covered.
180 self.config.LocalEngineSetLauncher.n=new
181 self.config.MPIExecEngineSetLauncher.n=new
182 self.config.SSHEngineSetLauncher.n=new
183 self.config.PBSEngineSetLauncher.n=new
184 self.config.SGEEngineSetLauncher.n=new
185 self.config.WinHPEngineSetLauncher.n=new
186
187 aliases = Dict(dict(
188 n='IPClusterApp.n',
189 signal = 'IPClusterApp.signal',
190 delay = 'IPClusterApp.delay',
191 clauncher = 'IPClusterApp.controller_launcher_class',
192 elauncher = 'IPClusterApp.engine_launcher_class',
193 ))
194 flags = Dict(flags)
195
136
196 def init_clusterdir(self):
137 def _log_level_default(self):
197 subcommand = self.subcommand
138 return 20
198 if subcommand=='list':
199 self.list_cluster_dirs()
200 self.exit(0)
201 if subcommand=='create':
202 reset = self.reset_config
203 self.auto_create_cluster_dir = True
204 super(IPClusterApp, self).init_clusterdir()
205 self.log.info('Copying default config files to cluster directory '
206 '[overwrite=%r]' % (reset,))
207 self.cluster_dir.copy_all_config_files(overwrite=reset)
208 elif subcommand=='start' or subcommand=='stop':
209 self.auto_create_cluster_dir = True
210 try:
211 super(IPClusterApp, self).init_clusterdir()
212 except ClusterDirError:
213 raise ClusterDirError(
214 "Could not find a cluster directory. A cluster dir must "
215 "be created before running 'ipcluster start'. Do "
216 "'ipcluster create -h' or 'ipcluster list -h' for more "
217 "information about creating and listing cluster dirs."
218 )
219 elif subcommand=='engines':
220 self.auto_create_cluster_dir = False
221 try:
222 super(IPClusterApp, self).init_clusterdir()
223 except ClusterDirError:
224 raise ClusterDirError(
225 "Could not find a cluster directory. A cluster dir must "
226 "be created before running 'ipcluster start'. Do "
227 "'ipcluster create -h' or 'ipcluster list -h' for more "
228 "information about creating and listing cluster dirs."
229 )
230
139
231 def list_cluster_dirs(self):
140 def list_cluster_dirs(self):
232 # Find the search paths
141 # Find the search paths
233 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
142 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
234 if cluster_dir_paths:
143 if cluster_dir_paths:
235 cluster_dir_paths = cluster_dir_paths.split(':')
144 cluster_dir_paths = cluster_dir_paths.split(':')
236 else:
145 else:
237 cluster_dir_paths = []
146 cluster_dir_paths = []
238 try:
147
239 ipython_dir = self.ipython_dir
240 except AttributeError:
241 ipython_dir = self.ipython_dir
148 ipython_dir = self.ipython_dir
242 paths = [os.getcwd(), ipython_dir] + \
149
243 cluster_dir_paths
150 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
244 paths = list(set(paths))
151 paths = list(set(paths))
245
152
246 self.log.info('Searching for cluster dirs in paths: %r' % paths)
153 self.log.info('Searching for cluster dirs in paths: %r' % paths)
247 for path in paths:
154 for path in paths:
248 files = os.listdir(path)
155 files = os.listdir(path)
249 for f in files:
156 for f in files:
250 full_path = os.path.join(path, f)
157 full_path = os.path.join(path, f)
251 if os.path.isdir(full_path) and f.startswith('cluster_'):
158 if os.path.isdir(full_path) and f.startswith('cluster_'):
252 profile = full_path.split('_')[-1]
159 profile = full_path.split('_')[-1]
253 start_cmd = 'ipcluster --start profile=%s n=4' % profile
160 start_cmd = 'ipcluster start profile=%s n=4' % profile
254 print start_cmd + " ==> " + full_path
161 print start_cmd + " ==> " + full_path
255
162
256 def init_launchers(self):
163 def start(self):
257 config = self.config
164 self.list_cluster_dirs()
258 subcmd = self.subcommand
259 if subcmd =='start':
260 self.start_logging()
261 self.loop = ioloop.IOLoop.instance()
262 # reactor.callWhenRunning(self.start_launchers)
263 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
264 dc.start()
265 if subcmd == 'engines':
266 self.start_logging()
267 self.loop = ioloop.IOLoop.instance()
268 # reactor.callWhenRunning(self.start_launchers)
269 engine_only = lambda : self.start_launchers(controller=False)
270 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
271 dc.start()
272
165
273 def start_launchers(self, controller=True):
166 create_flags = {}
274 config = self.config
167 create_flags.update(base_flags)
168 create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
169 "reset config files to defaults", "leave existing config files"))
275
170
276 # Create the launchers. In both bases, we set the work_dir of
171 class IPClusterCreate(ClusterApplication):
277 # the launcher to the cluster_dir. This is where the launcher's
172 name = u'ipcluster'
278 # subprocesses will be launched. It is not where the controller
173 description = create_help
279 # and engine will be launched.
174 auto_create_cluster_dir = Bool(True,
280 if controller:
175 help="whether to create the cluster_dir if it doesn't exist")
281 clsname = self.controller_launcher_class
176 default_config_file_name = default_config_file_name
282 if '.' not in clsname:
177
283 clsname = 'IPython.parallel.apps.launcher.'+clsname
178 reset = Bool(False, config=True,
284 cl_class = import_item(clsname)
179 help="Whether to reset config files as part of 'create'."
285 self.controller_launcher = cl_class(
286 work_dir=self.cluster_dir.location, config=config,
287 logname=self.log.name
288 )
180 )
289 # Setup the observing of stopping. If the controller dies, shut
290 # everything down as that will be completely fatal for the engines.
291 self.controller_launcher.on_stop(self.stop_launchers)
292 # But, we don't monitor the stopping of engines. An engine dying
293 # is just fine and in principle a user could start a new engine.
294 # Also, if we did monitor engine stopping, it is difficult to
295 # know what to do when only some engines die. Currently, the
296 # observing of engine stopping is inconsistent. Some launchers
297 # might trigger on a single engine stopping, other wait until
298 # all stop. TODO: think more about how to handle this.
299 else:
300 self.controller_launcher = None
301
181
302 clsname = self.engine_launcher_class
182 flags = Dict(create_flags)
303 if '.' not in clsname:
183
304 # not a module, presume it's the raw name in apps.launcher
184 aliases = Dict(dict(profile='ClusterDir.profile'))
305 clsname = 'IPython.parallel.apps.launcher.'+clsname
185
306 print repr(clsname)
186 classes = [ClusterDir]
307 el_class = import_item(clsname)
187
188 def init_clusterdir(self):
189 super(IPClusterCreate, self).init_clusterdir()
190 self.log.info('Copying default config files to cluster directory '
191 '[overwrite=%r]' % (self.reset,))
192 self.cluster_dir.copy_all_config_files(overwrite=self.reset)
193
194 def initialize(self, argv=None):
195 self.parse_command_line(argv)
196 self.init_clusterdir()
308
197
309 self.engine_launcher = el_class(
198 stop_aliases = dict(
310 work_dir=self.cluster_dir.location, config=config, logname=self.log.name
199 signal='IPClusterStop.signal',
200 profile='ClusterDir.profile',
201 cluster_dir='ClusterDir.location',
311 )
202 )
312
203
313 # Setup signals
204 class IPClusterStop(ClusterApplication):
314 signal.signal(signal.SIGINT, self.sigint_handler)
205 name = u'ipcluster'
206 description = stop_help
207 auto_create_cluster_dir = Bool(False,
208 help="whether to create the cluster_dir if it doesn't exist")
209 default_config_file_name = default_config_file_name
315
210
316 # Start the controller and engines
211 signal = Int(signal.SIGINT, config=True,
317 self._stopping = False # Make sure stop_launchers is not called 2x.
212 help="signal to use for stopping processes.")
318 if controller:
319 self.start_controller()
320 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay*controller, self.loop)
321 dc.start()
322 self.startup_message()
323
213
324 def startup_message(self, r=None):
214 aliases = Dict(stop_aliases)
325 self.log.info("IPython cluster: started")
326 return r
327
215
328 def start_controller(self, r=None):
216 def start(self):
329 # self.log.info("In start_controller")
217 """Start the app for the stop subcommand."""
330 config = self.config
218 try:
331 d = self.controller_launcher.start(
219 pid = self.get_pid_from_file()
332 cluster_dir=self.cluster_dir.location
220 except PIDFileError:
221 self.log.critical(
222 'Could not read pid file, cluster is probably not running.'
333 )
223 )
334 return d
224 # Here I exit with a unusual exit status that other processes
225 # can watch for to learn how I existed.
226 self.remove_pid_file()
227 self.exit(ALREADY_STOPPED)
228
229 if not self.check_pid(pid):
230 self.log.critical(
231 'Cluster [pid=%r] is not running.' % pid
232 )
233 self.remove_pid_file()
234 # Here I exit with a unusual exit status that other processes
235 # can watch for to learn how I existed.
236 self.exit(ALREADY_STOPPED)
237
238 elif os.name=='posix':
239 sig = self.signal
240 self.log.info(
241 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
242 )
243 try:
244 os.kill(pid, sig)
245 except OSError:
246 self.log.error("Stopping cluster failed, assuming already dead.",
247 exc_info=True)
248 self.remove_pid_file()
249 elif os.name=='nt':
250 try:
251 # kill the whole tree
252 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
253 except (CalledProcessError, OSError):
254 self.log.error("Stopping cluster failed, assuming already dead.",
255 exc_info=True)
256 self.remove_pid_file()
257
258 engine_aliases = {}
259 engine_aliases.update(base_aliases)
260 engine_aliases.update(dict(
261 n='IPClusterEngines.n',
262 elauncher = 'IPClusterEngines.engine_launcher_class',
263 ))
264 class IPClusterEngines(ClusterApplication):
335
265
336 def start_engines(self, r=None):
266 name = u'ipcluster'
337 # self.log.info("In start_engines")
267 description = engines_help
338 config = self.config
268 usage = None
269 default_config_file_name = default_config_file_name
270 default_log_level = logging.INFO
271 auto_create_cluster_dir = Bool(False)
272 classes = List()
273 def _classes_default(self):
274 from IPython.parallel.apps import launcher
275 launchers = launcher.all_launchers
276 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
277 return [ClusterDir]+eslaunchers
278
279 n = Int(2, config=True,
280 help="The number of engines to start.")
281
282 engine_launcher_class = Str('LocalEngineSetLauncher',
283 config=True,
284 help="The class for launching a set of Engines."
285 )
286 daemonize = Bool(False, config=True,
287 help='Daemonize the ipcluster program. This implies --log-to-file')
288
289 def _daemonize_changed(self, name, old, new):
290 if new:
291 self.log_to_file = True
292
293 aliases = Dict(engine_aliases)
294 # flags = Dict(flags)
295 _stopping = False
296
297 def initialize(self, argv=None):
298 super(IPClusterEngines, self).initialize(argv)
299 self.init_signal()
300 self.init_launchers()
301
302 def init_launchers(self):
303 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
304 self.engine_launcher.on_stop(lambda r: self.loop.stop())
305
306 def init_signal(self):
307 # Setup signals
308 signal.signal(signal.SIGINT, self.sigint_handler)
339
309
340 d = self.engine_launcher.start(
310 def build_launcher(self, clsname):
311 """import and instantiate a Launcher based on importstring"""
312 if '.' not in clsname:
313 # not a module, presume it's the raw name in apps.launcher
314 clsname = 'IPython.parallel.apps.launcher.'+clsname
315 # print repr(clsname)
316 klass = import_item(clsname)
317
318 launcher = klass(
319 work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
320 )
321 return launcher
322
323 def start_engines(self):
324 self.log.info("Starting %i engines"%self.n)
325 self.engine_launcher.start(
341 self.n,
326 self.n,
342 cluster_dir=self.cluster_dir.location
327 cluster_dir=self.cluster_dir.location
343 )
328 )
344 return d
345
346 def stop_controller(self, r=None):
347 # self.log.info("In stop_controller")
348 if self.controller_launcher and self.controller_launcher.running:
349 return self.controller_launcher.stop()
350
329
351 def stop_engines(self, r=None):
330 def stop_engines(self):
352 # self.log.info("In stop_engines")
331 self.log.info("Stopping Engines...")
353 if self.engine_launcher.running:
332 if self.engine_launcher.running:
354 d = self.engine_launcher.stop()
333 d = self.engine_launcher.stop()
355 # d.addErrback(self.log_err)
356 return d
334 return d
357 else:
335 else:
358 return None
336 return None
359
337
360 def log_err(self, f):
361 self.log.error(f.getTraceback())
362 return None
363
364 def stop_launchers(self, r=None):
338 def stop_launchers(self, r=None):
365 if not self._stopping:
339 if not self._stopping:
366 self._stopping = True
340 self._stopping = True
367 # if isinstance(r, failure.Failure):
368 # self.log.error('Unexpected error in ipcluster:')
369 # self.log.info(r.getTraceback())
370 self.log.error("IPython cluster: stopping")
341 self.log.error("IPython cluster: stopping")
371 # These return deferreds. We are not doing anything with them
342 self.stop_engines()
372 # but we are holding refs to them as a reminder that they
373 # do return deferreds.
374 d1 = self.stop_engines()
375 d2 = self.stop_controller()
376 # Wait a few seconds to let things shut down.
343 # Wait a few seconds to let things shut down.
377 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
344 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
378 dc.start()
345 dc.start()
379 # reactor.callLater(4.0, reactor.stop)
380
346
381 def sigint_handler(self, signum, frame):
347 def sigint_handler(self, signum, frame):
348 self.log.debug("SIGINT received, stopping launchers...")
382 self.stop_launchers()
349 self.stop_launchers()
383
350
384 def start_logging(self):
351 def start_logging(self):
385 # Remove old log files of the controller and engine
352 # Remove old log files of the controller and engine
386 if self.clean_logs:
353 if self.clean_logs:
387 log_dir = self.cluster_dir.log_dir
354 log_dir = self.cluster_dir.log_dir
388 for f in os.listdir(log_dir):
355 for f in os.listdir(log_dir):
389 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
356 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
390 os.remove(os.path.join(log_dir, f))
357 os.remove(os.path.join(log_dir, f))
391 # This will remove old log files for ipcluster itself
358 # This will remove old log files for ipcluster itself
392 # super(IPClusterApp, self).start_logging()
359 # super(IPClusterApp, self).start_logging()
393
360
394 def start(self):
361 def start(self):
395 """Start the application, depending on what subcommand is used."""
362 """Start the app for the engines subcommand."""
396 subcmd = self.subcommand
363 self.log.info("IPython cluster: started")
397 if subcmd=='create':
364 # First see if the cluster is already running
398 # init_clusterdir step completed create action
365
399 return
366 # Now log and daemonize
400 elif subcmd=='start':
367 self.log.info(
401 self.start_app_start()
368 'Starting engines with [daemon=%r]' % self.daemonize
402 elif subcmd=='stop':
369 )
403 self.start_app_stop()
370 # TODO: Get daemonize working on Windows or as a Windows Server.
404 elif subcmd=='engines':
371 if self.daemonize:
405 self.start_app_engines()
372 if os.name=='posix':
373 from twisted.scripts._twistd_unix import daemonize
374 daemonize()
375
376 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
377 dc.start()
378 # Now write the new pid file AFTER our new forked pid is active.
379 # self.write_pid_file()
380 try:
381 self.loop.start()
382 except KeyboardInterrupt:
383 pass
384 except zmq.ZMQError as e:
385 if e.errno == errno.EINTR:
386 pass
406 else:
387 else:
407 self.log.fatal("one command of '--start', '--stop', '--list', '--create', '--engines'"
388 raise
408 " must be specified")
389
409 self.exit(-1)
390 start_aliases = {}
391 start_aliases.update(engine_aliases)
392 start_aliases.update(dict(
393 delay='IPClusterStart.delay',
394 clean_logs='IPClusterStart.clean_logs',
395 ))
410
396
411 def start_app_start(self):
397 class IPClusterStart(IPClusterEngines):
398
399 name = u'ipcluster'
400 description = start_help
401 usage = None
402 default_config_file_name = default_config_file_name
403 default_log_level = logging.INFO
404 auto_create_cluster_dir = Bool(True, config=True,
405 help="whether to create the cluster_dir if it doesn't exist")
406 classes = List()
407 def _classes_default(self,):
408 from IPython.parallel.apps import launcher
409 return [ClusterDir]+launcher.all_launchers
410
411 clean_logs = Bool(True, config=True,
412 help="whether to cleanup old logs before starting")
413
414 delay = CFloat(1., config=True,
415 help="delay (in s) between starting the controller and the engines")
416
417 controller_launcher_class = Str('LocalControllerLauncher',
418 config=True,
419 help="The class for launching a Controller."
420 )
421 reset = Bool(False, config=True,
422 help="Whether to reset config files as part of '--create'."
423 )
424
425 # flags = Dict(flags)
426 aliases = Dict(start_aliases)
427
428 def init_clusterdir(self):
429 try:
430 super(IPClusterStart, self).init_clusterdir()
431 except ClusterDirError:
432 raise ClusterDirError(
433 "Could not find a cluster directory. A cluster dir must "
434 "be created before running 'ipcluster start'. Do "
435 "'ipcluster create -h' or 'ipcluster list -h' for more "
436 "information about creating and listing cluster dirs."
437 )
438
439 def init_launchers(self):
440 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
441 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
442 self.controller_launcher.on_stop(self.stop_launchers)
443
444 def start_controller(self):
445 self.controller_launcher.start(
446 cluster_dir=self.cluster_dir.location
447 )
448
449 def stop_controller(self):
450 # self.log.info("In stop_controller")
451 if self.controller_launcher and self.controller_launcher.running:
452 return self.controller_launcher.stop()
453
454 def stop_launchers(self, r=None):
455 if not self._stopping:
456 self.stop_controller()
457 super(IPClusterStart, self).stop_launchers()
458
459 def start(self):
412 """Start the app for the start subcommand."""
460 """Start the app for the start subcommand."""
413 config = self.config
414 # First see if the cluster is already running
461 # First see if the cluster is already running
415 try:
462 try:
416 pid = self.get_pid_from_file()
463 pid = self.get_pid_from_file()
417 except PIDFileError:
464 except PIDFileError:
418 pass
465 pass
419 else:
466 else:
420 if self.check_pid(pid):
467 if self.check_pid(pid):
421 self.log.critical(
468 self.log.critical(
422 'Cluster is already running with [pid=%s]. '
469 'Cluster is already running with [pid=%s]. '
423 'use "ipcluster stop" to stop the cluster.' % pid
470 'use "ipcluster stop" to stop the cluster.' % pid
424 )
471 )
425 # Here I exit with a unusual exit status that other processes
472 # Here I exit with a unusual exit status that other processes
426 # can watch for to learn how I existed.
473 # can watch for to learn how I existed.
427 self.exit(ALREADY_STARTED)
474 self.exit(ALREADY_STARTED)
428 else:
475 else:
429 self.remove_pid_file()
476 self.remove_pid_file()
430
477
431
478
432 # Now log and daemonize
479 # Now log and daemonize
433 self.log.info(
480 self.log.info(
434 'Starting ipcluster with [daemon=%r]' % self.daemonize
481 'Starting ipcluster with [daemon=%r]' % self.daemonize
435 )
482 )
436 # TODO: Get daemonize working on Windows or as a Windows Server.
483 # TODO: Get daemonize working on Windows or as a Windows Server.
437 if self.daemonize:
484 if self.daemonize:
438 if os.name=='posix':
485 if os.name=='posix':
439 from twisted.scripts._twistd_unix import daemonize
486 from twisted.scripts._twistd_unix import daemonize
440 daemonize()
487 daemonize()
441
488
489 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
490 dc.start()
491 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
492 dc.start()
442 # Now write the new pid file AFTER our new forked pid is active.
493 # Now write the new pid file AFTER our new forked pid is active.
443 self.write_pid_file()
494 self.write_pid_file()
444 try:
495 try:
445 self.loop.start()
496 self.loop.start()
446 except KeyboardInterrupt:
497 except KeyboardInterrupt:
447 pass
498 pass
448 except zmq.ZMQError as e:
499 except zmq.ZMQError as e:
449 if e.errno == errno.EINTR:
500 if e.errno == errno.EINTR:
450 pass
501 pass
451 else:
502 else:
452 raise
503 raise
453 finally:
504 finally:
454 self.remove_pid_file()
505 self.remove_pid_file()
455
506
456 def start_app_engines(self):
507 base='IPython.parallel.apps.ipclusterapp.IPCluster'
457 """Start the app for the start subcommand."""
458 config = self.config
459 # First see if the cluster is already running
460
461 # Now log and daemonize
462 self.log.info(
463 'Starting engines with [daemon=%r]' % self.daemonize
464 )
465 # TODO: Get daemonize working on Windows or as a Windows Server.
466 if self.daemonize:
467 if os.name=='posix':
468 from twisted.scripts._twistd_unix import daemonize
469 daemonize()
470
508
471 # Now write the new pid file AFTER our new forked pid is active.
509 class IPClusterApp(Application):
472 # self.write_pid_file()
510 name = u'ipcluster'
473 try:
511 description = _description
474 self.loop.start()
475 except KeyboardInterrupt:
476 pass
477 except zmq.ZMQError as e:
478 if e.errno == errno.EINTR:
479 pass
480 else:
481 raise
482 # self.remove_pid_file()
483
484 def start_app_stop(self):
485 """Start the app for the stop subcommand."""
486 config = self.config
487 try:
488 pid = self.get_pid_from_file()
489 except PIDFileError:
490 self.log.critical(
491 'Could not read pid file, cluster is probably not running.'
492 )
493 # Here I exit with a unusual exit status that other processes
494 # can watch for to learn how I existed.
495 self.remove_pid_file()
496 self.exit(ALREADY_STOPPED)
497
512
498 if not self.check_pid(pid):
513 subcommands = {'create' : (base+'Create', create_help),
499 self.log.critical(
514 'list' : (base+'List', list_help),
500 'Cluster [pid=%r] is not running.' % pid
515 'start' : (base+'Start', start_help),
501 )
516 'stop' : (base+'Stop', stop_help),
502 self.remove_pid_file()
517 'engines' : (base+'Engines', engines_help),
503 # Here I exit with a unusual exit status that other processes
518 }
504 # can watch for to learn how I existed.
505 self.exit(ALREADY_STOPPED)
506
519
507 elif os.name=='posix':
520 # no aliases or flags for parent App
508 sig = self.signal
521 aliases = Dict()
509 self.log.info(
522 flags = Dict()
510 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
511 )
512 try:
513 os.kill(pid, sig)
514 except OSError:
515 self.log.error("Stopping cluster failed, assuming already dead.",
516 exc_info=True)
517 self.remove_pid_file()
518 elif os.name=='nt':
519 try:
520 # kill the whole tree
521 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
522 except (CalledProcessError, OSError):
523 self.log.error("Stopping cluster failed, assuming already dead.",
524 exc_info=True)
525 self.remove_pid_file()
526
523
524 def start(self):
525 if self.subapp is None:
526 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
527 print
528 self.print_subcommands()
529 self.exit(1)
530 else:
531 return self.subapp.start()
527
532
528 def launch_new_instance():
533 def launch_new_instance():
529 """Create and run the IPython cluster."""
534 """Create and run the IPython cluster."""
530 app = IPClusterApp()
535 app = IPClusterApp()
531 app.parse_command_line()
536 app.initialize()
532 cl_config = app.config
533 app.init_clusterdir()
534 if app.config_file:
535 app.load_config_file(app.config_file)
536 else:
537 app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
538 # command-line should *override* config file, but command-line is necessary
539 # to determine clusterdir, etc.
540 app.update_config(cl_config)
541
542 app.to_work_dir()
543 app.init_launchers()
544
545 app.start()
537 app.start()
546
538
547
539
548 if __name__ == '__main__':
540 if __name__ == '__main__':
549 launch_new_instance()
541 launch_new_instance()
550
542
@@ -1,408 +1,400 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import copy
20 import copy
21 import os
21 import os
22 import logging
22 import logging
23 import socket
23 import socket
24 import stat
24 import stat
25 import sys
25 import sys
26 import uuid
26 import uuid
27
27
28 from multiprocessing import Process
28 from multiprocessing import Process
29
29
30 import zmq
30 import zmq
31 from zmq.devices import ProcessMonitoredQueue
31 from zmq.devices import ProcessMonitoredQueue
32 from zmq.log.handlers import PUBHandler
32 from zmq.log.handlers import PUBHandler
33 from zmq.utils import jsonapi as json
33 from zmq.utils import jsonapi as json
34
34
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36
36
37 from IPython.parallel import factory
37 from IPython.parallel import factory
38
38
39 from IPython.parallel.apps.clusterdir import (
39 from IPython.parallel.apps.clusterdir import (
40 ClusterDir,
40 ClusterDir,
41 ClusterDirApplication,
41 ClusterApplication,
42 base_flags
42 base_flags
43 # ClusterDirConfigLoader
43 # ClusterDirConfigLoader
44 )
44 )
45 from IPython.utils.importstring import import_item
45 from IPython.utils.importstring import import_item
46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, CStr, Dict
46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, CStr, Dict
47
47
48 # from IPython.parallel.controller.controller import ControllerFactory
48 # from IPython.parallel.controller.controller import ControllerFactory
49 from IPython.parallel.streamsession import StreamSession
49 from IPython.parallel.streamsession import StreamSession
50 from IPython.parallel.controller.heartmonitor import HeartMonitor
50 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 from IPython.parallel.controller.hub import Hub, HubFactory
51 from IPython.parallel.controller.hub import Hub, HubFactory
52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 from IPython.parallel.controller.sqlitedb import SQLiteDB
53 from IPython.parallel.controller.sqlitedb import SQLiteDB
54
54
55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
56
56
57 # conditional import of MongoDB backend class
57 # conditional import of MongoDB backend class
58
58
59 try:
59 try:
60 from IPython.parallel.controller.mongodb import MongoDB
60 from IPython.parallel.controller.mongodb import MongoDB
61 except ImportError:
61 except ImportError:
62 maybe_mongo = []
62 maybe_mongo = []
63 else:
63 else:
64 maybe_mongo = [MongoDB]
64 maybe_mongo = [MongoDB]
65
65
66
66
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68 # Module level variables
68 # Module level variables
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70
70
71
71
72 #: The default config file name for this application
72 #: The default config file name for this application
73 default_config_file_name = u'ipcontroller_config.py'
73 default_config_file_name = u'ipcontroller_config.py'
74
74
75
75
76 _description = """Start the IPython controller for parallel computing.
76 _description = """Start the IPython controller for parallel computing.
77
77
78 The IPython controller provides a gateway between the IPython engines and
78 The IPython controller provides a gateway between the IPython engines and
79 clients. The controller needs to be started before the engines and can be
79 clients. The controller needs to be started before the engines and can be
80 configured using command line options or using a cluster directory. Cluster
80 configured using command line options or using a cluster directory. Cluster
81 directories contain config, log and security files and are usually located in
81 directories contain config, log and security files and are usually located in
82 your ipython directory and named as "cluster_<profile>". See the --profile
82 your ipython directory and named as "cluster_<profile>". See the --profile
83 and --cluster-dir options for details.
83 and --cluster-dir options for details.
84 """
84 """
85
85
86
86
87
87
88
88
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90 # The main application
90 # The main application
91 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
92 flags = {}
92 flags = {}
93 flags.update(base_flags)
93 flags.update(base_flags)
94 flags.update({
94 flags.update({
95 'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}},
95 'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}},
96 'Use threads instead of processes for the schedulers'),
96 'Use threads instead of processes for the schedulers'),
97 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
97 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
98 'use the SQLiteDB backend'),
98 'use the SQLiteDB backend'),
99 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
99 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
100 'use the MongoDB backend'),
100 'use the MongoDB backend'),
101 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
101 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
102 'use the in-memory DictDB backend'),
102 'use the in-memory DictDB backend'),
103 })
103 })
104
104
105 flags.update()
105 flags.update()
106
106
107 class IPControllerApp(ClusterDirApplication):
107 class IPControllerApp(ClusterApplication):
108
108
109 name = u'ipcontroller'
109 name = u'ipcontroller'
110 description = _description
110 description = _description
111 # command_line_loader = IPControllerAppConfigLoader
111 # command_line_loader = IPControllerAppConfigLoader
112 default_config_file_name = default_config_file_name
112 default_config_file_name = default_config_file_name
113 auto_create_cluster_dir = True
113 auto_create_cluster_dir = True
114 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
114 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
115
115
116 reuse_files = Bool(False, config=True,
116 reuse_files = Bool(False, config=True,
117 help='Whether to reuse existing json connection files [default: False]'
117 help='Whether to reuse existing json connection files [default: False]'
118 )
118 )
119 secure = Bool(True, config=True,
119 secure = Bool(True, config=True,
120 help='Whether to use exec_keys for extra authentication [default: True]'
120 help='Whether to use exec_keys for extra authentication [default: True]'
121 )
121 )
122 ssh_server = Unicode(u'', config=True,
122 ssh_server = Unicode(u'', config=True,
123 help="""ssh url for clients to use when connecting to the Controller
123 help="""ssh url for clients to use when connecting to the Controller
124 processes. It should be of the form: [user@]server[:port]. The
124 processes. It should be of the form: [user@]server[:port]. The
125 Controller\'s listening addresses must be accessible from the ssh server""",
125 Controller\'s listening addresses must be accessible from the ssh server""",
126 )
126 )
127 location = Unicode(u'', config=True,
127 location = Unicode(u'', config=True,
128 help="""The external IP or domain name of the Controller, used for disambiguating
128 help="""The external IP or domain name of the Controller, used for disambiguating
129 engine and client connections.""",
129 engine and client connections.""",
130 )
130 )
131 import_statements = List([], config=True,
131 import_statements = List([], config=True,
132 help="import statements to be run at startup. Necessary in some environments"
132 help="import statements to be run at startup. Necessary in some environments"
133 )
133 )
134
134
135 usethreads = Bool(False, config=True,
135 usethreads = Bool(False, config=True,
136 help='Use threads instead of processes for the schedulers',
136 help='Use threads instead of processes for the schedulers',
137 )
137 )
138
138
139 # internal
139 # internal
140 children = List()
140 children = List()
141 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
141 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
142
142
143 def _usethreads_changed(self, name, old, new):
143 def _usethreads_changed(self, name, old, new):
144 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
144 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
145
145
146 aliases = Dict(dict(
146 aliases = Dict(dict(
147 config = 'IPControllerApp.config_file',
147 config = 'IPControllerApp.config_file',
148 # file = 'IPControllerApp.url_file',
148 # file = 'IPControllerApp.url_file',
149 log_level = 'IPControllerApp.log_level',
149 log_level = 'IPControllerApp.log_level',
150 reuse_files = 'IPControllerApp.reuse_files',
150 reuse_files = 'IPControllerApp.reuse_files',
151 secure = 'IPControllerApp.secure',
151 secure = 'IPControllerApp.secure',
152 ssh = 'IPControllerApp.ssh_server',
152 ssh = 'IPControllerApp.ssh_server',
153 usethreads = 'IPControllerApp.usethreads',
153 usethreads = 'IPControllerApp.usethreads',
154 import_statements = 'IPControllerApp.import_statements',
154 import_statements = 'IPControllerApp.import_statements',
155 location = 'IPControllerApp.location',
155 location = 'IPControllerApp.location',
156
156
157 ident = 'StreamSession.session',
157 ident = 'StreamSession.session',
158 user = 'StreamSession.username',
158 user = 'StreamSession.username',
159 exec_key = 'StreamSession.keyfile',
159 exec_key = 'StreamSession.keyfile',
160
160
161 url = 'HubFactory.url',
161 url = 'HubFactory.url',
162 ip = 'HubFactory.ip',
162 ip = 'HubFactory.ip',
163 transport = 'HubFactory.transport',
163 transport = 'HubFactory.transport',
164 port = 'HubFactory.regport',
164 port = 'HubFactory.regport',
165
165
166 ping = 'HeartMonitor.period',
166 ping = 'HeartMonitor.period',
167
167
168 scheme = 'TaskScheduler.scheme_name',
168 scheme = 'TaskScheduler.scheme_name',
169 hwm = 'TaskScheduler.hwm',
169 hwm = 'TaskScheduler.hwm',
170
170
171
171
172 profile = "ClusterDir.profile",
172 profile = "ClusterDir.profile",
173 cluster_dir = 'ClusterDir.location',
173 cluster_dir = 'ClusterDir.location',
174
174
175 ))
175 ))
176 flags = Dict(flags)
176 flags = Dict(flags)
177
177
178
178
179 def save_connection_dict(self, fname, cdict):
179 def save_connection_dict(self, fname, cdict):
180 """save a connection dict to json file."""
180 """save a connection dict to json file."""
181 c = self.config
181 c = self.config
182 url = cdict['url']
182 url = cdict['url']
183 location = cdict['location']
183 location = cdict['location']
184 if not location:
184 if not location:
185 try:
185 try:
186 proto,ip,port = split_url(url)
186 proto,ip,port = split_url(url)
187 except AssertionError:
187 except AssertionError:
188 pass
188 pass
189 else:
189 else:
190 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
190 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
191 cdict['location'] = location
191 cdict['location'] = location
192 fname = os.path.join(self.cluster_dir.security_dir, fname)
192 fname = os.path.join(self.cluster_dir.security_dir, fname)
193 with open(fname, 'w') as f:
193 with open(fname, 'w') as f:
194 f.write(json.dumps(cdict, indent=2))
194 f.write(json.dumps(cdict, indent=2))
195 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
195 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
196
196
197 def load_config_from_json(self):
197 def load_config_from_json(self):
198 """load config from existing json connector files."""
198 """load config from existing json connector files."""
199 c = self.config
199 c = self.config
200 # load from engine config
200 # load from engine config
201 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
201 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
202 cfg = json.loads(f.read())
202 cfg = json.loads(f.read())
203 key = c.StreamSession.key = cfg['exec_key']
203 key = c.StreamSession.key = cfg['exec_key']
204 xport,addr = cfg['url'].split('://')
204 xport,addr = cfg['url'].split('://')
205 c.HubFactory.engine_transport = xport
205 c.HubFactory.engine_transport = xport
206 ip,ports = addr.split(':')
206 ip,ports = addr.split(':')
207 c.HubFactory.engine_ip = ip
207 c.HubFactory.engine_ip = ip
208 c.HubFactory.regport = int(ports)
208 c.HubFactory.regport = int(ports)
209 self.location = cfg['location']
209 self.location = cfg['location']
210
210
211 # load client config
211 # load client config
212 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
212 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
213 cfg = json.loads(f.read())
213 cfg = json.loads(f.read())
214 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
214 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
215 xport,addr = cfg['url'].split('://')
215 xport,addr = cfg['url'].split('://')
216 c.HubFactory.client_transport = xport
216 c.HubFactory.client_transport = xport
217 ip,ports = addr.split(':')
217 ip,ports = addr.split(':')
218 c.HubFactory.client_ip = ip
218 c.HubFactory.client_ip = ip
219 self.ssh_server = cfg['ssh']
219 self.ssh_server = cfg['ssh']
220 assert int(ports) == c.HubFactory.regport, "regport mismatch"
220 assert int(ports) == c.HubFactory.regport, "regport mismatch"
221
221
222 def init_hub(self):
222 def init_hub(self):
223 # This is the working dir by now.
223 # This is the working dir by now.
224 sys.path.insert(0, '')
224 sys.path.insert(0, '')
225 c = self.config
225 c = self.config
226
226
227 self.do_import_statements()
227 self.do_import_statements()
228 reusing = self.reuse_files
228 reusing = self.reuse_files
229 if reusing:
229 if reusing:
230 try:
230 try:
231 self.load_config_from_json()
231 self.load_config_from_json()
232 except (AssertionError,IOError):
232 except (AssertionError,IOError):
233 reusing=False
233 reusing=False
234 # check again, because reusing may have failed:
234 # check again, because reusing may have failed:
235 if reusing:
235 if reusing:
236 pass
236 pass
237 elif self.secure:
237 elif self.secure:
238 key = str(uuid.uuid4())
238 key = str(uuid.uuid4())
239 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
239 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
240 # with open(keyfile, 'w') as f:
240 # with open(keyfile, 'w') as f:
241 # f.write(key)
241 # f.write(key)
242 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
242 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
243 c.StreamSession.key = key
243 c.StreamSession.key = key
244 else:
244 else:
245 key = c.StreamSession.key = ''
245 key = c.StreamSession.key = ''
246
246
247 try:
247 try:
248 self.factory = HubFactory(config=c, log=self.log)
248 self.factory = HubFactory(config=c, log=self.log)
249 # self.start_logging()
249 # self.start_logging()
250 self.factory.init_hub()
250 self.factory.init_hub()
251 except:
251 except:
252 self.log.error("Couldn't construct the Controller", exc_info=True)
252 self.log.error("Couldn't construct the Controller", exc_info=True)
253 self.exit(1)
253 self.exit(1)
254
254
255 if not reusing:
255 if not reusing:
256 # save to new json config files
256 # save to new json config files
257 f = self.factory
257 f = self.factory
258 cdict = {'exec_key' : key,
258 cdict = {'exec_key' : key,
259 'ssh' : self.ssh_server,
259 'ssh' : self.ssh_server,
260 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
260 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
261 'location' : self.location
261 'location' : self.location
262 }
262 }
263 self.save_connection_dict('ipcontroller-client.json', cdict)
263 self.save_connection_dict('ipcontroller-client.json', cdict)
264 edict = cdict
264 edict = cdict
265 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
265 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
266 self.save_connection_dict('ipcontroller-engine.json', edict)
266 self.save_connection_dict('ipcontroller-engine.json', edict)
267
267
268 #
268 #
269 def init_schedulers(self):
269 def init_schedulers(self):
270 children = self.children
270 children = self.children
271 mq = import_item(self.mq_class)
271 mq = import_item(self.mq_class)
272
272
273 hub = self.factory
273 hub = self.factory
274 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
274 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
275 # IOPub relay (in a Process)
275 # IOPub relay (in a Process)
276 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
276 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
277 q.bind_in(hub.client_info['iopub'])
277 q.bind_in(hub.client_info['iopub'])
278 q.bind_out(hub.engine_info['iopub'])
278 q.bind_out(hub.engine_info['iopub'])
279 q.setsockopt_out(zmq.SUBSCRIBE, '')
279 q.setsockopt_out(zmq.SUBSCRIBE, '')
280 q.connect_mon(hub.monitor_url)
280 q.connect_mon(hub.monitor_url)
281 q.daemon=True
281 q.daemon=True
282 children.append(q)
282 children.append(q)
283
283
284 # Multiplexer Queue (in a Process)
284 # Multiplexer Queue (in a Process)
285 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
285 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
286 q.bind_in(hub.client_info['mux'])
286 q.bind_in(hub.client_info['mux'])
287 q.setsockopt_in(zmq.IDENTITY, 'mux')
287 q.setsockopt_in(zmq.IDENTITY, 'mux')
288 q.bind_out(hub.engine_info['mux'])
288 q.bind_out(hub.engine_info['mux'])
289 q.connect_mon(hub.monitor_url)
289 q.connect_mon(hub.monitor_url)
290 q.daemon=True
290 q.daemon=True
291 children.append(q)
291 children.append(q)
292
292
293 # Control Queue (in a Process)
293 # Control Queue (in a Process)
294 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
294 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
295 q.bind_in(hub.client_info['control'])
295 q.bind_in(hub.client_info['control'])
296 q.setsockopt_in(zmq.IDENTITY, 'control')
296 q.setsockopt_in(zmq.IDENTITY, 'control')
297 q.bind_out(hub.engine_info['control'])
297 q.bind_out(hub.engine_info['control'])
298 q.connect_mon(hub.monitor_url)
298 q.connect_mon(hub.monitor_url)
299 q.daemon=True
299 q.daemon=True
300 children.append(q)
300 children.append(q)
301 try:
301 try:
302 scheme = self.config.TaskScheduler.scheme_name
302 scheme = self.config.TaskScheduler.scheme_name
303 except AttributeError:
303 except AttributeError:
304 scheme = TaskScheduler.scheme_name.get_default_value()
304 scheme = TaskScheduler.scheme_name.get_default_value()
305 # Task Queue (in a Process)
305 # Task Queue (in a Process)
306 if scheme == 'pure':
306 if scheme == 'pure':
307 self.log.warn("task::using pure XREQ Task scheduler")
307 self.log.warn("task::using pure XREQ Task scheduler")
308 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
308 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
309 # q.setsockopt_out(zmq.HWM, hub.hwm)
309 # q.setsockopt_out(zmq.HWM, hub.hwm)
310 q.bind_in(hub.client_info['task'][1])
310 q.bind_in(hub.client_info['task'][1])
311 q.setsockopt_in(zmq.IDENTITY, 'task')
311 q.setsockopt_in(zmq.IDENTITY, 'task')
312 q.bind_out(hub.engine_info['task'])
312 q.bind_out(hub.engine_info['task'])
313 q.connect_mon(hub.monitor_url)
313 q.connect_mon(hub.monitor_url)
314 q.daemon=True
314 q.daemon=True
315 children.append(q)
315 children.append(q)
316 elif scheme == 'none':
316 elif scheme == 'none':
317 self.log.warn("task::using no Task scheduler")
317 self.log.warn("task::using no Task scheduler")
318
318
319 else:
319 else:
320 self.log.info("task::using Python %s Task scheduler"%scheme)
320 self.log.info("task::using Python %s Task scheduler"%scheme)
321 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
321 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
322 hub.monitor_url, hub.client_info['notification'])
322 hub.monitor_url, hub.client_info['notification'])
323 kwargs = dict(logname=self.log.name, loglevel=self.log_level,
323 kwargs = dict(logname=self.log.name, loglevel=self.log_level,
324 config=dict(self.config))
324 config=dict(self.config))
325 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
325 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
326 q.daemon=True
326 q.daemon=True
327 children.append(q)
327 children.append(q)
328
328
329
329
330 def save_urls(self):
330 def save_urls(self):
331 """save the registration urls to files."""
331 """save the registration urls to files."""
332 c = self.config
332 c = self.config
333
333
334 sec_dir = self.cluster_dir.security_dir
334 sec_dir = self.cluster_dir.security_dir
335 cf = self.factory
335 cf = self.factory
336
336
337 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
337 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
338 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
338 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
339
339
340 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
340 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
341 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
341 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
342
342
343
343
344 def do_import_statements(self):
344 def do_import_statements(self):
345 statements = self.import_statements
345 statements = self.import_statements
346 for s in statements:
346 for s in statements:
347 try:
347 try:
348 self.log.msg("Executing statement: '%s'" % s)
348 self.log.msg("Executing statement: '%s'" % s)
349 exec s in globals(), locals()
349 exec s in globals(), locals()
350 except:
350 except:
351 self.log.msg("Error running statement: %s" % s)
351 self.log.msg("Error running statement: %s" % s)
352
352
353 # def start_logging(self):
353 # def start_logging(self):
354 # super(IPControllerApp, self).start_logging()
354 # super(IPControllerApp, self).start_logging()
355 # if self.config.Global.log_url:
355 # if self.config.Global.log_url:
356 # context = self.factory.context
356 # context = self.factory.context
357 # lsock = context.socket(zmq.PUB)
357 # lsock = context.socket(zmq.PUB)
358 # lsock.connect(self.config.Global.log_url)
358 # lsock.connect(self.config.Global.log_url)
359 # handler = PUBHandler(lsock)
359 # handler = PUBHandler(lsock)
360 # handler.root_topic = 'controller'
360 # handler.root_topic = 'controller'
361 # handler.setLevel(self.log_level)
361 # handler.setLevel(self.log_level)
362 # self.log.addHandler(handler)
362 # self.log.addHandler(handler)
363 # #
363 # #
364
365 def initialize(self, argv=None):
366 super(IPControllerApp, self).initialize(argv)
367 self.init_hub()
368 self.init_schedulers()
369
364 def start(self):
370 def start(self):
365 # Start the subprocesses:
371 # Start the subprocesses:
366 self.factory.start()
372 self.factory.start()
367 child_procs = []
373 child_procs = []
368 for child in self.children:
374 for child in self.children:
369 child.start()
375 child.start()
370 if isinstance(child, ProcessMonitoredQueue):
376 if isinstance(child, ProcessMonitoredQueue):
371 child_procs.append(child.launcher)
377 child_procs.append(child.launcher)
372 elif isinstance(child, Process):
378 elif isinstance(child, Process):
373 child_procs.append(child)
379 child_procs.append(child)
374 if child_procs:
380 if child_procs:
375 signal_children(child_procs)
381 signal_children(child_procs)
376
382
377 self.write_pid_file(overwrite=True)
383 self.write_pid_file(overwrite=True)
378
384
379 try:
385 try:
380 self.factory.loop.start()
386 self.factory.loop.start()
381 except KeyboardInterrupt:
387 except KeyboardInterrupt:
382 self.log.critical("Interrupted, Exiting...\n")
388 self.log.critical("Interrupted, Exiting...\n")
383
389
384
390
391
385 def launch_new_instance():
392 def launch_new_instance():
386 """Create and run the IPython controller"""
393 """Create and run the IPython controller"""
387 app = IPControllerApp()
394 app = IPControllerApp()
388 app.parse_command_line()
395 app.initialize()
389 cl_config = app.config
390 # app.load_config_file()
391 app.init_clusterdir()
392 if app.config_file:
393 app.load_config_file(app.config_file)
394 else:
395 app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
396 # command-line should *override* config file, but command-line is necessary
397 # to determine clusterdir, etc.
398 app.update_config(cl_config)
399
400 app.to_work_dir()
401 app.init_hub()
402 app.init_schedulers()
403
404 app.start()
396 app.start()
405
397
406
398
407 if __name__ == '__main__':
399 if __name__ == '__main__':
408 launch_new_instance()
400 launch_new_instance()
@@ -1,301 +1,290 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import json
18 import json
19 import os
19 import os
20 import sys
20 import sys
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24
24
25 from IPython.parallel.apps.clusterdir import (
25 from IPython.parallel.apps.clusterdir import (
26 ClusterDirApplication,
26 ClusterApplication,
27 ClusterDir,
27 ClusterDir,
28 base_aliases,
28 base_aliases,
29 # ClusterDirConfigLoader
29 # ClusterDirConfigLoader
30 )
30 )
31 from IPython.zmq.log import EnginePUBHandler
31 from IPython.zmq.log import EnginePUBHandler
32
32
33 from IPython.config.configurable import Configurable
33 from IPython.config.configurable import Configurable
34 from IPython.parallel.streamsession import StreamSession
34 from IPython.parallel.streamsession import StreamSession
35 from IPython.parallel.engine.engine import EngineFactory
35 from IPython.parallel.engine.engine import EngineFactory
36 from IPython.parallel.engine.streamkernel import Kernel
36 from IPython.parallel.engine.streamkernel import Kernel
37 from IPython.parallel.util import disambiguate_url
37 from IPython.parallel.util import disambiguate_url
38
38
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.traitlets import Str, Bool, Unicode, Dict, List, CStr
40 from IPython.utils.traitlets import Str, Bool, Unicode, Dict, List, CStr
41
41
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Module level variables
44 # Module level variables
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 #: The default config file name for this application
47 #: The default config file name for this application
48 default_config_file_name = u'ipengine_config.py'
48 default_config_file_name = u'ipengine_config.py'
49
49
50 _description = """Start an IPython engine for parallel computing.\n\n
50 _description = """Start an IPython engine for parallel computing.\n\n
51
51
52 IPython engines run in parallel and perform computations on behalf of a client
52 IPython engines run in parallel and perform computations on behalf of a client
53 and controller. A controller needs to be started before the engines. The
53 and controller. A controller needs to be started before the engines. The
54 engine can be configured using command line options or using a cluster
54 engine can be configured using command line options or using a cluster
55 directory. Cluster directories contain config, log and security files and are
55 directory. Cluster directories contain config, log and security files and are
56 usually located in your ipython directory and named as "cluster_<profile>".
56 usually located in your ipython directory and named as "cluster_<profile>".
57 See the `profile` and `cluster_dir` options for details.
57 See the `profile` and `cluster_dir` options for details.
58 """
58 """
59
59
60
60
61 #-----------------------------------------------------------------------------
61 #-----------------------------------------------------------------------------
62 # MPI configuration
62 # MPI configuration
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64
64
65 mpi4py_init = """from mpi4py import MPI as mpi
65 mpi4py_init = """from mpi4py import MPI as mpi
66 mpi.size = mpi.COMM_WORLD.Get_size()
66 mpi.size = mpi.COMM_WORLD.Get_size()
67 mpi.rank = mpi.COMM_WORLD.Get_rank()
67 mpi.rank = mpi.COMM_WORLD.Get_rank()
68 """
68 """
69
69
70
70
71 pytrilinos_init = """from PyTrilinos import Epetra
71 pytrilinos_init = """from PyTrilinos import Epetra
72 class SimpleStruct:
72 class SimpleStruct:
73 pass
73 pass
74 mpi = SimpleStruct()
74 mpi = SimpleStruct()
75 mpi.rank = 0
75 mpi.rank = 0
76 mpi.size = 0
76 mpi.size = 0
77 """
77 """
78
78
79 class MPI(Configurable):
79 class MPI(Configurable):
80 """Configurable for MPI initialization"""
80 """Configurable for MPI initialization"""
81 use = Str('', config=True,
81 use = Str('', config=True,
82 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
82 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
83 )
83 )
84
84
85 def _on_use_changed(self, old, new):
85 def _on_use_changed(self, old, new):
86 # load default init script if it's not set
86 # load default init script if it's not set
87 if not self.init_script:
87 if not self.init_script:
88 self.init_script = self.default_inits.get(new, '')
88 self.init_script = self.default_inits.get(new, '')
89
89
90 init_script = Str('', config=True,
90 init_script = Str('', config=True,
91 help="Initialization code for MPI")
91 help="Initialization code for MPI")
92
92
93 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
93 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
94 config=True)
94 config=True)
95
95
96
96
97 #-----------------------------------------------------------------------------
97 #-----------------------------------------------------------------------------
98 # Main application
98 # Main application
99 #-----------------------------------------------------------------------------
99 #-----------------------------------------------------------------------------
100
100
101
101
102 class IPEngineApp(ClusterDirApplication):
102 class IPEngineApp(ClusterApplication):
103
103
104 app_name = Unicode(u'ipengine')
104 app_name = Unicode(u'ipengine')
105 description = Unicode(_description)
105 description = Unicode(_description)
106 default_config_file_name = default_config_file_name
106 default_config_file_name = default_config_file_name
107 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
107 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
108
108
109 auto_create_cluster_dir = Bool(False, config=True,
110 help="whether to create the cluster_dir if it doesn't exist")
111
109 startup_script = Unicode(u'', config=True,
112 startup_script = Unicode(u'', config=True,
110 help='specify a script to be run at startup')
113 help='specify a script to be run at startup')
111 startup_command = Str('', config=True,
114 startup_command = Str('', config=True,
112 help='specify a command to be run at startup')
115 help='specify a command to be run at startup')
113
116
114 url_file = Unicode(u'', config=True,
117 url_file = Unicode(u'', config=True,
115 help="""The full location of the file containing the connection information for
118 help="""The full location of the file containing the connection information for
116 the controller. If this is not given, the file must be in the
119 the controller. If this is not given, the file must be in the
117 security directory of the cluster directory. This location is
120 security directory of the cluster directory. This location is
118 resolved using the `profile` or `cluster_dir` options.""",
121 resolved using the `profile` or `cluster_dir` options.""",
119 )
122 )
120
123
121 url_file_name = Unicode(u'ipcontroller-engine.json')
124 url_file_name = Unicode(u'ipcontroller-engine.json')
122
125
123 aliases = Dict(dict(
126 aliases = Dict(dict(
124 config = 'IPEngineApp.config_file',
127 config = 'IPEngineApp.config_file',
125 file = 'IPEngineApp.url_file',
128 file = 'IPEngineApp.url_file',
126 c = 'IPEngineApp.startup_command',
129 c = 'IPEngineApp.startup_command',
127 s = 'IPEngineApp.startup_script',
130 s = 'IPEngineApp.startup_script',
128
131
129 ident = 'StreamSession.session',
132 ident = 'StreamSession.session',
130 user = 'StreamSession.username',
133 user = 'StreamSession.username',
131 exec_key = 'StreamSession.keyfile',
134 exec_key = 'StreamSession.keyfile',
132
135
133 url = 'EngineFactory.url',
136 url = 'EngineFactory.url',
134 ip = 'EngineFactory.ip',
137 ip = 'EngineFactory.ip',
135 transport = 'EngineFactory.transport',
138 transport = 'EngineFactory.transport',
136 port = 'EngineFactory.regport',
139 port = 'EngineFactory.regport',
137 location = 'EngineFactory.location',
140 location = 'EngineFactory.location',
138
141
139 timeout = 'EngineFactory.timeout',
142 timeout = 'EngineFactory.timeout',
140
143
141 profile = "ClusterDir.profile",
144 profile = "ClusterDir.profile",
142 cluster_dir = 'ClusterDir.location',
145 cluster_dir = 'ClusterDir.location',
143
146
144 mpi = 'MPI.use',
147 mpi = 'MPI.use',
145
148
146 log_level = 'IPEngineApp.log_level',
149 log_level = 'IPEngineApp.log_level',
147 ))
150 ))
148
151
149 # def find_key_file(self):
152 # def find_key_file(self):
150 # """Set the key file.
153 # """Set the key file.
151 #
154 #
152 # Here we don't try to actually see if it exists for is valid as that
155 # Here we don't try to actually see if it exists for is valid as that
153 # is hadled by the connection logic.
156 # is hadled by the connection logic.
154 # """
157 # """
155 # config = self.master_config
158 # config = self.master_config
156 # # Find the actual controller key file
159 # # Find the actual controller key file
157 # if not config.Global.key_file:
160 # if not config.Global.key_file:
158 # try_this = os.path.join(
161 # try_this = os.path.join(
159 # config.Global.cluster_dir,
162 # config.Global.cluster_dir,
160 # config.Global.security_dir,
163 # config.Global.security_dir,
161 # config.Global.key_file_name
164 # config.Global.key_file_name
162 # )
165 # )
163 # config.Global.key_file = try_this
166 # config.Global.key_file = try_this
164
167
165 def find_url_file(self):
168 def find_url_file(self):
166 """Set the key file.
169 """Set the key file.
167
170
168 Here we don't try to actually see if it exists for is valid as that
171 Here we don't try to actually see if it exists for is valid as that
169 is hadled by the connection logic.
172 is hadled by the connection logic.
170 """
173 """
171 config = self.config
174 config = self.config
172 # Find the actual controller key file
175 # Find the actual controller key file
173 if not self.url_file:
176 if not self.url_file:
174 self.url_file = os.path.join(
177 self.url_file = os.path.join(
175 self.cluster_dir.security_dir,
178 self.cluster_dir.security_dir,
176 self.url_file_name
179 self.url_file_name
177 )
180 )
178
181
179 def init_engine(self):
182 def init_engine(self):
180 # This is the working dir by now.
183 # This is the working dir by now.
181 sys.path.insert(0, '')
184 sys.path.insert(0, '')
182 config = self.config
185 config = self.config
183 # print config
186 # print config
184 self.find_url_file()
187 self.find_url_file()
185
188
186 # if os.path.exists(config.Global.key_file) and config.Global.secure:
189 # if os.path.exists(config.Global.key_file) and config.Global.secure:
187 # config.SessionFactory.exec_key = config.Global.key_file
190 # config.SessionFactory.exec_key = config.Global.key_file
188 if os.path.exists(self.url_file):
191 if os.path.exists(self.url_file):
189 with open(self.url_file) as f:
192 with open(self.url_file) as f:
190 d = json.loads(f.read())
193 d = json.loads(f.read())
191 for k,v in d.iteritems():
194 for k,v in d.iteritems():
192 if isinstance(v, unicode):
195 if isinstance(v, unicode):
193 d[k] = v.encode()
196 d[k] = v.encode()
194 if d['exec_key']:
197 if d['exec_key']:
195 config.StreamSession.key = d['exec_key']
198 config.StreamSession.key = d['exec_key']
196 d['url'] = disambiguate_url(d['url'], d['location'])
199 d['url'] = disambiguate_url(d['url'], d['location'])
197 config.EngineFactory.url = d['url']
200 config.EngineFactory.url = d['url']
198 config.EngineFactory.location = d['location']
201 config.EngineFactory.location = d['location']
199
202
200 try:
203 try:
201 exec_lines = config.Kernel.exec_lines
204 exec_lines = config.Kernel.exec_lines
202 except AttributeError:
205 except AttributeError:
203 config.Kernel.exec_lines = []
206 config.Kernel.exec_lines = []
204 exec_lines = config.Kernel.exec_lines
207 exec_lines = config.Kernel.exec_lines
205
208
206 if self.startup_script:
209 if self.startup_script:
207 enc = sys.getfilesystemencoding() or 'utf8'
210 enc = sys.getfilesystemencoding() or 'utf8'
208 cmd="execfile(%r)"%self.startup_script.encode(enc)
211 cmd="execfile(%r)"%self.startup_script.encode(enc)
209 exec_lines.append(cmd)
212 exec_lines.append(cmd)
210 if self.startup_command:
213 if self.startup_command:
211 exec_lines.append(self.startup_command)
214 exec_lines.append(self.startup_command)
212
215
213 # Create the underlying shell class and Engine
216 # Create the underlying shell class and Engine
214 # shell_class = import_item(self.master_config.Global.shell_class)
217 # shell_class = import_item(self.master_config.Global.shell_class)
215 # print self.config
218 # print self.config
216 try:
219 try:
217 self.engine = EngineFactory(config=config, log=self.log)
220 self.engine = EngineFactory(config=config, log=self.log)
218 except:
221 except:
219 self.log.error("Couldn't start the Engine", exc_info=True)
222 self.log.error("Couldn't start the Engine", exc_info=True)
220 self.exit(1)
223 self.exit(1)
221
224
222 # self.start_logging()
225 # self.start_logging()
223
226
224 # Create the service hierarchy
227 # Create the service hierarchy
225 # self.main_service = service.MultiService()
228 # self.main_service = service.MultiService()
226 # self.engine_service.setServiceParent(self.main_service)
229 # self.engine_service.setServiceParent(self.main_service)
227 # self.tub_service = Tub()
230 # self.tub_service = Tub()
228 # self.tub_service.setServiceParent(self.main_service)
231 # self.tub_service.setServiceParent(self.main_service)
229 # # This needs to be called before the connection is initiated
232 # # This needs to be called before the connection is initiated
230 # self.main_service.startService()
233 # self.main_service.startService()
231
234
232 # This initiates the connection to the controller and calls
235 # This initiates the connection to the controller and calls
233 # register_engine to tell the controller we are ready to do work
236 # register_engine to tell the controller we are ready to do work
234 # self.engine_connector = EngineConnector(self.tub_service)
237 # self.engine_connector = EngineConnector(self.tub_service)
235
238
236 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
239 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
237
240
238 # reactor.callWhenRunning(self.call_connect)
241 # reactor.callWhenRunning(self.call_connect)
239
242
240 # def start_logging(self):
243 # def start_logging(self):
241 # super(IPEngineApp, self).start_logging()
244 # super(IPEngineApp, self).start_logging()
242 # if self.master_config.Global.log_url:
245 # if self.master_config.Global.log_url:
243 # context = self.engine.context
246 # context = self.engine.context
244 # lsock = context.socket(zmq.PUB)
247 # lsock = context.socket(zmq.PUB)
245 # lsock.connect(self.master_config.Global.log_url)
248 # lsock.connect(self.master_config.Global.log_url)
246 # handler = EnginePUBHandler(self.engine, lsock)
249 # handler = EnginePUBHandler(self.engine, lsock)
247 # handler.setLevel(self.log_level)
250 # handler.setLevel(self.log_level)
248 # self.log.addHandler(handler)
251 # self.log.addHandler(handler)
249 #
252 #
250 def init_mpi(self):
253 def init_mpi(self):
251 global mpi
254 global mpi
252 self.mpi = MPI(config=self.config)
255 self.mpi = MPI(config=self.config)
253
256
254 mpi_import_statement = self.mpi.init_script
257 mpi_import_statement = self.mpi.init_script
255 if mpi_import_statement:
258 if mpi_import_statement:
256 try:
259 try:
257 self.log.info("Initializing MPI:")
260 self.log.info("Initializing MPI:")
258 self.log.info(mpi_import_statement)
261 self.log.info(mpi_import_statement)
259 exec mpi_import_statement in globals()
262 exec mpi_import_statement in globals()
260 except:
263 except:
261 mpi = None
264 mpi = None
262 else:
265 else:
263 mpi = None
266 mpi = None
264
267
268 def initialize(self, argv=None):
269 super(IPEngineApp, self).initialize(argv)
270 self.init_mpi()
271 self.init_engine()
265
272
266 def start(self):
273 def start(self):
267 self.engine.start()
274 self.engine.start()
268 try:
275 try:
269 self.engine.loop.start()
276 self.engine.loop.start()
270 except KeyboardInterrupt:
277 except KeyboardInterrupt:
271 self.log.critical("Engine Interrupted, shutting down...\n")
278 self.log.critical("Engine Interrupted, shutting down...\n")
272
279
273
280
274 def launch_new_instance():
281 def launch_new_instance():
275 """Create and run the IPython engine"""
282 """Create and run the IPython engine"""
276 app = IPEngineApp()
283 app = IPEngineApp()
277 app.parse_command_line()
284 app.initialize()
278 cl_config = app.config
279 app.init_clusterdir()
280 # app.load_config_file()
281 # print app.config
282 if app.config_file:
283 app.load_config_file(app.config_file)
284 else:
285 app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
286
287 # command-line should *override* config file, but command-line is necessary
288 # to determine clusterdir, etc.
289 app.update_config(cl_config)
290
291 # print app.config
292 app.to_work_dir()
293 app.init_mpi()
294 app.init_engine()
295 print app.config
296 app.start()
285 app.start()
297
286
298
287
299 if __name__ == '__main__':
288 if __name__ == '__main__':
300 launch_new_instance()
289 launch_new_instance()
301
290
@@ -1,132 +1,132 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 A simple IPython logger application
4 A simple IPython logger application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
8 # Copyright (C) 2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import sys
19 import sys
20
20
21 import zmq
21 import zmq
22
22
23 from IPython.parallel.apps.clusterdir import (
23 from IPython.parallel.apps.clusterdir import (
24 ClusterDirApplication,
24 ClusterApplication,
25 ClusterDirConfigLoader
25 ClusterDirConfigLoader
26 )
26 )
27 from IPython.parallel.apps.logwatcher import LogWatcher
27 from IPython.parallel.apps.logwatcher import LogWatcher
28
28
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30 # Module level variables
30 # Module level variables
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32
32
33 #: The default config file name for this application
33 #: The default config file name for this application
34 default_config_file_name = u'iplogger_config.py'
34 default_config_file_name = u'iplogger_config.py'
35
35
36 _description = """Start an IPython logger for parallel computing.\n\n
36 _description = """Start an IPython logger for parallel computing.\n\n
37
37
38 IPython controllers and engines (and your own processes) can broadcast log messages
38 IPython controllers and engines (and your own processes) can broadcast log messages
39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
40 logger can be configured using command line options or using a cluster
40 logger can be configured using command line options or using a cluster
41 directory. Cluster directories contain config, log and security files and are
41 directory. Cluster directories contain config, log and security files and are
42 usually located in your ipython directory and named as "cluster_<profile>".
42 usually located in your ipython directory and named as "cluster_<profile>".
43 See the --profile and --cluster-dir options for details.
43 See the --profile and --cluster-dir options for details.
44 """
44 """
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Command line options
47 # Command line options
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50
50
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52
52
53 def _add_arguments(self):
53 def _add_arguments(self):
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 paa = self.parser.add_argument
55 paa = self.parser.add_argument
56 # Controller config
56 # Controller config
57 paa('--url',
57 paa('--url',
58 type=str, dest='LogWatcher.url',
58 type=str, dest='LogWatcher.url',
59 help='The url the LogWatcher will listen on',
59 help='The url the LogWatcher will listen on',
60 )
60 )
61 # MPI
61 # MPI
62 paa('--topics',
62 paa('--topics',
63 type=str, dest='LogWatcher.topics', nargs='+',
63 type=str, dest='LogWatcher.topics', nargs='+',
64 help='What topics to subscribe to',
64 help='What topics to subscribe to',
65 metavar='topics')
65 metavar='topics')
66 # Global config
66 # Global config
67 paa('--log-to-file',
67 paa('--log-to-file',
68 action='store_true', dest='Global.log_to_file',
68 action='store_true', dest='Global.log_to_file',
69 help='Log to a file in the log directory (default is stdout)')
69 help='Log to a file in the log directory (default is stdout)')
70
70
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # Main application
73 # Main application
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76
76
77 class IPLoggerApp(ClusterDirApplication):
77 class IPLoggerApp(ClusterApplication):
78
78
79 name = u'iploggerz'
79 name = u'iploggerz'
80 description = _description
80 description = _description
81 command_line_loader = IPLoggerAppConfigLoader
81 command_line_loader = IPLoggerAppConfigLoader
82 default_config_file_name = default_config_file_name
82 default_config_file_name = default_config_file_name
83 auto_create_cluster_dir = True
83 auto_create_cluster_dir = True
84
84
85 def create_default_config(self):
85 def create_default_config(self):
86 super(IPLoggerApp, self).create_default_config()
86 super(IPLoggerApp, self).create_default_config()
87
87
88 # The engine should not clean logs as we don't want to remove the
88 # The engine should not clean logs as we don't want to remove the
89 # active log files of other running engines.
89 # active log files of other running engines.
90 self.default_config.Global.clean_logs = False
90 self.default_config.Global.clean_logs = False
91
91
92 # If given, this is the actual location of the logger's URL file.
92 # If given, this is the actual location of the logger's URL file.
93 # If not, this is computed using the profile, app_dir and furl_file_name
93 # If not, this is computed using the profile, app_dir and furl_file_name
94 self.default_config.Global.url_file_name = u'iplogger.url'
94 self.default_config.Global.url_file_name = u'iplogger.url'
95 self.default_config.Global.url_file = u''
95 self.default_config.Global.url_file = u''
96
96
97 def post_load_command_line_config(self):
97 def post_load_command_line_config(self):
98 pass
98 pass
99
99
100 def pre_construct(self):
100 def pre_construct(self):
101 super(IPLoggerApp, self).pre_construct()
101 super(IPLoggerApp, self).pre_construct()
102
102
103 def construct(self):
103 def construct(self):
104 # This is the working dir by now.
104 # This is the working dir by now.
105 sys.path.insert(0, '')
105 sys.path.insert(0, '')
106
106
107 self.start_logging()
107 self.start_logging()
108
108
109 try:
109 try:
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
111 except:
111 except:
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 self.exit(1)
113 self.exit(1)
114
114
115
115
116 def start_app(self):
116 def start_app(self):
117 try:
117 try:
118 self.watcher.start()
118 self.watcher.start()
119 self.watcher.loop.start()
119 self.watcher.loop.start()
120 except KeyboardInterrupt:
120 except KeyboardInterrupt:
121 self.log.critical("Logging Interrupted, shutting down...\n")
121 self.log.critical("Logging Interrupted, shutting down...\n")
122
122
123
123
124 def launch_new_instance():
124 def launch_new_instance():
125 """Create and run the IPython LogWatcher"""
125 """Create and run the IPython LogWatcher"""
126 app = IPLoggerApp()
126 app = IPLoggerApp()
127 app.start()
127 app.start()
128
128
129
129
130 if __name__ == '__main__':
130 if __name__ == '__main__':
131 launch_new_instance()
131 launch_new_instance()
132
132
General Comments 0
You need to be logged in to leave comments. Login now