##// END OF EJS Templates
Reworking how controller and engines startup in ipcluster....
bgranger -
Show More
@@ -1,481 +1,481 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython cluster directory
4 The IPython cluster directory
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import shutil
21 import shutil
22 import sys
22 import sys
23
23
24 from twisted.python import log
24 from twisted.python import log
25
25
26 from IPython.core import release
26 from IPython.core import release
27 from IPython.config.loader import PyFileConfigLoader
27 from IPython.config.loader import PyFileConfigLoader
28 from IPython.core.application import Application
28 from IPython.core.application import Application
29 from IPython.core.component import Component
29 from IPython.core.component import Component
30 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
30 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
31 from IPython.utils.traitlets import Unicode, Bool
31 from IPython.utils.traitlets import Unicode, Bool
32 from IPython.utils import genutils
32 from IPython.utils import genutils
33
33
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Imports
35 # Imports
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38
38
39 class ClusterDirError(Exception):
39 class ClusterDirError(Exception):
40 pass
40 pass
41
41
42
42
43 class PIDFileError(Exception):
43 class PIDFileError(Exception):
44 pass
44 pass
45
45
46
46
47 class ClusterDir(Component):
47 class ClusterDir(Component):
48 """An object to manage the cluster directory and its resources.
48 """An object to manage the cluster directory and its resources.
49
49
50 The cluster directory is used by :command:`ipcontroller`,
50 The cluster directory is used by :command:`ipcontroller`,
51 :command:`ipcontroller` and :command:`ipcontroller` to manage the
51 :command:`ipcontroller` and :command:`ipcontroller` to manage the
52 configuration, logging and security of these applications.
52 configuration, logging and security of these applications.
53
53
54 This object knows how to find, create and manage these directories. This
54 This object knows how to find, create and manage these directories. This
55 should be used by any code that want's to handle cluster directories.
55 should be used by any code that want's to handle cluster directories.
56 """
56 """
57
57
58 security_dir_name = Unicode('security')
58 security_dir_name = Unicode('security')
59 log_dir_name = Unicode('log')
59 log_dir_name = Unicode('log')
60 pid_dir_name = Unicode('pid')
60 pid_dir_name = Unicode('pid')
61 security_dir = Unicode(u'')
61 security_dir = Unicode(u'')
62 log_dir = Unicode(u'')
62 log_dir = Unicode(u'')
63 pid_dir = Unicode(u'')
63 pid_dir = Unicode(u'')
64 location = Unicode(u'')
64 location = Unicode(u'')
65
65
66 def __init__(self, location):
66 def __init__(self, location):
67 super(ClusterDir, self).__init__(None)
67 super(ClusterDir, self).__init__(None)
68 self.location = location
68 self.location = location
69
69
70 def _location_changed(self, name, old, new):
70 def _location_changed(self, name, old, new):
71 if not os.path.isdir(new):
71 if not os.path.isdir(new):
72 os.makedirs(new, mode=0777)
72 os.makedirs(new, mode=0777)
73 else:
73 else:
74 os.chmod(new, 0777)
74 os.chmod(new, 0777)
75 self.security_dir = os.path.join(new, self.security_dir_name)
75 self.security_dir = os.path.join(new, self.security_dir_name)
76 self.log_dir = os.path.join(new, self.log_dir_name)
76 self.log_dir = os.path.join(new, self.log_dir_name)
77 self.pid_dir = os.path.join(new, self.pid_dir_name)
77 self.pid_dir = os.path.join(new, self.pid_dir_name)
78 self.check_dirs()
78 self.check_dirs()
79
79
80 def _log_dir_changed(self, name, old, new):
80 def _log_dir_changed(self, name, old, new):
81 self.check_log_dir()
81 self.check_log_dir()
82
82
83 def check_log_dir(self):
83 def check_log_dir(self):
84 if not os.path.isdir(self.log_dir):
84 if not os.path.isdir(self.log_dir):
85 os.mkdir(self.log_dir, 0777)
85 os.mkdir(self.log_dir, 0777)
86 else:
86 else:
87 os.chmod(self.log_dir, 0777)
87 os.chmod(self.log_dir, 0777)
88
88
89 def _security_dir_changed(self, name, old, new):
89 def _security_dir_changed(self, name, old, new):
90 self.check_security_dir()
90 self.check_security_dir()
91
91
92 def check_security_dir(self):
92 def check_security_dir(self):
93 if not os.path.isdir(self.security_dir):
93 if not os.path.isdir(self.security_dir):
94 os.mkdir(self.security_dir, 0700)
94 os.mkdir(self.security_dir, 0700)
95 else:
95 else:
96 os.chmod(self.security_dir, 0700)
96 os.chmod(self.security_dir, 0700)
97
97
98 def _pid_dir_changed(self, name, old, new):
98 def _pid_dir_changed(self, name, old, new):
99 self.check_pid_dir()
99 self.check_pid_dir()
100
100
101 def check_pid_dir(self):
101 def check_pid_dir(self):
102 if not os.path.isdir(self.pid_dir):
102 if not os.path.isdir(self.pid_dir):
103 os.mkdir(self.pid_dir, 0700)
103 os.mkdir(self.pid_dir, 0700)
104 else:
104 else:
105 os.chmod(self.pid_dir, 0700)
105 os.chmod(self.pid_dir, 0700)
106
106
107 def check_dirs(self):
107 def check_dirs(self):
108 self.check_security_dir()
108 self.check_security_dir()
109 self.check_log_dir()
109 self.check_log_dir()
110 self.check_pid_dir()
110 self.check_pid_dir()
111
111
112 def load_config_file(self, filename):
112 def load_config_file(self, filename):
113 """Load a config file from the top level of the cluster dir.
113 """Load a config file from the top level of the cluster dir.
114
114
115 Parameters
115 Parameters
116 ----------
116 ----------
117 filename : unicode or str
117 filename : unicode or str
118 The filename only of the config file that must be located in
118 The filename only of the config file that must be located in
119 the top-level of the cluster directory.
119 the top-level of the cluster directory.
120 """
120 """
121 loader = PyFileConfigLoader(filename, self.location)
121 loader = PyFileConfigLoader(filename, self.location)
122 return loader.load_config()
122 return loader.load_config()
123
123
124 def copy_config_file(self, config_file, path=None, overwrite=False):
124 def copy_config_file(self, config_file, path=None, overwrite=False):
125 """Copy a default config file into the active cluster directory.
125 """Copy a default config file into the active cluster directory.
126
126
127 Default configuration files are kept in :mod:`IPython.config.default`.
127 Default configuration files are kept in :mod:`IPython.config.default`.
128 This function moves these from that location to the working cluster
128 This function moves these from that location to the working cluster
129 directory.
129 directory.
130 """
130 """
131 if path is None:
131 if path is None:
132 import IPython.config.default
132 import IPython.config.default
133 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
133 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
134 path = os.path.sep.join(path)
134 path = os.path.sep.join(path)
135 src = os.path.join(path, config_file)
135 src = os.path.join(path, config_file)
136 dst = os.path.join(self.location, config_file)
136 dst = os.path.join(self.location, config_file)
137 if not os.path.isfile(dst) or overwrite:
137 if not os.path.isfile(dst) or overwrite:
138 shutil.copy(src, dst)
138 shutil.copy(src, dst)
139
139
140 def copy_all_config_files(self, path=None, overwrite=False):
140 def copy_all_config_files(self, path=None, overwrite=False):
141 """Copy all config files into the active cluster directory."""
141 """Copy all config files into the active cluster directory."""
142 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
142 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
143 u'ipcluster_config.py']:
143 u'ipcluster_config.py']:
144 self.copy_config_file(f, path=path, overwrite=overwrite)
144 self.copy_config_file(f, path=path, overwrite=overwrite)
145
145
146 @classmethod
146 @classmethod
147 def create_cluster_dir(csl, cluster_dir):
147 def create_cluster_dir(csl, cluster_dir):
148 """Create a new cluster directory given a full path.
148 """Create a new cluster directory given a full path.
149
149
150 Parameters
150 Parameters
151 ----------
151 ----------
152 cluster_dir : str
152 cluster_dir : str
153 The full path to the cluster directory. If it does exist, it will
153 The full path to the cluster directory. If it does exist, it will
154 be used. If not, it will be created.
154 be used. If not, it will be created.
155 """
155 """
156 return ClusterDir(cluster_dir)
156 return ClusterDir(cluster_dir)
157
157
158 @classmethod
158 @classmethod
159 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
159 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
160 """Create a cluster dir by profile name and path.
160 """Create a cluster dir by profile name and path.
161
161
162 Parameters
162 Parameters
163 ----------
163 ----------
164 path : str
164 path : str
165 The path (directory) to put the cluster directory in.
165 The path (directory) to put the cluster directory in.
166 profile : str
166 profile : str
167 The name of the profile. The name of the cluster directory will
167 The name of the profile. The name of the cluster directory will
168 be "cluster_<profile>".
168 be "cluster_<profile>".
169 """
169 """
170 if not os.path.isdir(path):
170 if not os.path.isdir(path):
171 raise ClusterDirError('Directory not found: %s' % path)
171 raise ClusterDirError('Directory not found: %s' % path)
172 cluster_dir = os.path.join(path, u'cluster_' + profile)
172 cluster_dir = os.path.join(path, u'cluster_' + profile)
173 return ClusterDir(cluster_dir)
173 return ClusterDir(cluster_dir)
174
174
175 @classmethod
175 @classmethod
176 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
176 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
177 """Find an existing cluster dir by profile name, return its ClusterDir.
177 """Find an existing cluster dir by profile name, return its ClusterDir.
178
178
179 This searches through a sequence of paths for a cluster dir. If it
179 This searches through a sequence of paths for a cluster dir. If it
180 is not found, a :class:`ClusterDirError` exception will be raised.
180 is not found, a :class:`ClusterDirError` exception will be raised.
181
181
182 The search path algorithm is:
182 The search path algorithm is:
183 1. ``os.getcwd()``
183 1. ``os.getcwd()``
184 2. ``ipython_dir``
184 2. ``ipython_dir``
185 3. The directories found in the ":" separated
185 3. The directories found in the ":" separated
186 :env:`IPCLUSTER_DIR_PATH` environment variable.
186 :env:`IPCLUSTER_DIR_PATH` environment variable.
187
187
188 Parameters
188 Parameters
189 ----------
189 ----------
190 ipython_dir : unicode or str
190 ipython_dir : unicode or str
191 The IPython directory to use.
191 The IPython directory to use.
192 profile : unicode or str
192 profile : unicode or str
193 The name of the profile. The name of the cluster directory
193 The name of the profile. The name of the cluster directory
194 will be "cluster_<profile>".
194 will be "cluster_<profile>".
195 """
195 """
196 dirname = u'cluster_' + profile
196 dirname = u'cluster_' + profile
197 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
197 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
198 if cluster_dir_paths:
198 if cluster_dir_paths:
199 cluster_dir_paths = cluster_dir_paths.split(':')
199 cluster_dir_paths = cluster_dir_paths.split(':')
200 else:
200 else:
201 cluster_dir_paths = []
201 cluster_dir_paths = []
202 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
202 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
203 for p in paths:
203 for p in paths:
204 cluster_dir = os.path.join(p, dirname)
204 cluster_dir = os.path.join(p, dirname)
205 if os.path.isdir(cluster_dir):
205 if os.path.isdir(cluster_dir):
206 return ClusterDir(cluster_dir)
206 return ClusterDir(cluster_dir)
207 else:
207 else:
208 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
208 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
209
209
210 @classmethod
210 @classmethod
211 def find_cluster_dir(cls, cluster_dir):
211 def find_cluster_dir(cls, cluster_dir):
212 """Find/create a cluster dir and return its ClusterDir.
212 """Find/create a cluster dir and return its ClusterDir.
213
213
214 This will create the cluster directory if it doesn't exist.
214 This will create the cluster directory if it doesn't exist.
215
215
216 Parameters
216 Parameters
217 ----------
217 ----------
218 cluster_dir : unicode or str
218 cluster_dir : unicode or str
219 The path of the cluster directory. This is expanded using
219 The path of the cluster directory. This is expanded using
220 :func:`IPython.utils.genutils.expand_path`.
220 :func:`IPython.utils.genutils.expand_path`.
221 """
221 """
222 cluster_dir = genutils.expand_path(cluster_dir)
222 cluster_dir = genutils.expand_path(cluster_dir)
223 if not os.path.isdir(cluster_dir):
223 if not os.path.isdir(cluster_dir):
224 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
224 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
225 return ClusterDir(cluster_dir)
225 return ClusterDir(cluster_dir)
226
226
227
227
228 class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):
228 class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):
229 """Default command line options for IPython cluster applications."""
229 """Default command line options for IPython cluster applications."""
230
230
231 def _add_other_arguments(self):
231 def _add_other_arguments(self):
232 self.parser.add_argument('--ipython-dir',
232 self.parser.add_argument('--ipython-dir',
233 dest='Global.ipython_dir',type=unicode,
233 dest='Global.ipython_dir',type=unicode,
234 help='Set to override default location of Global.ipython_dir.',
234 help='Set to override default location of Global.ipython_dir.',
235 default=NoConfigDefault,
235 default=NoConfigDefault,
236 metavar='Global.ipython_dir'
236 metavar='Global.ipython_dir'
237 )
237 )
238 self.parser.add_argument('-p', '--profile',
238 self.parser.add_argument('-p', '--profile',
239 dest='Global.profile',type=unicode,
239 dest='Global.profile',type=unicode,
240 help='The string name of the profile to be used. This determines '
240 help='The string name of the profile to be used. This determines '
241 'the name of the cluster dir as: cluster_<profile>. The default profile '
241 'the name of the cluster dir as: cluster_<profile>. The default profile '
242 'is named "default". The cluster directory is resolve this way '
242 'is named "default". The cluster directory is resolve this way '
243 'if the --cluster-dir option is not used.',
243 'if the --cluster-dir option is not used.',
244 default=NoConfigDefault,
244 default=NoConfigDefault,
245 metavar='Global.profile'
245 metavar='Global.profile'
246 )
246 )
247 self.parser.add_argument('--log-level',
247 self.parser.add_argument('--log-level',
248 dest="Global.log_level",type=int,
248 dest="Global.log_level",type=int,
249 help='Set the log level (0,10,20,30,40,50). Default is 30.',
249 help='Set the log level (0,10,20,30,40,50). Default is 30.',
250 default=NoConfigDefault,
250 default=NoConfigDefault,
251 metavar="Global.log_level"
251 metavar="Global.log_level"
252 )
252 )
253 self.parser.add_argument('--cluster-dir',
253 self.parser.add_argument('--cluster-dir',
254 dest='Global.cluster_dir',type=unicode,
254 dest='Global.cluster_dir',type=unicode,
255 help='Set the cluster dir. This overrides the logic used by the '
255 help='Set the cluster dir. This overrides the logic used by the '
256 '--profile option.',
256 '--profile option.',
257 default=NoConfigDefault,
257 default=NoConfigDefault,
258 metavar='Global.cluster_dir'
258 metavar='Global.cluster_dir'
259 ),
259 ),
260 self.parser.add_argument('--working-dir',
260 self.parser.add_argument('--working-dir',
261 dest='Global.working_dir',type=unicode,
261 dest='Global.working_dir',type=unicode,
262 help='Set the working dir for the process.',
262 help='Set the working dir for the process.',
263 default=NoConfigDefault,
263 default=NoConfigDefault,
264 metavar='Global.working_dir'
264 metavar='Global.working_dir'
265 )
265 )
266 self.parser.add_argument('--clean-logs',
266 self.parser.add_argument('--clean-logs',
267 dest='Global.clean_logs', action='store_true',
267 dest='Global.clean_logs', action='store_true',
268 help='Delete old log flies before starting.',
268 help='Delete old log flies before starting.',
269 default=NoConfigDefault
269 default=NoConfigDefault
270 )
270 )
271 self.parser.add_argument('--no-clean-logs',
271 self.parser.add_argument('--no-clean-logs',
272 dest='Global.clean_logs', action='store_false',
272 dest='Global.clean_logs', action='store_false',
273 help="Don't Delete old log flies before starting.",
273 help="Don't Delete old log flies before starting.",
274 default=NoConfigDefault
274 default=NoConfigDefault
275 )
275 )
276
276
277 class ApplicationWithClusterDir(Application):
277 class ApplicationWithClusterDir(Application):
278 """An application that puts everything into a cluster directory.
278 """An application that puts everything into a cluster directory.
279
279
280 Instead of looking for things in the ipython_dir, this type of application
280 Instead of looking for things in the ipython_dir, this type of application
281 will use its own private directory called the "cluster directory"
281 will use its own private directory called the "cluster directory"
282 for things like config files, log files, etc.
282 for things like config files, log files, etc.
283
283
284 The cluster directory is resolved as follows:
284 The cluster directory is resolved as follows:
285
285
286 * If the ``--cluster-dir`` option is given, it is used.
286 * If the ``--cluster-dir`` option is given, it is used.
287 * If ``--cluster-dir`` is not given, the application directory is
287 * If ``--cluster-dir`` is not given, the application directory is
288 resolve using the profile name as ``cluster_<profile>``. The search
288 resolve using the profile name as ``cluster_<profile>``. The search
289 path for this directory is then i) cwd if it is found there
289 path for this directory is then i) cwd if it is found there
290 and ii) in ipython_dir otherwise.
290 and ii) in ipython_dir otherwise.
291
291
292 The config file for the application is to be put in the cluster
292 The config file for the application is to be put in the cluster
293 dir and named the value of the ``config_file_name`` class attribute.
293 dir and named the value of the ``config_file_name`` class attribute.
294 """
294 """
295
295
296 auto_create_cluster_dir = True
296 auto_create_cluster_dir = True
297
297
298 def create_default_config(self):
298 def create_default_config(self):
299 super(ApplicationWithClusterDir, self).create_default_config()
299 super(ApplicationWithClusterDir, self).create_default_config()
300 self.default_config.Global.profile = u'default'
300 self.default_config.Global.profile = u'default'
301 self.default_config.Global.cluster_dir = u''
301 self.default_config.Global.cluster_dir = u''
302 self.default_config.Global.working_dir = os.getcwd()
302 self.default_config.Global.working_dir = os.getcwd()
303 self.default_config.Global.log_to_file = False
303 self.default_config.Global.log_to_file = False
304 self.default_config.Global.clean_logs = False
304 self.default_config.Global.clean_logs = False
305
305
306 def create_command_line_config(self):
306 def create_command_line_config(self):
307 """Create and return a command line config loader."""
307 """Create and return a command line config loader."""
308 return AppWithClusterDirArgParseConfigLoader(
308 return AppWithClusterDirArgParseConfigLoader(
309 description=self.description,
309 description=self.description,
310 version=release.version
310 version=release.version
311 )
311 )
312
312
313 def find_resources(self):
313 def find_resources(self):
314 """This resolves the cluster directory.
314 """This resolves the cluster directory.
315
315
316 This tries to find the cluster directory and if successful, it will
316 This tries to find the cluster directory and if successful, it will
317 have done:
317 have done:
318 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
318 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
319 the application.
319 the application.
320 * Sets ``self.cluster_dir`` attribute of the application and config
320 * Sets ``self.cluster_dir`` attribute of the application and config
321 objects.
321 objects.
322
322
323 The algorithm used for this is as follows:
323 The algorithm used for this is as follows:
324 1. Try ``Global.cluster_dir``.
324 1. Try ``Global.cluster_dir``.
325 2. Try using ``Global.profile``.
325 2. Try using ``Global.profile``.
326 3. If both of these fail and ``self.auto_create_cluster_dir`` is
326 3. If both of these fail and ``self.auto_create_cluster_dir`` is
327 ``True``, then create the new cluster dir in the IPython directory.
327 ``True``, then create the new cluster dir in the IPython directory.
328 4. If all fails, then raise :class:`ClusterDirError`.
328 4. If all fails, then raise :class:`ClusterDirError`.
329 """
329 """
330
330
331 try:
331 try:
332 cluster_dir = self.command_line_config.Global.cluster_dir
332 cluster_dir = self.command_line_config.Global.cluster_dir
333 except AttributeError:
333 except AttributeError:
334 cluster_dir = self.default_config.Global.cluster_dir
334 cluster_dir = self.default_config.Global.cluster_dir
335 cluster_dir = genutils.expand_path(cluster_dir)
335 cluster_dir = genutils.expand_path(cluster_dir)
336 try:
336 try:
337 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
337 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
338 except ClusterDirError:
338 except ClusterDirError:
339 pass
339 pass
340 else:
340 else:
341 self.log.info('Using existing cluster dir: %s' % \
341 self.log.info('Using existing cluster dir: %s' % \
342 self.cluster_dir_obj.location
342 self.cluster_dir_obj.location
343 )
343 )
344 self.finish_cluster_dir()
344 self.finish_cluster_dir()
345 return
345 return
346
346
347 try:
347 try:
348 self.profile = self.command_line_config.Global.profile
348 self.profile = self.command_line_config.Global.profile
349 except AttributeError:
349 except AttributeError:
350 self.profile = self.default_config.Global.profile
350 self.profile = self.default_config.Global.profile
351 try:
351 try:
352 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
352 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
353 self.ipython_dir, self.profile)
353 self.ipython_dir, self.profile)
354 except ClusterDirError:
354 except ClusterDirError:
355 pass
355 pass
356 else:
356 else:
357 self.log.info('Using existing cluster dir: %s' % \
357 self.log.info('Using existing cluster dir: %s' % \
358 self.cluster_dir_obj.location
358 self.cluster_dir_obj.location
359 )
359 )
360 self.finish_cluster_dir()
360 self.finish_cluster_dir()
361 return
361 return
362
362
363 if self.auto_create_cluster_dir:
363 if self.auto_create_cluster_dir:
364 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
364 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
365 self.ipython_dir, self.profile
365 self.ipython_dir, self.profile
366 )
366 )
367 self.log.info('Creating new cluster dir: %s' % \
367 self.log.info('Creating new cluster dir: %s' % \
368 self.cluster_dir_obj.location
368 self.cluster_dir_obj.location
369 )
369 )
370 self.finish_cluster_dir()
370 self.finish_cluster_dir()
371 else:
371 else:
372 raise ClusterDirError('Could not find a valid cluster directory.')
372 raise ClusterDirError('Could not find a valid cluster directory.')
373
373
374 def finish_cluster_dir(self):
374 def finish_cluster_dir(self):
375 # Set the cluster directory
375 # Set the cluster directory
376 self.cluster_dir = self.cluster_dir_obj.location
376 self.cluster_dir = self.cluster_dir_obj.location
377
377
378 # These have to be set because they could be different from the one
378 # These have to be set because they could be different from the one
379 # that we just computed. Because command line has the highest
379 # that we just computed. Because command line has the highest
380 # priority, this will always end up in the master_config.
380 # priority, this will always end up in the master_config.
381 self.default_config.Global.cluster_dir = self.cluster_dir
381 self.default_config.Global.cluster_dir = self.cluster_dir
382 self.command_line_config.Global.cluster_dir = self.cluster_dir
382 self.command_line_config.Global.cluster_dir = self.cluster_dir
383
383
384 # Set the search path to the cluster directory
384 # Set the search path to the cluster directory
385 self.config_file_paths = (self.cluster_dir,)
385 self.config_file_paths = (self.cluster_dir,)
386
386
387 def find_config_file_name(self):
387 def find_config_file_name(self):
388 """Find the config file name for this application."""
388 """Find the config file name for this application."""
389 # For this type of Application it should be set as a class attribute.
389 # For this type of Application it should be set as a class attribute.
390 if not hasattr(self, 'config_file_name'):
390 if not hasattr(self, 'config_file_name'):
391 self.log.critical("No config filename found")
391 self.log.critical("No config filename found")
392
392
393 def find_config_file_paths(self):
393 def find_config_file_paths(self):
394 # Set the search path to the cluster directory
394 # Set the search path to the cluster directory
395 self.config_file_paths = (self.cluster_dir,)
395 self.config_file_paths = (self.cluster_dir,)
396
396
397 def pre_construct(self):
397 def pre_construct(self):
398 # The log and security dirs were set earlier, but here we put them
398 # The log and security dirs were set earlier, but here we put them
399 # into the config and log them.
399 # into the config and log them.
400 config = self.master_config
400 config = self.master_config
401 sdir = self.cluster_dir_obj.security_dir
401 sdir = self.cluster_dir_obj.security_dir
402 self.security_dir = config.Global.security_dir = sdir
402 self.security_dir = config.Global.security_dir = sdir
403 ldir = self.cluster_dir_obj.log_dir
403 ldir = self.cluster_dir_obj.log_dir
404 self.log_dir = config.Global.log_dir = ldir
404 self.log_dir = config.Global.log_dir = ldir
405 pdir = self.cluster_dir_obj.pid_dir
405 pdir = self.cluster_dir_obj.pid_dir
406 self.pid_dir = config.Global.pid_dir = pdir
406 self.pid_dir = config.Global.pid_dir = pdir
407 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
407 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
408 config.Global.working_dir = unicode(genutils.expand_path(config.Global.working_dir))
408 config.Global.working_dir = unicode(genutils.expand_path(config.Global.working_dir))
409 # Change to the working directory. We do this just before construct
409 # Change to the working directory. We do this just before construct
410 # is called so all the components there have the right working dir.
410 # is called so all the components there have the right working dir.
411 self.to_working_dir()
411 self.to_working_dir()
412
412
413 def to_working_dir(self):
413 def to_working_dir(self):
414 wd = self.master_config.Global.working_dir
414 wd = self.master_config.Global.working_dir
415 if unicode(wd) != unicode(os.getcwd()):
415 if unicode(wd) != unicode(os.getcwd()):
416 os.chdir(wd)
416 os.chdir(wd)
417 self.log.info("Changing to working dir: %s" % wd)
417 self.log.info("Changing to working dir: %s" % wd)
418
418
419 def start_logging(self):
419 def start_logging(self):
420 # Remove old log files
420 # Remove old log files
421 if self.master_config.Global.clean_logs:
421 if self.master_config.Global.clean_logs:
422 log_dir = self.master_config.Global.log_dir
422 log_dir = self.master_config.Global.log_dir
423 for f in os.listdir(log_dir):
423 for f in os.listdir(log_dir):
424 if f.startswith(self.name + u'-') and f.endswith('.log'):
424 if f.startswith(self.name + u'-') and f.endswith('.log'):
425 os.remove(os.path.join(log_dir, f))
425 os.remove(os.path.join(log_dir, f))
426 # Start logging to the new log file
426 # Start logging to the new log file
427 if self.master_config.Global.log_to_file:
427 if self.master_config.Global.log_to_file:
428 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
428 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
429 logfile = os.path.join(self.log_dir, log_filename)
429 logfile = os.path.join(self.log_dir, log_filename)
430 open_log_file = open(logfile, 'w')
430 open_log_file = open(logfile, 'w')
431 else:
431 else:
432 open_log_file = sys.stdout
432 open_log_file = sys.stdout
433 log.startLogging(open_log_file)
433 log.startLogging(open_log_file)
434
434
435 def write_pid_file(self, overwrite=False):
435 def write_pid_file(self, overwrite=False):
436 """Create a .pid file in the pid_dir with my pid.
436 """Create a .pid file in the pid_dir with my pid.
437
437
438 This must be called after pre_construct, which sets `self.pid_dir`.
438 This must be called after pre_construct, which sets `self.pid_dir`.
439 This raises :exc:`PIDFileError` if the pid file exists already.
439 This raises :exc:`PIDFileError` if the pid file exists already.
440 """
440 """
441 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
441 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
442 if os.path.isfile(pid_file):
442 if os.path.isfile(pid_file):
443 pid = self.get_pid_from_file()
443 pid = self.get_pid_from_file()
444 if not overwrite:
444 if not overwrite:
445 raise PIDFileError(
445 raise PIDFileError(
446 'The pid file [%s] already exists. \nThis could mean that this '
446 'The pid file [%s] already exists. \nThis could mean that this '
447 'server is already running with [pid=%s].' % (pid_file, pid)
447 'server is already running with [pid=%s].' % (pid_file, pid)
448 )
448 )
449 with open(pid_file, 'w') as f:
449 with open(pid_file, 'w') as f:
450 self.log.info("Creating pid file: %s" % pid_file)
450 self.log.info("Creating pid file: %s" % pid_file)
451 f.write(repr(os.getpid())+'\n')
451 f.write(repr(os.getpid())+'\n')
452
452
453 def remove_pid_file(self):
453 def remove_pid_file(self):
454 """Remove the pid file.
454 """Remove the pid file.
455
455
456 This should be called at shutdown by registering a callback with
456 This should be called at shutdown by registering a callback with
457 :func:`reactor.addSystemEventTrigger`.
457 :func:`reactor.addSystemEventTrigger`. This needs to return
458 ``None``.
458 """
459 """
459 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
460 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
460 if os.path.isfile(pid_file):
461 if os.path.isfile(pid_file):
461 try:
462 try:
462 self.log.info("Removing pid file: %s" % pid_file)
463 self.log.info("Removing pid file: %s" % pid_file)
463 os.remove(pid_file)
464 os.remove(pid_file)
464 except:
465 except:
465 self.log.warn("Error removing the pid file: %s" % pid_file)
466 self.log.warn("Error removing the pid file: %s" % pid_file)
466 raise
467
467
468 def get_pid_from_file(self):
468 def get_pid_from_file(self):
469 """Get the pid from the pid file.
469 """Get the pid from the pid file.
470
470
471 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
471 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
472 """
472 """
473 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
473 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
474 if os.path.isfile(pid_file):
474 if os.path.isfile(pid_file):
475 with open(pid_file, 'r') as f:
475 with open(pid_file, 'r') as f:
476 pid = int(f.read().strip())
476 pid = int(f.read().strip())
477 return pid
477 return pid
478 else:
478 else:
479 raise PIDFileError('pid file not found: %s' % pid_file)
479 raise PIDFileError('pid file not found: %s' % pid_file)
480
480
481
481
@@ -1,412 +1,454 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import logging
18 import logging
19 import os
19 import os
20 import signal
20 import signal
21 import sys
21 import sys
22
22
23 if os.name=='posix':
23 if os.name=='posix':
24 from twisted.scripts._twistd_unix import daemonize
24 from twisted.scripts._twistd_unix import daemonize
25
25
26 from IPython.core import release
26 from IPython.core import release
27 from IPython.external import argparse
27 from IPython.external import argparse
28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
29 from IPython.utils.importstring import import_item
29 from IPython.utils.importstring import import_item
30
30
31 from IPython.kernel.clusterdir import (
31 from IPython.kernel.clusterdir import (
32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
33 )
33 )
34
34
35 from twisted.internet import reactor
35 from twisted.internet import reactor, defer
36 from twisted.python import log
36 from twisted.python import log, failure
37
37
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # The ipcluster application
40 # The ipcluster application
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43
43
44 # Exit codes for ipcluster
44 # Exit codes for ipcluster
45
45
46 # This will be the exit code if the ipcluster appears to be running because
46 # This will be the exit code if the ipcluster appears to be running because
47 # a .pid file exists
47 # a .pid file exists
48 ALREADY_STARTED = 10
48 ALREADY_STARTED = 10
49
49
50 # This will be the exit code if ipcluster stop is run, but there is not .pid
50 # This will be the exit code if ipcluster stop is run, but there is not .pid
51 # file to be found.
51 # file to be found.
52 ALREADY_STOPPED = 11
52 ALREADY_STOPPED = 11
53
53
54
54
55 class IPClusterCLLoader(ArgParseConfigLoader):
55 class IPClusterCLLoader(ArgParseConfigLoader):
56
56
57 def _add_arguments(self):
57 def _add_arguments(self):
58 # This has all the common options that all subcommands use
58 # This has all the common options that all subcommands use
59 parent_parser1 = argparse.ArgumentParser(add_help=False)
59 parent_parser1 = argparse.ArgumentParser(add_help=False)
60 parent_parser1.add_argument('--ipython-dir',
60 parent_parser1.add_argument('--ipython-dir',
61 dest='Global.ipython_dir',type=unicode,
61 dest='Global.ipython_dir',type=unicode,
62 help='Set to override default location of Global.ipython_dir.',
62 help='Set to override default location of Global.ipython_dir.',
63 default=NoConfigDefault,
63 default=NoConfigDefault,
64 metavar='Global.ipython_dir')
64 metavar='Global.ipython_dir')
65 parent_parser1.add_argument('--log-level',
65 parent_parser1.add_argument('--log-level',
66 dest="Global.log_level",type=int,
66 dest="Global.log_level",type=int,
67 help='Set the log level (0,10,20,30,40,50). Default is 30.',
67 help='Set the log level (0,10,20,30,40,50). Default is 30.',
68 default=NoConfigDefault,
68 default=NoConfigDefault,
69 metavar='Global.log_level')
69 metavar='Global.log_level')
70
70
71 # This has all the common options that other subcommands use
71 # This has all the common options that other subcommands use
72 parent_parser2 = argparse.ArgumentParser(add_help=False)
72 parent_parser2 = argparse.ArgumentParser(add_help=False)
73 parent_parser2.add_argument('-p','--profile',
73 parent_parser2.add_argument('-p','--profile',
74 dest='Global.profile',type=unicode,
74 dest='Global.profile',type=unicode,
75 default=NoConfigDefault,
75 default=NoConfigDefault,
76 help='The string name of the profile to be used. This determines '
76 help='The string name of the profile to be used. This determines '
77 'the name of the cluster dir as: cluster_<profile>. The default profile '
77 'the name of the cluster dir as: cluster_<profile>. The default profile '
78 'is named "default". The cluster directory is resolve this way '
78 'is named "default". The cluster directory is resolve this way '
79 'if the --cluster-dir option is not used.',
79 'if the --cluster-dir option is not used.',
80 default=NoConfigDefault,
80 default=NoConfigDefault,
81 metavar='Global.profile')
81 metavar='Global.profile')
82 parent_parser2.add_argument('--cluster-dir',
82 parent_parser2.add_argument('--cluster-dir',
83 dest='Global.cluster_dir',type=unicode,
83 dest='Global.cluster_dir',type=unicode,
84 default=NoConfigDefault,
84 default=NoConfigDefault,
85 help='Set the cluster dir. This overrides the logic used by the '
85 help='Set the cluster dir. This overrides the logic used by the '
86 '--profile option.',
86 '--profile option.',
87 default=NoConfigDefault,
87 default=NoConfigDefault,
88 metavar='Global.cluster_dir'),
88 metavar='Global.cluster_dir'),
89 parent_parser2.add_argument('--working-dir',
89 parent_parser2.add_argument('--working-dir',
90 dest='Global.working_dir',type=unicode,
90 dest='Global.working_dir',type=unicode,
91 help='Set the working dir for the process.',
91 help='Set the working dir for the process.',
92 default=NoConfigDefault,
92 default=NoConfigDefault,
93 metavar='Global.working_dir')
93 metavar='Global.working_dir')
94 parent_parser2.add_argument('--log-to-file',
94 parent_parser2.add_argument('--log-to-file',
95 action='store_true', dest='Global.log_to_file',
95 action='store_true', dest='Global.log_to_file',
96 default=NoConfigDefault,
96 default=NoConfigDefault,
97 help='Log to a file in the log directory (default is stdout)'
97 help='Log to a file in the log directory (default is stdout)'
98 )
98 )
99
99
100 subparsers = self.parser.add_subparsers(
100 subparsers = self.parser.add_subparsers(
101 dest='Global.subcommand',
101 dest='Global.subcommand',
102 title='ipcluster subcommands',
102 title='ipcluster subcommands',
103 description='ipcluster has a variety of subcommands. '
103 description='ipcluster has a variety of subcommands. '
104 'The general way of running ipcluster is "ipcluster <cmd> '
104 'The general way of running ipcluster is "ipcluster <cmd> '
105 ' [options]""',
105 ' [options]""',
106 help='For more help, type "ipcluster <cmd> -h"')
106 help='For more help, type "ipcluster <cmd> -h"')
107
107
108 parser_list = subparsers.add_parser(
108 parser_list = subparsers.add_parser(
109 'list',
109 'list',
110 help='List all clusters in cwd and ipython_dir.',
110 help='List all clusters in cwd and ipython_dir.',
111 parents=[parent_parser1]
111 parents=[parent_parser1]
112 )
112 )
113
113
114 parser_create = subparsers.add_parser(
114 parser_create = subparsers.add_parser(
115 'create',
115 'create',
116 help='Create a new cluster directory.',
116 help='Create a new cluster directory.',
117 parents=[parent_parser1, parent_parser2]
117 parents=[parent_parser1, parent_parser2]
118 )
118 )
119 parser_create.add_argument(
119 parser_create.add_argument(
120 '--reset-config',
120 '--reset-config',
121 dest='Global.reset_config', action='store_true',
121 dest='Global.reset_config', action='store_true',
122 default=NoConfigDefault,
122 default=NoConfigDefault,
123 help='Recopy the default config files to the cluster directory. '
123 help='Recopy the default config files to the cluster directory. '
124 'You will loose any modifications you have made to these files.'
124 'You will loose any modifications you have made to these files.'
125 )
125 )
126
126
127 parser_start = subparsers.add_parser(
127 parser_start = subparsers.add_parser(
128 'start',
128 'start',
129 help='Start a cluster.',
129 help='Start a cluster.',
130 parents=[parent_parser1, parent_parser2]
130 parents=[parent_parser1, parent_parser2]
131 )
131 )
132 parser_start.add_argument(
132 parser_start.add_argument(
133 '-n', '--number',
133 '-n', '--number',
134 type=int, dest='Global.n',
134 type=int, dest='Global.n',
135 default=NoConfigDefault,
135 default=NoConfigDefault,
136 help='The number of engines to start.',
136 help='The number of engines to start.',
137 metavar='Global.n'
137 metavar='Global.n'
138 )
138 )
139 parser_start.add_argument('--clean-logs',
139 parser_start.add_argument('--clean-logs',
140 dest='Global.clean_logs', action='store_true',
140 dest='Global.clean_logs', action='store_true',
141 help='Delete old log flies before starting.',
141 help='Delete old log flies before starting.',
142 default=NoConfigDefault
142 default=NoConfigDefault
143 )
143 )
144 parser_start.add_argument('--no-clean-logs',
144 parser_start.add_argument('--no-clean-logs',
145 dest='Global.clean_logs', action='store_false',
145 dest='Global.clean_logs', action='store_false',
146 help="Don't delete old log flies before starting.",
146 help="Don't delete old log flies before starting.",
147 default=NoConfigDefault
147 default=NoConfigDefault
148 )
148 )
149 parser_start.add_argument('--daemon',
149 parser_start.add_argument('--daemon',
150 dest='Global.daemonize', action='store_true',
150 dest='Global.daemonize', action='store_true',
151 help='Daemonize the ipcluster program. This implies --log-to-file',
151 help='Daemonize the ipcluster program. This implies --log-to-file',
152 default=NoConfigDefault
152 default=NoConfigDefault
153 )
153 )
154 parser_start.add_argument('--no-daemon',
154 parser_start.add_argument('--no-daemon',
155 dest='Global.daemonize', action='store_false',
155 dest='Global.daemonize', action='store_false',
156 help="Dont't daemonize the ipcluster program.",
156 help="Dont't daemonize the ipcluster program.",
157 default=NoConfigDefault
157 default=NoConfigDefault
158 )
158 )
159
159
160 parser_start = subparsers.add_parser(
160 parser_start = subparsers.add_parser(
161 'stop',
161 'stop',
162 help='Stop a cluster.',
162 help='Stop a cluster.',
163 parents=[parent_parser1, parent_parser2]
163 parents=[parent_parser1, parent_parser2]
164 )
164 )
165 parser_start.add_argument('--signal',
165 parser_start.add_argument('--signal',
166 dest='Global.signal', type=int,
166 dest='Global.signal', type=int,
167 help="The signal number to use in stopping the cluster (default=2).",
167 help="The signal number to use in stopping the cluster (default=2).",
168 metavar="Global.signal",
168 metavar="Global.signal",
169 default=NoConfigDefault
169 default=NoConfigDefault
170 )
170 )
171
171
172
172
173 default_config_file_name = u'ipcluster_config.py'
173 default_config_file_name = u'ipcluster_config.py'
174
174
175
175
176 class IPClusterApp(ApplicationWithClusterDir):
176 class IPClusterApp(ApplicationWithClusterDir):
177
177
178 name = u'ipcluster'
178 name = u'ipcluster'
179 description = 'Start an IPython cluster (controller and engines).'
179 description = 'Start an IPython cluster (controller and engines).'
180 config_file_name = default_config_file_name
180 config_file_name = default_config_file_name
181 default_log_level = logging.INFO
181 default_log_level = logging.INFO
182 auto_create_cluster_dir = False
182 auto_create_cluster_dir = False
183
183
184 def create_default_config(self):
184 def create_default_config(self):
185 super(IPClusterApp, self).create_default_config()
185 super(IPClusterApp, self).create_default_config()
186 self.default_config.Global.controller_launcher = \
186 self.default_config.Global.controller_launcher = \
187 'IPython.kernel.launcher.LocalControllerLauncher'
187 'IPython.kernel.launcher.LocalControllerLauncher'
188 self.default_config.Global.engine_launcher = \
188 self.default_config.Global.engine_launcher = \
189 'IPython.kernel.launcher.LocalEngineSetLauncher'
189 'IPython.kernel.launcher.LocalEngineSetLauncher'
190 self.default_config.Global.n = 2
190 self.default_config.Global.n = 2
191 self.default_config.Global.reset_config = False
191 self.default_config.Global.reset_config = False
192 self.default_config.Global.clean_logs = True
192 self.default_config.Global.clean_logs = True
193 self.default_config.Global.signal = 2
193 self.default_config.Global.signal = 2
194 self.default_config.Global.daemonize = False
194 self.default_config.Global.daemonize = False
195
195
196 def create_command_line_config(self):
196 def create_command_line_config(self):
197 """Create and return a command line config loader."""
197 """Create and return a command line config loader."""
198 return IPClusterCLLoader(
198 return IPClusterCLLoader(
199 description=self.description,
199 description=self.description,
200 version=release.version
200 version=release.version
201 )
201 )
202
202
203 def find_resources(self):
203 def find_resources(self):
204 subcommand = self.command_line_config.Global.subcommand
204 subcommand = self.command_line_config.Global.subcommand
205 if subcommand=='list':
205 if subcommand=='list':
206 self.list_cluster_dirs()
206 self.list_cluster_dirs()
207 # Exit immediately because there is nothing left to do.
207 # Exit immediately because there is nothing left to do.
208 self.exit()
208 self.exit()
209 elif subcommand=='create':
209 elif subcommand=='create':
210 self.auto_create_cluster_dir = True
210 self.auto_create_cluster_dir = True
211 super(IPClusterApp, self).find_resources()
211 super(IPClusterApp, self).find_resources()
212 elif subcommand=='start' or subcommand=='stop':
212 elif subcommand=='start' or subcommand=='stop':
213 self.auto_create_cluster_dir = False
213 self.auto_create_cluster_dir = False
214 try:
214 try:
215 super(IPClusterApp, self).find_resources()
215 super(IPClusterApp, self).find_resources()
216 except ClusterDirError:
216 except ClusterDirError:
217 raise ClusterDirError(
217 raise ClusterDirError(
218 "Could not find a cluster directory. A cluster dir must "
218 "Could not find a cluster directory. A cluster dir must "
219 "be created before running 'ipcluster start'. Do "
219 "be created before running 'ipcluster start'. Do "
220 "'ipcluster create -h' or 'ipcluster list -h' for more "
220 "'ipcluster create -h' or 'ipcluster list -h' for more "
221 "information about creating and listing cluster dirs."
221 "information about creating and listing cluster dirs."
222 )
222 )
223
223
224 def list_cluster_dirs(self):
224 def list_cluster_dirs(self):
225 # Find the search paths
225 # Find the search paths
226 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
226 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
227 if cluster_dir_paths:
227 if cluster_dir_paths:
228 cluster_dir_paths = cluster_dir_paths.split(':')
228 cluster_dir_paths = cluster_dir_paths.split(':')
229 else:
229 else:
230 cluster_dir_paths = []
230 cluster_dir_paths = []
231 try:
231 try:
232 ipython_dir = self.command_line_config.Global.ipython_dir
232 ipython_dir = self.command_line_config.Global.ipython_dir
233 except AttributeError:
233 except AttributeError:
234 ipython_dir = self.default_config.Global.ipython_dir
234 ipython_dir = self.default_config.Global.ipython_dir
235 paths = [os.getcwd(), ipython_dir] + \
235 paths = [os.getcwd(), ipython_dir] + \
236 cluster_dir_paths
236 cluster_dir_paths
237 paths = list(set(paths))
237 paths = list(set(paths))
238
238
239 self.log.info('Searching for cluster dirs in paths: %r' % paths)
239 self.log.info('Searching for cluster dirs in paths: %r' % paths)
240 for path in paths:
240 for path in paths:
241 files = os.listdir(path)
241 files = os.listdir(path)
242 for f in files:
242 for f in files:
243 full_path = os.path.join(path, f)
243 full_path = os.path.join(path, f)
244 if os.path.isdir(full_path) and f.startswith('cluster_'):
244 if os.path.isdir(full_path) and f.startswith('cluster_'):
245 profile = full_path.split('_')[-1]
245 profile = full_path.split('_')[-1]
246 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
246 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
247 print start_cmd + " ==> " + full_path
247 print start_cmd + " ==> " + full_path
248
248
249 def pre_construct(self):
249 def pre_construct(self):
250 # This is where we cd to the working directory.
250 # This is where we cd to the working directory.
251 super(IPClusterApp, self).pre_construct()
251 super(IPClusterApp, self).pre_construct()
252 config = self.master_config
252 config = self.master_config
253 try:
253 try:
254 daemon = config.Global.daemonize
254 daemon = config.Global.daemonize
255 if daemon:
255 if daemon:
256 config.Global.log_to_file = True
256 config.Global.log_to_file = True
257 except AttributeError:
257 except AttributeError:
258 pass
258 pass
259
259
260 def construct(self):
260 def construct(self):
261 config = self.master_config
261 config = self.master_config
262 if config.Global.subcommand=='list':
262 if config.Global.subcommand=='list':
263 pass
263 pass
264 elif config.Global.subcommand=='create':
264 elif config.Global.subcommand=='create':
265 self.log.info('Copying default config files to cluster directory '
265 self.log.info('Copying default config files to cluster directory '
266 '[overwrite=%r]' % (config.Global.reset_config,))
266 '[overwrite=%r]' % (config.Global.reset_config,))
267 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
267 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
268 elif config.Global.subcommand=='start':
268 elif config.Global.subcommand=='start':
269 self.start_logging()
269 self.start_logging()
270 reactor.callWhenRunning(self.start_launchers)
270 reactor.callWhenRunning(self.start_launchers)
271
271
272 def start_launchers(self):
272 def start_launchers(self):
273 config = self.master_config
273 config = self.master_config
274
274
275 # Create the launchers
275 # Create the launchers
276 el_class = import_item(config.Global.engine_launcher)
276 el_class = import_item(config.Global.engine_launcher)
277 self.engine_launcher = el_class(
277 self.engine_launcher = el_class(
278 self.cluster_dir, config=config
278 self.cluster_dir, config=config
279 )
279 )
280 cl_class = import_item(config.Global.controller_launcher)
280 cl_class = import_item(config.Global.controller_launcher)
281 self.controller_launcher = cl_class(
281 self.controller_launcher = cl_class(
282 self.cluster_dir, config=config
282 self.cluster_dir, config=config
283 )
283 )
284
284
285 # Setup signals
285 # Setup signals
286 signal.signal(signal.SIGINT, self.stop_launchers)
286 signal.signal(signal.SIGINT, self.sigint_handler)
287
287
288 # Setup the observing of stopping
288 # Setup the observing of stopping. If the controller dies, shut
289 # everything down as that will be completely fatal for the engines.
289 d1 = self.controller_launcher.observe_stop()
290 d1 = self.controller_launcher.observe_stop()
290 d1.addCallback(self.stop_engines)
291 d1.addCallback(self.stop_launchers)
291 d1.addErrback(self.err_and_stop)
292 # But, we don't monitor the stopping of engines. An engine dying
292 # If this triggers, just let them die
293 # is just fine and in principle a user could start a new engine.
293 # d2 = self.engine_launcher.observe_stop()
294 # Also, if we did monitor engine stopping, it is difficult to
295 # know what to do when only some engines die. Currently, the
296 # observing of engine stopping is inconsistent. Some launchers
297 # might trigger on a single engine stopping, other wait until
298 # all stop. TODO: think more about how to handle this.
294
299
295 # Start the controller and engines
300 # Start the controller and engines
301 self._stopping = False # Make sure stop_launchers is not called 2x.
302 d = self.start_controller()
303 d.addCallback(self.start_engines)
304 d.addCallback(self.startup_message)
305 # If the controller or engines fail to start, stop everything
306 d.addErrback(self.stop_launchers)
307 return d
308
309 def startup_message(self, r=None):
310 log.msg("IPython cluster: started")
311 return r
312
313 def start_controller(self, r=None):
314 # log.msg("In start_controller")
315 config = self.master_config
296 d = self.controller_launcher.start(
316 d = self.controller_launcher.start(
297 cluster_dir=config.Global.cluster_dir
317 cluster_dir=config.Global.cluster_dir
298 )
318 )
299 d.addCallback(lambda _: self.start_engines())
319 return d
300 d.addErrback(self.err_and_stop)
320
301
321 def start_engines(self, r=None):
302 def err_and_stop(self, f):
322 # log.msg("In start_engines")
303 log.msg('Unexpected error in ipcluster:')
304 log.err(f)
305 reactor.stop()
306
307 def stop_engines(self, r):
308 return self.engine_launcher.stop()
309
310 def start_engines(self):
311 config = self.master_config
323 config = self.master_config
312 d = self.engine_launcher.start(
324 d = self.engine_launcher.start(
313 config.Global.n,
325 config.Global.n,
314 cluster_dir=config.Global.cluster_dir
326 cluster_dir=config.Global.cluster_dir
315 )
327 )
316 return d
328 return d
317
329
318 def stop_launchers(self, signum, frame):
330 def stop_controller(self, r=None):
319 log.msg("Stopping cluster")
331 # log.msg("In stop_controller")
320 d1 = self.engine_launcher.stop()
332 if self.controller_launcher.running:
321 d2 = self.controller_launcher.stop()
333 d = self.controller_launcher.stop()
322 # d1.addCallback(lambda _: self.controller_launcher.stop)
334 d.addErrback(self.log_err)
323 d1.addErrback(self.err_and_stop)
335 return d
324 d2.addErrback(self.err_and_stop)
336 else:
325 reactor.callLater(2.0, reactor.stop)
337 return defer.succeed(None)
338
339 def stop_engines(self, r=None):
340 # log.msg("In stop_engines")
341 if self.engine_launcher.running:
342 d = self.engine_launcher.stop()
343 d.addErrback(self.log_err)
344 return d
345 else:
346 return defer.succeed(None)
326
347
348 def log_err(self, f):
349 log.msg(f.getTraceback())
350 return None
351
352 def stop_launchers(self, r=None):
353 if not self._stopping:
354 self._stopping = True
355 if isinstance(r, failure.Failure):
356 log.msg('Unexpected error in ipcluster:')
357 log.msg(r.getTraceback())
358 log.msg("IPython cluster: stopping")
359 d= self.stop_engines()
360 d2 = self.stop_controller()
361 # Wait a few seconds to let things shut down.
362 reactor.callLater(3.0, reactor.stop)
363
364 def sigint_handler(self, signum, frame):
365 self.stop_launchers()
366
327 def start_logging(self):
367 def start_logging(self):
328 # Remove old log files
368 # Remove old log files of the controller and engine
329 if self.master_config.Global.clean_logs:
369 if self.master_config.Global.clean_logs:
330 log_dir = self.master_config.Global.log_dir
370 log_dir = self.master_config.Global.log_dir
331 for f in os.listdir(log_dir):
371 for f in os.listdir(log_dir):
332 if f.startswith('ipengine' + '-') and f.endswith('.log'):
372 if f.startswith('ipengine' + '-'):
333 os.remove(os.path.join(log_dir, f))
373 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
334 for f in os.listdir(log_dir):
374 os.remove(os.path.join(log_dir, f))
335 if f.startswith('ipcontroller' + '-') and f.endswith('.log'):
375 if f.startswith('ipcontroller' + '-'):
336 os.remove(os.path.join(log_dir, f))
376 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
377 os.remove(os.path.join(log_dir, f))
378 # This will remote old log files for ipcluster itself
337 super(IPClusterApp, self).start_logging()
379 super(IPClusterApp, self).start_logging()
338
380
339 def start_app(self):
381 def start_app(self):
340 """Start the application, depending on what subcommand is used."""
382 """Start the application, depending on what subcommand is used."""
341 subcmd = self.master_config.Global.subcommand
383 subcmd = self.master_config.Global.subcommand
342 if subcmd=='create' or subcmd=='list':
384 if subcmd=='create' or subcmd=='list':
343 return
385 return
344 elif subcmd=='start':
386 elif subcmd=='start':
345 self.start_app_start()
387 self.start_app_start()
346 elif subcmd=='stop':
388 elif subcmd=='stop':
347 self.start_app_stop()
389 self.start_app_stop()
348
390
349 def start_app_start(self):
391 def start_app_start(self):
350 """Start the app for the start subcommand."""
392 """Start the app for the start subcommand."""
351 config = self.master_config
393 config = self.master_config
352 # First see if the cluster is already running
394 # First see if the cluster is already running
353 try:
395 try:
354 pid = self.get_pid_from_file()
396 pid = self.get_pid_from_file()
355 except PIDFileError:
397 except PIDFileError:
356 pass
398 pass
357 else:
399 else:
358 self.log.critical(
400 self.log.critical(
359 'Cluster is already running with [pid=%s]. '
401 'Cluster is already running with [pid=%s]. '
360 'use "ipcluster stop" to stop the cluster.' % pid
402 'use "ipcluster stop" to stop the cluster.' % pid
361 )
403 )
362 # Here I exit with a unusual exit status that other processes
404 # Here I exit with a unusual exit status that other processes
363 # can watch for to learn how I existed.
405 # can watch for to learn how I existed.
364 self.exit(ALREADY_STARTED)
406 self.exit(ALREADY_STARTED)
365
407
366 # Now log and daemonize
408 # Now log and daemonize
367 self.log.info(
409 self.log.info(
368 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
410 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
369 )
411 )
370 if config.Global.daemonize:
412 if config.Global.daemonize:
371 if os.name=='posix':
413 if os.name=='posix':
372 daemonize()
414 daemonize()
373
415
374 # Now write the new pid file AFTER our new forked pid is active.
416 # Now write the new pid file AFTER our new forked pid is active.
375 self.write_pid_file()
417 self.write_pid_file()
376 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
418 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
377 reactor.run()
419 reactor.run()
378
420
379 def start_app_stop(self):
421 def start_app_stop(self):
380 """Start the app for the stop subcommand."""
422 """Start the app for the stop subcommand."""
381 config = self.master_config
423 config = self.master_config
382 try:
424 try:
383 pid = self.get_pid_from_file()
425 pid = self.get_pid_from_file()
384 except PIDFileError:
426 except PIDFileError:
385 self.log.critical(
427 self.log.critical(
386 'Problem reading pid file, cluster is probably not running.'
428 'Problem reading pid file, cluster is probably not running.'
387 )
429 )
388 # Here I exit with a unusual exit status that other processes
430 # Here I exit with a unusual exit status that other processes
389 # can watch for to learn how I existed.
431 # can watch for to learn how I existed.
390 self.exit(ALREADY_STOPPED)
432 self.exit(ALREADY_STOPPED)
391 else:
433 else:
392 if os.name=='posix':
434 if os.name=='posix':
393 sig = config.Global.signal
435 sig = config.Global.signal
394 self.log.info(
436 self.log.info(
395 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
437 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
396 )
438 )
397 os.kill(pid, sig)
439 os.kill(pid, sig)
398 elif os.name=='nt':
440 elif os.name=='nt':
399 # As of right now, we don't support daemonize on Windows, so
441 # As of right now, we don't support daemonize on Windows, so
400 # stop will not do anything. Minimally, it should clean up the
442 # stop will not do anything. Minimally, it should clean up the
401 # old .pid files.
443 # old .pid files.
402 self.remove_pid_file()
444 self.remove_pid_file()
403
445
404 def launch_new_instance():
446 def launch_new_instance():
405 """Create and run the IPython cluster."""
447 """Create and run the IPython cluster."""
406 app = IPClusterApp()
448 app = IPClusterApp()
407 app.start()
449 app.start()
408
450
409
451
410 if __name__ == '__main__':
452 if __name__ == '__main__':
411 launch_new_instance()
453 launch_new_instance()
412
454
@@ -1,306 +1,318 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Job and task components for writing .xml files that the Windows HPC Server
4 Job and task components for writing .xml files that the Windows HPC Server
5 2008 can use to start jobs.
5 2008 can use to start jobs.
6 """
6 """
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2009 The IPython Development Team
9 # Copyright (C) 2008-2009 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 from __future__ import with_statement
19 from __future__ import with_statement
20
20
21 import os
21 import os
22 import re
22 import re
23 import uuid
23 import uuid
24
24
25 from xml.etree import ElementTree as ET
25 from xml.etree import ElementTree as ET
26 from xml.dom import minidom
26 from xml.dom import minidom
27
27
28 from IPython.core.component import Component
28 from IPython.core.component import Component
29 from IPython.external import Itpl
29 from IPython.external import Itpl
30 from IPython.utils.traitlets import (
30 from IPython.utils.traitlets import (
31 Str, Int, List, Unicode, Instance,
31 Str, Int, List, Unicode, Instance,
32 Enum, Bool, CStr
32 Enum, Bool, CStr
33 )
33 )
34
34
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36 # Job and Task Component
36 # Job and Task Component
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38
38
39
39
40 def as_str(value):
40 def as_str(value):
41 if isinstance(value, str):
41 if isinstance(value, str):
42 return value
42 return value
43 elif isinstance(value, bool):
43 elif isinstance(value, bool):
44 if value:
44 if value:
45 return 'true'
45 return 'true'
46 else:
46 else:
47 return 'false'
47 return 'false'
48 elif isinstance(value, (int, float)):
48 elif isinstance(value, (int, float)):
49 return repr(value)
49 return repr(value)
50 else:
50 else:
51 return value
51 return value
52
52
53
53
54 def indent(elem, level=0):
54 def indent(elem, level=0):
55 i = "\n" + level*" "
55 i = "\n" + level*" "
56 if len(elem):
56 if len(elem):
57 if not elem.text or not elem.text.strip():
57 if not elem.text or not elem.text.strip():
58 elem.text = i + " "
58 elem.text = i + " "
59 if not elem.tail or not elem.tail.strip():
59 if not elem.tail or not elem.tail.strip():
60 elem.tail = i
60 elem.tail = i
61 for elem in elem:
61 for elem in elem:
62 indent(elem, level+1)
62 indent(elem, level+1)
63 if not elem.tail or not elem.tail.strip():
63 if not elem.tail or not elem.tail.strip():
64 elem.tail = i
64 elem.tail = i
65 else:
65 else:
66 if level and (not elem.tail or not elem.tail.strip()):
66 if level and (not elem.tail or not elem.tail.strip()):
67 elem.tail = i
67 elem.tail = i
68
68
69
69
70 def find_username():
70 def find_username():
71 domain = os.environ.get('USERDOMAIN')
71 domain = os.environ.get('USERDOMAIN')
72 username = os.environ.get('USERNAME','')
72 username = os.environ.get('USERNAME','')
73 if domain is None:
73 if domain is None:
74 return username
74 return username
75 else:
75 else:
76 return '%s\\%s' % (domain, username)
76 return '%s\\%s' % (domain, username)
77
77
78
78
79 class WinHPCJob(Component):
79 class WinHPCJob(Component):
80
80
81 job_id = Str('')
81 job_id = Str('')
82 job_name = Str('MyJob', config=True)
82 job_name = Str('MyJob', config=True)
83 min_cores = Int(1, config=True)
83 min_cores = Int(1, config=True)
84 max_cores = Int(1, config=True)
84 max_cores = Int(1, config=True)
85 min_sockets = Int(1, config=True)
85 min_sockets = Int(1, config=True)
86 max_sockets = Int(1, config=True)
86 max_sockets = Int(1, config=True)
87 min_nodes = Int(1, config=True)
87 min_nodes = Int(1, config=True)
88 max_nodes = Int(1, config=True)
88 max_nodes = Int(1, config=True)
89 unit_type = Str("Core", config=True)
89 unit_type = Str("Core", config=True)
90 auto_calculate_min = Bool(True, config=True)
90 auto_calculate_min = Bool(True, config=True)
91 auto_calculate_max = Bool(True, config=True)
91 auto_calculate_max = Bool(True, config=True)
92 run_until_canceled = Bool(False, config=True)
92 run_until_canceled = Bool(False, config=True)
93 is_exclusive = Bool(False, config=True)
93 is_exclusive = Bool(False, config=True)
94 username = Str(find_username(), config=True)
94 username = Str(find_username(), config=True)
95 job_type = Str('Batch', config=True)
95 job_type = Str('Batch', config=True)
96 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
96 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
97 default_value='Highest', config=True)
97 default_value='Highest', config=True)
98 requested_nodes = Str('', config=True)
98 requested_nodes = Str('', config=True)
99 project = Str('IPython', config=True)
99 project = Str('IPython', config=True)
100 xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
100 xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
101 version = Str("2.000")
101 version = Str("2.000")
102 tasks = List([])
102 tasks = List([])
103
103
104 @property
104 @property
105 def owner(self):
105 def owner(self):
106 return self.username
106 return self.username
107
107
108 def _write_attr(self, root, attr, key):
108 def _write_attr(self, root, attr, key):
109 s = as_str(getattr(self, attr, ''))
109 s = as_str(getattr(self, attr, ''))
110 if s:
110 if s:
111 root.set(key, s)
111 root.set(key, s)
112
112
113 def as_element(self):
113 def as_element(self):
114 # We have to add _A_ type things to get the right order than
114 # We have to add _A_ type things to get the right order than
115 # the MSFT XML parser expects.
115 # the MSFT XML parser expects.
116 root = ET.Element('Job')
116 root = ET.Element('Job')
117 self._write_attr(root, 'version', '_A_Version')
117 self._write_attr(root, 'version', '_A_Version')
118 self._write_attr(root, 'job_name', '_B_Name')
118 self._write_attr(root, 'job_name', '_B_Name')
119 self._write_attr(root, 'unit_type', '_C_UnitType')
119 self._write_attr(root, 'unit_type', '_C_UnitType')
120 self._write_attr(root, 'min_cores', '_D_MinCores')
120 self._write_attr(root, 'min_cores', '_D_MinCores')
121 self._write_attr(root, 'max_cores', '_E_MaxCores')
121 self._write_attr(root, 'max_cores', '_E_MaxCores')
122 self._write_attr(root, 'min_sockets', '_F_MinSockets')
122 self._write_attr(root, 'min_sockets', '_F_MinSockets')
123 self._write_attr(root, 'max_sockets', '_G_MaxSockets')
123 self._write_attr(root, 'max_sockets', '_G_MaxSockets')
124 self._write_attr(root, 'min_nodes', '_H_MinNodes')
124 self._write_attr(root, 'min_nodes', '_H_MinNodes')
125 self._write_attr(root, 'max_nodes', '_I_MaxNodes')
125 self._write_attr(root, 'max_nodes', '_I_MaxNodes')
126 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
126 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
127 self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
127 self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
128 self._write_attr(root, 'username', '_L_UserName')
128 self._write_attr(root, 'username', '_L_UserName')
129 self._write_attr(root, 'job_type', '_M_JobType')
129 self._write_attr(root, 'job_type', '_M_JobType')
130 self._write_attr(root, 'priority', '_N_Priority')
130 self._write_attr(root, 'priority', '_N_Priority')
131 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
131 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
132 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
132 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
133 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
133 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
134 self._write_attr(root, 'project', '_R_Project')
134 self._write_attr(root, 'project', '_R_Project')
135 self._write_attr(root, 'owner', '_S_Owner')
135 self._write_attr(root, 'owner', '_S_Owner')
136 self._write_attr(root, 'xmlns', '_T_xmlns')
136 self._write_attr(root, 'xmlns', '_T_xmlns')
137 dependencies = ET.SubElement(root, "Dependencies")
137 dependencies = ET.SubElement(root, "Dependencies")
138 etasks = ET.SubElement(root, "Tasks")
138 etasks = ET.SubElement(root, "Tasks")
139 for t in self.tasks:
139 for t in self.tasks:
140 etasks.append(t.as_element())
140 etasks.append(t.as_element())
141 return root
141 return root
142
142
143 def tostring(self):
143 def tostring(self):
144 """Return the string representation of the job description XML."""
144 """Return the string representation of the job description XML."""
145 root = self.as_element()
145 root = self.as_element()
146 indent(root)
146 indent(root)
147 txt = ET.tostring(root, encoding="utf-8")
147 txt = ET.tostring(root, encoding="utf-8")
148 # Now remove the tokens used to order the attributes.
148 # Now remove the tokens used to order the attributes.
149 txt = re.sub(r'_[A-Z]_','',txt)
149 txt = re.sub(r'_[A-Z]_','',txt)
150 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
150 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
151 return txt
151 return txt
152
152
153 def write(self, filename):
153 def write(self, filename):
154 """Write the XML job description to a file."""
154 """Write the XML job description to a file."""
155 txt = self.tostring()
155 txt = self.tostring()
156 with open(filename, 'w') as f:
156 with open(filename, 'w') as f:
157 f.write(txt)
157 f.write(txt)
158
158
159 def add_task(self, task):
159 def add_task(self, task):
160 """Add a task to the job.
160 """Add a task to the job.
161
161
162 Parameters
162 Parameters
163 ----------
163 ----------
164 task : :class:`WinHPCTask`
164 task : :class:`WinHPCTask`
165 The task object to add.
165 The task object to add.
166 """
166 """
167 self.tasks.append(task)
167 self.tasks.append(task)
168
168
169
169
170 class WinHPCTask(Component):
170 class WinHPCTask(Component):
171
171
172 task_id = Str('')
172 task_id = Str('')
173 task_name = Str('')
173 task_name = Str('')
174 version = Str("2.000")
174 version = Str("2.000")
175 min_cores = Int(1, config=True)
175 min_cores = Int(1, config=True)
176 max_cores = Int(1, config=True)
176 max_cores = Int(1, config=True)
177 min_sockets = Int(1, config=True)
177 min_sockets = Int(1, config=True)
178 max_sockets = Int(1, config=True)
178 max_sockets = Int(1, config=True)
179 min_nodes = Int(1, config=True)
179 min_nodes = Int(1, config=True)
180 max_nodes = Int(1, config=True)
180 max_nodes = Int(1, config=True)
181 unit_type = Str("Core", config=True)
181 unit_type = Str("Core", config=True)
182 command_line = CStr('', config=True)
182 command_line = CStr('', config=True)
183 work_directory = CStr('', config=True)
183 work_directory = CStr('', config=True)
184 is_rerunnaable = Bool(True, config=True)
184 is_rerunnaable = Bool(True, config=True)
185 std_out_file_path = CStr('', config=True)
185 std_out_file_path = CStr('', config=True)
186 std_err_file_path = CStr('', config=True)
186 std_err_file_path = CStr('', config=True)
187 is_parametric = Bool(False, config=True)
187 is_parametric = Bool(False, config=True)
188 environment_variables = Instance(dict, args=(), config=True)
188 environment_variables = Instance(dict, args=(), config=True)
189
189
190 def _write_attr(self, root, attr, key):
190 def _write_attr(self, root, attr, key):
191 s = as_str(getattr(self, attr, ''))
191 s = as_str(getattr(self, attr, ''))
192 if s:
192 if s:
193 root.set(key, s)
193 root.set(key, s)
194
194
195 def as_element(self):
195 def as_element(self):
196 root = ET.Element('Task')
196 root = ET.Element('Task')
197 self._write_attr(root, 'version', '_A_Version')
197 self._write_attr(root, 'version', '_A_Version')
198 self._write_attr(root, 'task_name', '_B_Name')
198 self._write_attr(root, 'task_name', '_B_Name')
199 self._write_attr(root, 'min_cores', '_C_MinCores')
199 self._write_attr(root, 'min_cores', '_C_MinCores')
200 self._write_attr(root, 'max_cores', '_D_MaxCores')
200 self._write_attr(root, 'max_cores', '_D_MaxCores')
201 self._write_attr(root, 'min_sockets', '_E_MinSockets')
201 self._write_attr(root, 'min_sockets', '_E_MinSockets')
202 self._write_attr(root, 'max_sockets', '_F_MaxSockets')
202 self._write_attr(root, 'max_sockets', '_F_MaxSockets')
203 self._write_attr(root, 'min_nodes', '_G_MinNodes')
203 self._write_attr(root, 'min_nodes', '_G_MinNodes')
204 self._write_attr(root, 'max_nodes', '_H_MaxNodes')
204 self._write_attr(root, 'max_nodes', '_H_MaxNodes')
205 self._write_attr(root, 'command_line', '_I_CommandLine')
205 self._write_attr(root, 'command_line', '_I_CommandLine')
206 self._write_attr(root, 'work_directory', '_J_WorkDirectory')
206 self._write_attr(root, 'work_directory', '_J_WorkDirectory')
207 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
207 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
208 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
208 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
209 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
209 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
210 self._write_attr(root, 'is_parametric', '_N_IsParametric')
210 self._write_attr(root, 'is_parametric', '_N_IsParametric')
211 self._write_attr(root, 'unit_type', '_O_UnitType')
211 self._write_attr(root, 'unit_type', '_O_UnitType')
212 root.append(self.get_env_vars())
212 root.append(self.get_env_vars())
213 return root
213 return root
214
214
215 def get_env_vars(self):
215 def get_env_vars(self):
216 env_vars = ET.Element('EnvironmentVariables')
216 env_vars = ET.Element('EnvironmentVariables')
217 for k, v in self.environment_variables.items():
217 for k, v in self.environment_variables.items():
218 variable = ET.SubElement(env_vars, "Variable")
218 variable = ET.SubElement(env_vars, "Variable")
219 name = ET.SubElement(variable, "Name")
219 name = ET.SubElement(variable, "Name")
220 name.text = k
220 name.text = k
221 value = ET.SubElement(variable, "Value")
221 value = ET.SubElement(variable, "Value")
222 value.text = v
222 value.text = v
223 return env_vars
223 return env_vars
224
224
225
225
226
226
227 # By declaring these, we can configure the controller and engine separately!
227 # By declaring these, we can configure the controller and engine separately!
228
228
229 class IPControllerJob(WinHPCJob):
229 class IPControllerJob(WinHPCJob):
230 job_name = Str('IPController', config=False)
230 job_name = Str('IPController', config=False)
231 is_exclusive = Bool(False, config=True)
231 is_exclusive = Bool(False, config=True)
232 username = Str(find_username(), config=True)
232 username = Str(find_username(), config=True)
233 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
233 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
234 default_value='Highest', config=True)
234 default_value='Highest', config=True)
235 requested_nodes = Str('', config=True)
235 requested_nodes = Str('', config=True)
236 project = Str('IPython', config=True)
236 project = Str('IPython', config=True)
237
237
238
238
239 class IPEngineSetJob(WinHPCJob):
239 class IPEngineSetJob(WinHPCJob):
240 job_name = Str('IPEngineSet', config=False)
240 job_name = Str('IPEngineSet', config=False)
241 is_exclusive = Bool(False, config=True)
241 is_exclusive = Bool(False, config=True)
242 username = Str(find_username(), config=True)
242 username = Str(find_username(), config=True)
243 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
243 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
244 default_value='Highest', config=True)
244 default_value='Highest', config=True)
245 requested_nodes = Str('', config=True)
245 requested_nodes = Str('', config=True)
246 project = Str('IPython', config=True)
246 project = Str('IPython', config=True)
247
247
248
248
249 class IPControllerTask(WinHPCTask):
249 class IPControllerTask(WinHPCTask):
250
250
251 task_name = Str('IPController', config=True)
251 task_name = Str('IPController', config=True)
252 controller_cmd = List(['ipcontroller.exe'], config=True)
252 controller_cmd = List(['ipcontroller.exe'], config=True)
253 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
253 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
254 # I don't want these to be configurable
254 # I don't want these to be configurable
255 std_out_file_path = CStr(os.path.join('log','ipcontroller-out.txt'), config=False)
255 std_out_file_path = CStr('', config=False)
256 std_err_file_path = CStr(os.path.join('log','ipcontroller-err.txt'), config=False)
256 std_err_file_path = CStr('', config=False)
257 min_cores = Int(1, config=False)
257 min_cores = Int(1, config=False)
258 max_cores = Int(1, config=False)
258 max_cores = Int(1, config=False)
259 min_sockets = Int(1, config=False)
259 min_sockets = Int(1, config=False)
260 max_sockets = Int(1, config=False)
260 max_sockets = Int(1, config=False)
261 min_nodes = Int(1, config=False)
261 min_nodes = Int(1, config=False)
262 max_nodes = Int(1, config=False)
262 max_nodes = Int(1, config=False)
263 unit_type = Str("Core", config=False)
263 unit_type = Str("Core", config=False)
264 work_directory = CStr('', config=False)
264 work_directory = CStr('', config=False)
265
265
266 def __init__(self, parent, name=None, config=None):
267 super(IPControllerTask, self).__init__(parent, name, config)
268 the_uuid = uuid.uuid1()
269 self.std_out_file_path = os.path.join('log','ipcontroller-%s.out' % the_uuid)
270 self.std_err_file_path = os.path.join('log','ipcontroller-%s.err' % the_uuid)
271
266 @property
272 @property
267 def command_line(self):
273 def command_line(self):
268 return ' '.join(self.controller_cmd + self.controller_args)
274 return ' '.join(self.controller_cmd + self.controller_args)
269
275
270
276
271 class IPEngineTask(WinHPCTask):
277 class IPEngineTask(WinHPCTask):
272
278
273 task_name = Str('IPEngine', config=True)
279 task_name = Str('IPEngine', config=True)
274 engine_cmd = List(['ipengine.exe'], config=True)
280 engine_cmd = List(['ipengine.exe'], config=True)
275 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
281 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
276 # I don't want these to be configurable
282 # I don't want these to be configurable
277 std_out_file_path = CStr(os.path.join('log','ipengine-out-%s.txt' % uuid.uuid1()), config=False)
283 std_out_file_path = CStr('', config=False)
278 std_err_file_path = CStr(os.path.join('log','ipengine-err-%s.txt' % uuid.uuid1()), config=False)
284 std_err_file_path = CStr('', config=False)
279 min_cores = Int(1, config=False)
285 min_cores = Int(1, config=False)
280 max_cores = Int(1, config=False)
286 max_cores = Int(1, config=False)
281 min_sockets = Int(1, config=False)
287 min_sockets = Int(1, config=False)
282 max_sockets = Int(1, config=False)
288 max_sockets = Int(1, config=False)
283 min_nodes = Int(1, config=False)
289 min_nodes = Int(1, config=False)
284 max_nodes = Int(1, config=False)
290 max_nodes = Int(1, config=False)
285 unit_type = Str("Core", config=False)
291 unit_type = Str("Core", config=False)
286 work_directory = CStr('', config=False)
292 work_directory = CStr('', config=False)
287
293
294 def __init__(self, parent, name=None, config=None):
295 super(IPEngineTask,self).__init__(parent, name, config)
296 the_uuid = uuid.uuid1()
297 self.std_out_file_path = os.path.join('log','ipengine-%s.out' % the_uuid)
298 self.std_err_file_path = os.path.join('log','ipengine-%s.err' % the_uuid)
299
288 @property
300 @property
289 def command_line(self):
301 def command_line(self):
290 return ' '.join(self.engine_cmd + self.engine_args)
302 return ' '.join(self.engine_cmd + self.engine_args)
291
303
292
304
293 # j = WinHPCJob(None)
305 # j = WinHPCJob(None)
294 # j.job_name = 'IPCluster'
306 # j.job_name = 'IPCluster'
295 # j.username = 'GNET\\bgranger'
307 # j.username = 'GNET\\bgranger'
296 # j.requested_nodes = 'GREEN'
308 # j.requested_nodes = 'GREEN'
297 #
309 #
298 # t = WinHPCTask(None)
310 # t = WinHPCTask(None)
299 # t.task_name = 'Controller'
311 # t.task_name = 'Controller'
300 # t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
312 # t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
301 # t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
313 # t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
302 # t.std_out_file_path = 'controller-out.txt'
314 # t.std_out_file_path = 'controller-out.txt'
303 # t.std_err_file_path = 'controller-err.txt'
315 # t.std_err_file_path = 'controller-err.txt'
304 # t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
316 # t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
305 # j.add_task(t)
317 # j.add_task(t)
306
318
General Comments 0
You need to be logged in to leave comments. Login now