##// END OF EJS Templates
Reworking how controller and engines startup in ipcluster....
bgranger -
Show More
@@ -1,481 +1,481 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import os
21 21 import shutil
22 22 import sys
23 23
24 24 from twisted.python import log
25 25
26 26 from IPython.core import release
27 27 from IPython.config.loader import PyFileConfigLoader
28 28 from IPython.core.application import Application
29 29 from IPython.core.component import Component
30 30 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
31 31 from IPython.utils.traitlets import Unicode, Bool
32 32 from IPython.utils import genutils
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Imports
36 36 #-----------------------------------------------------------------------------
37 37
38 38
39 39 class ClusterDirError(Exception):
40 40 pass
41 41
42 42
43 43 class PIDFileError(Exception):
44 44 pass
45 45
46 46
47 47 class ClusterDir(Component):
48 48 """An object to manage the cluster directory and its resources.
49 49
50 50 The cluster directory is used by :command:`ipcontroller`,
51 51 :command:`ipcontroller` and :command:`ipcontroller` to manage the
52 52 configuration, logging and security of these applications.
53 53
54 54 This object knows how to find, create and manage these directories. This
55 55 should be used by any code that want's to handle cluster directories.
56 56 """
57 57
58 58 security_dir_name = Unicode('security')
59 59 log_dir_name = Unicode('log')
60 60 pid_dir_name = Unicode('pid')
61 61 security_dir = Unicode(u'')
62 62 log_dir = Unicode(u'')
63 63 pid_dir = Unicode(u'')
64 64 location = Unicode(u'')
65 65
66 66 def __init__(self, location):
67 67 super(ClusterDir, self).__init__(None)
68 68 self.location = location
69 69
70 70 def _location_changed(self, name, old, new):
71 71 if not os.path.isdir(new):
72 72 os.makedirs(new, mode=0777)
73 73 else:
74 74 os.chmod(new, 0777)
75 75 self.security_dir = os.path.join(new, self.security_dir_name)
76 76 self.log_dir = os.path.join(new, self.log_dir_name)
77 77 self.pid_dir = os.path.join(new, self.pid_dir_name)
78 78 self.check_dirs()
79 79
80 80 def _log_dir_changed(self, name, old, new):
81 81 self.check_log_dir()
82 82
83 83 def check_log_dir(self):
84 84 if not os.path.isdir(self.log_dir):
85 85 os.mkdir(self.log_dir, 0777)
86 86 else:
87 87 os.chmod(self.log_dir, 0777)
88 88
89 89 def _security_dir_changed(self, name, old, new):
90 90 self.check_security_dir()
91 91
92 92 def check_security_dir(self):
93 93 if not os.path.isdir(self.security_dir):
94 94 os.mkdir(self.security_dir, 0700)
95 95 else:
96 96 os.chmod(self.security_dir, 0700)
97 97
98 98 def _pid_dir_changed(self, name, old, new):
99 99 self.check_pid_dir()
100 100
101 101 def check_pid_dir(self):
102 102 if not os.path.isdir(self.pid_dir):
103 103 os.mkdir(self.pid_dir, 0700)
104 104 else:
105 105 os.chmod(self.pid_dir, 0700)
106 106
107 107 def check_dirs(self):
108 108 self.check_security_dir()
109 109 self.check_log_dir()
110 110 self.check_pid_dir()
111 111
112 112 def load_config_file(self, filename):
113 113 """Load a config file from the top level of the cluster dir.
114 114
115 115 Parameters
116 116 ----------
117 117 filename : unicode or str
118 118 The filename only of the config file that must be located in
119 119 the top-level of the cluster directory.
120 120 """
121 121 loader = PyFileConfigLoader(filename, self.location)
122 122 return loader.load_config()
123 123
124 124 def copy_config_file(self, config_file, path=None, overwrite=False):
125 125 """Copy a default config file into the active cluster directory.
126 126
127 127 Default configuration files are kept in :mod:`IPython.config.default`.
128 128 This function moves these from that location to the working cluster
129 129 directory.
130 130 """
131 131 if path is None:
132 132 import IPython.config.default
133 133 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
134 134 path = os.path.sep.join(path)
135 135 src = os.path.join(path, config_file)
136 136 dst = os.path.join(self.location, config_file)
137 137 if not os.path.isfile(dst) or overwrite:
138 138 shutil.copy(src, dst)
139 139
140 140 def copy_all_config_files(self, path=None, overwrite=False):
141 141 """Copy all config files into the active cluster directory."""
142 142 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
143 143 u'ipcluster_config.py']:
144 144 self.copy_config_file(f, path=path, overwrite=overwrite)
145 145
146 146 @classmethod
147 147 def create_cluster_dir(csl, cluster_dir):
148 148 """Create a new cluster directory given a full path.
149 149
150 150 Parameters
151 151 ----------
152 152 cluster_dir : str
153 153 The full path to the cluster directory. If it does exist, it will
154 154 be used. If not, it will be created.
155 155 """
156 156 return ClusterDir(cluster_dir)
157 157
158 158 @classmethod
159 159 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
160 160 """Create a cluster dir by profile name and path.
161 161
162 162 Parameters
163 163 ----------
164 164 path : str
165 165 The path (directory) to put the cluster directory in.
166 166 profile : str
167 167 The name of the profile. The name of the cluster directory will
168 168 be "cluster_<profile>".
169 169 """
170 170 if not os.path.isdir(path):
171 171 raise ClusterDirError('Directory not found: %s' % path)
172 172 cluster_dir = os.path.join(path, u'cluster_' + profile)
173 173 return ClusterDir(cluster_dir)
174 174
175 175 @classmethod
176 176 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
177 177 """Find an existing cluster dir by profile name, return its ClusterDir.
178 178
179 179 This searches through a sequence of paths for a cluster dir. If it
180 180 is not found, a :class:`ClusterDirError` exception will be raised.
181 181
182 182 The search path algorithm is:
183 183 1. ``os.getcwd()``
184 184 2. ``ipython_dir``
185 185 3. The directories found in the ":" separated
186 186 :env:`IPCLUSTER_DIR_PATH` environment variable.
187 187
188 188 Parameters
189 189 ----------
190 190 ipython_dir : unicode or str
191 191 The IPython directory to use.
192 192 profile : unicode or str
193 193 The name of the profile. The name of the cluster directory
194 194 will be "cluster_<profile>".
195 195 """
196 196 dirname = u'cluster_' + profile
197 197 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
198 198 if cluster_dir_paths:
199 199 cluster_dir_paths = cluster_dir_paths.split(':')
200 200 else:
201 201 cluster_dir_paths = []
202 202 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
203 203 for p in paths:
204 204 cluster_dir = os.path.join(p, dirname)
205 205 if os.path.isdir(cluster_dir):
206 206 return ClusterDir(cluster_dir)
207 207 else:
208 208 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
209 209
210 210 @classmethod
211 211 def find_cluster_dir(cls, cluster_dir):
212 212 """Find/create a cluster dir and return its ClusterDir.
213 213
214 214 This will create the cluster directory if it doesn't exist.
215 215
216 216 Parameters
217 217 ----------
218 218 cluster_dir : unicode or str
219 219 The path of the cluster directory. This is expanded using
220 220 :func:`IPython.utils.genutils.expand_path`.
221 221 """
222 222 cluster_dir = genutils.expand_path(cluster_dir)
223 223 if not os.path.isdir(cluster_dir):
224 224 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
225 225 return ClusterDir(cluster_dir)
226 226
227 227
228 228 class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):
229 229 """Default command line options for IPython cluster applications."""
230 230
231 231 def _add_other_arguments(self):
232 232 self.parser.add_argument('--ipython-dir',
233 233 dest='Global.ipython_dir',type=unicode,
234 234 help='Set to override default location of Global.ipython_dir.',
235 235 default=NoConfigDefault,
236 236 metavar='Global.ipython_dir'
237 237 )
238 238 self.parser.add_argument('-p', '--profile',
239 239 dest='Global.profile',type=unicode,
240 240 help='The string name of the profile to be used. This determines '
241 241 'the name of the cluster dir as: cluster_<profile>. The default profile '
242 242 'is named "default". The cluster directory is resolve this way '
243 243 'if the --cluster-dir option is not used.',
244 244 default=NoConfigDefault,
245 245 metavar='Global.profile'
246 246 )
247 247 self.parser.add_argument('--log-level',
248 248 dest="Global.log_level",type=int,
249 249 help='Set the log level (0,10,20,30,40,50). Default is 30.',
250 250 default=NoConfigDefault,
251 251 metavar="Global.log_level"
252 252 )
253 253 self.parser.add_argument('--cluster-dir',
254 254 dest='Global.cluster_dir',type=unicode,
255 255 help='Set the cluster dir. This overrides the logic used by the '
256 256 '--profile option.',
257 257 default=NoConfigDefault,
258 258 metavar='Global.cluster_dir'
259 259 ),
260 260 self.parser.add_argument('--working-dir',
261 261 dest='Global.working_dir',type=unicode,
262 262 help='Set the working dir for the process.',
263 263 default=NoConfigDefault,
264 264 metavar='Global.working_dir'
265 265 )
266 266 self.parser.add_argument('--clean-logs',
267 267 dest='Global.clean_logs', action='store_true',
268 268 help='Delete old log flies before starting.',
269 269 default=NoConfigDefault
270 270 )
271 271 self.parser.add_argument('--no-clean-logs',
272 272 dest='Global.clean_logs', action='store_false',
273 273 help="Don't Delete old log flies before starting.",
274 274 default=NoConfigDefault
275 275 )
276 276
277 277 class ApplicationWithClusterDir(Application):
278 278 """An application that puts everything into a cluster directory.
279 279
280 280 Instead of looking for things in the ipython_dir, this type of application
281 281 will use its own private directory called the "cluster directory"
282 282 for things like config files, log files, etc.
283 283
284 284 The cluster directory is resolved as follows:
285 285
286 286 * If the ``--cluster-dir`` option is given, it is used.
287 287 * If ``--cluster-dir`` is not given, the application directory is
288 288 resolve using the profile name as ``cluster_<profile>``. The search
289 289 path for this directory is then i) cwd if it is found there
290 290 and ii) in ipython_dir otherwise.
291 291
292 292 The config file for the application is to be put in the cluster
293 293 dir and named the value of the ``config_file_name`` class attribute.
294 294 """
295 295
296 296 auto_create_cluster_dir = True
297 297
298 298 def create_default_config(self):
299 299 super(ApplicationWithClusterDir, self).create_default_config()
300 300 self.default_config.Global.profile = u'default'
301 301 self.default_config.Global.cluster_dir = u''
302 302 self.default_config.Global.working_dir = os.getcwd()
303 303 self.default_config.Global.log_to_file = False
304 304 self.default_config.Global.clean_logs = False
305 305
306 306 def create_command_line_config(self):
307 307 """Create and return a command line config loader."""
308 308 return AppWithClusterDirArgParseConfigLoader(
309 309 description=self.description,
310 310 version=release.version
311 311 )
312 312
313 313 def find_resources(self):
314 314 """This resolves the cluster directory.
315 315
316 316 This tries to find the cluster directory and if successful, it will
317 317 have done:
318 318 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
319 319 the application.
320 320 * Sets ``self.cluster_dir`` attribute of the application and config
321 321 objects.
322 322
323 323 The algorithm used for this is as follows:
324 324 1. Try ``Global.cluster_dir``.
325 325 2. Try using ``Global.profile``.
326 326 3. If both of these fail and ``self.auto_create_cluster_dir`` is
327 327 ``True``, then create the new cluster dir in the IPython directory.
328 328 4. If all fails, then raise :class:`ClusterDirError`.
329 329 """
330 330
331 331 try:
332 332 cluster_dir = self.command_line_config.Global.cluster_dir
333 333 except AttributeError:
334 334 cluster_dir = self.default_config.Global.cluster_dir
335 335 cluster_dir = genutils.expand_path(cluster_dir)
336 336 try:
337 337 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
338 338 except ClusterDirError:
339 339 pass
340 340 else:
341 341 self.log.info('Using existing cluster dir: %s' % \
342 342 self.cluster_dir_obj.location
343 343 )
344 344 self.finish_cluster_dir()
345 345 return
346 346
347 347 try:
348 348 self.profile = self.command_line_config.Global.profile
349 349 except AttributeError:
350 350 self.profile = self.default_config.Global.profile
351 351 try:
352 352 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
353 353 self.ipython_dir, self.profile)
354 354 except ClusterDirError:
355 355 pass
356 356 else:
357 357 self.log.info('Using existing cluster dir: %s' % \
358 358 self.cluster_dir_obj.location
359 359 )
360 360 self.finish_cluster_dir()
361 361 return
362 362
363 363 if self.auto_create_cluster_dir:
364 364 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
365 365 self.ipython_dir, self.profile
366 366 )
367 367 self.log.info('Creating new cluster dir: %s' % \
368 368 self.cluster_dir_obj.location
369 369 )
370 370 self.finish_cluster_dir()
371 371 else:
372 372 raise ClusterDirError('Could not find a valid cluster directory.')
373 373
374 374 def finish_cluster_dir(self):
375 375 # Set the cluster directory
376 376 self.cluster_dir = self.cluster_dir_obj.location
377 377
378 378 # These have to be set because they could be different from the one
379 379 # that we just computed. Because command line has the highest
380 380 # priority, this will always end up in the master_config.
381 381 self.default_config.Global.cluster_dir = self.cluster_dir
382 382 self.command_line_config.Global.cluster_dir = self.cluster_dir
383 383
384 384 # Set the search path to the cluster directory
385 385 self.config_file_paths = (self.cluster_dir,)
386 386
387 387 def find_config_file_name(self):
388 388 """Find the config file name for this application."""
389 389 # For this type of Application it should be set as a class attribute.
390 390 if not hasattr(self, 'config_file_name'):
391 391 self.log.critical("No config filename found")
392 392
393 393 def find_config_file_paths(self):
394 394 # Set the search path to the cluster directory
395 395 self.config_file_paths = (self.cluster_dir,)
396 396
397 397 def pre_construct(self):
398 398 # The log and security dirs were set earlier, but here we put them
399 399 # into the config and log them.
400 400 config = self.master_config
401 401 sdir = self.cluster_dir_obj.security_dir
402 402 self.security_dir = config.Global.security_dir = sdir
403 403 ldir = self.cluster_dir_obj.log_dir
404 404 self.log_dir = config.Global.log_dir = ldir
405 405 pdir = self.cluster_dir_obj.pid_dir
406 406 self.pid_dir = config.Global.pid_dir = pdir
407 407 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
408 408 config.Global.working_dir = unicode(genutils.expand_path(config.Global.working_dir))
409 409 # Change to the working directory. We do this just before construct
410 410 # is called so all the components there have the right working dir.
411 411 self.to_working_dir()
412 412
413 413 def to_working_dir(self):
414 414 wd = self.master_config.Global.working_dir
415 415 if unicode(wd) != unicode(os.getcwd()):
416 416 os.chdir(wd)
417 417 self.log.info("Changing to working dir: %s" % wd)
418 418
419 419 def start_logging(self):
420 420 # Remove old log files
421 421 if self.master_config.Global.clean_logs:
422 422 log_dir = self.master_config.Global.log_dir
423 423 for f in os.listdir(log_dir):
424 424 if f.startswith(self.name + u'-') and f.endswith('.log'):
425 425 os.remove(os.path.join(log_dir, f))
426 426 # Start logging to the new log file
427 427 if self.master_config.Global.log_to_file:
428 428 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
429 429 logfile = os.path.join(self.log_dir, log_filename)
430 430 open_log_file = open(logfile, 'w')
431 431 else:
432 432 open_log_file = sys.stdout
433 433 log.startLogging(open_log_file)
434 434
435 435 def write_pid_file(self, overwrite=False):
436 436 """Create a .pid file in the pid_dir with my pid.
437 437
438 438 This must be called after pre_construct, which sets `self.pid_dir`.
439 439 This raises :exc:`PIDFileError` if the pid file exists already.
440 440 """
441 441 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
442 442 if os.path.isfile(pid_file):
443 443 pid = self.get_pid_from_file()
444 444 if not overwrite:
445 445 raise PIDFileError(
446 446 'The pid file [%s] already exists. \nThis could mean that this '
447 447 'server is already running with [pid=%s].' % (pid_file, pid)
448 448 )
449 449 with open(pid_file, 'w') as f:
450 450 self.log.info("Creating pid file: %s" % pid_file)
451 451 f.write(repr(os.getpid())+'\n')
452 452
453 453 def remove_pid_file(self):
454 454 """Remove the pid file.
455 455
456 456 This should be called at shutdown by registering a callback with
457 :func:`reactor.addSystemEventTrigger`.
457 :func:`reactor.addSystemEventTrigger`. This needs to return
458 ``None``.
458 459 """
459 460 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
460 461 if os.path.isfile(pid_file):
461 462 try:
462 463 self.log.info("Removing pid file: %s" % pid_file)
463 464 os.remove(pid_file)
464 465 except:
465 466 self.log.warn("Error removing the pid file: %s" % pid_file)
466 raise
467 467
468 468 def get_pid_from_file(self):
469 469 """Get the pid from the pid file.
470 470
471 471 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
472 472 """
473 473 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
474 474 if os.path.isfile(pid_file):
475 475 with open(pid_file, 'r') as f:
476 476 pid = int(f.read().strip())
477 477 return pid
478 478 else:
479 479 raise PIDFileError('pid file not found: %s' % pid_file)
480 480
481 481
@@ -1,412 +1,454 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import logging
19 19 import os
20 20 import signal
21 21 import sys
22 22
23 23 if os.name=='posix':
24 24 from twisted.scripts._twistd_unix import daemonize
25 25
26 26 from IPython.core import release
27 27 from IPython.external import argparse
28 28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
29 29 from IPython.utils.importstring import import_item
30 30
31 31 from IPython.kernel.clusterdir import (
32 32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
33 33 )
34 34
35 from twisted.internet import reactor
36 from twisted.python import log
35 from twisted.internet import reactor, defer
36 from twisted.python import log, failure
37 37
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # The ipcluster application
41 41 #-----------------------------------------------------------------------------
42 42
43 43
44 44 # Exit codes for ipcluster
45 45
46 46 # This will be the exit code if the ipcluster appears to be running because
47 47 # a .pid file exists
48 48 ALREADY_STARTED = 10
49 49
50 50 # This will be the exit code if ipcluster stop is run, but there is not .pid
51 51 # file to be found.
52 52 ALREADY_STOPPED = 11
53 53
54 54
55 55 class IPClusterCLLoader(ArgParseConfigLoader):
56 56
57 57 def _add_arguments(self):
58 58 # This has all the common options that all subcommands use
59 59 parent_parser1 = argparse.ArgumentParser(add_help=False)
60 60 parent_parser1.add_argument('--ipython-dir',
61 61 dest='Global.ipython_dir',type=unicode,
62 62 help='Set to override default location of Global.ipython_dir.',
63 63 default=NoConfigDefault,
64 64 metavar='Global.ipython_dir')
65 65 parent_parser1.add_argument('--log-level',
66 66 dest="Global.log_level",type=int,
67 67 help='Set the log level (0,10,20,30,40,50). Default is 30.',
68 68 default=NoConfigDefault,
69 69 metavar='Global.log_level')
70 70
71 71 # This has all the common options that other subcommands use
72 72 parent_parser2 = argparse.ArgumentParser(add_help=False)
73 73 parent_parser2.add_argument('-p','--profile',
74 74 dest='Global.profile',type=unicode,
75 75 default=NoConfigDefault,
76 76 help='The string name of the profile to be used. This determines '
77 77 'the name of the cluster dir as: cluster_<profile>. The default profile '
78 78 'is named "default". The cluster directory is resolve this way '
79 79 'if the --cluster-dir option is not used.',
80 80 default=NoConfigDefault,
81 81 metavar='Global.profile')
82 82 parent_parser2.add_argument('--cluster-dir',
83 83 dest='Global.cluster_dir',type=unicode,
84 84 default=NoConfigDefault,
85 85 help='Set the cluster dir. This overrides the logic used by the '
86 86 '--profile option.',
87 87 default=NoConfigDefault,
88 88 metavar='Global.cluster_dir'),
89 89 parent_parser2.add_argument('--working-dir',
90 90 dest='Global.working_dir',type=unicode,
91 91 help='Set the working dir for the process.',
92 92 default=NoConfigDefault,
93 93 metavar='Global.working_dir')
94 94 parent_parser2.add_argument('--log-to-file',
95 95 action='store_true', dest='Global.log_to_file',
96 96 default=NoConfigDefault,
97 97 help='Log to a file in the log directory (default is stdout)'
98 98 )
99 99
100 100 subparsers = self.parser.add_subparsers(
101 101 dest='Global.subcommand',
102 102 title='ipcluster subcommands',
103 103 description='ipcluster has a variety of subcommands. '
104 104 'The general way of running ipcluster is "ipcluster <cmd> '
105 105 ' [options]""',
106 106 help='For more help, type "ipcluster <cmd> -h"')
107 107
108 108 parser_list = subparsers.add_parser(
109 109 'list',
110 110 help='List all clusters in cwd and ipython_dir.',
111 111 parents=[parent_parser1]
112 112 )
113 113
114 114 parser_create = subparsers.add_parser(
115 115 'create',
116 116 help='Create a new cluster directory.',
117 117 parents=[parent_parser1, parent_parser2]
118 118 )
119 119 parser_create.add_argument(
120 120 '--reset-config',
121 121 dest='Global.reset_config', action='store_true',
122 122 default=NoConfigDefault,
123 123 help='Recopy the default config files to the cluster directory. '
124 124 'You will loose any modifications you have made to these files.'
125 125 )
126 126
127 127 parser_start = subparsers.add_parser(
128 128 'start',
129 129 help='Start a cluster.',
130 130 parents=[parent_parser1, parent_parser2]
131 131 )
132 132 parser_start.add_argument(
133 133 '-n', '--number',
134 134 type=int, dest='Global.n',
135 135 default=NoConfigDefault,
136 136 help='The number of engines to start.',
137 137 metavar='Global.n'
138 138 )
139 139 parser_start.add_argument('--clean-logs',
140 140 dest='Global.clean_logs', action='store_true',
141 141 help='Delete old log flies before starting.',
142 142 default=NoConfigDefault
143 143 )
144 144 parser_start.add_argument('--no-clean-logs',
145 145 dest='Global.clean_logs', action='store_false',
146 146 help="Don't delete old log flies before starting.",
147 147 default=NoConfigDefault
148 148 )
149 149 parser_start.add_argument('--daemon',
150 150 dest='Global.daemonize', action='store_true',
151 151 help='Daemonize the ipcluster program. This implies --log-to-file',
152 152 default=NoConfigDefault
153 153 )
154 154 parser_start.add_argument('--no-daemon',
155 155 dest='Global.daemonize', action='store_false',
156 156 help="Dont't daemonize the ipcluster program.",
157 157 default=NoConfigDefault
158 158 )
159 159
160 160 parser_start = subparsers.add_parser(
161 161 'stop',
162 162 help='Stop a cluster.',
163 163 parents=[parent_parser1, parent_parser2]
164 164 )
165 165 parser_start.add_argument('--signal',
166 166 dest='Global.signal', type=int,
167 167 help="The signal number to use in stopping the cluster (default=2).",
168 168 metavar="Global.signal",
169 169 default=NoConfigDefault
170 170 )
171 171
172 172
173 173 default_config_file_name = u'ipcluster_config.py'
174 174
175 175
176 176 class IPClusterApp(ApplicationWithClusterDir):
177 177
178 178 name = u'ipcluster'
179 179 description = 'Start an IPython cluster (controller and engines).'
180 180 config_file_name = default_config_file_name
181 181 default_log_level = logging.INFO
182 182 auto_create_cluster_dir = False
183 183
184 184 def create_default_config(self):
185 185 super(IPClusterApp, self).create_default_config()
186 186 self.default_config.Global.controller_launcher = \
187 187 'IPython.kernel.launcher.LocalControllerLauncher'
188 188 self.default_config.Global.engine_launcher = \
189 189 'IPython.kernel.launcher.LocalEngineSetLauncher'
190 190 self.default_config.Global.n = 2
191 191 self.default_config.Global.reset_config = False
192 192 self.default_config.Global.clean_logs = True
193 193 self.default_config.Global.signal = 2
194 194 self.default_config.Global.daemonize = False
195 195
196 196 def create_command_line_config(self):
197 197 """Create and return a command line config loader."""
198 198 return IPClusterCLLoader(
199 199 description=self.description,
200 200 version=release.version
201 201 )
202 202
203 203 def find_resources(self):
204 204 subcommand = self.command_line_config.Global.subcommand
205 205 if subcommand=='list':
206 206 self.list_cluster_dirs()
207 207 # Exit immediately because there is nothing left to do.
208 208 self.exit()
209 209 elif subcommand=='create':
210 210 self.auto_create_cluster_dir = True
211 211 super(IPClusterApp, self).find_resources()
212 212 elif subcommand=='start' or subcommand=='stop':
213 213 self.auto_create_cluster_dir = False
214 214 try:
215 215 super(IPClusterApp, self).find_resources()
216 216 except ClusterDirError:
217 217 raise ClusterDirError(
218 218 "Could not find a cluster directory. A cluster dir must "
219 219 "be created before running 'ipcluster start'. Do "
220 220 "'ipcluster create -h' or 'ipcluster list -h' for more "
221 221 "information about creating and listing cluster dirs."
222 222 )
223 223
224 224 def list_cluster_dirs(self):
225 225 # Find the search paths
226 226 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
227 227 if cluster_dir_paths:
228 228 cluster_dir_paths = cluster_dir_paths.split(':')
229 229 else:
230 230 cluster_dir_paths = []
231 231 try:
232 232 ipython_dir = self.command_line_config.Global.ipython_dir
233 233 except AttributeError:
234 234 ipython_dir = self.default_config.Global.ipython_dir
235 235 paths = [os.getcwd(), ipython_dir] + \
236 236 cluster_dir_paths
237 237 paths = list(set(paths))
238 238
239 239 self.log.info('Searching for cluster dirs in paths: %r' % paths)
240 240 for path in paths:
241 241 files = os.listdir(path)
242 242 for f in files:
243 243 full_path = os.path.join(path, f)
244 244 if os.path.isdir(full_path) and f.startswith('cluster_'):
245 245 profile = full_path.split('_')[-1]
246 246 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
247 247 print start_cmd + " ==> " + full_path
248 248
249 249 def pre_construct(self):
250 250 # This is where we cd to the working directory.
251 251 super(IPClusterApp, self).pre_construct()
252 252 config = self.master_config
253 253 try:
254 254 daemon = config.Global.daemonize
255 255 if daemon:
256 256 config.Global.log_to_file = True
257 257 except AttributeError:
258 258 pass
259 259
260 260 def construct(self):
261 261 config = self.master_config
262 262 if config.Global.subcommand=='list':
263 263 pass
264 264 elif config.Global.subcommand=='create':
265 265 self.log.info('Copying default config files to cluster directory '
266 266 '[overwrite=%r]' % (config.Global.reset_config,))
267 267 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
268 268 elif config.Global.subcommand=='start':
269 269 self.start_logging()
270 270 reactor.callWhenRunning(self.start_launchers)
271 271
272 272 def start_launchers(self):
273 273 config = self.master_config
274 274
275 275 # Create the launchers
276 276 el_class = import_item(config.Global.engine_launcher)
277 277 self.engine_launcher = el_class(
278 278 self.cluster_dir, config=config
279 279 )
280 280 cl_class = import_item(config.Global.controller_launcher)
281 281 self.controller_launcher = cl_class(
282 282 self.cluster_dir, config=config
283 283 )
284 284
285 285 # Setup signals
286 signal.signal(signal.SIGINT, self.stop_launchers)
286 signal.signal(signal.SIGINT, self.sigint_handler)
287 287
288 # Setup the observing of stopping
288 # Setup the observing of stopping. If the controller dies, shut
289 # everything down as that will be completely fatal for the engines.
289 290 d1 = self.controller_launcher.observe_stop()
290 d1.addCallback(self.stop_engines)
291 d1.addErrback(self.err_and_stop)
292 # If this triggers, just let them die
293 # d2 = self.engine_launcher.observe_stop()
291 d1.addCallback(self.stop_launchers)
292 # But, we don't monitor the stopping of engines. An engine dying
293 # is just fine and in principle a user could start a new engine.
294 # Also, if we did monitor engine stopping, it is difficult to
295 # know what to do when only some engines die. Currently, the
296 # observing of engine stopping is inconsistent. Some launchers
297 # might trigger on a single engine stopping, other wait until
298 # all stop. TODO: think more about how to handle this.
294 299
295 300 # Start the controller and engines
301 self._stopping = False # Make sure stop_launchers is not called 2x.
302 d = self.start_controller()
303 d.addCallback(self.start_engines)
304 d.addCallback(self.startup_message)
305 # If the controller or engines fail to start, stop everything
306 d.addErrback(self.stop_launchers)
307 return d
308
309 def startup_message(self, r=None):
310 log.msg("IPython cluster: started")
311 return r
312
313 def start_controller(self, r=None):
314 # log.msg("In start_controller")
315 config = self.master_config
296 316 d = self.controller_launcher.start(
297 317 cluster_dir=config.Global.cluster_dir
298 318 )
299 d.addCallback(lambda _: self.start_engines())
300 d.addErrback(self.err_and_stop)
301
302 def err_and_stop(self, f):
303 log.msg('Unexpected error in ipcluster:')
304 log.err(f)
305 reactor.stop()
306
307 def stop_engines(self, r):
308 return self.engine_launcher.stop()
309
310 def start_engines(self):
319 return d
320
321 def start_engines(self, r=None):
322 # log.msg("In start_engines")
311 323 config = self.master_config
312 324 d = self.engine_launcher.start(
313 325 config.Global.n,
314 326 cluster_dir=config.Global.cluster_dir
315 327 )
316 328 return d
317 329
318 def stop_launchers(self, signum, frame):
319 log.msg("Stopping cluster")
320 d1 = self.engine_launcher.stop()
321 d2 = self.controller_launcher.stop()
322 # d1.addCallback(lambda _: self.controller_launcher.stop)
323 d1.addErrback(self.err_and_stop)
324 d2.addErrback(self.err_and_stop)
325 reactor.callLater(2.0, reactor.stop)
330 def stop_controller(self, r=None):
331 # log.msg("In stop_controller")
332 if self.controller_launcher.running:
333 d = self.controller_launcher.stop()
334 d.addErrback(self.log_err)
335 return d
336 else:
337 return defer.succeed(None)
338
339 def stop_engines(self, r=None):
340 # log.msg("In stop_engines")
341 if self.engine_launcher.running:
342 d = self.engine_launcher.stop()
343 d.addErrback(self.log_err)
344 return d
345 else:
346 return defer.succeed(None)
326 347
348 def log_err(self, f):
349 log.msg(f.getTraceback())
350 return None
351
352 def stop_launchers(self, r=None):
353 if not self._stopping:
354 self._stopping = True
355 if isinstance(r, failure.Failure):
356 log.msg('Unexpected error in ipcluster:')
357 log.msg(r.getTraceback())
358 log.msg("IPython cluster: stopping")
359 d= self.stop_engines()
360 d2 = self.stop_controller()
361 # Wait a few seconds to let things shut down.
362 reactor.callLater(3.0, reactor.stop)
363
364 def sigint_handler(self, signum, frame):
365 self.stop_launchers()
366
327 367 def start_logging(self):
328 # Remove old log files
368 # Remove old log files of the controller and engine
329 369 if self.master_config.Global.clean_logs:
330 370 log_dir = self.master_config.Global.log_dir
331 371 for f in os.listdir(log_dir):
332 if f.startswith('ipengine' + '-') and f.endswith('.log'):
333 os.remove(os.path.join(log_dir, f))
334 for f in os.listdir(log_dir):
335 if f.startswith('ipcontroller' + '-') and f.endswith('.log'):
336 os.remove(os.path.join(log_dir, f))
372 if f.startswith('ipengine' + '-'):
373 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
374 os.remove(os.path.join(log_dir, f))
375 if f.startswith('ipcontroller' + '-'):
376 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
377 os.remove(os.path.join(log_dir, f))
378 # This will remote old log files for ipcluster itself
337 379 super(IPClusterApp, self).start_logging()
338 380
339 381 def start_app(self):
340 382 """Start the application, depending on what subcommand is used."""
341 383 subcmd = self.master_config.Global.subcommand
342 384 if subcmd=='create' or subcmd=='list':
343 385 return
344 386 elif subcmd=='start':
345 387 self.start_app_start()
346 388 elif subcmd=='stop':
347 389 self.start_app_stop()
348 390
349 391 def start_app_start(self):
350 392 """Start the app for the start subcommand."""
351 393 config = self.master_config
352 394 # First see if the cluster is already running
353 395 try:
354 396 pid = self.get_pid_from_file()
355 397 except PIDFileError:
356 398 pass
357 399 else:
358 400 self.log.critical(
359 401 'Cluster is already running with [pid=%s]. '
360 402 'use "ipcluster stop" to stop the cluster.' % pid
361 403 )
362 404 # Here I exit with a unusual exit status that other processes
363 405 # can watch for to learn how I existed.
364 406 self.exit(ALREADY_STARTED)
365 407
366 408 # Now log and daemonize
367 409 self.log.info(
368 410 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
369 411 )
370 412 if config.Global.daemonize:
371 413 if os.name=='posix':
372 414 daemonize()
373 415
374 416 # Now write the new pid file AFTER our new forked pid is active.
375 417 self.write_pid_file()
376 418 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
377 419 reactor.run()
378 420
379 421 def start_app_stop(self):
380 422 """Start the app for the stop subcommand."""
381 423 config = self.master_config
382 424 try:
383 425 pid = self.get_pid_from_file()
384 426 except PIDFileError:
385 427 self.log.critical(
386 428 'Problem reading pid file, cluster is probably not running.'
387 429 )
388 430 # Here I exit with a unusual exit status that other processes
389 431 # can watch for to learn how I existed.
390 432 self.exit(ALREADY_STOPPED)
391 433 else:
392 434 if os.name=='posix':
393 435 sig = config.Global.signal
394 436 self.log.info(
395 437 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
396 438 )
397 439 os.kill(pid, sig)
398 440 elif os.name=='nt':
399 441 # As of right now, we don't support daemonize on Windows, so
400 442 # stop will not do anything. Minimally, it should clean up the
401 443 # old .pid files.
402 444 self.remove_pid_file()
403 445
404 446 def launch_new_instance():
405 447 """Create and run the IPython cluster."""
406 448 app = IPClusterApp()
407 449 app.start()
408 450
409 451
410 452 if __name__ == '__main__':
411 453 launch_new_instance()
412 454
@@ -1,306 +1,318 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Job and task components for writing .xml files that the Windows HPC Server
5 5 2008 can use to start jobs.
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2009 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 from __future__ import with_statement
20 20
21 21 import os
22 22 import re
23 23 import uuid
24 24
25 25 from xml.etree import ElementTree as ET
26 26 from xml.dom import minidom
27 27
28 28 from IPython.core.component import Component
29 29 from IPython.external import Itpl
30 30 from IPython.utils.traitlets import (
31 31 Str, Int, List, Unicode, Instance,
32 32 Enum, Bool, CStr
33 33 )
34 34
35 35 #-----------------------------------------------------------------------------
36 36 # Job and Task Component
37 37 #-----------------------------------------------------------------------------
38 38
39 39
40 40 def as_str(value):
41 41 if isinstance(value, str):
42 42 return value
43 43 elif isinstance(value, bool):
44 44 if value:
45 45 return 'true'
46 46 else:
47 47 return 'false'
48 48 elif isinstance(value, (int, float)):
49 49 return repr(value)
50 50 else:
51 51 return value
52 52
53 53
54 54 def indent(elem, level=0):
55 55 i = "\n" + level*" "
56 56 if len(elem):
57 57 if not elem.text or not elem.text.strip():
58 58 elem.text = i + " "
59 59 if not elem.tail or not elem.tail.strip():
60 60 elem.tail = i
61 61 for elem in elem:
62 62 indent(elem, level+1)
63 63 if not elem.tail or not elem.tail.strip():
64 64 elem.tail = i
65 65 else:
66 66 if level and (not elem.tail or not elem.tail.strip()):
67 67 elem.tail = i
68 68
69 69
70 70 def find_username():
71 71 domain = os.environ.get('USERDOMAIN')
72 72 username = os.environ.get('USERNAME','')
73 73 if domain is None:
74 74 return username
75 75 else:
76 76 return '%s\\%s' % (domain, username)
77 77
78 78
79 79 class WinHPCJob(Component):
80 80
81 81 job_id = Str('')
82 82 job_name = Str('MyJob', config=True)
83 83 min_cores = Int(1, config=True)
84 84 max_cores = Int(1, config=True)
85 85 min_sockets = Int(1, config=True)
86 86 max_sockets = Int(1, config=True)
87 87 min_nodes = Int(1, config=True)
88 88 max_nodes = Int(1, config=True)
89 89 unit_type = Str("Core", config=True)
90 90 auto_calculate_min = Bool(True, config=True)
91 91 auto_calculate_max = Bool(True, config=True)
92 92 run_until_canceled = Bool(False, config=True)
93 93 is_exclusive = Bool(False, config=True)
94 94 username = Str(find_username(), config=True)
95 95 job_type = Str('Batch', config=True)
96 96 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
97 97 default_value='Highest', config=True)
98 98 requested_nodes = Str('', config=True)
99 99 project = Str('IPython', config=True)
100 100 xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
101 101 version = Str("2.000")
102 102 tasks = List([])
103 103
104 104 @property
105 105 def owner(self):
106 106 return self.username
107 107
108 108 def _write_attr(self, root, attr, key):
109 109 s = as_str(getattr(self, attr, ''))
110 110 if s:
111 111 root.set(key, s)
112 112
113 113 def as_element(self):
114 114 # We have to add _A_ type things to get the right order than
115 115 # the MSFT XML parser expects.
116 116 root = ET.Element('Job')
117 117 self._write_attr(root, 'version', '_A_Version')
118 118 self._write_attr(root, 'job_name', '_B_Name')
119 119 self._write_attr(root, 'unit_type', '_C_UnitType')
120 120 self._write_attr(root, 'min_cores', '_D_MinCores')
121 121 self._write_attr(root, 'max_cores', '_E_MaxCores')
122 122 self._write_attr(root, 'min_sockets', '_F_MinSockets')
123 123 self._write_attr(root, 'max_sockets', '_G_MaxSockets')
124 124 self._write_attr(root, 'min_nodes', '_H_MinNodes')
125 125 self._write_attr(root, 'max_nodes', '_I_MaxNodes')
126 126 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
127 127 self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
128 128 self._write_attr(root, 'username', '_L_UserName')
129 129 self._write_attr(root, 'job_type', '_M_JobType')
130 130 self._write_attr(root, 'priority', '_N_Priority')
131 131 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
132 132 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
133 133 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
134 134 self._write_attr(root, 'project', '_R_Project')
135 135 self._write_attr(root, 'owner', '_S_Owner')
136 136 self._write_attr(root, 'xmlns', '_T_xmlns')
137 137 dependencies = ET.SubElement(root, "Dependencies")
138 138 etasks = ET.SubElement(root, "Tasks")
139 139 for t in self.tasks:
140 140 etasks.append(t.as_element())
141 141 return root
142 142
143 143 def tostring(self):
144 144 """Return the string representation of the job description XML."""
145 145 root = self.as_element()
146 146 indent(root)
147 147 txt = ET.tostring(root, encoding="utf-8")
148 148 # Now remove the tokens used to order the attributes.
149 149 txt = re.sub(r'_[A-Z]_','',txt)
150 150 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
151 151 return txt
152 152
153 153 def write(self, filename):
154 154 """Write the XML job description to a file."""
155 155 txt = self.tostring()
156 156 with open(filename, 'w') as f:
157 157 f.write(txt)
158 158
159 159 def add_task(self, task):
160 160 """Add a task to the job.
161 161
162 162 Parameters
163 163 ----------
164 164 task : :class:`WinHPCTask`
165 165 The task object to add.
166 166 """
167 167 self.tasks.append(task)
168 168
169 169
170 170 class WinHPCTask(Component):
171 171
172 172 task_id = Str('')
173 173 task_name = Str('')
174 174 version = Str("2.000")
175 175 min_cores = Int(1, config=True)
176 176 max_cores = Int(1, config=True)
177 177 min_sockets = Int(1, config=True)
178 178 max_sockets = Int(1, config=True)
179 179 min_nodes = Int(1, config=True)
180 180 max_nodes = Int(1, config=True)
181 181 unit_type = Str("Core", config=True)
182 182 command_line = CStr('', config=True)
183 183 work_directory = CStr('', config=True)
184 184 is_rerunnaable = Bool(True, config=True)
185 185 std_out_file_path = CStr('', config=True)
186 186 std_err_file_path = CStr('', config=True)
187 187 is_parametric = Bool(False, config=True)
188 188 environment_variables = Instance(dict, args=(), config=True)
189 189
190 190 def _write_attr(self, root, attr, key):
191 191 s = as_str(getattr(self, attr, ''))
192 192 if s:
193 193 root.set(key, s)
194 194
195 195 def as_element(self):
196 196 root = ET.Element('Task')
197 197 self._write_attr(root, 'version', '_A_Version')
198 198 self._write_attr(root, 'task_name', '_B_Name')
199 199 self._write_attr(root, 'min_cores', '_C_MinCores')
200 200 self._write_attr(root, 'max_cores', '_D_MaxCores')
201 201 self._write_attr(root, 'min_sockets', '_E_MinSockets')
202 202 self._write_attr(root, 'max_sockets', '_F_MaxSockets')
203 203 self._write_attr(root, 'min_nodes', '_G_MinNodes')
204 204 self._write_attr(root, 'max_nodes', '_H_MaxNodes')
205 205 self._write_attr(root, 'command_line', '_I_CommandLine')
206 206 self._write_attr(root, 'work_directory', '_J_WorkDirectory')
207 207 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
208 208 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
209 209 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
210 210 self._write_attr(root, 'is_parametric', '_N_IsParametric')
211 211 self._write_attr(root, 'unit_type', '_O_UnitType')
212 212 root.append(self.get_env_vars())
213 213 return root
214 214
215 215 def get_env_vars(self):
216 216 env_vars = ET.Element('EnvironmentVariables')
217 217 for k, v in self.environment_variables.items():
218 218 variable = ET.SubElement(env_vars, "Variable")
219 219 name = ET.SubElement(variable, "Name")
220 220 name.text = k
221 221 value = ET.SubElement(variable, "Value")
222 222 value.text = v
223 223 return env_vars
224 224
225 225
226 226
227 227 # By declaring these, we can configure the controller and engine separately!
228 228
229 229 class IPControllerJob(WinHPCJob):
230 230 job_name = Str('IPController', config=False)
231 231 is_exclusive = Bool(False, config=True)
232 232 username = Str(find_username(), config=True)
233 233 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
234 234 default_value='Highest', config=True)
235 235 requested_nodes = Str('', config=True)
236 236 project = Str('IPython', config=True)
237 237
238 238
239 239 class IPEngineSetJob(WinHPCJob):
240 240 job_name = Str('IPEngineSet', config=False)
241 241 is_exclusive = Bool(False, config=True)
242 242 username = Str(find_username(), config=True)
243 243 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
244 244 default_value='Highest', config=True)
245 245 requested_nodes = Str('', config=True)
246 246 project = Str('IPython', config=True)
247 247
248 248
249 249 class IPControllerTask(WinHPCTask):
250 250
251 251 task_name = Str('IPController', config=True)
252 252 controller_cmd = List(['ipcontroller.exe'], config=True)
253 253 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
254 254 # I don't want these to be configurable
255 std_out_file_path = CStr(os.path.join('log','ipcontroller-out.txt'), config=False)
256 std_err_file_path = CStr(os.path.join('log','ipcontroller-err.txt'), config=False)
255 std_out_file_path = CStr('', config=False)
256 std_err_file_path = CStr('', config=False)
257 257 min_cores = Int(1, config=False)
258 258 max_cores = Int(1, config=False)
259 259 min_sockets = Int(1, config=False)
260 260 max_sockets = Int(1, config=False)
261 261 min_nodes = Int(1, config=False)
262 262 max_nodes = Int(1, config=False)
263 263 unit_type = Str("Core", config=False)
264 264 work_directory = CStr('', config=False)
265 265
266 def __init__(self, parent, name=None, config=None):
267 super(IPControllerTask, self).__init__(parent, name, config)
268 the_uuid = uuid.uuid1()
269 self.std_out_file_path = os.path.join('log','ipcontroller-%s.out' % the_uuid)
270 self.std_err_file_path = os.path.join('log','ipcontroller-%s.err' % the_uuid)
271
266 272 @property
267 273 def command_line(self):
268 274 return ' '.join(self.controller_cmd + self.controller_args)
269 275
270 276
271 277 class IPEngineTask(WinHPCTask):
272 278
273 279 task_name = Str('IPEngine', config=True)
274 280 engine_cmd = List(['ipengine.exe'], config=True)
275 281 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
276 282 # I don't want these to be configurable
277 std_out_file_path = CStr(os.path.join('log','ipengine-out-%s.txt' % uuid.uuid1()), config=False)
278 std_err_file_path = CStr(os.path.join('log','ipengine-err-%s.txt' % uuid.uuid1()), config=False)
283 std_out_file_path = CStr('', config=False)
284 std_err_file_path = CStr('', config=False)
279 285 min_cores = Int(1, config=False)
280 286 max_cores = Int(1, config=False)
281 287 min_sockets = Int(1, config=False)
282 288 max_sockets = Int(1, config=False)
283 289 min_nodes = Int(1, config=False)
284 290 max_nodes = Int(1, config=False)
285 291 unit_type = Str("Core", config=False)
286 292 work_directory = CStr('', config=False)
287 293
294 def __init__(self, parent, name=None, config=None):
295 super(IPEngineTask,self).__init__(parent, name, config)
296 the_uuid = uuid.uuid1()
297 self.std_out_file_path = os.path.join('log','ipengine-%s.out' % the_uuid)
298 self.std_err_file_path = os.path.join('log','ipengine-%s.err' % the_uuid)
299
288 300 @property
289 301 def command_line(self):
290 302 return ' '.join(self.engine_cmd + self.engine_args)
291 303
292 304
293 305 # j = WinHPCJob(None)
294 306 # j.job_name = 'IPCluster'
295 307 # j.username = 'GNET\\bgranger'
296 308 # j.requested_nodes = 'GREEN'
297 309 #
298 310 # t = WinHPCTask(None)
299 311 # t.task_name = 'Controller'
300 312 # t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
301 313 # t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
302 314 # t.std_out_file_path = 'controller-out.txt'
303 315 # t.std_err_file_path = 'controller-err.txt'
304 316 # t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
305 317 # j.add_task(t)
306 318
General Comments 0
You need to be logged in to leave comments. Login now